국비 데이터 분석 3차 _ 서비스 3_ 모델링
https://tech.kakao.com/2020/04/29/kakaoarena-3rd-part1/
멜론에서 음악 추천을 어떻게 할까? – 카카오 아레나 3회 대회(Part.1)
이 글은 멜론에서 음악 추천을 어떻게 할까? 카카오 아레나 3회 대회(Part.1)라는 이름으로 카카오 정책산업연구 브런치에 동시 개제되었습니다. 자신이 잘 모르는 분야에서 새로운 도전을 할 때
tech.kakao.com
위의 글을 보면 오디오 분석시, 그림처럼 직접 형태를 변환해 멜스펙트로그램을 만든 후 이를 end to end 형태로 넣으면 분류를 할 수 있다고 한다.
(이 부분은 ipynb 파일이 통채로 날라가서 말로만 설명한다)
이를 이용해 먼저 음원 데이터를 받아 wav 파일로 바꾸고, 순서와 상관이 없는 lstm에 넣어 분석시켰다.
학습을 하기 위해 곡 제목, 곡 아티스트, score가 있는 dataframe을 먼저 가져왔다.
이를 spotify에서 제대로 검색을 하기 위해, 약간의 사전처리를 해주고(알파벳 첨자 통일화, 소문자 일괄화 등)
스포티파이 api로 미리듣기 소스를 검색해준다.
client_credentials_manager = SpotifyClientCredentials(client_id=self.client_id, client_secret=self.client_secret)
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)
song = sp.search(q=f'track:{노래제목} artist:{가수명}', type='track')
id_ =song['id']
track = sp.track(id_)
src = track['preview_url']
wget으로 mp3를 받고
import wget
file = wget.download(src, out=out_path)
이 mp3 파일을 wav로 바꾼 뒤 signal을 뽑아낸다.
import numpy
import scipy.io.wavfile
from scipy.fftpack import dct
file_path = out_path
_, signal = scipy.io.wavfile.read(file_path) # wav 파일 가져오기
signal = signal[int(0 * sample_rate):int(180 * sample_rate)]
이 신호를 melspectrogram으로 변환하여 lstm 모델에 돌리면 된다.
melspectrogram은 librosa를 이용했다.
df를 불러오고
model을 불러오고
#스포티파이
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from spotipy.oauth2 import SpotifyOAuth
# mp3 저장
import wget
# pandas, numpy
import pandas as pd
import numpy as np
import os
from pydub import AudioSegment
import scipy.io.wavfile
from scipy.fftpack import dct
import librosa
import tensorflow as tf
from tensorflow.python.keras.models import Sequential
from tensorflow.python.keras.layers import LSTM, Dense
from keras.callbacks import EarlyStopping
signal을 뽑아 pre-emp-filter 통과시키고, 이 데이터를 mel spactrogram으로 바꾼다. lstm의 input 형태가 (sample, time_step, features) 이므로 이 모양을 만들기 위해 shape를 변환시켜준다.
미리듣기가 대부분의 곡이 30초이지만, 가끔 30초가 안되는 곡이 있기 때문에 이런 곡들은 padding작업을 통해 뒷부분을 0으로 채워준다.
# pre-emphasis filter 생성
def preEmpFilter(signal):
# 맥시마이저 되어있는 소리를 pre-emphasis filter 로 풀어주기
pre_emphasis = 0.97 # 또는 0.95 0.9357
return np.append(signal[0], signal[1:] - pre_emphasis * signal[:-1])
# 서비스4를 위한 전처리
def Service4(out_path):
wav_info = {}
# 0초 ~ 16초 : 최저 4마디 기준
sound = AudioSegment.from_file(out_path)
# start_time = 0 * 1000
# end_time = (1*29) * 1000
# sound = sound[start_time:end_time]
input_wav = out_path[:-4] + '.wav'
#wav 파일 생성
sound.export(input_wav, format='wav')
# signal 뽑기
_, signal = scipy.io.wavfile.read(input_wav)
# preEmp 필터 통과
signal = preEmpFilter(signal)
wav_info = {
'signal' : signal,
}
# dataframe으로 생성
df = pd.DataFrame([wav_info])
# 멜스펙트럼으로 바꾸기
df['signal'] = df['signal'].apply(lambda x : librosa.feature.melspectrogram(y=x, sr=44100//2))
# LSTM input shape에 맞춰서 signal 변환
df['signal'] = df['signal'].apply(lambda x : np.array(x.T.tolist()))
return df
def paddingData(df):
data = []
max_time_steps = 5119
# (samples, time_steps, features)
# 샘플 수, 시간, 128
signal = df.iloc[0]['signal']
time_steps, features = signal.shape
# if time_steps > max_time_steps:
# max_time_steps = time_steps
signal = np.expand_dims(signal, axis=0) # samples=1로 만듦
data.append(signal)
samples = len(data)
features = data[0].shape[2]
# 초기화
X = np.zeros((samples, max_time_steps, features))
# sample만큼 반복
for k in range(samples):
# 패딩 작업
time_steps = data[k].shape[1]
padding_size = max_time_steps - time_steps
if padding_size > 0:
padded_signal = np.pad(data[k], ((0, 0), (0, padding_size), (0, 0)), mode='constant')
else:
padded_signal = data[k][:,:max_time_steps,:]
X[k] = padded_signal
return X
lstm 모델을 만들고 한번에 돌리면 메모리 오류가 났다. lstm은 이전 기억을 가져와서 학습시키기 때문에 곡들을 하나씩 받아서 fit만 돌리는 형태로 새로 작업을 했다. 0보다 작아지는 경우가 없기 떄문에 sigmoid를 사용했다.(relu 사용시 결과 뻥튀기가 심해졌다.)
model4 = Sequential()
model4.add(LSTM(64, input_shape=(5119, 128), return_sequences=True))
model4.add(LSTM(32))
model4.add(Dense(1, activation='sigmoid'))
model4.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
early_stopping = EarlyStopping(monitor='loss', min_delta=0.3)
# model.fit(X4data, y, epochs=30, batch_size=1, verbose=2, callbacks=[early_stopping])
미리보기 하나 검색 후 mp3 받아 wav로 바꾼후, 필요없어진 mp3,wav파일을 지우는 동작을 중간에 넣었다.
%%time
for idx, v in enumerate(df.values):
try:
print('='*20)
print(f'{idx} :: -- 시 작 --')
song = sp.search(q=f'track:{v[0]} artist:{v[1]}', type='track')
title = v[0]
artist = v[1]
print(f'{v[0]} // {v[1]} :: 시도')
src=song['tracks']['items'][0]['preview_url']
out_path = f'./0510_last/{title}_{artist}.mp3'
wget.download(src, out=out_path)
df4 = Service4(out_path)
X4data = paddingData(df4)
print('='*20)
y = np.array([v[2]])
# validation 넣은
model4.fit(X4data, y, epochs=13, batch_size=1, verbose=2)
model4.save('end_to_end_final512.h5')
os.remove(out_path)
os.remove(out_path[:-4]+'.wav')
print('*'*20)
print(f'{idx} ::-- 완 료 --')
except Exception as e:
print(e)
print(f'{v[0]} // {v[1]} // {idx} :: 에러 났음')
print('-'*20)
이렇게 early_stopping은 loss로 두면 사실상 의미가 없고, 데이터를 하나씩 넣어가면서 학습 시키기 때문에 test나 validation을 나눌 수 없는 상황이라서 빼주었다. 나중에는 분할을 적게, 그리고 한번에(집 컴퓨터로 아슬아슬하게 돌아갔다.) 새로 모델을 돌렸는데 전자가 제일 괜찮은 결과가 나와서 최종적으로 사용했다.
%%time
# model = Sequential()
# model.add(LSTM(64, input_shape=(3001, 128), return_sequences=True))
# model.add(LSTM(32))
# model.add(Dense(1, activation='sigmoid'))
# model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
early_stopping = EarlyStopping(monitor='val_loss', patience=2)
for i in range(7):
data = []
print(i,'번 분할 파일 작업 중')
for iii in range(6, len(os.listdir(f'{path}/{i}'))//90 + 1):
df_all = pd.DataFrame()
data = []
print(f'{iii*90}~{(iii+1)*90} 까지')
for ls in os.listdir(f'{path}/{i}')[iii*90:(iii+1)*90]:
# load
with open(f'{path}\\{i}\\{ls}', 'rb') as f:
df = pickle.load(f)
df_all = pd.concat([df_all, df])
df_all.reset_index(drop=True,inplace=True)
# 'signal' 컬럼의 데이터를 3D 텐서로 변환
print('파일 불러오기 완료!')
print('-'*30)
print('데이터 전처리 중')
max_time_steps = 0
for j in range(len(df_all)):
signal = df_all.iloc[j]['signal']
time_steps, features = signal.shape
if time_steps > max_time_steps:
max_time_steps = time_steps
signal = np.expand_dims(signal, axis=0) # samples=1로 만듦
data.append(signal)
print('데이터 전처리 완료')
print('-'*30)
# max_time_steps 값 출력
print(f'max_time_steps: {max_time_steps}')
print('-'*30)
# 모든 3D 텐서를 수직으로 쌓아서 (samples, time_steps, features) shape의 3D 텐서로 만듦
# 906, 시간, 128
# print(data)
samples = len(data)
print(samples)
features = data[0].shape[2]
# print(samples)
# print(features)
X = np.zeros((samples, max_time_steps, features))
for k in range(samples):
time_steps = data[k].shape[1]
padding_size = max_time_steps - time_steps
padded_signal = np.pad(data[k], ((0, 0), (0, padding_size), (0, 0)), mode='constant')
X[k] = padded_signal
# print(X)
df_all['score'] = df_all['score'].fillna(0)
# 'score' 컬럼은 레이블로 사용
y = df_all['score'].values
print('학습모델 분할 완료')
print('-'*30)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=4)
print('='*30)
print('학습 시작!')
print('='*30)
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=30, batch_size=1, verbose=2, callbacks=[early_stopping])
print('*'*30)
print('저장 중!')
model.save(f'end_to_end_final32.h5')
print('='*30)
print('저장 완료!')
print('*'*30)
print('*'*30)