import numpy as np import matplotlib.pyplot as plt import scipy from scipy import signal #---------------------Time domain NLMS adaptive filter--------------- def nlms(x, d, h, step_size0.5): N len(x) M len(h) u np.zeros(N) e np.zeros(N) # x前补零让x的第一个值也可以加入计算 x_padded np.pad(x,(M-1,0),modeconstant) for i in range(N): x_input x_padded[i:iM][::-1] power np.sum(x_input*x_input)1e-10 u[i] np.dot(x_input,h) e[i] d[i]-u[i] h step_size*e[i]*x_input/power return e,u,h #---------------------Frequency domain block NLMS adaptive filter--------------- def bnlms_freq(x, d, h, step_size0.5): M len(h) #滤波器长度 L 2*M #输入帧大小2*MM个历史数据M个新数据重叠大小也为M nb min(len(x),len(d))//M #计算块数向下取整 x x[:nb*M] #只保留整数块 d d[:nb*M] #只保留整数块 #初始化误差信号和输出信号 e np.zeros(len(x),dtypenp.float32) u np.zeros(len(x),dtypenp.float32) #滤波器频域初始化 Hf np.fft.rfft(h,nL) x_block np.zeros(L,dtypenp.float32) d_block np.zeros(M,dtypenp.float32) for n in range(nb): start_index n*M end_index (n1)*M #构建输入块 x_block[:M] x_block[M:] x_block[M:] x[start_index:end_index] #更新期望信号 d_block d[start_index:end_index] #FFT加速计算h和xblock的卷积u_block,取后M个有效点 Xf_block np.fft.rfft(x_block,nL) u_block np.fft.irfft(Xf_block*Hf)[M:] e_block d_block-u_block e[start_index:end_index] e_block u[start_index:end_index] u_block #通过e_block在频域计算梯度FFT加速,计算循环互相关 e_block_padded np.concatenate((np.zeros(M),e_block)) Ef_block np.fft.rfft(e_block_padded,nL) Gradf Ef_block*np.conj(Xf_block) power_norm np.abs(Xf_block)**21e-10 #频域更新滤波器系数 Hf step_size*Gradf/power_norm #parseval时域能量等于1/N频域能量 h np.fft.irfft(Hf,nL).real #将h后一半置0下一次循环使用 h[M:]0 Hf np.fft.rfft(h,nL) h np.fft.irfft(Hf,nL)[:M] return e,u,h #---------------------Partitioned Frequency domain block NLMS adaptive filter--------------- class PFDAF: def __init__(self,M,p,step_size): self.P p #滤波器分块个数 self.L 2*M #输入信号block size self.M M #输入信号block中的有效输入,滤波器分块后长度也为M self.N_fft 2*M self.N_freqs M1 self.step step_size self.x np.zeros(shape(2*self.M),dtypenp.float32) #滤波器分块后对应的不同延时的输入块频域延时为滤波器分块后的长度M self.Xf np.zeros(shape(p,self.N_freqs),dtypenp.complex64) self.Hf np.zeros(shape(p, self.N_freqs), dtypenp.complex64) self.h np.zeros(shape(self.P,self.M),dtypenp.float32) def filter(self,x,d): x:有效输入信号长度为M d:有效目标信号长度为M self.x[self.M:] x Xf np.fft.rfft(self.x) self.x[:self.M] self.x[self.M:] self.Xf[1:] self.Xf[:-1] self.Xf[0] Xf u np.fft.irfft(np.sum(self.Xf*self.Hf,axis0))[self.M:] e d - u return e,u def update(self,e): e_n np.concatenate((np.zeros(self.M),e)) Ef_n np.fft.rfft(e_n) Xf_power np.sum(np.abs(self.Xf)**2,axis0) 1e-10 grad Ef_n*self.Xf.conj()*self.step/Xf_power self.Hf grad for p in range(self.P): h np.fft.irfft(self.Hf[p]) h[self.M:] 0 self.h[p] h[:self.M] self.Hf[p] np.fft.rfft(h) def pfdaf(self,x,d,M,p,step_size): ft PFDAF(M,p,step_size) nb min(len(x),len(d)) // M #初始化误差和输出信号 e_total np.zeros(shape(nb*M),dtypenp.float32) u_total np.zeros(shape(nb*M),dtypenp.float32) for n in range(nb): x_n x[n*M:(n1)*M] d_n d[n*M:(n1)*M] e_n,u_n ft.filter(x_n,d_n) ft.update(e_n) e_total[n*M:(n1)*M] e_n u_total[n*M:(n1)*M] u_n h_total np.ravel(ft.h) return e_total, u_total, h_total #----------------------------------------辅助函数------------------------------------- def make_path(delay,length): 产生指数衰减的脉冲响应 Args: delay (_type_): 脉冲响应延迟 length (_type_): 脉冲响应总长度 Returns: _type_: _description_ path_length length-delay h0 np.zeros(shape(length),dtypenp.float32) h0[delay:] np.random.randn(path_length) * np.exp(np.linspace(0,-4,path_length)) h0 h0/np.sqrt(np.sum(h0**2)) return h0 def plot_converge(y, u, label): plot_converge绘制未知系统输出y和自适应滤波器输出u之间的误差 Args: y (_type_): 目标信号 u (_type_): 滤波器生成信号 label (str, optional): _description_. Defaults to . size len(u) avg_number 200 e np.power(y[:size] - u, 2) tmp e[:int(size/avg_number)*avg_number] tmp.shape -1, avg_number avg np.average( tmp, axis1 ) plt.plot(np.linspace(0, size, len(avg)), 10*np.log10(avg), linewidth2.0, labellabel) def diff_db(h0, h): 直接计算未知系统的传递函数h0和自适应滤波器的传递函数h之间的误差 Args: h0 (_type_): 未知系统传递函数 h (_type_): 自适应滤波器的传递函数 Returns: _type_: 误差 return 10*np.log10(np.sum((h0-h)*(h0-h)) / np.sum(h0*h0)) def sim_system_identify(filter, x, h0, step_size, noise_scale): _summary_ Args: filter (_type_): nlms算法的实现函数 x (_type_): 参照信号 h0 (_type_): 未知系统的传递函数 step_size (_type_): nlms算法的步长 noise_scale (_type_): 外部干扰的系数此系数决定外部干扰的大小0表示没有外部干扰 Returns: _type_: _description_ y np.convolve(x, h0) d y np.random.standard_normal(len(y)) * noise_scale # 添加白色噪声外部干扰的目标信号 h np.zeros(len(h0),dtypenp.float32) e,u,h filter(x, d, h, step_size) y y[:len(u)] return y, u, h def system_identify_test1(): h0 make_path(32, 256) # 随机产生一个未知系统的传递函数 x np.random.standard_normal(10240) y, u, h sim_system_identify(filterbnlms_freq, xx, h0h0, step_size0.25, noise_scale0.1) print (diff_db(h0, h)) plt.figure( figsize(8, 6) ) plt.subplot(211) plt.subplots_adjust(hspace0.4) plt.plot(h0, cr) plt.plot(h, cg,linestyle-) plt.title(u未知系统和收敛后的滤波器的系数比较) plt.subplot(212) plot_converge(y, u) plt.title(u自适应滤波器收敛特性) plt.xlabel(Iterations (samples)) plt.ylabel(Converge Level (dB)) plt.show() def system_identify_test2(): h0 make_path(32, 256) x np.random.standard_normal(10240) y np.convolve(x, h0) d y np.random.standard_normal(len(y)) * 0.1 pfdaf_instance PFDAF(64, 4, 0.3) e, u, h pfdaf_instance.pfdaf(x, d, 64, 4, 0.3) y y[:len(u)] print(diff_db(h0, h)) plt.figure( figsize(8, 6) ) plt.subplot(211) plt.subplots_adjust(hspace0.4) plt.plot(h0, cr) plt.plot(h, cg,linestyle-) plt.title(u未知系统和收敛后的滤波器的系数比较) plt.subplot(212) plot_converge(y, u) plt.title(u自适应滤波器收敛特性) plt.xlabel(Iterations (samples)) plt.ylabel(Converge Level (dB)) plt.show() system_identify_test1()