numpyでスペクトル包絡の算出(ケプストラム分析)

・引用/データ

メル周波数ケプストラム(MFCC) - Miyazawa’s Pukiwiki

 

・スペクトル

import numpy as np
from scipy.io import wavfile
import matplotlib.pyplot as plt

# waveファイル入力
sample_rate, wave_data_orig = wavfile.read('a.wav')

# 切り抜き
cuttime = 0.04 # 切り出し長さ0.04[s]
N = int(sample_rate * cuttime) # サンプル数1764
center = int(wave_data_orig.shape[0] / 2)
wave_data = wave_data_orig[center - int(N / 2): center + int(N / 2)]

# 離散フーリエ変換
fourier = np.fft.fft(wave_data)
amp = np.abs(fourier)

# 表示
freq = np.linspace(0, sample_rate, N)
plt.plot(freq[:int(N/10)], amp[:int(N/10)])
plt.show()

f:id:LeMU_Research:20201102220955p:plain

・対数スペクトル

解析対象の音声が、音源と声道フィルタのたたみ込みで生成されているとする。

時間領域のたたみ込みは、周波数領域では積になる。

対数をとると積は和になり、音源特性と声道特性の重ね合わせを可視化できる。

# スペクトルのlog
log_amp = np.log10(amp)
plt.plot(freq[:int(N/10)], log_amp[:int(N/10)])
plt.show()

f:id:LeMU_Research:20201102221753p:plain

・ケプストラム/スペクトル包絡

対数スペクトルをフーリエ変換したものがケプストラム。

 ※スペクトルのフーリエ変換=逆フーリエ変換

ケプストラムの高周波成分をカットして周波数成分に戻し、対数を外すと、声道特性(スペクトル包絡)が抽出できる。

# ケプストラム/スペクトル包絡
cps = np.fft.ifft(log_amp).real
cps[120:cps.shape[0] - 119] = 0
envelop = abs(10 ** np.fft.fft(cps))
plt.plot(freq[:int(N/10)], envelop[:int(N/10)])
plt.show()

f:id:LeMU_Research:20201102223324p:plain

 

離散フーリエ変換への最短経路

サンプリング数Nの離散時間信号をフーリエ変換したい。

 

f:id:LeMU_Research:20201102003214p:plain

引用:6. 離散フーリエ変換 (やる夫で学ぶディジタル信号処理)

 

入出力が離散信号でないといけないので、離散フーリエ変換を利用する。

本来、離散フーリエ変換は周期時間信号にしか適用できないが、周期Nとして適用。

周波数領域(角速度)の間隔は2πf=2π/N。周期はNに依らず2π。

 

周波数f[Hz]のsin波をサンプリングして入力したい。

サンプリングレートをrとすると、y=sin(w*t)=sin(2πf*t)=sin(2πf*r*n)

 

コードは以下参照。

【NumPy】高速フーリエ変換 (FFT)で振幅スペクトルを計算 | 西住工房

 

最後に、周波数領域を角速度ではなく、周波数[Hz]で表したい。

角速度の範囲は[0, 2π]、角速度2πということは1サンプル(r)で1周期。

f=1/T=1/r。r=0.01[秒]だとすると、周波数の最大値は1/0.01=100[Hz]。

Juliusの認識結果をpythonで受信する

・Juliusサーバの起動

.\bin\windows\julius.exe -C main.jconf -C am-dnn.jconf -dnnconf julius.dnnconf -module

 

Pythonクライアントコード

import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('127.0.0.1', 10500))

while True:
# 音声認識結果の終わりまでデータ(XML形式)を受信して溜めておく
data = ""
while data.find("</RECOGOUT>\n.") == -1:
soc = s.recv(1024)
data = data + soc.decode('utf-8')

# データから単語部分のみを抜き出して連結
recog_text = ""
for line in data.split('\n'):
idx = line.find('WORD="')
if idx != -1:
start_idx = idx + 6
end_idx = line.find('"', start_idx)
recog_text += line[start_idx:end_idx]

print("認識結果: " + recog_text)

 

参考ページ

techacademy.jp

pythonでsocket通信

・サーバ

import socket

