fastpitch和fastspeech2_模型检验的方法

fastpitch和fastspeech2_模型检验的方法本文件主要是定义Variance Adaptor,其中主要包括Duration Predictor、Length Regulator、Pitch

FastSpeech2模型搭建主要涉及的两个文件为fastspeech.py和model路径下的modules.py文件。

1.model/modules.py

本文件主要是定义Variance Adaptor,其中主要包括Duration Predictor、Length Regulator、Pitch Predictor和Energy Predictor,详细代码和注释解析如下所示

 import os
 import json
 import copy
 import math
 from collections import OrderedDict
 
 import torch
 import torch.nn as nn
 import numpy as np
 import torch.nn.functional as F
 
 from utils.tools import get_mask_from_lengths, pad
 
 device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
 
 # 完整Variance Adaptor
 class VarianceAdaptor(nn.Module):
     """Variance Adaptor"""
 
     def __init__(self, preprocess_config, model_config):
         super(VarianceAdaptor, self).__init__()
         self.duration_predictor = VariancePredictor(model_config)
         self.length_regulator = LengthRegulator()
         self.pitch_predictor = VariancePredictor(model_config)
         self.energy_predictor = VariancePredictor(model_config)
 
         # 设置pitch和energy的级别
         self.pitch_feature_level = preprocess_config["preprocessing"]["pitch"]["feature"]
         self.energy_feature_level = preprocess_config["preprocessing"]["energy"]["feature"]
         assert self.pitch_feature_level in ["phoneme_level", "frame_level"]
         assert self.energy_feature_level in ["phoneme_level", "frame_level"]
 
         # 设置pitch何energy的量化方式
         pitch_quantization = model_config["variance_embedding"]["pitch_quantization"]
         energy_quantization = model_config["variance_embedding"]["energy_quantization"]
         n_bins = model_config["variance_embedding"]["n_bins"]
         assert pitch_quantization in ["linear", "log"]
         assert energy_quantization in ["linear", "log"]
 
         # 加载pitch和energy正则化所需参数
         with open(
             os.path.join(preprocess_config["path"]["preprocessed_path"], "stats.json")
         ) as f:
             stats = json.load(f)
             pitch_min, pitch_max = stats["pitch"][:2]
             energy_min, energy_max = stats["energy"][:2]
 
         # if量化参数为log,表示在处理过程中没有经过量化,正常情况下量化方式为linear
         if pitch_quantization == "log":
             # torch.exp() 表示e^{input}
             # torch.linsapce(x,y,num)表示返回x和y之间的等间隔的区间,共num个
             self.pitch_bins = nn.Parameter(
                 torch.exp(
                     torch.linspace(np.log(pitch_min), np.log(pitch_max), n_bins - 1) # 255
                 ),
                 requires_grad=False,
             )
         else:
             self.pitch_bins = nn.Parameter(
                 torch.linspace(pitch_min, pitch_max, n_bins - 1),
                 requires_grad=False,
             )
         if energy_quantization == "log":
             self.energy_bins = nn.Parameter(
                 torch.exp(
                     torch.linspace(np.log(energy_min), np.log(energy_max), n_bins - 1)
                 ),
                 requires_grad=False,
             )
         else:
             self.energy_bins = nn.Parameter(
                 torch.linspace(energy_min, energy_max, n_bins - 1),
                 requires_grad=False,
             )
         # pitch和energy的嵌入层
         self.pitch_embedding = nn.Embedding(
             n_bins, model_config["transformer"]["encoder_hidden"]
         )
         self.energy_embedding = nn.Embedding(
             n_bins, model_config["transformer"]["encoder_hidden"]
         )
 
     # 计算pitch嵌入层
     def get_pitch_embedding(self, x, target, mask, control):
         prediction = self.pitch_predictor(x, mask)  # pitch预测器预测的数值
         if target is not None:  # target存在,训练过程,使用target计算embedding
             embedding = self.pitch_embedding(torch.bucketize(target, self.pitch_bins))
         else:  # target不存在,预测过程,使用prediction计算embedding
             prediction = prediction * control   # control是用于控制的系数
             embedding = self.pitch_embedding(torch.bucketize(prediction, self.pitch_bins))
         return prediction, embedding  # prediction用于训练过程计算损失,embedding与x相加进行后续计算
 
     # 计算energy嵌入层
     def get_energy_embedding(self, x, target, mask, control):
         prediction = self.energy_predictor(x, mask)
         if target is not None:
             embedding = self.energy_embedding(torch.bucketize(target, self.energy_bins))
         else:
             prediction = prediction * control
             embedding = self.energy_embedding(torch.bucketize(prediction, self.energy_bins))
         return prediction, embedding
 
     def forward(
         self,
         x,
         src_mask,
         mel_mask=None,
         max_len=None,
         pitch_target=None,
         energy_target=None,
         duration_target=None,
         p_control=1.0,
         e_control=1.0,
         d_control=1.0,
     ):
 
         log_duration_prediction = self.duration_predictor(x, src_mask)  # 对音素序列预测的持续时间
         if self.pitch_feature_level == "phoneme_level":
             pitch_prediction, pitch_embedding = self.get_pitch_embedding(
                 x, pitch_target, src_mask, p_control
             )
             x = x + pitch_embedding  # 累加pitch嵌入层
         if self.energy_feature_level == "phoneme_level":
             energy_prediction, energy_embedding = self.get_energy_embedding(
                 x, energy_target, src_mask, p_control
             )
             x = x + energy_embedding  # 累加energy嵌入层
 
         if duration_target is not None:  # duration_target,训练过程,使用duration_target计算
             x, mel_len = self.length_regulator(x, duration_target, max_len)  # 使用duration_target调整x
             duration_rounded = duration_target
         else:  # 预测过程
             # 基于log_duration_prediction构建duration_rounded,用于调整x
             # torch.clamp() 将输入input张量每个元素的夹紧到区间 [min,max][min,max],并返回结果到一个新张量
             # torch.round() 四舍五入
             duration_rounded = torch.clamp(
                 (torch.round(torch.exp(log_duration_prediction) - 1) * d_control),
                 min=0,
             )
             x, mel_len = self.length_regulator(x, duration_rounded, max_len)
             mel_mask = get_mask_from_lengths(mel_len)
 
         if self.pitch_feature_level == "frame_level":
             pitch_prediction, pitch_embedding = self.get_pitch_embedding(
                 x, pitch_target, mel_mask, p_control
             )
             x = x + pitch_embedding
         if self.energy_feature_level == "frame_level":
             energy_prediction, energy_embedding = self.get_energy_embedding(
                 x, energy_target, mel_mask, p_control
             )
             x = x + energy_embedding
 
         return (  # 此处三个prediction用于后续计算损失
             x,
             pitch_prediction,
             energy_prediction,
             log_duration_prediction,
             duration_rounded,
             mel_len,
             mel_mask,
         )
 
 # 长度调节器
 class LengthRegulator(nn.Module):
     """Length Regulator"""
 
     def __init__(self):
         super(LengthRegulator, self).__init__()
 
     # 对输入的音素序列x进行长度调正
     def LR(self, x, duration, max_len):
         """
         基于音素持续时间将音素序列长度与mel谱图长度对齐
         @param x: 经过FFT块转换后的音素序列,[batch_size, max_sequence_len, encoder_dim]
         @param duration: 音素持续时间矩阵,[batch_size, max_sequence_len]
         @param max_len: 音素谱图序列中最大长度
         @return: 长度经过调整后的音素序列,[batch_size, max_len, encoder_dim]
         """
         output = list()
         mel_len = list()
         for batch, expand_target in zip(x, duration):
             expanded = self.expand(batch, expand_target)  # 获得一个长度完整调整之后音素序列
             output.append(expanded)
             mel_len.append(expanded.shape[0])  # 记录mel谱图长度大小,方便后续生成mask
 
         # 如果传入max_len就按其进行pad,如果没有就以output中最长序列大小进行pad
         if max_len is not None:
             output = pad(output, max_len)
         else:
             output = pad(output)
 
         return output, torch.LongTensor(mel_len).to(device)
 
     def expand(self, batch, predicted):
         """
         将输入的一个音素序列的长度按其对应的持续时间调整
         @param batch:一个音频对应文本的音素序列,[max_sequence_len, encoder_dim]
         @param predicted:音素序列中每个音素对应的持续序列,长度为max_sequence_len
         @return:长度调整后的音素序列,长度与mel谱图长度一致
         """
 
         out = list()
 
         for i, vec in enumerate(batch):
             expand_size = predicted[i].item()  # i对应的音素对应持续时间,即需要重复的次数
             out.append(vec.expand(max(int(expand_size), 0), -1))  # 将i对应的音素的表征向量vec重复expand_size次
         out = torch.cat(out, 0)  # 将整个音素序列cat起来
 
         return out
 
     def forward(self, x, duration, max_len):
         output, mel_len = self.LR(x, duration, max_len)
         return output, mel_len
 
 
 class VariancePredictor(nn.Module):
     """Duration, Pitch and Energy Predictor"""
 
     def __init__(self, model_config):
         super(VariancePredictor, self).__init__()
 
         self.input_size = model_config["transformer"]["encoder_hidden"]  # 输入尺寸 256
         self.filter_size = model_config["variance_predictor"]["filter_size"]  # 输出尺寸 256
         self.kernel = model_config["variance_predictor"]["kernel_size"]  # 卷积核大小 3
         self.conv_output_size = model_config["variance_predictor"]["filter_size"]
         self.dropout = model_config["variance_predictor"]["dropout"]
 
         # 定义一个包含激活函数和正则项的卷积序列,即[Con1D+Relu+LN+Dropout]+[Con1D+Relu+LN+Dropout]
         self.conv_layer = nn.Sequential(
             OrderedDict(
                 [
                     (
                         "conv1d_1",
                         Conv(
                             self.input_size,
                             self.filter_size,
                             kernel_size=self.kernel,
                             padding=(self.kernel - 1) // 2,
                         ),
                     ),
                     ("relu_1", nn.ReLU()),
                     ("layer_norm_1", nn.LayerNorm(self.filter_size)),
                     ("dropout_1", nn.Dropout(self.dropout)),
                     (
                         "conv1d_2",
                         Conv(
                             self.filter_size,
                             self.filter_size,
                             kernel_size=self.kernel,
                             padding=1,
                         ),
                     ),
                     ("relu_2", nn.ReLU()),
                     ("layer_norm_2", nn.LayerNorm(self.filter_size)),
                     ("dropout_2", nn.Dropout(self.dropout)),
                 ]
             )
         )
 
         self.linear_layer = nn.Linear(self.conv_output_size, 1)
 
     def forward(self, encoder_output, mask):
         out = self.conv_layer(encoder_output)  # [Con1D+Relu+LN+Dropout]+[Con1D+Relu+LN+Dropout]
         out = self.linear_layer(out)  # 最后输出前的线性层
         out = out.squeeze(-1)  # 因为线性层返回的是1,即输出的尺寸的最后一维是1,将其压缩掉
 
         if mask is not None:  # 将mask对应地方设置为0
             out = out.masked_fill(mask, 0.0)
 
         return out
 
 # 自定义的一维卷积网络
 class Conv(nn.Module):
     """
     Convolution Module
     """
 
     def __init__(
         self,
         in_channels,
         out_channels,
         kernel_size=1,
         stride=1,
         padding=0,
         dilation=1,
         bias=True,
         w_init="linear",
     ):
         """
         :param in_channels: dimension of input
         :param out_channels: dimension of output
         :param kernel_size: size of kernel
         :param stride: size of stride
         :param padding: size of padding
         :param dilation: dilation rate
         :param bias: boolean. if True, bias is included.
         :param w_init: str. weight inits with xavier initialization.
         """
         super(Conv, self).__init__()
 
         self.conv = nn.Conv1d(
             in_channels,
             out_channels,
             kernel_size=kernel_size,
             stride=stride,
             padding=padding,
             dilation=dilation,
             bias=bias,
         )
 
     def forward(self, x):
         x = x.contiguous().transpose(1, 2)
         x = self.conv(x)
         x = x.contiguous().transpose(1, 2)
 
         return x
 

