Update spec_utils.py

This commit is contained in:
aufr33 2021-06-30 16:45:27 +03:00 committed by GitHub
parent cedc6b9790
commit 68a63c7e91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,11 +1,12 @@
import os import os
import librosa import librosa
import numpy as np import numpy as np
import soundfile as sf import soundfile as sf
import math import math
import json import json
import hashlib import hashlib
import threading
from tqdm import tqdm from tqdm import tqdm
@ -26,53 +27,39 @@ def crop_center(h1, h2):
return h1 return h1
def wave_to_spectrogram(wave, hop_length, n_fft, mid_side=False, reverse=False): def wave_to_spectrogram(wave, hop_length, n_fft, mp, multithreading):
if reverse: if mp.param['reverse']:
wave_left = np.flip(np.asfortranarray(wave[0])) wave_left = np.flip(np.asfortranarray(wave[0]))
wave_right = np.flip(np.asfortranarray(wave[1])) wave_right = np.flip(np.asfortranarray(wave[1]))
elif mid_side: elif mp.param['mid_side_b']:
wave_left = np.asfortranarray(np.add(wave[0], wave[1] * .5))
wave_right = np.asfortranarray(np.subtract(wave[1], wave[0] * .5))
elif mp.param['mid_side']:
wave_left = np.asfortranarray(np.add(wave[0], wave[1]) / 2) wave_left = np.asfortranarray(np.add(wave[0], wave[1]) / 2)
wave_right = np.asfortranarray(np.subtract(wave[0], wave[1])) wave_right = np.asfortranarray(np.subtract(wave[0], wave[1]))
else: else:
wave_left = np.asfortranarray(wave[0]) wave_left = np.asfortranarray(wave[0])
wave_right = np.asfortranarray(wave[1]) wave_right = np.asfortranarray(wave[1])
if multithreading:
def run_thread(**kwargs):
global spec_left_mt
spec_left_mt = librosa.stft(**kwargs)
spec_left = librosa.stft(wave_left, n_fft, hop_length=hop_length) thread = threading.Thread(target=run_thread, kwargs={'y': wave_left, 'n_fft': n_fft, 'hop_length': hop_length})
spec_right = librosa.stft(wave_right, n_fft, hop_length=hop_length) thread.start()
spec_right = librosa.stft(wave_right, n_fft, hop_length=hop_length)
spec = np.asfortranarray([spec_left, spec_right]) thread.join()
spec = np.asfortranarray([spec_left_mt, spec_right])
return spec
def wave_to_spectrogram_mt(wave, hop_length, n_fft, mid_side=False, reverse=False):
import threading
if reverse:
wave_left = np.flip(np.asfortranarray(wave[0]))
wave_right = np.flip(np.asfortranarray(wave[1]))
elif mid_side:
wave_left = np.asfortranarray(np.add(wave[0], wave[1]) / 2)
wave_right = np.asfortranarray(np.subtract(wave[0], wave[1]))
else: else:
wave_left = np.asfortranarray(wave[0]) spec_left = librosa.stft(wave_left, n_fft, hop_length=hop_length)
wave_right = np.asfortranarray(wave[1]) spec_right = librosa.stft(wave_right, n_fft, hop_length=hop_length)
spec = np.asfortranarray([spec_left, spec_right])
def run_thread(**kwargs):
global spec_left
spec_left = librosa.stft(**kwargs)
thread = threading.Thread(target=run_thread, kwargs={'y': wave_left, 'n_fft': n_fft, 'hop_length': hop_length})
thread.start()
spec_right = librosa.stft(wave_right, n_fft, hop_length=hop_length)
thread.join()
spec = np.asfortranarray([spec_left, spec_right])
return spec return spec
def combine_spectrograms(specs, mp): def combine_spectrograms(specs, mp):
l = min([specs[i].shape[2] for i in specs]) l = min([specs[i].shape[2] for i in specs])
spec_c = np.zeros(shape=(2, mp.param['bins'] + 1, l), dtype=np.complex64) spec_c = np.zeros(shape=(2, mp.param['bins'] + 1, l), dtype=np.complex64)
@ -99,7 +86,7 @@ def combine_spectrograms(specs, mp):
spec_c[:, b, :] *= g spec_c[:, b, :] *= g
return np.asfortranarray(spec_c) return np.asfortranarray(spec_c)
def spectrogram_to_image(spec, mode='magnitude'): def spectrogram_to_image(spec, mode='magnitude'):
if mode == 'magnitude': if mode == 'magnitude':
@ -214,8 +201,8 @@ def cache_or_load(mix_path, inst_path, mp):
X_wave[d], y_wave[d] = align_wave_head_and_tail(X_wave[d], y_wave[d]) X_wave[d], y_wave[d] = align_wave_head_and_tail(X_wave[d], y_wave[d])
X_spec_s[d] = wave_to_spectrogram(X_wave[d], bp['hl'], bp['n_fft'], mp.param['mid_side'], mp.param['reverse']) X_spec_s[d] = wave_to_spectrogram(X_wave[d], bp['hl'], bp['n_fft'], mp, False)
y_spec_s[d] = wave_to_spectrogram(y_wave[d], bp['hl'], bp['n_fft'], mp.param['mid_side'], mp.param['reverse']) y_spec_s[d] = wave_to_spectrogram(y_wave[d], bp['hl'], bp['n_fft'], mp, False)
del X_wave, y_wave del X_wave, y_wave
@ -232,45 +219,36 @@ def cache_or_load(mix_path, inst_path, mp):
return X_spec_m, y_spec_m return X_spec_m, y_spec_m
def spectrogram_to_wave(spec, hop_length, mid_side, reverse):
spec_left = np.asfortranarray(spec[0])
spec_right = np.asfortranarray(spec[1])
wave_left = librosa.istft(spec_left, hop_length=hop_length)
wave_right = librosa.istft(spec_right, hop_length=hop_length)
if reverse:
return np.asfortranarray([np.flip(wave_left), np.flip(wave_right)])
elif mid_side:
return np.asfortranarray([np.add(wave_left, wave_right / 2), np.subtract(wave_left, wave_right / 2)])
else:
return np.asfortranarray([wave_left, wave_right])
def spectrogram_to_wave(spec, hop_length, mp, multithreading):
def spectrogram_to_wave_mt(spec, hop_length, mid_side, reverse):
import threading import threading
spec_left = np.asfortranarray(spec[0]) spec_left = np.asfortranarray(spec[0])
spec_right = np.asfortranarray(spec[1]) spec_right = np.asfortranarray(spec[1])
def run_thread(**kwargs): if multithreading:
global wave_left def run_thread(**kwargs):
wave_left = librosa.istft(**kwargs) global wave_left
wave_left = librosa.istft(**kwargs)
thread = threading.Thread(target=run_thread, kwargs={'stft_matrix': spec_left, 'hop_length': hop_length})
thread.start() thread = threading.Thread(target=run_thread, kwargs={'stft_matrix': spec_left, 'hop_length': hop_length})
wave_right = librosa.istft(spec_right, hop_length=hop_length) thread.start()
thread.join() wave_right = librosa.istft(spec_right, hop_length=hop_length)
thread.join()
else:
wave_left = librosa.istft(spec_left, hop_length=hop_length)
wave_right = librosa.istft(spec_right, hop_length=hop_length)
if reverse: if mp.param['reverse']:
return np.asfortranarray([np.flip(wave_left), np.flip(wave_right)]) return np.asfortranarray([np.flip(wave_left), np.flip(wave_right)])
elif mid_side: elif mp.param['mid_side_b']:
return np.asfortranarray([np.subtract(wave_left / 1.25, .4 * wave_right), np.add(wave_right / 1.25, .4 * wave_left)])
elif mp.param['mid_side']:
return np.asfortranarray([np.add(wave_left, wave_right / 2), np.subtract(wave_left, wave_right / 2)]) return np.asfortranarray([np.add(wave_left, wave_right / 2), np.subtract(wave_left, wave_right / 2)])
else: else:
return np.asfortranarray([wave_left, wave_right]) return np.asfortranarray([wave_left, wave_right])
def cmb_spectrogram_to_wave(spec_m, mp, extra_bins_h=None, extra_bins=None): def cmb_spectrogram_to_wave(spec_m, mp, extra_bins_h=None, extra_bins=None):
wave_band = {} wave_band = {}
bands_n = len(mp.param['band']) bands_n = len(mp.param['band'])
@ -290,18 +268,18 @@ def cmb_spectrogram_to_wave(spec_m, mp, extra_bins_h=None, extra_bins=None):
if bp['hpf_start'] > 0: if bp['hpf_start'] > 0:
spec_s = fft_hp_filter(spec_s, bp['hpf_start'], bp['hpf_stop'] - 1) spec_s = fft_hp_filter(spec_s, bp['hpf_start'], bp['hpf_stop'] - 1)
if bands_n == 1: if bands_n == 1:
wave = spectrogram_to_wave(spec_s, bp['hl'], mp.param['mid_side'], mp.param['reverse']) wave = spectrogram_to_wave(spec_s, bp['hl'], mp, False)
else: else:
wave = np.add(wave, spectrogram_to_wave(spec_s, bp['hl'], mp.param['mid_side'], mp.param['reverse'])) wave = np.add(wave, spectrogram_to_wave(spec_s, bp['hl'], mp, False))
else: else:
sr = mp.param['band'][d+1]['sr'] sr = mp.param['band'][d+1]['sr']
if d == 1: # lower if d == 1: # lower
spec_s = fft_lp_filter(spec_s, bp['lpf_start'], bp['lpf_stop']) spec_s = fft_lp_filter(spec_s, bp['lpf_start'], bp['lpf_stop'])
wave = librosa.resample(spectrogram_to_wave(spec_s, bp['hl'], mp.param['mid_side'], mp.param['reverse']), bp['sr'], sr, res_type="sinc_fastest") wave = librosa.resample(spectrogram_to_wave(spec_s, bp['hl'], mp, False), bp['sr'], sr, res_type="sinc_fastest")
else: # mid else: # mid
spec_s = fft_hp_filter(spec_s, bp['hpf_start'], bp['hpf_stop'] - 1) spec_s = fft_hp_filter(spec_s, bp['hpf_start'], bp['hpf_stop'] - 1)
spec_s = fft_lp_filter(spec_s, bp['lpf_start'], bp['lpf_stop']) spec_s = fft_lp_filter(spec_s, bp['lpf_start'], bp['lpf_stop'])
wave2 = np.add(wave, spectrogram_to_wave(spec_s, bp['hl'], mp.param['mid_side'], mp.param['reverse'])) wave2 = np.add(wave, spectrogram_to_wave(spec_s, bp['hl'], mp, False))
wave = librosa.resample(wave2, bp['sr'], sr, res_type="sinc_fastest") wave = librosa.resample(wave2, bp['sr'], sr, res_type="sinc_fastest")
return wave.T return wave.T
@ -401,7 +379,7 @@ if __name__ == "__main__":
else: # lower bands else: # lower bands
wave[d] = librosa.resample(wave[d+1], mp.param['band'][d+1]['sr'], bp['sr'], res_type=bp['res_type']) wave[d] = librosa.resample(wave[d+1], mp.param['band'][d+1]['sr'], bp['sr'], res_type=bp['res_type'])
spec[d] = wave_to_spectrogram(wave[d], bp['hl'], bp['n_fft'], mp.param['mid_side'], mp.param['reverse']) spec[d] = wave_to_spectrogram(wave[d], bp['hl'], bp['n_fft'], mp, False)
specs[i] = combine_spectrograms(spec, mp) specs[i] = combine_spectrograms(spec, mp)