pythonPaddleSpeech实现婴儿啼哭识别

2022-08-26 12:47:36
目录
一、基于PaddleSpeech的婴儿啼哭识别1.项目背景2.数据说明:二、PaddleSpeech环境准备三、数据预处理1.数据解压缩2.查看声音文件3.音频文件长度处理3.自定义数据集四、模型训练1.选取预训练模型2.构建分类模型3.finetune五、模型训练六、注意事项

一、基于PaddleSpeech的婴儿啼哭识别

1.项目背景

对婴儿来说,啼哭声是一种通讯的方式,一个非常有限的,但类似成年人进行交流的方式。它也是一种生物报警器,向外界传达着婴儿生理和心理的需求。基于啼哭声声波携带的信息,婴儿的身体状况才能被确定,疾病才能被检测出来。因此,有效辨识啼哭声,成功地将婴儿啼哭声“翻译”成“成人语言”,让我们能够读懂啼哭声的含义,有重大的实际意义。

2.数据说明:

    1.训练数据集包含六类哭声,已人工添加噪声。

    A:awake(苏醒)

    B:diaper(换尿布)

    C:hug(要抱抱)

    D:hungry(饥饿)

    E:sleepy(困乏)

    F:uncomfortable(不舒服)

      2.噪声数据来源Noisex-92标准数据库。

      二、PaddleSpeech环境准备

      # 环境准备:安装paddlespeech和paddleaudio
      !python -m pip install -q -U pip --user
      !pip install paddlespeech paddleaudio -U -q
      
      !pip list|grep paddle
      
      import warnings
      warnings.filterwarnings("ignore")
      import IPython
      import numpy as np
      import matplotlib.pyplot as plt
      import paddle
      %matplotlib inline
      

      三、数据预处理

      1.数据解压缩

      # !unzip -qoa data/data41960/dddd.zip
      

      2.查看声音文件

      from paddleaudio import load
      data, sr = load(file='train/awake/awake_0.wav', mono=True, dtype='float32')  # 单通道,float32音频样本点
      print('wav shape: {}'.format(data.shape))
      print('sample rate: {}'.format(sr))
      # 展示音频波形
      plt.figure()
      plt.plot(data)
      plt.show()
      
      from paddleaudio import load
      data, sr = load(file='train/diaper/diaper_0.wav', mono=True, dtype='float32')  # 单通道,float32音频样本点
      print('wav shape: {}'.format(data.shape))
      print('sample rate: {}'.format(sr))
      # 展示音频波形
      plt.figure()
      plt.plot(data)
      plt.show()
      
      !paddlespeech cls --input train/awake/awake_0.wav
      
      !paddlespeech help
      

      3.音频文件长度处理

      # 查音频长度
      import contextlib
      import wave
      def get_sound_len(file_path):
          with contextlib.closing(wave.open(file_path, 'r')) as f:
              frames = f.getnframes()
              rate = f.getframerate()
              wav_length = frames / float(rate)
          return wav_length
      
      # 编译wav文件
      import glob
      sound_files=glob.glob('train/*/*.wav')
      print(sound_files[0])
      print(len(sound_files))
      
      # 统计最长、最短音频
      sounds_len=[]
      for sound in sound_files:
          sounds_len.append(get_sound_len(sound))
      print("音频最大长度:",max(sounds_len),"秒")
      print("音频最小长度:",min(sounds_len),"秒")
      
      !cp train/hungry/hungry_0.wav ~/
      
      !pip install pydub -q
      
      # 音频信息查看
      import math
      import soundfile as sf
      import numpy as np
      import librosa
      data, samplerate = sf.read('hungry_0.wav')
      channels = len(data.shape)
      length_s = len(data)/float(samplerate)
      format_rate=16000
      print(f"channels: {channels}")
      print(f"length_s: {length_s}")
      print(f"samplerate: {samplerate}")
      
      # 统一到34s
      from pydub import AudioSegment
      audio = AudioSegment.from_wav('hungry_0.wav')
      print(str(audio.duration_seconds))
      i = 1
      padded = audio
      while padded.duration_seconds * 1000 < 34000:
          padded = audio * i
          i = i + 1
      padded[0:34000].set_frame_rate(16000).export('padded-file.wav', format='wav')
      
      import math
      import soundfile as sf
      import numpy as np
      import librosa
      data, samplerate = sf.read('padded-file.wav')
      channels = len(data.shape)
      length_s = len(data)/float(samplerate)
      format_rate=16000
      print(f"channels: {channels}")
      print(f"length_s: {length_s}")
      print(f"samplerate: {samplerate}")
      
      # 定义函数,如未达到最大长度,则重复填充,最终从超过34s的音频中截取
      from pydub import AudioSegment
      def convert_sound_len(filename):
          audio = AudioSegment.from_wav(filename)
          i = 1
          padded = audio*i
          while padded.duration_seconds * 1000 < 34000:
              i = i + 1
              padded = audio * i
          padded[0:34000].set_frame_rate(16000).export(filename, format='wav')
      
      # 统一所有音频到定长
      for sound in sound_files:
          convert_sound_len(sound)
      

      3.自定义数据集

      import os
      from paddlespeech.audio.datasets.dataset import AudioClassificationDataset
      class CustomDataset(AudioClassificationDataset):
          # List all the class labels
          label_list = [
              'awake',
              'diaper',
              'hug',
              'hungry',
              'sleepy',
              'uncomfortable'
          ]
          train_data_dir='./train/'
          def __init__(self, **kwargs):
              files, labels = self._get_data()
              super(CustomDataset, self).__init__(
                  files=files, labels=labels, feat_type='raw', **kwargs)
          # 返回音频文件、label值
          def _get_data(self):
              '''
              This method offer information of wave files and labels.
              '''
              files = []
              labels = []
              for i in range(len(self.label_list)):
                  single_class_path=os.path.join(self.train_data_dir, self.label_list[i])            
                  for sound in os.listdir(single_class_path):
                      # print(sound)
                      if 'wav' in sound:
                          sound=os.path.join(single_class_path, sound)
                          files.append(sound)
                          labels.append(i)
              return files, labels
      
      # 定义dataloader
      import paddle
      from paddlespeech.audio.features import LogMelSpectrogram
      # Feature config should be align with pretrained model
      sample_rate = 16000
      feat_conf = {
        'sr': sample_rate,
        'n_fft': 1024,
        'hop_length': 320,
        'window': 'hann',
        'win_length': 1024,
        'f_min': 50.0,
        'f_max': 14000.0,
        'n_mels': 64,
      }
      train_ds = CustomDataset(sample_rate=sample_rate)
      feature_extractor = LogMelSpectrogram(**feat_conf)
      train_sampler = paddle.io.DistributedBatchSampler(
          train_ds, batch_size=64, shuffle=True, drop_last=False)
      train_loader = paddle.io.DataLoader(
          train_ds,
          batch_sampler=train_sampler,
          return_list=True,
          use_buffer_reader=True)
      

      四、模型训练

      1.选取预训练模型

      选取cnn14作为>

      from paddlespeech.cls.models import cnn14
      backbone = cnn14(pretrained=True, extract_embedding=True)
      

      2.构建分类模型

      SoundClassifer接收cnn14作为backbone模型,并创建下游的分类网络:

      import paddle.nn as nn
      class SoundClassifier(nn.Layer):
          def __init__(self, backbone, num_class, dropout=0.1):
              super().__init__()
              self.backbone = backbone
              self.dropout = nn.Dropout(dropout)
              self.fc = nn.Linear(self.backbone.emb_size, num_class)
          def forward(self, x):
              x = x.unsqueeze(1)
              x = self.backbone(x)
              x = self.dropout(x)
              logits = self.fc(x)
              return logits
      model = SoundClassifier(backbone, num_class=len(train_ds.label_list))
      

      3.finetune

      # 定义优化器和 Loss
      optimizer = paddle.optimizer.Adam(learning_rate=1e-4, parameters=model.parameters())
      criterion = paddle.nn.loss.CrossEntropyLoss()
      
      from paddleaudio.utils import logger
      epochs = 20
      steps_per_epoch = len(train_loader)
      log_freq = 10
      eval_freq = 10
      for epoch in range(1, epochs + 1):
          model.train()
          avg_loss = 0
          num_corrects = 0
          num_samples = 0
          for batch_idx, batch in enumerate(train_loader):
              waveforms, labels = batch
              feats = feature_extractor(waveforms)
              feats = paddle.transpose(feats, [0, 2, 1])  # [B, N, T] -> [B, T, N]
              logits = model(feats)
              loss = criterion(logits, labels)
              loss.backward()
              optimizer.step()
              if isinstance(optimizer._learning_rate,
                            paddle.optimizer.lr.LRScheduler):
                  optimizer._learning_rate.step()
              optimizer.clear_grad()
              # Calculate loss
              avg_loss += loss.numpy()[0]
              # Calculate metrics
              preds = paddle.argmax(logits, axis=1)
              num_corrects += (preds == labels).numpy().sum()
              num_samples += feats.shape[0]
              if (batch_idx + 1) % log_freq == 0:
                  lr = optimizer.get_lr()
                  avg_loss /= log_freq
                  avg_acc = num_corrects / num_samples
                  print_msg = 'Epoch={}/{}, Step={}/{}'.format(
                      epoch, epochs, batch_idx + 1, steps_per_epoch)
                  print_msg += ' loss={:.4f}'.format(avg_loss)
                  print_msg += ' acc={:.4f}'.format(avg_acc)
                  print_msg += ' lr={:.6f}'.format(lr)
                  logger.train(print_msg)
                  avg_loss = 0
                  num_corrects = 0
                  num_samples = 0
      

      [2022-08-24 02:20:49,381] [   TRAIN] - Epoch=17/20, Step=10/15 loss=1.3319 acc=0.4875 lr=0.000100
      [2022-08-24 02:21:08,107] [   TRAIN] - Epoch=18/20, Step=10/15 loss=1.3222 acc=0.4719 lr=0.000100
      [2022-08-24 02:21:08,107] [   TRAIN] - Epoch=18/20, Step=10/15 loss=1.3222 acc=0.4719 lr=0.000100
      [2022-08-24 02:21:26,884] [   TRAIN] - Epoch=19/20, Step=10/15 loss=1.2539 acc=0.5125 lr=0.000100
      [2022-08-24 02:21:26,884] [   TRAIN] - Epoch=19/20, Step=10/15 loss=1.2539 acc=0.5125 lr=0.000100
      [2022-08-24 02:21:45,579] [   TRAIN] - Epoch=20/20, Step=10/15 loss=1.2021 acc=0.5281 lr=0.000100
      [2022-08-24 02:21:45,579] [   TRAIN] - Epoch=20/20, Step=10/15 loss=1.2021 acc=0.5281 lr=0.000100 

      五、模型训练

      top_k = 3
      wav_file = 'test/test_0.wav'
      n_fft = 1024
      win_length = 1024
      hop_length = 320
      f_min=50.0
      f_max=16000.0
      waveform, sr = load(wav_file, sr=sr)
      feature_extractor = LogMelSpectrogram(
          sr=sr, 
          n_fft=n_fft, 
          hop_length=hop_length, 
          win_length=win_length, 
          window='hann', 
          f_min=f_min, 
          f_max=f_max, 
          n_mels=64)
      feats = feature_extractor(paddle.to_tensor(paddle.to_tensor(waveform).unsqueeze(0)))
      feats = paddle.transpose(feats, [0, 2, 1])  # [B, N, T] -> [B, T, N]
      logits = model(feats)
      probs = nn.functional.softmax(logits, axis=1).numpy()
      sorted_indices = probs[0].argsort()
      msg = f'[{wav_file}]\n'
      for idx in sorted_indices[-1:-top_k-1:-1]:
          msg += f'{train_ds.label_list[idx]}: {probs[0][idx]:.5f}\n'
      print(msg)    
      

       [test/test_0.wav]
      diaper: 0.50155
      sleepy: 0.41397
      hug: 0.05912

      六、注意事项

        1.自定义数据集,格式可参考文档;2.统一音频尺寸(例如音频长度、采样频率)

        以上就是python>