Spaces:
Sleeping
Sleeping
| """ | |
| File: model.py | |
| Author: Elena Ryumina and Dmitry Ryumin | |
| Description: This module provides model architectures. | |
| License: MIT License | |
| """ | |
| import torch | |
| import torch.nn as nn | |
| import torch.nn.functional as F | |
| import math | |
| import numpy as np | |
| from transformers.models.wav2vec2.modeling_wav2vec2 import ( | |
| Wav2Vec2Model, | |
| Wav2Vec2PreTrainedModel, | |
| ) | |
| from typing import Optional | |
| class Bottleneck(nn.Module): | |
| expansion = 4 | |
| def __init__(self, in_channels, out_channels, i_downsample=None, stride=1): | |
| super(Bottleneck, self).__init__() | |
| self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, padding=0, bias=False) | |
| self.batch_norm1 = nn.BatchNorm2d(out_channels, eps=0.001, momentum=0.99) | |
| self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding='same', bias=False) | |
| self.batch_norm2 = nn.BatchNorm2d(out_channels, eps=0.001, momentum=0.99) | |
| self.conv3 = nn.Conv2d(out_channels, out_channels*self.expansion, kernel_size=1, stride=1, padding=0, bias=False) | |
| self.batch_norm3 = nn.BatchNorm2d(out_channels*self.expansion, eps=0.001, momentum=0.99) | |
| self.i_downsample = i_downsample | |
| self.stride = stride | |
| self.relu = nn.ReLU() | |
| def forward(self, x): | |
| identity = x.clone() | |
| x = self.relu(self.batch_norm1(self.conv1(x))) | |
| x = self.relu(self.batch_norm2(self.conv2(x))) | |
| x = self.conv3(x) | |
| x = self.batch_norm3(x) | |
| #downsample if needed | |
| if self.i_downsample is not None: | |
| identity = self.i_downsample(identity) | |
| #add identity | |
| x+=identity | |
| x=self.relu(x) | |
| return x | |
| class Conv2dSame(torch.nn.Conv2d): | |
| def calc_same_pad(self, i: int, k: int, s: int, d: int) -> int: | |
| return max((math.ceil(i / s) - 1) * s + (k - 1) * d + 1 - i, 0) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| ih, iw = x.size()[-2:] | |
| pad_h = self.calc_same_pad(i=ih, k=self.kernel_size[0], s=self.stride[0], d=self.dilation[0]) | |
| pad_w = self.calc_same_pad(i=iw, k=self.kernel_size[1], s=self.stride[1], d=self.dilation[1]) | |
| if pad_h > 0 or pad_w > 0: | |
| x = F.pad( | |
| x, [pad_w // 2, pad_w - pad_w // 2, pad_h // 2, pad_h - pad_h // 2] | |
| ) | |
| return F.conv2d( | |
| x, | |
| self.weight, | |
| self.bias, | |
| self.stride, | |
| self.padding, | |
| self.dilation, | |
| self.groups, | |
| ) | |
| class ResNet(nn.Module): | |
| def __init__(self, ResBlock, layer_list, num_classes, num_channels=3): | |
| super(ResNet, self).__init__() | |
| self.in_channels = 64 | |
| self.conv_layer_s2_same = Conv2dSame(num_channels, 64, 7, stride=2, groups=1, bias=False) | |
| self.batch_norm1 = nn.BatchNorm2d(64, eps=0.001, momentum=0.99) | |
| self.relu = nn.ReLU() | |
| self.max_pool = nn.MaxPool2d(kernel_size = 3, stride=2) | |
| self.layer1 = self._make_layer(ResBlock, layer_list[0], planes=64, stride=1) | |
| self.layer2 = self._make_layer(ResBlock, layer_list[1], planes=128, stride=2) | |
| self.layer3 = self._make_layer(ResBlock, layer_list[2], planes=256, stride=2) | |
| self.layer4 = self._make_layer(ResBlock, layer_list[3], planes=512, stride=2) | |
| self.avgpool = nn.AdaptiveAvgPool2d((1,1)) | |
| self.fc1 = nn.Linear(512*ResBlock.expansion, 512) | |
| self.relu1 = nn.ReLU() | |
| self.fc2 = nn.Linear(512, num_classes) | |
| def extract_features(self, x): | |
| x = self.relu(self.batch_norm1(self.conv_layer_s2_same(x))) | |
| x = self.max_pool(x) | |
| # print(x.shape) | |
| x = self.layer1(x) | |
| x = self.layer2(x) | |
| x = self.layer3(x) | |
| x = self.layer4(x) | |
| x = self.avgpool(x) | |
| x = x.reshape(x.shape[0], -1) | |
| x = self.fc1(x) | |
| return x | |
| def forward(self, x): | |
| x = self.extract_features(x) | |
| x = self.relu1(x) | |
| x = self.fc2(x) | |
| return x | |
| def _make_layer(self, ResBlock, blocks, planes, stride=1): | |
| ii_downsample = None | |
| layers = [] | |
| if stride != 1 or self.in_channels != planes*ResBlock.expansion: | |
| ii_downsample = nn.Sequential( | |
| nn.Conv2d(self.in_channels, planes*ResBlock.expansion, kernel_size=1, stride=stride, bias=False, padding=0), | |
| nn.BatchNorm2d(planes*ResBlock.expansion, eps=0.001, momentum=0.99) | |
| ) | |
| layers.append(ResBlock(self.in_channels, planes, i_downsample=ii_downsample, stride=stride)) | |
| self.in_channels = planes*ResBlock.expansion | |
| for i in range(blocks-1): | |
| layers.append(ResBlock(self.in_channels, planes)) | |
| return nn.Sequential(*layers) | |
| def ResNet50(num_classes, channels=3): | |
| return ResNet(Bottleneck, [3,4,6,3], num_classes, channels) | |
| class LSTMPyTorch(nn.Module): | |
| def __init__(self): | |
| super(LSTMPyTorch, self).__init__() | |
| self.lstm1 = nn.LSTM(input_size=512, hidden_size=512, batch_first=True, bidirectional=False) | |
| self.lstm2 = nn.LSTM(input_size=512, hidden_size=256, batch_first=True, bidirectional=False) | |
| self.fc = nn.Linear(256, 7) | |
| # self.softmax = nn.Softmax(dim=1) | |
| def forward(self, x): | |
| x, _ = self.lstm1(x) | |
| x, _ = self.lstm2(x) | |
| x = self.fc(x[:, -1, :]) | |
| # x = self.softmax(x) | |
| return x | |
| class ExprModelV3(Wav2Vec2PreTrainedModel): | |
| def __init__(self, config) -> None: | |
| super().__init__(config) | |
| self.config = config | |
| self.wav2vec2 = Wav2Vec2Model(config) | |
| self.tl1 = TransformerLayer( | |
| input_dim=1024, num_heads=32, dropout=0.1, positional_encoding=True | |
| ) | |
| self.tl2 = TransformerLayer( | |
| input_dim=1024, num_heads=16, dropout=0.1, positional_encoding=True | |
| ) | |
| self.f_size = 1024 | |
| self.time_downsample = torch.nn.Sequential( | |
| torch.nn.Conv1d( | |
| self.f_size, self.f_size, kernel_size=5, stride=3, dilation=2 | |
| ), | |
| torch.nn.BatchNorm1d(self.f_size), | |
| torch.nn.MaxPool1d(5), | |
| torch.nn.ReLU(), | |
| torch.nn.Conv1d(self.f_size, self.f_size, kernel_size=3), | |
| torch.nn.BatchNorm1d(self.f_size), | |
| torch.nn.AdaptiveAvgPool1d(1), | |
| torch.nn.ReLU(), | |
| ) | |
| self.feature_downsample = nn.Linear(self.f_size, 8) | |
| self.init_weights() | |
| self.unfreeze_last_n_blocks(4) | |
| def freeze_conv_only(self): | |
| # freeze conv | |
| for param in self.wav2vec2.feature_extractor.conv_layers.parameters(): | |
| param.requires_grad = False | |
| def unfreeze_last_n_blocks(self, num_blocks: int) -> None: | |
| # freeze all wav2vec | |
| for param in self.wav2vec2.parameters(): | |
| param.requires_grad = False | |
| # unfreeze last n transformer blocks | |
| for i in range(0, num_blocks): | |
| for param in self.wav2vec2.encoder.layers[-1 * (i + 1)].parameters(): | |
| param.requires_grad = True | |
| def forward(self, x): | |
| x = self.wav2vec2(x)[0] | |
| x = self.tl1(query=x, key=x, value=x) | |
| x = self.tl2(query=x, key=x, value=x) | |
| x = x.permute(0, 2, 1) | |
| x = self.time_downsample(x) | |
| x = x.squeeze() | |
| x = self.feature_downsample(x) | |
| return x | |
| class ScaledDotProductAttention_MultiHead(nn.Module): | |
| def __init__(self): | |
| super(ScaledDotProductAttention_MultiHead, self).__init__() | |
| self.softmax = nn.Softmax(dim=-1) | |
| def forward(self, query, key, value, mask=None): | |
| if mask is not None: | |
| raise ValueError("Mask is not supported yet") | |
| # key, query, value shapes: [batch_size, num_heads, seq_len, dim] | |
| emb_dim = key.shape[-1] | |
| # Calculate attention weights | |
| attention_weights = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt( | |
| emb_dim | |
| ) | |
| # masking | |
| if mask is not None: | |
| raise ValueError("Mask is not supported yet") | |
| # Softmax | |
| attention_weights = self.softmax(attention_weights) | |
| # modify value | |
| value = torch.matmul(attention_weights, value) | |
| return value, attention_weights | |
| class PositionWiseFeedForward(nn.Module): | |
| def __init__(self, input_dim, hidden_dim, dropout: float = 0.1): | |
| super().__init__() | |
| self.layer_1 = nn.Linear(input_dim, hidden_dim) | |
| self.layer_2 = nn.Linear(hidden_dim, input_dim) | |
| self.layer_norm = nn.LayerNorm(input_dim) | |
| self.dropout = nn.Dropout(dropout) | |
| def forward(self, x): | |
| # feed-forward network | |
| x = self.layer_1(x) | |
| x = self.dropout(x) | |
| x = F.relu(x) | |
| x = self.layer_2(x) | |
| return x | |
| class Add_and_Norm(nn.Module): | |
| def __init__(self, input_dim, dropout: Optional[float] = 0.1): | |
| super().__init__() | |
| self.layer_norm = nn.LayerNorm(input_dim) | |
| if dropout is not None: | |
| self.dropout = nn.Dropout(dropout) | |
| def forward(self, x1, residual): | |
| x = x1 | |
| # apply dropout of needed | |
| if hasattr(self, "dropout"): | |
| x = self.dropout(x) | |
| # add and then norm | |
| x = x + residual | |
| x = self.layer_norm(x) | |
| return x | |
| class MultiHeadAttention(nn.Module): | |
| def __init__(self, input_dim, num_heads, dropout: Optional[float] = 0.1): | |
| super().__init__() | |
| self.input_dim = input_dim | |
| self.num_heads = num_heads | |
| if input_dim % num_heads != 0: | |
| raise ValueError("input_dim must be divisible by num_heads") | |
| self.head_dim = input_dim // num_heads | |
| self.dropout = dropout | |
| # initialize weights | |
| self.query_w = nn.Linear(input_dim, self.num_heads * self.head_dim, bias=False) | |
| self.keys_w = nn.Linear(input_dim, self.num_heads * self.head_dim, bias=False) | |
| self.values_w = nn.Linear(input_dim, self.num_heads * self.head_dim, bias=False) | |
| self.ff_layer_after_concat = nn.Linear( | |
| self.num_heads * self.head_dim, input_dim, bias=False | |
| ) | |
| self.attention = ScaledDotProductAttention_MultiHead() | |
| if self.dropout is not None: | |
| self.dropout = nn.Dropout(dropout) | |
| def forward(self, queries, keys, values, mask=None): | |
| # query, keys, values shapes: [batch_size, seq_len, input_dim] | |
| batch_size, len_query, len_keys, len_values = ( | |
| queries.size(0), | |
| queries.size(1), | |
| keys.size(1), | |
| values.size(1), | |
| ) | |
| # linear transformation before attention | |
| queries = ( | |
| self.query_w(queries) | |
| .view(batch_size, len_query, self.num_heads, self.head_dim) | |
| .transpose(1, 2) | |
| ) # [batch_size, num_heads, seq_len, dim] | |
| keys = ( | |
| self.keys_w(keys) | |
| .view(batch_size, len_keys, self.num_heads, self.head_dim) | |
| .transpose(1, 2) | |
| ) # [batch_size, num_heads, seq_len, dim] | |
| values = ( | |
| self.values_w(values) | |
| .view(batch_size, len_values, self.num_heads, self.head_dim) | |
| .transpose(1, 2) | |
| ) # [batch_size, num_heads, seq_len, dim] | |
| # attention itself | |
| values, attention_weights = self.attention( | |
| queries, keys, values, mask=mask | |
| ) # values shape:[batch_size, num_heads, seq_len, dim] | |
| # concatenation | |
| out = ( | |
| values.transpose(1, 2) | |
| .contiguous() | |
| .view(batch_size, len_values, self.num_heads * self.head_dim) | |
| ) # [batch_size, seq_len, num_heads * dim = input_dim] | |
| # go through last linear layer | |
| out = self.ff_layer_after_concat(out) | |
| return out | |
| class EncoderLayer(nn.Module): | |
| def __init__( | |
| self, | |
| input_dim, | |
| num_heads, | |
| dropout: Optional[float] = 0.1, | |
| positional_encoding: bool = True, | |
| ): | |
| super(EncoderLayer, self).__init__() | |
| self.positional_encoding = positional_encoding | |
| self.input_dim = input_dim | |
| self.num_heads = num_heads | |
| self.head_dim = input_dim // num_heads | |
| self.dropout = dropout | |
| # initialize layers | |
| self.self_attention = MultiHeadAttention(input_dim, num_heads, dropout=dropout) | |
| self.feed_forward = PositionWiseFeedForward( | |
| input_dim, input_dim, dropout=dropout | |
| ) | |
| self.add_norm_after_attention = Add_and_Norm(input_dim, dropout=dropout) | |
| self.add_norm_after_ff = Add_and_Norm(input_dim, dropout=dropout) | |
| # calculate positional encoding | |
| if self.positional_encoding: | |
| self.positional_encoding = PositionalEncoding(input_dim) | |
| def forward(self, x): | |
| # x shape: [batch_size, seq_len, input_dim] | |
| # positional encoding | |
| if self.positional_encoding: | |
| x = self.positional_encoding(x) | |
| # multi-head attention | |
| residual = x | |
| x = self.self_attention(x, x, x) | |
| x = self.add_norm_after_attention(x, residual) | |
| # feed forward | |
| residual = x | |
| x = self.feed_forward(x) | |
| x = self.add_norm_after_ff(x, residual) | |
| return x | |
| class PositionalEncoding(nn.Module): | |
| def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000): | |
| super().__init__() | |
| self.dropout = nn.Dropout(p=dropout) | |
| position = torch.arange(max_len).unsqueeze(1) | |
| div_term = torch.exp( | |
| torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model) | |
| ) | |
| pe = torch.zeros(max_len, 1, d_model) | |
| pe[:, 0, 0::2] = torch.sin(position * div_term) | |
| pe[:, 0, 1::2] = torch.cos(position * div_term) | |
| pe = pe.permute( | |
| 1, 0, 2 | |
| ) # [seq_len, batch_size, embedding_dim] -> [batch_size, seq_len, embedding_dim] | |
| self.register_buffer("pe", pe) | |
| def forward(self, x: torch.Tensor) -> torch.Tensor: | |
| """ | |
| Args: | |
| x: Tensor, shape [batch_size, seq_len, embedding_dim] | |
| """ | |
| x = x + self.pe[:, : x.size(1)] | |
| return self.dropout(x) | |
| class TransformerLayer(nn.Module): | |
| def __init__( | |
| self, | |
| input_dim, | |
| num_heads, | |
| dropout: Optional[float] = 0.1, | |
| positional_encoding: bool = True, | |
| ): | |
| super(TransformerLayer, self).__init__() | |
| self.positional_encoding = positional_encoding | |
| self.input_dim = input_dim | |
| self.num_heads = num_heads | |
| self.head_dim = input_dim // num_heads | |
| self.dropout = dropout | |
| # initialize layers | |
| self.self_attention = MultiHeadAttention(input_dim, num_heads, dropout=dropout) | |
| self.feed_forward = PositionWiseFeedForward( | |
| input_dim, input_dim, dropout=dropout | |
| ) | |
| self.add_norm_after_attention = Add_and_Norm(input_dim, dropout=dropout) | |
| self.add_norm_after_ff = Add_and_Norm(input_dim, dropout=dropout) | |
| # calculate positional encoding | |
| if self.positional_encoding: | |
| self.positional_encoding = PositionalEncoding(input_dim) | |
| def forward(self, key, value, query, mask=None): | |
| # key, value, and query shapes: [batch_size, seq_len, input_dim] | |
| # positional encoding | |
| if self.positional_encoding: | |
| key = self.positional_encoding(key) | |
| value = self.positional_encoding(value) | |
| query = self.positional_encoding(query) | |
| # multi-head attention | |
| residual = query | |
| x = self.self_attention(queries=query, keys=key, values=value, mask=mask) | |
| x = self.add_norm_after_attention(x, residual) | |
| # feed forward | |
| residual = x | |
| x = self.feed_forward(x) | |
| x = self.add_norm_after_ff(x, residual) | |
| return x | |