# ソケット作成
# AF_INET : IPv4
# SOCK_STREAM : TCP
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:

# 接続待ちするIPアドレスとポートを指定
s.bind(('127.0.0.1', 50007))

# 接続待ち数1
s.listen(1)

# 接続待ち
connection, address = s.accept()

with connection:
while True:
# データ受信
data = connection.recv(1024)
if not data:
break

# 受信データを表示
print('data : {}, address: {}'.format(data.decode('utf-8'), address))

# 返信
connection.sendall(b'Received: ' + data)

 

・クライアント

import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:

# サーバのIPアドレスとポートを指定
s.connect(('127.0.0.1', 50007))

# サーバにデータを送る
s.sendall(b'hello')

# サーバからのデータ待ち
data = s.recv(1024)

# 受信データを表示
print(data.decode('utf-8'))

 

参考ページ

pykakasiでcsvファイルの中身をローマ字に変換

from pykakasi import kakasi
import pandas as pd
import re

kakasi = kakasi()
kakasi.setMode("H", "a") # ひらがな
kakasi.setMode("K", "a") # カタカナ
kakasi.setMode("J", "a") # 漢字
kakasi.setMode("r", "Hepburn") # ヘボン式ローマ字
conv = kakasi.getConverter()

df = pd.read_csv('Event01_karina.csv', header=None).dropna()
for idx in range(len(df)):
result = conv.do(df[1][idx])
result = re.sub('\W', '', result) # 記号を消す(正規表現)
print(result)

ESC50音声分類をシンプルなCNNでやってみた

※改良記事書きました。

メルスペクトラムとデータ水増しでESC50の精度を上げる - LeMU_Researchの日記

 

・npy化

import os
import pandas as pd
import numpy as np
from scipy.io import wavfile
import pyworld as pw
import cv2

data_dir = 'data'
df = pd.read_csv(os.path.join(data_dir, 'meta/esc50.csv'))

for idx in range(len(df)):
fname = df['filename'][idx]
sample_rate, signal_int = wavfile.read(os.path.join(data_dir, 'audio/' + fname))

# float化、正規化(スペクトルのオーバーフロー防止)
signal_float = signal_int.astype(np.float)
signal_float /= max(signal_float)

# スペクトル包絡
f0, t = pw.dio(signal_float, sample_rate) # 基本周波数の抽出
f0 = pw.stonemask(signal_float, f0, t, sample_rate) # refinement
sp = pw.cheaptrick(signal_float, f0, t, sample_rate) # スペクトル包絡の抽出

# リサイズ、正規化、3次元化
sp = cv2.resize(sp, (512, 512), interpolation=cv2.INTER_LINEAR)
sp /= np.max(sp)
sp = sp[np.newaxis, ...].astype('float32')

np.save('data/npy/' + fname[:-4], sp)

 

・学習

import os
import pandas as pd
import numpy as np
import torch
from torch import optim, nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm


class Down(nn.Module):

def __init__(self, in_channels, out_channels):
super().__init__()
self.maxpool_conv = nn.Sequential(
nn.MaxPool2d(2),
nn.Conv2d(in_channels, out_channels, kernel_size=3, dilation=1, padding=1),
nn.BatchNorm2d(out_channels),
nn.ReLU(inplace=True)
)

def forward(self, x):
return self.maxpool_conv(x)


class EscNet(nn.Module):

def __init__(self):
super().__init__()
self.features = nn.Sequential(
nn.Conv2d(1, 32, kernel_size=3, dilation=1, padding=1),
nn.BatchNorm2d(32),
nn.ReLU(inplace=True),
Down(32, 64),
Down(64, 128),
Down(128, 256),
Down(256, 512),
Down(512, 1024)
)
self.fc1 = nn.Linear(1024, 256)
self.fc2 = nn.Linear(256, 50)

def forward(self, x):
x = self.features(x)
x = F.avg_pool2d(x, kernel_size=x.size()[2:])
x = x.view(x.shape[0], -1)
x = F.relu(F.dropout2d(self.fc1(x)))
return self.fc2(x)


class EscDataset(Dataset):

def __init__(self, data_dir, train_flag):

