데이터 준비하기2

4 minute read

이전 포스팅에서는 시간 영역에서 파형을 관찰하는 방법을 다뤘었고, 이번 포스팅에서는 음성데이터를 배열로 받아서 악기소리(노이즈)가 상대적으로 강한 구간과 약한 구간을 관찰하는 내용을 다룬다.

train data, teacher data 준비방법

  • Vocal remover를 통해 Original -> Instruments, Vocals 파일로 분리한다.
  • Vocal remover는 악기소리(Instruments)에는 굉장히 민감하게 반응하므로 필터링 된 Instruments 파일로부터 특정 임계값을 설정한 후 해당 값보다 큰 구간(악기소리가 강한 구간)은 1로, 해당 값보다 작은 구간(악기소리가 약한 구간)은 0으로 나타낸다.
  • Original 파일로부터 악기소리가 약한 구간(0인 구간)만을 파싱하고, 이어붙인 후 새로운 음성 데이터로 추출한다. 그리고 이 음성데이터가 teacher data가 된다.
  • train data는 teacher data에 noise를 입히는 과정을 통해 제작한다. 이번에는 Instruments 파일로부터 악기소리가 강한 구간(1인 구간)을 파싱해서 teacher data와 합치는 아이디어로 제작했다. train data와 noise 의 시간대역이 서로 겹치는 일이 없기 때문에 original data와도 차별점이 있고 다양성도 만족시킬 수 있다고 판단했다.

소스코드


import argparse
import librosa
import librosa.display
import matplotlib.pyplot as plt
import numpy as np
import wave
from pydub import AudioSegment

