メルスペクトラムとデータ水増しでESC50の精度を上げる

以下の記事の改良。

test accuracyが8%から30%くらいまで上がりました。

ESC50音声分類をシンプルなCNNでやってみた - LeMU_Researchの日記

 

import os
import pandas as pd
import numpy as np
import random
import librosa
import torch
from torch import optim, nn
from torch.nn.functional import avg_pool2d
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import torch.utils.tensorboard as tensorboard


def add_white_noise(x, rate=0.002):
return x + rate * np.random.randn(len(x))


def shift_sound(x, rate=2):
return np.roll(x, int(len(x) // rate))


def stretch_sound(x, rate=1.1):
input_length = len(x)
x = librosa.effects.time_stretch(x, rate)
if len(x) > input_length:
return x[:input_length]
else:
return np.pad(x, (0, max(0, input_length - len(x))), "constant")


def make_cba(in_channels, out_channels, kernel_size, stride):
return nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size, stride),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True)
)


class EscDataset(Dataset):

def __init__(self, data_dir, train_flag):

self.data_dir = data_dir
self.train_flag = train_flag

self.df = pd.read_csv(os.path.join(self.data_dir, 'meta/esc50.csv'))
if train_flag:
self.df = self.df[self.df['fold'] != 5]
else:
self.df = self.df[self.df['fold'] == 5]
self.df = self.df.reset_index()

def __len__(self):
return len(self.df)

def __getitem__(self, idx):

# waveファイルの読み込み
fname = self.df['filename'][idx]
data, sr = librosa.load(os.path.join(self.data_dir, 'audio/' + fname), sr=44100)

# Data Augmentation
if self.train_flag:
rand1 = random.randint(0, 49)
rand2 = random.randint(0, 49)
rand3 = random.randint(0, 49)
if rand1 < 25:
data = add_white_noise(data, 0.0002 * rand1) # [0, 0.005)
if rand2 < 25:
data = shift_sound(data, rand2 // 5 + 2) # [2, 6]
if rand3 < 25:
data = stretch_sound(data, 1.0 + (rand3 - 12.5) / 72.5) # [0.8, 1.2)

# 短時間フーリエ変換
# n_fft:窓サイズ, hop_length:窓間の距離
# stft:[周波数サンプル数, 時間サンプル数], 周波数サンプル数=n_fft/2, 時間サンプル数=wave_data.shape[0]/hop_length
stft = np.abs(librosa.stft(data, n_fft=1024, hop_length=128)) ** 2

# メルスペクトラムの算出
# n_mels:周波数サンプル数
# melsp:[n_mels, 時間サンプル数]
log_stft = librosa.power_to_db(stft)
melsp = librosa.feature.melspectrogram(S=log_stft, n_mels=128)[np.newaxis, ...]

return melsp.astype('float32'), self.df['target'][idx]


class EscNet(nn.Module):

def __init__(self):
super().__init__()
self.cba1_1 = make_cba(1, 32, (1, 8), (1, 2))
self.cba1_2 = make_cba(32, 32, (8, 1), (2, 1))
self.cba1_3 = make_cba(32, 64, (1, 8), (1, 2))
self.cba1_4 = make_cba(64, 64, (8, 1), (2, 1))
self.cba2_1 = make_cba(1, 32, (1, 16), (1, 2))
self.cba2_2 = make_cba(32, 32, (16, 1), (2, 1))
self.cba2_3 = make_cba(32, 64, (1, 16), (1, 2))
self.cba2_4 = make_cba(64, 64, (16, 1), (2, 1))
self.cba3_1 = make_cba(1, 32, (1, 32), (1, 2))
self.cba3_2 = make_cba(32, 32, (32, 1), (2, 1))
self.cba3_3 = make_cba(32, 64, (1, 32), (1, 2))
self.cba3_4 = make_cba(64, 64, (32, 1), (2, 1))
self.fc = nn.Linear(64 * 3, 50)

def forward(self, x):
x1 = self.cba1_1(x)
x1 = self.cba1_2(x1)
x1 = self.cba1_3(x1)
x1 = self.cba1_4(x1)
x1 = avg_pool2d(x1, kernel_size=x1.size()[2:])
x2 = self.cba2_1(x)
x2 = self.cba2_2(x2)
x2 = self.cba2_3(x2)
x2 = self.cba2_4(x2)
x2 = avg_pool2d(x2, kernel_size=x2.size()[2:])
x3 = self.cba3_1(x)
x3 = self.cba3_2(x3)
x3 = self.cba3_3(x3)
x3 = self.cba3_4(x3)
x3 = avg_pool2d(x3, kernel_size=x3.size()[2:])
x = torch.cat([x1, x2, x3], dim=3)
x = x.view(x.shape[0], -1)
return self.fc(x)


def train_epoch(data_loader, model, criterion, optimizer, epoch, writer):
model.train()
loss_sum = 0
for data, target in tqdm(data_loader):
data = data.cuda()
target = target.cuda()

output = model(data)
loss = criterion(output, target)

optimizer.zero_grad()
loss.backward()
optimizer.step()

loss_sum += loss.item()

loss_epoch = loss_sum / len(data_loader)
print('train_loss = ', loss_epoch)
writer.add_scalar("train loss", loss_epoch, epoch)


def val_epoch(data_loader, model, criterion, epoch, writer):
model.eval()
loss_sum = 0
correct = 0
data_num = 0
for data, target in tqdm(data_loader):
data = data.cuda()
target = target.cuda()

output = model(data)
loss = criterion(output, target)
loss_sum += loss.item()

_, preds = torch.max(output, axis=1)
correct += (preds == target).sum().item()
data_num += target.size(0)

loss_epoch = loss_sum / len(data_loader)
print('val_loss = ', loss_epoch)

accuracy = float(correct) / data_num
print('accuracy = ', accuracy)

writer.add_scalar("val loss", loss_epoch, epoch)
writer.add_scalar("val accuracy", accuracy, epoch)


def main():

data_dir = 'data'
train_dataset = EscDataset(data_dir, train_flag=True)
val_dataset = EscDataset(data_dir, train_flag=False)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, num_workers=8)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=8)

model = EscNet().cuda()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
criterion = nn.CrossEntropyLoss()

writer = tensorboard.SummaryWriter(log_dir="logs")

for epoch in range(100):
train_epoch(train_loader, model, criterion, optimizer, epoch, writer)
val_epoch(val_loader, model, criterion, epoch, writer)

state = {'state_dict': model.state_dict()}
filename = 'checkpoints/{0:04d}.pth.tar'.format(epoch)
torch.save(state, filename)


if __name__ == '__main__':
main()

 

・参考

ディープラーニングで音声分類 - Qiita