self.data_dir = data_dir
self.df = pd.read_csv(os.path.join(self.data_dir, 'meta/esc50.csv'))
if train_flag:
self.df = self.df[self.df['fold'] != 5]
else:
self.df = self.df[self.df['fold'] == 5]
self.df = self.df.reset_index()

def __len__(self):
return len(self.df)

def __getitem__(self, idx):
fname = self.df['filename'][idx]
sp = np.load(os.path.join(self.data_dir, 'npy/' + fname[:-4] + '.npy'))
return sp, self.df['target'][idx]


def train_epoch(data_loader, model, criterion, optimizer):
model.train()
loss_sum = 0
for data, target in tqdm(data_loader):
data = data.cuda()
target = target.cuda()

output = model(data)
loss = criterion(output, target)

optimizer.zero_grad()
loss.backward()
optimizer.step()

loss_sum += loss.item()

loss_epoch = loss_sum / len(data_loader)
print('train_loss = ', loss_epoch)


def val_epoch(data_loader, model, criterion):
model.eval()
loss_sum = 0
correct = 0
data_num = 0
for data, target in tqdm(data_loader):
data = data.cuda()
target = target.cuda()

output = model(data)
loss = criterion(output, target)
loss_sum += loss.item()

_, preds = torch.max(output, axis=1)
correct += (preds == target).sum().item()
data_num += target.size(0)

loss_epoch = loss_sum / len(data_loader)
print('val_loss = ', loss_epoch)

accuracy = float(correct) / data_num
print('accuracy = ', accuracy)


def main():
data_dir = 'data'
train_dataset = EscDataset(data_dir, train_flag=True)
train_loader = DataLoader(dataset=train_dataset, batch_size=32, shuffle=True, num_workers=0)
val_dataset = EscDataset(data_dir, train_flag=False)
val_loader = DataLoader(dataset=val_dataset, batch_size=16, shuffle=False, num_workers=0)

model = EscNet().cuda()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
criterion = nn.CrossEntropyLoss()

for epoch in range(10):
train_epoch(train_loader, model, criterion, optimizer)
val_epoch(val_loader, model, criterion)

state = {'state_dict': model.state_dict()}
filename = os.path.join(data_dir, 'checkpoints/{0:04d}.pth.tar'.format(epoch))
torch.save(state, filename)


if __name__ == '__main__':
main()

 

test accuracyは8%弱までしか上がりませんでした。

pyworldでピッチ・フォルマントシフト(+wav入力、音声再生、周波数プロット)

import numpy as np
from scipy.io import wavfile
import simpleaudio as sa
import pyworld as pw
import matplotlib.pyplot as plt

# wavファイルの入力
sample_rate, data_int = wavfile.read('input.wav')

# 基本周波数、スペクトル包絡、非周期性指標の抽出
data_float = data_int.astype(np.float)
f0, t = pw.dio(data_float, sample_rate) # 基本周波数の抽出
f0 = pw.stonemask(data_float, f0, t, sample_rate) # refinement
sp = pw.cheaptrick(data_float, f0, t, sample_rate) # スペクトル包絡の抽出
ap = pw.d4c(data_float, f0, t, sample_rate) # 非周期性指標の抽出

# ピッチシフト
f0_rate = 1.5
f0_modified = modified_f0 = f0_rate * f0

# フォルマントシフト
sp_rate = 0.75
sp_modified = np.zeros_like(sp)
sp_range = int(sp_modified.shape[1] * sp_rate)
for f in range(sp_modified.shape[1]):
sp_modified[:, f] = sp[:, int(sp_rate * f)]

# 再合成
synth_float = pw.synthesize(f0_modified, sp_modified, ap, sample_rate)
synth_float *= 32767 / max(abs(synth_float))
synth_int = synth_float.astype(np.int16)

# 再生
play_obj = sa.play_buffer(data_int, 1, 2, sample_rate)
play_obj.wait_done()
play_obj = sa.play_buffer(synth_int, 1, 2, sample_rate)
play_obj.wait_done()

# 基本周波数のプロット
plt.plot(f0)
plt.show()

 

参考ページ

PyAudioとPyWorldで音声を逐次分析合成しつづけるPythonスクリプト - 備忘録