代码100分

2.model/fastspeech2.py

本文件将Encoder, Decoder, PostNet和Variance Adaptor模块集成在一起,完成FastSpeech2模型搭建

代码100分 import os
 import json
 
 import torch
 import torch.nn as nn
 import torch.nn.functional as F
 
 from transformer import Encoder, Decoder, PostNet
 from .modules import VarianceAdaptor
 from utils.tools import get_mask_from_lengths
 
 
 class FastSpeech2(nn.Module):
     """ FastSpeech2 """
 
     def __init__(self, preprocess_config, model_config):
         super(FastSpeech2, self).__init__()
         self.model_config = model_config
 
         self.encoder = Encoder(model_config)  # variance adaptor之前encoder
         self.variance_adaptor = VarianceAdaptor(preprocess_config, model_config)
         self.decoder = Decoder(model_config)  # variance adaptor之后decoder
         self.mel_linear = nn.Linear(
             model_config["transformer"]["decoder_hidden"],  # 256
             preprocess_config["preprocessing"]["mel"]["n_mel_channels"],  # 80
         )
         self.postnet = PostNet()
 
         self.speaker_emb = None
         # 如果为多说话人
         if model_config["multi_speaker"]: # True
             # 加载speaker文件
             with open(os.path.join(preprocess_config["path"]["preprocessed_path"], "speakers.json"),"r",) \
                     as f:
                 n_speaker = len(json.load(f))
             # 构建speaker embedding
             self.speaker_emb = nn.Embedding(
                 n_speaker,
                 model_config["transformer"]["encoder_hidden"],  # 256
             )
 
     def forward(
         self,
         speakers,
         texts,
         src_lens,
         max_src_len,
         mels=None,
         mel_lens=None,
         max_mel_len=None,
         p_targets=None,
         e_targets=None,
         d_targets=None,
         p_control=1.0,  # 控制系数
         e_control=1.0,
         d_control=1.0,
     ):
         src_masks = get_mask_from_lengths(src_lens, max_src_len)  # 原始文本序列mask
         mel_masks = (
             get_mask_from_lengths(mel_lens, max_mel_len)
             if mel_lens is not None
             else None
         ) # mel谱图序列mask
 
         output = self.encoder(texts, src_masks) # 编码
 
         if self.speaker_emb is not None:  # 如果存在speaker嵌入层,将其和output相加
             output = output + self.speaker_emb(speakers).unsqueeze(1).expand(
                 -1, max_src_len, -1
             )
 
         # 通过Variance Adaptor模块计算
         (
             output,
             p_predictions,
             e_predictions,
             log_d_predictions,
             d_rounded,
             mel_lens,
             mel_masks,
         ) = self.variance_adaptor(
             output,
             src_masks,
             mel_masks,
             max_mel_len,
             p_targets,
             e_targets,
             d_targets,
             p_control,
             e_control,
             d_control,
         )
 
         output, mel_masks = self.decoder(output, mel_masks)  # 解码
         output = self.mel_linear(output)  # 线性转换
 
         postnet_output = self.postnet(output) + output  # 后处理
 
         return (
             output,
             postnet_output,
             p_predictions,
             e_predictions,
             log_d_predictions,
             d_rounded,
             src_masks,
             mel_masks,
             src_lens,
             mel_lens,
         )

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
转载请注明出处: https://daima100.com/4184.html

(0)
上一篇 2023-11-12
下一篇 2023-04-01

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注