import os
import librosa
import torch
import numpy as np
import soundfile as sf
import math
import json
import hashlib
import threading
from tqdm import tqdm
def crop_center(h1, h2):
h1_shape = h1.size()
h2_shape = h2.size()
if h1_shape[3] == h2_shape[3]:
return h1
elif h1_shape[3] < h2_shape[3]:
raise ValueError('h1_shape[3] must be greater than h2_shape[3]')
# s_freq = (h2_shape[2] - h1_shape[2]) // 2
# e_freq = s_freq + h1_shape[2]
s_time = (h1_shape[3] - h2_shape[3]) // 2
e_time = s_time + h2_shape[3]
h1 = h1[:, :, :, s_time:e_time]
return h1
def wave_to_spectrogram(wave, hop_length, n_fft, mp, multithreading):
wave_left = np.asfortranarray(wave[0])
wave_right = np.asfortranarray(wave[1])
if multithreading:
def run_thread(**kwargs):
global spec_left_mt
spec_left_mt = librosa.stft(**kwargs)
thread = threading.Thread(target=run_thread, kwargs={'y': wave_left, 'n_fft': n_fft, 'hop_length': hop_length})
spec_right = librosa.stft(wave_right, n_fft, hop_length=hop_length)
spec = np.asfortranarray([spec_left_mt, spec_right])
spec_left = librosa.stft(wave_left, n_fft, hop_length=hop_length)
spec_right = librosa.stft(wave_right, n_fft, hop_length=hop_length)
spec = np.asfortranarray([spec_left, spec_right])
return spec
def convert_channels(spec, mp, band):
cc = mp.param['band'][band].get('convert_channels')
if mp.param['reverse']:
spec_left = np.flip(spec[0])
spec_right = np.flip(spec[1])
elif mp.param['mid_side_b'] or 'mid_side_b' == cc:
spec_left = np.add(spec[0], spec[1] * .5)
spec_right = np.subtract(spec[1], spec[0] * .5)
elif mp.param['mid_side_b2'] or 'mid_side_b2' == cc:
spec_left = np.add(spec[1], spec[0] * .5)
spec_right = np.subtract(spec[0], spec[1] * .5)
elif 'mid_side_c' == cc:
spec_left = np.add(spec[0], spec[1] * .25)
spec_right = np.subtract(spec[1], spec[0] * .25)
elif mp.param['mid_side'] or 'mid_side' == cc:
spec_left = np.add(spec[0], spec[1]) / 2
spec_right = np.subtract(spec[0], spec[1])
#elif mp.param['stereo_w']:
# spec_left = np.subtract(spec[0], spec[1] * .25)
# spec_right = np.subtract(spec[1], spec[0] * .25)
return spec
return np.asfortranarray([spec_left, spec_right])
def combine_spectrograms(specs, mp):
l = min([specs[i].shape[2] for i in specs])
spec_c = np.zeros(shape=(2, mp.param['bins'] + 1, l), dtype=np.complex64)
offset = 0
bands_n = len(mp.param['band'])
for d in range(1, bands_n + 1):
h = mp.param['band'][d]['crop_stop'] - mp.param['band'][d]['crop_start']
s = specs[d][:, mp.param['band'][d]['crop_start']:mp.param['band'][d]['crop_stop'], :l]
#if 'flip' in mp.param['band'][d]:
# s = np.flip(s, 1)
spec_c[:, offset:offset+h, :l] = s
offset += h
if offset > mp.param['bins']:
raise ValueError('Too much bins')
if mp.param['pre_filter_start'] > 0:
#if bands_n == 1:
spec_c *= get_lp_filter_mask(spec_c.shape[1], mp.param['pre_filter_start'], mp.param['pre_filter_stop'])
gp = 1
for b in range(mp.param['pre_filter_start'] + 1, mp.param['pre_filter_stop']):
g = math.pow(10, -(b - mp.param['pre_filter_start']) * (3.5 - gp) / 20.0)
gp = g
spec_c[:, b, :] *= g
return np.asfortranarray(spec_c)
def spectrogram_to_image(spec, mode='magnitude'):
if mode == 'magnitude':
if np.iscomplexobj(spec):
y = np.abs(spec)
y = spec
y = np.log10(y ** 2 + 1e-8)
elif mode == 'phase':
if np.iscomplexobj(spec):
y = np.angle(spec)
y = spec
y -= y.min()
y *= 255 / y.max()
img = np.uint8(y)
if y.ndim == 3:
img = img.transpose(1, 2, 0)
img = np.concatenate([
np.max(img, axis=2, keepdims=True), img
], axis=2)
return img
def reduce_vocal_aggressively(X, y, softmask):
v = X - y
y_mag_tmp = np.abs(y)
v_mag_tmp = np.abs(v)
v_mask = v_mag_tmp > y_mag_tmp
y_mag = np.clip(y_mag_tmp - v_mag_tmp * v_mask * softmask, 0, np.inf)
return y_mag * np.exp(1.j * np.angle(y))
def mask_silence(mag, ref, thres=0.2, min_range=64, fade_size=32):
if min_range < fade_size * 2:
raise ValueError('min_range must be >= fade_area * 2')
mag = mag.copy()
idx = np.where(ref.mean(axis=(0, 1)) < thres)[0]
starts = np.insert(idx[np.where(np.diff(idx) != 1)[0] + 1], 0, idx[0])
ends = np.append(idx[np.where(np.diff(idx) != 1)[0]], idx[-1])
uninformative = np.where(ends - starts > min_range)[0]
if len(uninformative) > 0:
starts = starts[uninformative]
ends = ends[uninformative]
old_e = None
for s, e in zip(starts, ends):
if old_e is not None and s - old_e < fade_size:
s = old_e - fade_size * 2
if s != 0:
weight = np.linspace(0, 1, fade_size)
mag[:, :, s:s + fade_size] += weight * ref[:, :, s:s + fade_size]
s -= fade_size
if e != mag.shape[2]:
weight = np.linspace(1, 0, fade_size)
mag[:, :, e - fade_size:e] += weight * ref[:, :, e - fade_size:e]
e += fade_size
mag[:, :, s + fade_size:e - fade_size] += ref[:, :, s + fade_size:e - fade_size]
old_e = e
return mag
def trim_specs(a, b):
l = min([a.shape[2], b.shape[2]])
return a[:,:,:l], b[:,:,:l]
def cache_or_load(mix_path, inst_path, mp):
mix_basename = os.path.splitext(os.path.basename(mix_path))[0]
inst_basename = os.path.splitext(os.path.basename(inst_path))[0]
# the cache will be common for some model types
mpp2 = dict(mp.param)
mpp2.update(dict.fromkeys(['mid_side', 'mid_side_b', 'mid_side_b2', 'reverse'], False))
for d in mpp2['band']:
mpp2['band'][d]['convert_channels'] = ''
cache_dir = 'mp{}'.format(hashlib.sha1(json.dumps(mpp2, sort_keys=True).encode('utf-8')).hexdigest())
mix_cache_dir = os.path.join('cache', cache_dir)
inst_cache_dir = os.path.join('cache', cache_dir)
os.makedirs(mix_cache_dir, exist_ok=True)
os.makedirs(inst_cache_dir, exist_ok=True)
mix_cache_path = os.path.join(mix_cache_dir, mix_basename + '.npy')
inst_cache_path = os.path.join(inst_cache_dir, inst_basename + '.npy')
if os.path.exists(mix_cache_path) and os.path.exists(inst_cache_path):
X_spec_m = np.load(mix_cache_path)
y_spec_m = np.load(inst_cache_path)
X_wave, y_wave, X_spec_s, y_spec_s = {}, {}, {}, {}
for d in range(len(mp.param['band']), 0, -1):
bp = mp.param['band'][d]
if d == len(mp.param['band']): # high-end band
X_wave[d], _ = librosa.load(
mix_path, bp['sr'], False, dtype=np.float32, res_type=bp['res_type'])
y_wave[d], _ = librosa.load(
inst_path, bp['sr'], False, dtype=np.float32, res_type=bp['res_type'])
else: # lower bands
X_wave[d] = librosa.resample(X_wave[d+1], mp.param['band'][d+1]['sr'], bp['sr'], res_type=bp['res_type'])
y_wave[d] = librosa.resample(y_wave[d+1], mp.param['band'][d+1]['sr'], bp['sr'], res_type=bp['res_type'])
X_wave[d], y_wave[d] = align_wave_head_and_tail(X_wave[d], y_wave[d])
X_spec_s[d] = wave_to_spectrogram(X_wave[d], bp['hl'], bp['n_fft'], mp, False)
y_spec_s[d] = wave_to_spectrogram(y_wave[d], bp['hl'], bp['n_fft'], mp, False)
del X_wave, y_wave
X_spec_m = combine_spectrograms(X_spec_s, mp)
y_spec_m = combine_spectrograms(y_spec_s, mp)
X_spec_m = spec_from_file(mix_path, mp)
y_spec_m = spec_from_file(inst_path, mp)
X_spec_m, y_spec_m = trim_specs(X_spec_m, y_spec_m)
if X_spec_m.shape != y_spec_m.shape:
raise ValueError('The combined spectrograms are different: ' + mix_path)
_, ext = os.path.splitext(mix_path)
np.save(mix_cache_path, X_spec_m)
np.save(inst_cache_path, y_spec_m)
return X_spec_m, y_spec_m
def spectrogram_to_wave(spec, hop_length, mp, band, multithreading):
import threading
spec_left = np.asfortranarray(spec[0])
spec_right = np.asfortranarray(spec[1])
cc = mp.param['band'][band].get('convert_channels')
if multithreading:
def run_thread(**kwargs):
global wave_left_mt
wave_left_mt = librosa.istft(**kwargs)
thread = threading.Thread(target=run_thread, kwargs={'stft_matrix': spec_left, 'hop_length': hop_length})
wave_right = librosa.istft(spec_right, hop_length=hop_length)
wave_left = wave_left_mt
wave_left = librosa.istft(spec_left, hop_length=hop_length)
wave_right = librosa.istft(spec_right, hop_length=hop_length)
if mp.param['reverse']:
return np.asfortranarray([np.flip(wave_left), np.flip(wave_right)])
elif mp.param['mid_side_b'] or 'mid_side_b' == cc:
return np.asfortranarray([np.subtract(wave_left / 1.25, .4 * wave_right), np.add(wave_right / 1.25, .4 * wave_left)])
elif mp.param['mid_side_b2'] or 'mid_side_b2' == cc:
return np.asfortranarray([np.add(wave_right / 1.25, .4 * wave_left), np.subtract(wave_left / 1.25, .4 * wave_right)])
elif 'mid_side_c' == cc:
return np.asfortranarray([np.subtract(wave_left / 1.0625, wave_right / 4.25), np.add(wave_right / 1.0625, wave_left / 4.25)])
elif mp.param['mid_side'] or 'mid_side' == cc:
return np.asfortranarray([np.add(wave_left, wave_right / 2), np.subtract(wave_left, wave_right / 2)])
#elif mp.param['stereo_w']:
# return np.asfortranarray([np.add(wave_left, wave_right * .25) / 0.9375, np.add(wave_right, wave_left * .25) / 0.9375])
return np.asfortranarray([wave_left, wave_right])
def cmb_spectrogram_to_wave(spec_m, mp, extra_bins_h=None, extra_bins=None):
wave_band = {}
bands_n = len(mp.param['band'])
offset = 0
for d in range(1, bands_n + 1):
bp = mp.param['band'][d]
spec_s = np.zeros(shape=(2, bp['n_fft'] // 2 + 1, spec_m.shape[2]), dtype=complex)
h = bp['crop_stop'] - bp['crop_start']
#if 'flip' in mp.param['band'][d]:
# spec_s[:, bp['crop_start']:bp['crop_stop'], :] = np.flip(spec_m[:, offset:offset+h, :], 1)
spec_s[:, bp['crop_start']:bp['crop_stop'], :] = spec_m[:, offset:offset+h, :]
offset += h
if d == bands_n: # high-end
if extra_bins_h:
max_bin = bp['n_fft'] // 2
spec_s[:, max_bin-extra_bins_h:max_bin, :] = extra_bins[:, :extra_bins_h, :]
if bp['hpf_start'] > 0:
spec_s *= get_hp_filter_mask(spec_s.shape[1], bp['hpf_start'], bp['hpf_stop'] - 1)
if bands_n == 1:
wave = spectrogram_to_wave(spec_s, bp['hl'], mp, d, False)
wave = np.add(wave, spectrogram_to_wave(spec_s, bp['hl'], mp, d, False))
sr = mp.param['band'][d+1]['sr']
if d == 1: # low-end
spec_s *= get_lp_filter_mask(spec_s.shape[1], bp['lpf_start'], bp['lpf_stop'])
wave = librosa.resample(spectrogram_to_wave(spec_s, bp['hl'], mp, d, False), bp['sr'], sr, res_type="sinc_fastest")
else: # mid
spec_s *= get_hp_filter_mask(spec_s.shape[1], bp['hpf_start'], bp['hpf_stop'] - 1)
spec_s *= get_lp_filter_mask(spec_s.shape[1], bp['lpf_start'], bp['lpf_stop'])
wave2 = np.add(wave, spectrogram_to_wave(spec_s, bp['hl'], mp, d, False))
wave = librosa.resample(wave2, bp['sr'], sr, res_type="sinc_fastest")
return wave.T
def cmb_spectrogram_to_wave_ffmpeg(spec_m, mp, tmp_basename, extra_bins_h=None, extra_bins=None):
import subprocess
bands_n = len(mp.param['band'])
offset = 0
ffmprc = {}
for d in range(1, bands_n + 1):
bp = mp.param['band'][d]
spec_s = np.zeros(shape=(2, bp['n_fft'] // 2 + 1, spec_m.shape[2]), dtype=complex)
h = bp['crop_stop'] - bp['crop_start']
spec_s[:, bp['crop_start']:bp['crop_stop'], :] = spec_m[:, offset:offset+h, :]
tmp_wav = os.path.join('tmp', '{}_cstw_b{}_sr{}'.format(tmp_basename, d, str(bp['sr']) + '.wav'))
tmp_wav2 = os.path.join('tmp', '{}_cstw_b{}_sr{}'.format(tmp_basename, d, str(mp.param['sr']) + '.wav'))
offset += h
if d == bands_n: # high-end
if extra_bins_h:
max_bin = bp['n_fft'] // 2
spec_s[:, max_bin-extra_bins_h:max_bin, :] = extra_bins[:, :extra_bins_h, :]
if bp['hpf_start'] > 0:
spec_s *= get_hp_filter_mask(spec_s.shape[1], bp['hpf_start'], bp['hpf_stop'] - 1)
if bands_n == 1:
wave = spectrogram_to_wave(spec_s, bp['hl'], mp, d, True)
wave = spectrogram_to_wave(spec_s, bp['hl'], mp, d, True)
if d == 1: # low-end
spec_s *= get_lp_filter_mask(spec_s.shape[1], bp['lpf_start'], bp['lpf_stop'])
else: # mid
spec_s *= get_hp_filter_mask(spec_s.shape[1], bp['hpf_start'], bp['hpf_stop'] - 1)
spec_s *= get_lp_filter_mask(spec_s.shape[1], bp['lpf_start'], bp['lpf_stop'])
sf.write(tmp_wav, spectrogram_to_wave(spec_s, bp['hl'], mp, d, True).T, bp['sr'])
ffmprc[d] = subprocess.Popen(['ffmpeg', '-hide_banner', '-loglevel', 'panic', '-y', '-i', tmp_wav, '-ar', str(mp.param['sr']), '-ac', '2', '-c:a', 'pcm_s16le', tmp_wav2])
for s in ffmprc:
for d in range(bands_n - 1, 0, -1):
os.remove(os.path.join('tmp', f'{tmp_basename}_cstw_b{d}_sr' + str(mp.param['band'][d]['sr']) + '.wav'))
tmp_wav2 = os.path.join('tmp', f'{tmp_basename}_cstw_b{d}_sr' + str(mp.param['sr']) + '.wav')
wave2, _ = librosa.load(tmp_wav2, mp.param['sr'], False, dtype=np.float32, res_type="sinc_fastest")
wave = np.add(wave, wave2)
return wave.T
def fft_lp_filter(spec, bin_start, bin_stop):
g = 1.0
for b in range(bin_start, bin_stop):
g -= 1 / (bin_stop - bin_start)
spec[:, b, :] = g * spec[:, b, :]
spec[:, bin_stop:, :] *= 0
return spec
def fft_hp_filter(spec, bin_start, bin_stop):
g = 1.0
for b in range(bin_start, bin_stop, -1):
g -= 1 / (bin_start - bin_stop)
spec[:, b, :] = g * spec[:, b, :]
spec[:, 0:bin_stop+1, :] *= 0
return spec
def get_lp_filter_mask(bins_n, bin_start, bin_stop):
mask = np.concatenate([
np.ones((bin_start - 1, 1)),
np.linspace(1, 0, bin_stop - bin_start + 1)[:, None],
np.zeros((bins_n - bin_stop, 1))
], axis=0)
return mask
def get_hp_filter_mask(bins_n, bin_start, bin_stop):
mask = np.concatenate([
np.zeros((bin_stop + 1, 1)),
np.linspace(0, 1, 1 + bin_start - bin_stop)[:, None],
np.ones((bins_n - bin_start - 2, 1))
], axis=0)
return mask
def mirroring(a, spec_m, input_high_end, mp):
if 'mirroring' == a:
mirror = np.flip(np.abs(spec_m[:, mp.param['pre_filter_start']-10-input_high_end.shape[1]:mp.param['pre_filter_start']-10, :]), 1)
mirror = mirror * np.exp(1.j * np.angle(input_high_end))
return np.where(np.abs(input_high_end) <= np.abs(mirror), input_high_end, mirror)
if 'mirroring2' == a:
mirror = np.flip(np.abs(spec_m[:, mp.param['pre_filter_start']-10-input_high_end.shape[1]:mp.param['pre_filter_start']-10, :]), 1)
mi = np.multiply(mirror, input_high_end * 1.7)
return np.where(np.abs(input_high_end) <= np.abs(mi), input_high_end, mi)
def adjust_aggr(mask, params):
aggr = params.get('aggr_value', 0.0)
if aggr != 0:
if params.get('is_vocal_model'):
aggr = 1 - aggr
aggr_l = aggr_r = aggr
if params['aggr_correction'] is not None:
aggr_l += params['aggr_correction']['left']
aggr_r += params['aggr_correction']['right']
mask[:, 0, :params['aggr_split_bin']] = torch.pow(mask[:, 0, :params['aggr_split_bin']], 1 + aggr_l / 3)
mask[:, 0, params['aggr_split_bin']:] = torch.pow(mask[:, 0, params['aggr_split_bin']:], 1 + aggr_l)
mask[:, 1, :params['aggr_split_bin']] = torch.pow(mask[:, 1, :params['aggr_split_bin']], 1 + aggr_r / 3)
mask[:, 1, params['aggr_split_bin']:] = torch.pow(mask[:, 1, params['aggr_split_bin']:], 1 + aggr_r)
return mask
def ensembling(a, specs):
for i in range(1, len(specs)):
if i == 1:
spec = specs[0]
ln = min([spec.shape[2], specs[i].shape[2]])
spec = spec[:,:,:ln]
specs[i] = specs[i][:,:,:ln]
if 'min_mag' == a:
spec = np.where(np.abs(specs[i]) <= np.abs(spec), specs[i], spec)
if 'max_mag' == a:
spec = np.where(np.abs(specs[i]) >= np.abs(spec), specs[i], spec)
return spec
def spec_from_file(filename, mp):
wave, spec = {}, {}
for d in range(len(mp.param['band']), 0, -1):
bp = mp.param['band'][d]
if d == len(mp.param['band']): # high-end band
wave, _ = librosa.load(
filename, bp['sr'], False, dtype=np.float32, res_type=bp['res_type'])
if len(wave.shape) == 1: # mono to stereo
wave = np.array([wave, wave])
else: # lower bands
wave = librosa.resample(wave, mp.param['band'][d+1]['sr'], bp['sr'], res_type=bp['res_type'])
spec[d] = wave_to_spectrogram(wave, bp['hl'], bp['n_fft'], mp, False)
spec[d] = convert_channels(spec[d], mp, d)
return combine_spectrograms(spec, mp)
if __name__ == "__main__":
import cv2
import sys
import time
import argparse
from model_param_init import ModelParameters
p = argparse.ArgumentParser()
p.add_argument('--algorithm', '-a', type=str, choices=['invert', 'invert_p', 'min_mag', 'max_mag', 'deep', 'align'], default='min_mag')
p.add_argument('--model_params', '-m', type=str, default=os.path.join('modelparams', '1band_sr44100_hl512.json'))
p.add_argument('--output_name', '-o', type=str, default='output')
p.add_argument('--vocals_only', '-v', action='store_true')
p.add_argument('input', nargs='+')
args = p.parse_args()
start_time = time.time()
if args.algorithm.startswith('invert') and len(args.input) != 2:
raise ValueError('There should be two input files.')
if not args.algorithm.startswith('invert') and len(args.input) < 2:
raise ValueError('There must be at least two input files.')
specs = {}
mp = ModelParameters(args.model_params)
for i in range(len(args.input)):
specs[i] = spec_from_file(args.input[i], mp)
specs[0], specs[1] = trim_specs(specs[0], specs[1])
if args.algorithm == 'deep':
d_spec = np.where(np.abs(specs[0]) <= np.abs(specs[1]), specs[0], specs[1])
v_spec = d_spec - specs[1]
sf.write(os.path.join('{}.wav'.format(args.output_name)), cmb_spectrogram_to_wave(v_spec, mp), mp.param['sr'])
if args.algorithm.startswith('invert'):
if 'invert_p' == args.algorithm:
X_mag = np.abs(specs[0])
y_mag = np.abs(specs[1])
max_mag = np.where(X_mag >= y_mag, X_mag, y_mag)
v_spec = specs[1] - max_mag * np.exp(1.j * np.angle(specs[0]))
specs[1] = reduce_vocal_aggressively(specs[0], specs[1], 0.2)
v_spec = specs[0] - specs[1]
if not args.vocals_only:
X_mag = np.abs(specs[0])
y_mag = np.abs(specs[1])
v_mag = np.abs(v_spec)
X_image = spectrogram_to_image(X_mag)
y_image = spectrogram_to_image(y_mag)
v_image = spectrogram_to_image(v_mag)
cv2.imwrite('{}_X.png'.format(args.output_name), X_image)
cv2.imwrite('{}_y.png'.format(args.output_name), y_image)
cv2.imwrite('{}_v.png'.format(args.output_name), v_image)
sf.write('{}_X.wav'.format(args.output_name), cmb_spectrogram_to_wave(specs[0], mp), mp.param['sr'])
sf.write('{}_y.wav'.format(args.output_name), cmb_spectrogram_to_wave(specs[1], mp), mp.param['sr'])
sf.write('{}_v.wav'.format(args.output_name), cmb_spectrogram_to_wave(v_spec, mp), mp.param['sr'])
if not args.algorithm == 'deep':
sf.write(os.path.join('ensembled','{}.wav'.format(args.output_name)), cmb_spectrogram_to_wave(ensembling(args.algorithm, specs), mp), mp.param['sr'])
#print('Total time: {0:.{1}f}s'.format(time.time() - start_time, 1))
if args.algorithm == 'align':
trackalignment = [
for i,e in tqdm(enumerate(trackalignment), desc="Performing Alignment..."):
os.system(f"python lib/align_tracks.py {e['file1']} {e['file2']}")