2022-12-18 21:18:56 -06:00

448 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
#
# Created on 2018/12
# Author: Kaituo XU
# Modified on 2019/11 by Alexandre Defossez, added support for multiple output channels
# Here is the original license:
# The MIT License (MIT)
#
# Copyright (c) 2018 Kaituo XU
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from .utils import capture_init
EPS = 1e-8
def overlap_and_add(signal, frame_step):
outer_dimensions = signal.size()[:-2]
frames, frame_length = signal.size()[-2:]
subframe_length = math.gcd(frame_length, frame_step) # gcd=Greatest Common Divisor
subframe_step = frame_step // subframe_length
subframes_per_frame = frame_length // subframe_length
output_size = frame_step * (frames - 1) + frame_length
output_subframes = output_size // subframe_length
subframe_signal = signal.view(*outer_dimensions, -1, subframe_length)
frame = torch.arange(0, output_subframes,
device=signal.device).unfold(0, subframes_per_frame, subframe_step)
frame = frame.long() # signal may in GPU or CPU
frame = frame.contiguous().view(-1)
result = signal.new_zeros(*outer_dimensions, output_subframes, subframe_length)
result.index_add_(-2, frame, subframe_signal)
result = result.view(*outer_dimensions, -1)
return result
class ConvTasNet(nn.Module):
@capture_init
def __init__(self,
N=256,
L=20,
B=256,
H=512,
P=3,
X=8,
R=4,
C=4,
audio_channels=1,
samplerate=44100,
norm_type="gLN",
causal=False,
mask_nonlinear='relu'):
"""
Args:
N: Number of filters in autoencoder
L: Length of the filters (in samples)
B: Number of channels in bottleneck 1 × 1-conv block
H: Number of channels in convolutional blocks
P: Kernel size in convolutional blocks
X: Number of convolutional blocks in each repeat
R: Number of repeats
C: Number of speakers
norm_type: BN, gLN, cLN
causal: causal or non-causal
mask_nonlinear: use which non-linear function to generate mask
"""
super(ConvTasNet, self).__init__()
# Hyper-parameter
self.N, self.L, self.B, self.H, self.P, self.X, self.R, self.C = N, L, B, H, P, X, R, C
self.norm_type = norm_type
self.causal = causal
self.mask_nonlinear = mask_nonlinear
self.audio_channels = audio_channels
self.samplerate = samplerate
# Components
self.encoder = Encoder(L, N, audio_channels)
self.separator = TemporalConvNet(N, B, H, P, X, R, C, norm_type, causal, mask_nonlinear)
self.decoder = Decoder(N, L, audio_channels)
# init
for p in self.parameters():
if p.dim() > 1:
nn.init.xavier_normal_(p)
def valid_length(self, length):
return length
def forward(self, mixture):
"""
Args:
mixture: [M, T], M is batch size, T is #samples
Returns:
est_source: [M, C, T]
"""
mixture_w = self.encoder(mixture)
est_mask = self.separator(mixture_w)
est_source = self.decoder(mixture_w, est_mask)
# T changed after conv1d in encoder, fix it here
T_origin = mixture.size(-1)
T_conv = est_source.size(-1)
est_source = F.pad(est_source, (0, T_origin - T_conv))
return est_source
class Encoder(nn.Module):
"""Estimation of the nonnegative mixture weight by a 1-D conv layer.
"""
def __init__(self, L, N, audio_channels):
super(Encoder, self).__init__()
# Hyper-parameter
self.L, self.N = L, N
# Components
# 50% overlap
self.conv1d_U = nn.Conv1d(audio_channels, N, kernel_size=L, stride=L // 2, bias=False)
def forward(self, mixture):
"""
Args:
mixture: [M, T], M is batch size, T is #samples
Returns:
mixture_w: [M, N, K], where K = (T-L)/(L/2)+1 = 2T/L-1
"""
mixture_w = F.relu(self.conv1d_U(mixture)) # [M, N, K]
return mixture_w
class Decoder(nn.Module):
def __init__(self, N, L, audio_channels):
super(Decoder, self).__init__()
# Hyper-parameter
self.N, self.L = N, L
self.audio_channels = audio_channels
# Components
self.basis_signals = nn.Linear(N, audio_channels * L, bias=False)
def forward(self, mixture_w, est_mask):
"""
Args:
mixture_w: [M, N, K]
est_mask: [M, C, N, K]
Returns:
est_source: [M, C, T]
"""
# D = W * M
source_w = torch.unsqueeze(mixture_w, 1) * est_mask # [M, C, N, K]
source_w = torch.transpose(source_w, 2, 3) # [M, C, K, N]
# S = DV
est_source = self.basis_signals(source_w) # [M, C, K, ac * L]
m, c, k, _ = est_source.size()
est_source = est_source.view(m, c, k, self.audio_channels, -1).transpose(2, 3).contiguous()
est_source = overlap_and_add(est_source, self.L // 2) # M x C x ac x T
return est_source
class TemporalConvNet(nn.Module):
def __init__(self, N, B, H, P, X, R, C, norm_type="gLN", causal=False, mask_nonlinear='relu'):
"""
Args:
N: Number of filters in autoencoder
B: Number of channels in bottleneck 1 × 1-conv block
H: Number of channels in convolutional blocks
P: Kernel size in convolutional blocks
X: Number of convolutional blocks in each repeat
R: Number of repeats
C: Number of speakers
norm_type: BN, gLN, cLN
causal: causal or non-causal
mask_nonlinear: use which non-linear function to generate mask
"""
super(TemporalConvNet, self).__init__()
# Hyper-parameter
self.C = C
self.mask_nonlinear = mask_nonlinear
# Components
# [M, N, K] -> [M, N, K]
layer_norm = ChannelwiseLayerNorm(N)
# [M, N, K] -> [M, B, K]
bottleneck_conv1x1 = nn.Conv1d(N, B, 1, bias=False)
# [M, B, K] -> [M, B, K]
repeats = []
for r in range(R):
blocks = []
for x in range(X):
dilation = 2**x
padding = (P - 1) * dilation if causal else (P - 1) * dilation // 2
blocks += [
TemporalBlock(B,
H,
P,
stride=1,
padding=padding,
dilation=dilation,
norm_type=norm_type,
causal=causal)
]
repeats += [nn.Sequential(*blocks)]
temporal_conv_net = nn.Sequential(*repeats)
# [M, B, K] -> [M, C*N, K]
mask_conv1x1 = nn.Conv1d(B, C * N, 1, bias=False)
# Put together
self.network = nn.Sequential(layer_norm, bottleneck_conv1x1, temporal_conv_net,
mask_conv1x1)
def forward(self, mixture_w):
"""
Keep this API same with TasNet
Args:
mixture_w: [M, N, K], M is batch size
returns:
est_mask: [M, C, N, K]
"""
M, N, K = mixture_w.size()
score = self.network(mixture_w) # [M, N, K] -> [M, C*N, K]
score = score.view(M, self.C, N, K) # [M, C*N, K] -> [M, C, N, K]
if self.mask_nonlinear == 'softmax':
est_mask = F.softmax(score, dim=1)
elif self.mask_nonlinear == 'relu':
est_mask = F.relu(score)
else:
raise ValueError("Unsupported mask non-linear function")
return est_mask
class TemporalBlock(nn.Module):
def __init__(self,
in_channels,
out_channels,
kernel_size,
stride,
padding,
dilation,
norm_type="gLN",
causal=False):
super(TemporalBlock, self).__init__()
# [M, B, K] -> [M, H, K]
conv1x1 = nn.Conv1d(in_channels, out_channels, 1, bias=False)
prelu = nn.PReLU()
norm = chose_norm(norm_type, out_channels)
# [M, H, K] -> [M, B, K]
dsconv = DepthwiseSeparableConv(out_channels, in_channels, kernel_size, stride, padding,
dilation, norm_type, causal)
# Put together
self.net = nn.Sequential(conv1x1, prelu, norm, dsconv)
def forward(self, x):
"""
Args:
x: [M, B, K]
Returns:
[M, B, K]
"""
residual = x
out = self.net(x)
# TODO: when P = 3 here works fine, but when P = 2 maybe need to pad?
return out + residual # look like w/o F.relu is better than w/ F.relu
# return F.relu(out + residual)
class DepthwiseSeparableConv(nn.Module):
def __init__(self,
in_channels,
out_channels,
kernel_size,
stride,
padding,
dilation,
norm_type="gLN",
causal=False):
super(DepthwiseSeparableConv, self).__init__()
# Use `groups` option to implement depthwise convolution
# [M, H, K] -> [M, H, K]
depthwise_conv = nn.Conv1d(in_channels,
in_channels,
kernel_size,
stride=stride,
padding=padding,
dilation=dilation,
groups=in_channels,
bias=False)
if causal:
chomp = Chomp1d(padding)
prelu = nn.PReLU()
norm = chose_norm(norm_type, in_channels)
# [M, H, K] -> [M, B, K]
pointwise_conv = nn.Conv1d(in_channels, out_channels, 1, bias=False)
# Put together
if causal:
self.net = nn.Sequential(depthwise_conv, chomp, prelu, norm, pointwise_conv)
else:
self.net = nn.Sequential(depthwise_conv, prelu, norm, pointwise_conv)
def forward(self, x):
"""
Args:
x: [M, H, K]
Returns:
result: [M, B, K]
"""
return self.net(x)
class Chomp1d(nn.Module):
"""To ensure the output length is the same as the input.
"""
def __init__(self, chomp_size):
super(Chomp1d, self).__init__()
self.chomp_size = chomp_size
def forward(self, x):
"""
Args:
x: [M, H, Kpad]
Returns:
[M, H, K]
"""
return x[:, :, :-self.chomp_size].contiguous()
def chose_norm(norm_type, channel_size):
"""The input of normlization will be (M, C, K), where M is batch size,
C is channel size and K is sequence length.
"""
if norm_type == "gLN":
return GlobalLayerNorm(channel_size)
elif norm_type == "cLN":
return ChannelwiseLayerNorm(channel_size)
elif norm_type == "id":
return nn.Identity()
else: # norm_type == "BN":
# Given input (M, C, K), nn.BatchNorm1d(C) will accumulate statics
# along M and K, so this BN usage is right.
return nn.BatchNorm1d(channel_size)
# TODO: Use nn.LayerNorm to impl cLN to speed up
class ChannelwiseLayerNorm(nn.Module):
"""Channel-wise Layer Normalization (cLN)"""
def __init__(self, channel_size):
super(ChannelwiseLayerNorm, self).__init__()
self.gamma = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1]
self.beta = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1]
self.reset_parameters()
def reset_parameters(self):
self.gamma.data.fill_(1)
self.beta.data.zero_()
def forward(self, y):
"""
Args:
y: [M, N, K], M is batch size, N is channel size, K is length
Returns:
cLN_y: [M, N, K]
"""
mean = torch.mean(y, dim=1, keepdim=True) # [M, 1, K]
var = torch.var(y, dim=1, keepdim=True, unbiased=False) # [M, 1, K]
cLN_y = self.gamma * (y - mean) / torch.pow(var + EPS, 0.5) + self.beta
return cLN_y
class GlobalLayerNorm(nn.Module):
"""Global Layer Normalization (gLN)"""
def __init__(self, channel_size):
super(GlobalLayerNorm, self).__init__()
self.gamma = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1]
self.beta = nn.Parameter(torch.Tensor(1, channel_size, 1)) # [1, N, 1]
self.reset_parameters()
def reset_parameters(self):
self.gamma.data.fill_(1)
self.beta.data.zero_()
def forward(self, y):
"""
Args:
y: [M, N, K], M is batch size, N is channel size, K is length
Returns:
gLN_y: [M, N, K]
"""
# TODO: in torch 1.0, torch.mean() support dim list
mean = y.mean(dim=1, keepdim=True).mean(dim=2, keepdim=True) # [M, 1, 1]
var = (torch.pow(y - mean, 2)).mean(dim=1, keepdim=True).mean(dim=2, keepdim=True)
gLN_y = self.gamma * (y - mean) / torch.pow(var + EPS, 0.5) + self.beta
return gLN_y
if __name__ == "__main__":
torch.manual_seed(123)
M, N, L, T = 2, 3, 4, 12
K = 2 * T // L - 1
B, H, P, X, R, C, norm_type, causal = 2, 3, 3, 3, 2, 2, "gLN", False
mixture = torch.randint(3, (M, T))
# test Encoder
encoder = Encoder(L, N)
encoder.conv1d_U.weight.data = torch.randint(2, encoder.conv1d_U.weight.size())
mixture_w = encoder(mixture)
print('mixture', mixture)
print('U', encoder.conv1d_U.weight)
print('mixture_w', mixture_w)
print('mixture_w size', mixture_w.size())
# test TemporalConvNet
separator = TemporalConvNet(N, B, H, P, X, R, C, norm_type=norm_type, causal=causal)
est_mask = separator(mixture_w)
print('est_mask', est_mask)
# test Decoder
decoder = Decoder(N, L)
est_mask = torch.randint(2, (B, K, C, N))
est_source = decoder(mixture_w, est_mask)
print('est_source', est_source)
# test Conv-TasNet
conv_tasnet = ConvTasNet(N, L, B, H, P, X, R, C, norm_type=norm_type)
est_source = conv_tasnet(mixture)
print('est_source', est_source)
print('est_source size', est_source.size())