class Signal_extractor(object):

    def __init__(self):
        pass

    def Instrument_PostProcessor(path_i, path_o, file_name, th_type):
        input_I = path_i + "\\" + file_name + "_Instruments_mono.wav"
        spf_I = wave.open(input_I, "r")
        signal_I = spf_I.readframes(-1) # 모든 프레임 읽어 들이기
        signal_I = np.fromstring(signal_I, np.int16)
        fr = spf_I.getframerate()
        l_size = signal_I.size
        
        input_origin = path_o + "\\" + file_name + "_mono.wav"
        spf_O = wave.open(input_origin, "r")
        signal_origin = spf_O.readframes(-1)
        signal_origin = np.fromstring(signal_origin, np.int16)


        #TI : Time Interval setting 0.1 second
        TI_size = int(signal_I.size/(fr/10))+1

        print("Instruments extraction")

        # Positive value filter
        pos_signal_I = np.array(signal_I, np.int16)

        for i in range(0, signal_I.size, 1):
            value = signal_I[i]
            if(value<0): pos_signal_I[i] = 0

        # Positive value 0.1s Summation
        TI_pos_signal_I = np.zeros(TI_size, np.int32)

        k=0
        index=0
        pos_v=0
        for i in range(0,signal_I.size,1):
            if (i==signal_I.size-1):
                pos_v=pos_v+pos_signal_I[i]
                TI_pos_signal_I[index]=pos_v
            elif (k<(fr/10)):
                k=k+1
                pos_v=pos_v+pos_signal_I[i]
            else: # k == fr/10 일때마다 TI_pos_signal_I 에 누적합(pos_v)을 더한다.
                k=0;
                TI_pos_signal_I[index]=pos_v
                pos_v=0
                index=index+1

        #TI Time Interval
        TI_pos_bool_I=np.zeros(TI_size,np.uint32)
        s_size=TI_pos_bool_I.size
        TI_th_I = 200000 # Threshold
        if (th_type==0):
            TI_th_I = np.mean(TI_pos_signal_I)*0.2 

        for i in range(0,TI_size,1):
            if (TI_pos_signal_I[i]<(TI_th_I)):
                TI_pos_bool_I[i]=0;
            else:
                TI_pos_bool_I[i]=1;

        #I_filtering
        f_index=np.array([]) #false index
        t_index=np.array([]) #true  index

        k=30 #number of counting
        for i in range(0,TI_size-k,k):
            num=0
            for j in range(i,i+k,1):
                if(TI_pos_bool_I[j]==1):
                    num=num+1
                if (num<(k/2)):
                    f_index=np.append(f_index,[i],0);
                else:
                    t_index=np.append(t_index,[i],0);    

        for i in range(0,f_index.size,1):
            for j in range(int(f_index[i]),int(f_index[i])+k,1):
                TI_pos_bool_I[j]=0
        for i in range(0,t_index.size,1):
            for j in range(int(t_index[i]),int(t_index[i])+k,1):
                TI_pos_bool_I[j]=1
        
        #####bool to original size
        TI_pos_bool_I_large=np.zeros(signal_I.size,np.int16)
        for i in range(0,l_size,1):
            TI_pos_bool_I_large[i]=TI_pos_bool_I[int((i/l_size)*s_size)];
        
        
        start_idx = 0
        end_idx = 0

        nchannels = spf_O.getnchannels()
        sampwidth = spf_O.getsampwidth()
        nframes = spf_O.getnframes()
        comptype = spf_O.getcomptype()
        compname = spf_O.getcompname()

        cnt = 0
        i = 0
        final_signal = np.array([], dtype=np.int16)
        while i < l_size-1:
            if TI_pos_bool_I_large[i]==0:
                cnt = cnt + 1
                start_idx = i
                i = i + 1
                while TI_pos_bool_I_large[i] == 0:
                    end_idx = i
                    if i >= l_size-1:
                        break
                    i = i + 1
            
                filtered_signal = signal_origin[start_idx:end_idx]
                final_signal = np.append(final_signal, filtered_signal, axis=0)
                
            else:
                i = i + 1

        if final_signal.size >= 190000: # 길이가 너무 짧은 영상은 추출x
            wav_file = wave.open(path_i+"\\dataset\\instruments_mono\\"+file_name+"_inst.wav", "w")
            wav_file.setparams((nchannels, sampwidth, fr, nframes, comptype, compname))
            wav_file.writeframes(final_signal)
            wav_file.close()

    def Noise_extractor(path_i, path_o, file_name, th_type):
        input_I = path_i + "\\" + file_name + "_Instruments_mono.wav"
        spf_I = wave.open(input_I, "r")
        signal_I = spf_I.readframes(-1) # 모든 프레임 읽어 들이기
        signal_I = np.fromstring(signal_I, np.int16)
        fr = spf_I.getframerate()
        l_size = signal_I.size
        
        input_origin = path_o + "\\" + file_name + "_mono.wav"
        spf_O = wave.open(input_origin, "r")
        signal_origin = spf_O.readframes(-1)
        signal_origin = np.fromstring(signal_origin, np.int16)


        #TI : Time Interval setting 0.1 second
        TI_size = int(signal_I.size/(fr/10))+1

        print("Instruments extraction")

        # Positive value filter
        pos_signal_I = np.array(signal_I, np.int16)

        for i in range(0, signal_I.size, 1):
            value = signal_I[i]
            if(value<0): pos_signal_I[i] = 0

        # Positive value 0.1s Summation
        TI_pos_signal_I = np.zeros(TI_size, np.int32)

        k=0
        index=0
        pos_v=0
        for i in range(0,signal_I.size,1):
            if (i==signal_I.size-1):
                pos_v=pos_v+pos_signal_I[i]
                TI_pos_signal_I[index]=pos_v
            elif (k<(fr/10)):
                k=k+1
                pos_v=pos_v+pos_signal_I[i]
            else: # k == fr/10 일때마다 TI_pos_signal_I 에 누적합(pos_v)을 더한다.
                k=0;
                TI_pos_signal_I[index]=pos_v
                pos_v=0
                index=index+1

        #TI Time Interval
        TI_pos_bool_I=np.zeros(TI_size,np.uint32)
        s_size=TI_pos_bool_I.size
        TI_th_I = 200000 # Threshold
        if (th_type==0):
            TI_th_I = np.mean(TI_pos_signal_I)*0.2 

        for i in range(0,TI_size,1):
            if (TI_pos_signal_I[i]<(TI_th_I)):
                TI_pos_bool_I[i]=0;
            else:
                TI_pos_bool_I[i]=1;

        #I_filtering
        f_index=np.array([]) #false index
        t_index=np.array([]) #true  index

        k=30 #number of counting
        for i in range(0,TI_size-k,k):
            num=0
            for j in range(i,i+k,1):
                if(TI_pos_bool_I[j]==1):
                    num=num+1
                if (num<(k/2)):
                    f_index=np.append(f_index,[i],0);
                else:
                    t_index=np.append(t_index,[i],0);    

        for i in range(0,f_index.size,1):
            for j in range(int(f_index[i]),int(f_index[i])+k,1):
                TI_pos_bool_I[j]=0
        for i in range(0,t_index.size,1):
            for j in range(int(t_index[i]),int(t_index[i])+k,1):
                TI_pos_bool_I[j]=1
        
        #####bool to original size
        TI_pos_bool_I_large=np.zeros(signal_I.size,np.int16)
        for i in range(0,l_size,1):
            TI_pos_bool_I_large[i]=TI_pos_bool_I[int((i/l_size)*s_size)];
        
        
        start_idx = 0
        end_idx = 0

        nchannels = spf_O.getnchannels()
        sampwidth = spf_O.getsampwidth()
        nframes = spf_O.getnframes()
        comptype = spf_O.getcomptype()
        compname = spf_O.getcompname()


        i = 0
        final_signal = np.array([], dtype=np.int16)
        while i < l_size-1:
            if TI_pos_bool_I_large[i]==1:
                start_idx = i
                i = i + 1
                while TI_pos_bool_I_large[i] == 1:
                    end_idx = i
                    if i >= l_size-1:
                        break
                    i = i + 1
            
                filtered_signal = signal_I[start_idx:end_idx]
                final_signal = np.append(final_signal, filtered_signal, axis=0)
                
            else:
                i = i + 1

        wav_file = wave.open(path_i+"\\dataset\\noises\\"+file_name+"_noise.wav", "w")
        wav_file.setparams((nchannels, sampwidth, fr, nframes, comptype, compname))
        wav_file.writeframes(final_signal)
        wav_file.close()

def main():
    p = argparse.ArgumentParser()
    p.add_argument('--pathi', '-P', type=str, default='.')
    p.add_argument('--patho', '-po', type=str, default='.')
    p.add_argument('--input', '-i', required=True)
    p.add_argument('--th', '-t', type=int, default=0)

    args = p.parse_args()
    path_i = args.pathi
    path_origin = args.patho
    file_name = args.input
    th_type = args.th

    Signal_extractor.Instrument_PostProcessor(path_i, path_origin, file_name, th_type)
    Signal_extractor.Noise_extractor(path_i, path_origin, file_name, th_type)

if __name__ == '__main__':
    main()

  • 노이즈를 섞는 소스코드는 다음 포스팅에서 확인할 수 있다.
  • 모든 소스코드는 Github에서 확인할 수 있다. Github

Leave a comment