ultimatevocalremovergui/inference_v5.py
2022-04-07 03:30:15 -05:00

440 lines
18 KiB
Python

import pprint
import argparse
import os
import importlib
import cv2
import librosa
import math
import numpy as np
import soundfile as sf
from tqdm import tqdm
from lib_v5 import dataset
from lib_v5 import spec_utils
from lib_v5.model_param_init import ModelParameters
import torch
# Command line text parsing and widget manipulation
from collections import defaultdict
import tkinter as tk
import traceback # Error Message Recent Calls
import time # Timer
class VocalRemover(object):
def __init__(self, data, text_widget: tk.Text):
self.data = data
self.text_widget = text_widget
self.models = defaultdict(lambda: None)
self.devices = defaultdict(lambda: None)
self._load_models()
# self.offset = model.offset
def _load_models(self):
self.text_widget.write('Loading models...\n') # nopep8 Write Command Text
nn_arch_sizes = [
31191, # default
33966, 123821, 123812, 537238 # custom
]
global args
global model_params_d
p = argparse.ArgumentParser()
p.add_argument('--paramone', type=str, default='lib_v5/modelparams/4band_44100.json')
p.add_argument('--paramtwo', type=str, default='lib_v5/modelparams/4band_v2.json')
p.add_argument('--paramthree', type=str, default='lib_v5/modelparams/3band_44100_msb2.json')
p.add_argument('--paramfour', type=str, default='lib_v5/modelparams/4band_v2_sn.json')
p.add_argument('--aggressiveness',type=float, default=data['agg']/100)
p.add_argument('--nn_architecture', type=str, choices= ['auto'] + list('{}KB'.format(s) for s in nn_arch_sizes), default='auto')
p.add_argument('--high_end_process', type=str, default='mirroring')
args = p.parse_args()
if 'auto' == args.nn_architecture:
model_size = math.ceil(os.stat(data['instrumentalModel']).st_size / 1024)
args.nn_architecture = '{}KB'.format(min(nn_arch_sizes, key=lambda x:abs(x-model_size)))
nets = importlib.import_module('lib_v5.nets' + f'_{args.nn_architecture}'.replace('_{}KB'.format(nn_arch_sizes[0]), ''), package=None)
ModelName=(data['instrumentalModel'])
ModelParam1="4BAND_44100"
ModelParam2="4BAND_44100_B"
ModelParam3="MSB2"
ModelParam4="4BAND_44100_SN"
if ModelParam1 in ModelName:
model_params_d=args.paramone
if ModelParam2 in ModelName:
model_params_d=args.paramtwo
if ModelParam3 in ModelName:
model_params_d=args.paramthree
if ModelParam4 in ModelName:
model_params_d=args.paramfour
print(model_params_d)
mp = ModelParameters(model_params_d)
# -Instrumental-
if os.path.isfile(data['instrumentalModel']):
device = torch.device('cpu')
model = nets.CascadedASPPNet(mp.param['bins'] * 2)
model.load_state_dict(torch.load(self.data['instrumentalModel'],
map_location=device))
if torch.cuda.is_available() and self.data['gpu'] >= 0:
device = torch.device('cuda:{}'.format(self.data['gpu']))
model.to(device)
self.models['instrumental'] = model
self.devices['instrumental'] = device
self.text_widget.write('Done!\n')
def _execute(self, X_mag_pad, roi_size, n_window, device, model, aggressiveness):
model.eval()
with torch.no_grad():
preds = []
for i in tqdm(range(n_window)):
start = i * roi_size
X_mag_window = X_mag_pad[None, :, :, start:start + self.data['window_size']]
X_mag_window = torch.from_numpy(X_mag_window).to(device)
pred = model.predict(X_mag_window, aggressiveness)
pred = pred.detach().cpu().numpy()
preds.append(pred[0])
pred = np.concatenate(preds, axis=2)
return pred
def preprocess(self, X_spec):
X_mag = np.abs(X_spec)
X_phase = np.angle(X_spec)
return X_mag, X_phase
def inference(self, X_spec, device, model, aggressiveness):
X_mag, X_phase = self.preprocess(X_spec)
coef = X_mag.max()
X_mag_pre = X_mag / coef
n_frame = X_mag_pre.shape[2]
pad_l, pad_r, roi_size = dataset.make_padding(n_frame,
self.data['window_size'], model.offset)
n_window = int(np.ceil(n_frame / roi_size))
X_mag_pad = np.pad(
X_mag_pre, ((0, 0), (0, 0), (pad_l, pad_r)), mode='constant')
pred = self._execute(X_mag_pad, roi_size, n_window,
device, model, aggressiveness)
pred = pred[:, :, :n_frame]
return pred * coef, X_mag, np.exp(1.j * X_phase)
def inference_tta(self, X_spec, device, model, aggressiveness):
X_mag, X_phase = self.preprocess(X_spec)
coef = X_mag.max()
X_mag_pre = X_mag / coef
n_frame = X_mag_pre.shape[2]
pad_l, pad_r, roi_size = dataset.make_padding(n_frame,
self.data['window_size'], model.offset)
n_window = int(np.ceil(n_frame / roi_size))
X_mag_pad = np.pad(
X_mag_pre, ((0, 0), (0, 0), (pad_l, pad_r)), mode='constant')
pred = self._execute(X_mag_pad, roi_size, n_window,
device, model, aggressiveness)
pred = pred[:, :, :n_frame]
pad_l += roi_size // 2
pad_r += roi_size // 2
n_window += 1
X_mag_pad = np.pad(
X_mag_pre, ((0, 0), (0, 0), (pad_l, pad_r)), mode='constant')
pred_tta = self._execute(X_mag_pad, roi_size, n_window,
device, model, aggressiveness)
pred_tta = pred_tta[:, :, roi_size // 2:]
pred_tta = pred_tta[:, :, :n_frame]
return (pred + pred_tta) * 0.5 * coef, X_mag, np.exp(1.j * X_phase)
data = {
# Paths
'input_paths': None,
'export_path': None,
# Processing Options
'gpu': -1,
'postprocess': True,
'tta': True,
'output_image': True,
# Models
'instrumentalModel': None,
'useModel': None,
# Constants
'window_size': 512,
'agg': 10
}
default_window_size = data['window_size']
default_agg = data['agg']
def update_progress(progress_var, total_files, file_num, step: float = 1):
"""Calculate the progress for the progress widget in the GUI"""
base = (100 / total_files)
progress = base * (file_num - 1)
progress += step
progress_var.set(progress)
def get_baseText(total_files, file_num):
"""Create the base text for the command widget"""
text = 'File {file_num}/{total_files} '.format(file_num=file_num,
total_files=total_files)
return text
def determineModelFolderName():
"""
Determine the name that is used for the folder and appended
to the back of the music files
"""
modelFolderName = ''
if not data['modelFolder']:
# Model Test Mode not selected
return modelFolderName
# -Instrumental-
if os.path.isfile(data['instrumentalModel']):
modelFolderName += os.path.splitext(os.path.basename(data['instrumentalModel']))[0]
if modelFolderName:
modelFolderName = '/' + modelFolderName
return modelFolderName
def main(window: tk.Wm, text_widget: tk.Text, button_widget: tk.Button, progress_var: tk.Variable,
**kwargs: dict):
def save_files(wav_instrument, wav_vocals):
"""Save output music files"""
vocal_name = '(Vocals)'
instrumental_name = '(Instrumental)'
save_path = os.path.dirname(base_name)
# Swap names if vocal model
VModel="Vocal"
if VModel in model_name:
# Reverse names
vocal_name, instrumental_name = instrumental_name, vocal_name
# Save Temp File
# For instrumental the instrumental is the temp file
# and for vocal the instrumental is the temp file due
# to reversement
sf.write(f'temp.wav',
wav_instrument, mp.param['sr'])
appendModelFolderName = modelFolderName.replace('/', '_')
# -Save files-
# Instrumental
if instrumental_name is not None:
instrumental_path = '{save_path}/{file_name}.wav'.format(
save_path=save_path,
file_name=f'{os.path.basename(base_name)}_{instrumental_name}{appendModelFolderName}',
)
sf.write(instrumental_path,
wav_instrument, mp.param['sr'])
# Vocal
if vocal_name is not None:
vocal_path = '{save_path}/{file_name}.wav'.format(
save_path=save_path,
file_name=f'{os.path.basename(base_name)}_{vocal_name}{appendModelFolderName}',
)
sf.write(vocal_path,
wav_vocals, mp.param['sr'])
data.update(kwargs)
# Update default settings
global default_window_size
global default_agg
default_window_size = data['window_size']
default_agg = data['agg']
stime = time.perf_counter()
progress_var.set(0)
text_widget.clear()
button_widget.configure(state=tk.DISABLED) # Disable Button
vocal_remover = VocalRemover(data, text_widget)
modelFolderName = determineModelFolderName()
if modelFolderName:
folder_path = f'{data["export_path"]}{modelFolderName}'
if not os.path.isdir(folder_path):
os.mkdir(folder_path)
# Separation Preperation
try:
for file_num, music_file in enumerate(data['input_paths'], start=1):
# Determine File Name
base_name = f'{data["export_path"]}{modelFolderName}/{file_num}_{os.path.splitext(os.path.basename(music_file))[0]}'
# Start Separation
model_name = os.path.basename(data[f'{data["useModel"]}Model'])
model = vocal_remover.models[data['useModel']]
device = vocal_remover.devices[data['useModel']]
# -Get text and update progress-
base_text = get_baseText(total_files=len(data['input_paths']),
file_num=file_num)
progress_kwargs = {'progress_var': progress_var,
'total_files': len(data['input_paths']),
'file_num': file_num}
update_progress(**progress_kwargs,
step=0)
mp = ModelParameters(model_params_d)
# -Go through the different steps of seperation-
# Wave source
text_widget.write(base_text + 'Loading wave source...\n')
X_wave, y_wave, X_spec_s, y_spec_s = {}, {}, {}, {}
bands_n = len(mp.param['band'])
for d in range(bands_n, 0, -1):
bp = mp.param['band'][d]
if d == bands_n: # high-end band
X_wave[d], _ = librosa.load(
music_file, bp['sr'], False, dtype=np.float32, res_type=bp['res_type'])
if X_wave[d].ndim == 1:
X_wave[d] = np.asarray([X_wave[d], X_wave[d]])
else: # lower bands
X_wave[d] = librosa.resample(X_wave[d+1], mp.param['band'][d+1]['sr'], bp['sr'], res_type=bp['res_type'])
# Stft of wave source
X_spec_s[d] = spec_utils.wave_to_spectrogram_mt(X_wave[d], bp['hl'], bp['n_fft'], mp.param['mid_side'],
mp.param['mid_side_b2'], mp.param['reverse'])
if d == bands_n and args.high_end_process != 'none':
input_high_end_h = (bp['n_fft']//2 - bp['crop_stop']) + (mp.param['pre_filter_stop'] - mp.param['pre_filter_start'])
input_high_end = X_spec_s[d][:, bp['n_fft']//2-input_high_end_h:bp['n_fft']//2, :]
text_widget.write(base_text + 'Done!\n')
update_progress(**progress_kwargs,
step=0.1)
text_widget.write(base_text + 'Stft of wave source...\n')
X_spec_m = spec_utils.combine_spectrograms(X_spec_s, mp)
del X_wave, X_spec_s
if data['tta']:
pred, X_mag, X_phase = vocal_remover.inference_tta(X_spec_m,
device,
model, {'value': args.aggressiveness,'split_bin': mp.param['band'][1]['crop_stop']})
else:
pred, X_mag, X_phase = vocal_remover.inference(X_spec_m,
device,
model, {'value': args.aggressiveness,'split_bin': mp.param['band'][1]['crop_stop']})
text_widget.write(base_text + 'Done!\n')
update_progress(**progress_kwargs,
step=0.6)
# Postprocess
if data['postprocess']:
text_widget.write(base_text + 'Post processing...\n')
pred_inv = np.clip(X_mag - pred, 0, np.inf)
pred = spec_utils.mask_silence(pred, pred_inv)
text_widget.write(base_text + 'Done!\n')
update_progress(**progress_kwargs,
step=0.65)
# Inverse stft
text_widget.write(base_text + 'Inverse stft of instruments and vocals...\n') # nopep8
y_spec_m = pred * X_phase
v_spec_m = X_spec_m - y_spec_m
if args.high_end_process.startswith('mirroring'):
input_high_end_ = spec_utils.mirroring(args.high_end_process, y_spec_m, input_high_end, mp)
wav_instrument = spec_utils.cmb_spectrogram_to_wave(y_spec_m, mp, input_high_end_h, input_high_end_)
else:
wav_instrument = spec_utils.cmb_spectrogram_to_wave(y_spec_m, mp)
if args.high_end_process.startswith('mirroring'):
input_high_end_ = spec_utils.mirroring(args.high_end_process, v_spec_m, input_high_end, mp)
wav_vocals = spec_utils.cmb_spectrogram_to_wave(v_spec_m, mp, input_high_end_h, input_high_end_)
else:
wav_vocals = spec_utils.cmb_spectrogram_to_wave(v_spec_m, mp)
text_widget.write(base_text + 'Done!\n')
update_progress(**progress_kwargs,
step=0.7)
# Save output music files
text_widget.write(base_text + 'Saving Files...\n')
save_files(wav_instrument, wav_vocals)
text_widget.write(base_text + 'Done!\n')
update_progress(**progress_kwargs,
step=0.8)
# Save output image
if data['output_image']:
with open('{}_Instruments.jpg'.format(base_name), mode='wb') as f:
image = spec_utils.spectrogram_to_image(y_spec_m)
_, bin_image = cv2.imencode('.jpg', image)
bin_image.tofile(f)
with open('{}_Vocals.jpg'.format(base_name), mode='wb') as f:
image = spec_utils.spectrogram_to_image(v_spec_m)
_, bin_image = cv2.imencode('.jpg', image)
bin_image.tofile(f)
text_widget.write(base_text + 'Completed Seperation!\n\n')
except Exception as e:
traceback_text = ''.join(traceback.format_tb(e.__traceback__))
message = f'Traceback Error: "{traceback_text}"\n{type(e).__name__}: "{e}"\nFile: {music_file}\nPlease contact the creator and attach a screenshot of this error with the file and settings that caused it!'
tk.messagebox.showerror(master=window,
title='Untracked Error',
message=message)
print(traceback_text)
print(type(e).__name__, e)
print(message)
progress_var.set(0)
button_widget.configure(state=tk.NORMAL) # Enable Button
return
os.remove('temp.wav')
progress_var.set(0)
text_widget.write(f'Conversion(s) Completed and Saving all Files!\n')
text_widget.write(f'Time Elapsed: {time.strftime("%H:%M:%S", time.gmtime(int(time.perf_counter() - stime)))}') # nopep8
torch.cuda.empty_cache()
button_widget.configure(state=tk.NORMAL) # Enable Button