import os import random import json import hashlib import numpy as np import torch import torch.utils.data from tqdm import tqdm from lib import spec_utils class VocalRemoverValidationSet(torch.utils.data.Dataset): def __init__(self, patch_list): self.patch_list = patch_list def __len__(self): return len(self.patch_list) def __getitem__(self, idx): path = self.patch_list[idx] data = np.load(path) X, y = data['X'], data['y'] X_mag = np.abs(X) y_mag = np.abs(y) return X_mag, y_mag def make_pair(mix_dir, inst_dir): input_exts = ['.wav', '.m4a', '.mp3', '.mp4', '.flac'] X_list = sorted([ os.path.join(mix_dir, fname) for fname in os.listdir(mix_dir) if os.path.splitext(fname)[1] in input_exts]) y_list = sorted([ os.path.join(inst_dir, fname) for fname in os.listdir(inst_dir) if os.path.splitext(fname)[1] in input_exts]) filelist = list(zip(X_list, y_list)) return filelist def train_val_split(dataset_dir, split_mode, val_rate, val_filelist): if split_mode == 'random': filelist = make_pair( os.path.join(dataset_dir, 'mixtures'), os.path.join(dataset_dir, 'instruments')) random.shuffle(filelist) if len(val_filelist) == 0: val_size = int(len(filelist) * val_rate) train_filelist = filelist[:-val_size] val_filelist = filelist[-val_size:] else: train_filelist = [ pair for pair in filelist if list(pair) not in val_filelist] elif split_mode == 'subdirs': if len(val_filelist) != 0: raise ValueError('The `val_filelist` option is not available in `subdirs` mode') train_filelist = make_pair( os.path.join(dataset_dir, 'training/mixtures'), os.path.join(dataset_dir, 'training/instruments')) val_filelist = make_pair( os.path.join(dataset_dir, 'validation/mixtures'), os.path.join(dataset_dir, 'validation/instruments')) return train_filelist, val_filelist def augment(X, y, reduction_rate, reduction_mask, mixup_rate, mixup_alpha, mp, augment_path, is_karokee, is_vocal): perm = np.random.permutation(len(X)) for i, idx in enumerate(tqdm(perm)): if np.random.uniform() < reduction_rate: y[idx] = spec_utils.reduce_vocal_aggressively(X[idx], y[idx], reduction_mask) if np.random.uniform() < 0.5: # swap channel X[idx] = X[idx, ::-1] y[idx] = y[idx, ::-1] #if np.random.uniform() < 0.01: # vocal samples mixing # spec_from_file(os.path.join(augment_path, random.choice(os.listdir(augment_path))), mp) if np.random.uniform() < 0.02: # mono X[idx] = X[idx].mean(axis=0, keepdims=True) y[idx] = y[idx].mean(axis=0, keepdims=True) if np.random.uniform() < 0.02: # vocal echo d = np.random.randint(1, 10, size=2) v = X[idx] - y[idx] X[idx, 0, :, d[0]:] += v[0, :, :-d[0]] * np.random.uniform(0.1, 0.3) X[idx, 1, :, d[1]:] += v[1, :, :-d[1]] * np.random.uniform(0.1, 0.3) if np.random.uniform() < 0.02: # vocal panning if is_karokee: pan = np.random.uniform() * 0.1 else: pan = np.random.uniform() * 0.5 v = (X[idx] - y[idx]) * (1 + pan / 2) v[np.random.randint(0, 2)] *= (1 - pan) X[idx] = y[idx] + v if np.random.uniform() < 0.02 and not is_karokee: # inst X[idx] = y[idx] if is_vocal: # mix & inst -> mix & vocals y[idx] = X[idx] - y[idx] offset = 0 if np.random.uniform() < mixup_rate and i < len(perm) - 1: lam = np.random.beta(mixup_alpha, mixup_alpha) X[idx] = lam * X[idx] + (1 - lam) * X[perm[i + 1]] y[idx] = lam * y[idx] + (1 - lam) * y[perm[i + 1]] for d in range(1, len(mp.param['band']) + 1): bp = mp.param['band'][d] h = bp['crop_stop'] - bp['crop_start'] X[idx][:, offset:offset+h, :] = spec_utils.convert_channels(X[idx][:, offset:offset+h, :], mp, d) y[idx][:, offset:offset+h, :] = spec_utils.convert_channels(y[idx][:, offset:offset+h, :], mp, d) offset += h return X, y def make_padding(width, cropsize, offset): left = offset roi_size = cropsize - left * 2 if roi_size == 0: roi_size = cropsize right = roi_size - (width % roi_size) + left return left, right, roi_size def make_training_set(filelist, cropsize, patches, mp, offset): len_dataset = patches * len(filelist) X_dataset = np.zeros( (len_dataset, 2, mp.param['bins'] + 1, cropsize), dtype=np.complex64) y_dataset = np.zeros( (len_dataset, 2, mp.param['bins'] + 1, cropsize), dtype=np.complex64) for i, (X_path, y_path) in enumerate(tqdm(filelist)): X, y = spec_utils.cache_or_load(X_path, y_path, mp) coef = np.max([np.abs(X).max(), np.abs(y).max()]) X, y = X / coef, y / coef l, r, roi_size = make_padding(X.shape[2], cropsize, offset) X_pad = np.pad(X, ((0, 0), (0, 0), (l, r)), mode='constant') y_pad = np.pad(y, ((0, 0), (0, 0), (l, r)), mode='constant') starts = np.random.randint(0, X_pad.shape[2] - cropsize, patches) ends = starts + cropsize for j in range(patches): idx = i * patches + j X_dataset[idx] = X_pad[:, :, starts[j]:ends[j]] y_dataset[idx] = y_pad[:, :, starts[j]:ends[j]] return X_dataset, y_dataset def make_validation_set(filelist, cropsize, mp, offset): patch_list = [] patch_dir = 'cs{}_mph{}_of{}'.format(cropsize, hashlib.sha1(json.dumps(mp.param, sort_keys=True).encode('utf-8')).hexdigest(), offset) os.makedirs(patch_dir, exist_ok=True) for i, (X_path, y_path) in enumerate(tqdm(filelist)): basename = os.path.splitext(os.path.basename(X_path))[0] X, y = spec_utils.cache_or_load(X_path, y_path, mp) coef = np.max([np.abs(X).max(), np.abs(y).max()]) X, y = X / coef, y / coef l, r, roi_size = make_padding(X.shape[2], cropsize, offset) X_pad = np.pad(X, ((0, 0), (0, 0), (l, r)), mode='constant') y_pad = np.pad(y, ((0, 0), (0, 0), (l, r)), mode='constant') len_dataset = int(np.ceil(X.shape[2] / roi_size)) for j in range(len_dataset): outpath = os.path.join(patch_dir, '{}_p{}.npz'.format(basename, j)) start = j * roi_size if not os.path.exists(outpath): np.savez( outpath, X=X_pad[:, :, start:start + cropsize], y=y_pad[:, :, start:start + cropsize]) patch_list.append(outpath) return VocalRemoverValidationSet(patch_list)