Commits

Michael Forbes  committed ccc8116

The filter.py file can be used to apply George's notch filter to a wave file.

  • Participants
  • Parent commits 12f19ba

Comments (0)

Files changed (2)

         self.max_freq = 14000 * _Hz
         self.dt = 1. / self.rate
 
-    def analyze(self, y, window='hamming'):
+    def analyze(self, y, window='hamming', log=True):
+        y = y.astype(complex)
         skip = max(1, int(np.ceil(self.max_freq * self.dt)))
         skip = 1
         print skip
 
         plt.subplot(122)
         inds = np.argsort(freqs)
-        plt.semilogy(freqs[inds], power[inds] + 1e-10, '-')
+        if log:
+            plt.semilogx(abs(freqs[inds]), 10 * np.log10(power[inds] + 1e-200),
+                         '-')
+        else:
+            plt.plot(abs(freqs[inds]), 10 * np.log10(power[inds] + 1e-200),
+                     '-')
         #plt.semilogy(freqs[inds], spower[inds] + 1e-10, '-')
         #plt.ylim([0, 1e-1])
         plt.xlabel('freq (Hz)')
-        plt.ylabel('power (rel)')
+        plt.ylabel('power (dB)')
         plt.draw()
 
     def get_t(self, T):
         self.b = [1.0]          # Redefine to define filter
         self.a = [1.0]          # Usually not changed.
 
-    def respose(self):
+    def response(self):
         r"""Compute the response of the filter."""
         w, a = sp.signal.freqz(self.a, self.b)
         return 2.0 * self.nyq * w / (2 * np.pi), a
         processing several consecutive signals, you should pass both the new
         signal and the initial conditions `zi`.  If `zi` is `None`, then assume
         that the signal was initially zero."""
+        if len(y.shape) == 2:
+            # Trivial vectorization.
+            ys = []
+            zis = []
+            _z = None
+            for _n in xrange(y.shape[1]):
+                if zi is not None:
+                    _z = zi[:, _n]
+                res = self.apply(y[:, _n], _z)
+                ys.append(res[0])
+                zis.append(res[1])
+            return np.asarray(ys).T, np.asarray(zis).T
+
         if zi is None:
             zi = sp.signal.lfiltic(b=self.b, a=self.a, y=0*y[:0], x=y)
         y_new, zi = sp.signal.lfilter(b=self.b, a=self.a, x=y, zi=zi)
-        return y_new, zi
+        return y_new.astype(y.dtype), zi
+
+    def test_float(self, T=0.1, seed=1):
+        r"""Test filtering of noise."""
+        import analyze
+        samples = int(T * self.rate)
+        np.random.seed(seed)
+        y1 = np.random.random((samples, 2)).astype(np.float32) - 0.5
+        y2, zi = self.apply(y1)
+
+        a1 = analyze.AnalyzeSignal(rate=self.rate, fignum=1)
+        a2 = analyze.AnalyzeSignal(rate=self.rate, fignum=2)
+        a1.analyze(y1[:, 0], log=False)
+        a2.analyze(y2[:, 0], log=False)
+        
+    def test_int(self, T=1.0, seed=1):
+        r"""Test filtering of noise."""
+        import analyze
+        samples = int(T * self.rate)
+        np.random.seed(seed)
+        int16max = 2.0**(15)
+        y1 = (np.random.random((samples, 2)) - 0.5) * int16max
+        y1 = y1.astype(np.int16)
+
+        # Apply filter at once
+        y2, zi = self.apply(y1)
+        
+        # Apply filter over chunks
+        chunk = 256*4
+        zi = None
+        n = 0
+        ys = []
+        while n < y1.shape[0]:
+            _y = y1[n:n+chunk, ::]
+            _yf, zi = self.apply(_y, zi)
+            ys.append(_yf)
+            n += chunk
+        y3 = np.vstack(ys)
+        assert y2.dtype == y1.dtype
+        assert y3.dtype == y1.dtype
+        assert np.allclose(y2, y3)
+
+        a1 = analyze.AnalyzeSignal(rate=self.rate, fignum=1)
+        a2 = analyze.AnalyzeSignal(rate=self.rate, fignum=2)
+        a1.analyze(y1[:, 0])
+        a2.analyze(y2[:, 0])
 
 
 class Notch(Filter):
-    def __init__(self, freq=7688.0, width=math.sqrt(2), numtaps=100,
-                 max_freq=9000, **kw):
+    def __init__(self, freq=7688.0, width=math.sqrt(2), numtaps=100, **kw):
         Filter.__init__(self, **kw)
         f0 = freq / width
         f1 = freq * width
         self.freq = np.array([0, f0, f0, f1, f1, self.nyq, self.nyq])
         #self.nyq = 1.0
         #self.freq = np.array([0, 0.5, 0.5, 0.7, 0.7, 1.0]) * self.nyq
-        self.gain = [1.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0]
+        self.gain = [1.0, 1.0, 0.0, 0.0, 1.0, 1.0, 1.0]
+        self.numtaps = numtaps + (numtaps + 1) % 2
+        self.b = sp.signal.firwin2(
+            numtaps=self.numtaps, freq=self.freq, gain=self.gain,
+            nyq=self.nyq)
+        self.a = [1.0]
+
+
+class George(Filter):
+    def __init__(self, numtaps=100, **kw):
+        Filter.__init__(self, **kw)
+        f_tin_low = 7400.0
+        f_tin_high = 9200.0
+        self.freq = np.array([0,
+                              f_tin_low, f_tin_low,
+                              f_tin_high, f_tin_high,
+                              self.nyq])
+        self.gain = [1.0, 1.0, 0.0, 0.0, 1.0, 1.0]
+
+        # Make odd so we can have gain at nyq.
+        self.numtaps = numtaps + (numtaps + 1) % 2
+        self.b = sp.signal.firwin2(
+            numtaps=self.numtaps, freq=self.freq, gain=self.gain,
+            nyq=self.nyq)
+        self.a = [1.0]
+
+
+class Notch2(Filter):
+    def __init__(self, freq=7688.0, width=math.sqrt(2),
+                 _offset=1.1, numtaps=100, **kw):
+        Filter.__init__(self, **kw)
+        w0 = 2.0*np.pi * freq / width
+        w1 = 2.0*np.pi * freq * width
+        
+        wn = 2.0 * np.pi * self.nyq
+        wp = [w0/_offset/wn, w1*_offset/wn]
+        ws = [w0*_offset/wn, w1/_offset/wn]
+        print wp, ws
+        self.b, self.a = scipy.signal.iirdesign(
+            #wp=[0.05, 0.3], ws=[0.02,0.35],
+            wp=wp, ws=ws,
+            gpass=1, gstop=20,
+            analog=0,
+            ftype='ellip', output='ba')
+        
+
+class LowPass(Filter):
+    def __init__(self, freq=7688.0, numtaps=100, **kw):
+        Filter.__init__(self, **kw)
+        self.freq = np.array([0, freq, freq, self.nyq])
+        self.gain = [1.0, 1.0, 1.0, 0.0]
         self.numtaps = numtaps
         self.b = sp.signal.firwin2(
             numtaps=self.numtaps, freq=self.freq, gain=self.gain,
         self.a = [1.0]
 
 
+class HighPass(Filter):
+    def __init__(self, freq=7688.0, numtaps=100, **kw):
+        Filter.__init__(self, **kw)
+        self.freq = np.array([0, freq, freq, self.nyq, self.nyq])
+        self.gain = [0.0, 0.0, 1.0, 1.0, 0.0]
+        self.numtaps = numtaps
+        self.b = sp.signal.firwin2(
+            numtaps=self.numtaps, freq=self.freq, gain=self.gain,
+            nyq=self.nyq)
+        self.a = [1.0]
+
+
+def _wav_to_array(data, params):
+    r"""Convert raw data from a wav file to a numpy int array."""
+    nchannels, sampwidth, framerate, nframes, comptype, compname = params
+    dtype = getattr(np, 'int%i' % (sampwidth*8, ))
+    frames = len(data) // sampwidth // nchannels
+    return np.fromstring(data, dtype=dtype).reshape((frames, nchannels))
+
+
+def _array_to_wav(y, params):
+    r"""Convert raw data from a wav file to a numpy int array."""
+    nchannels, sampwidth, framerate, nframes, comptype, compname = params
+    dtype = getattr(np, 'int%i' % (sampwidth*8, ))
+    assert y.dtype == dtype
+    assert y.shape[1] == nchannels
+    return y.tostring()
+
+
 def filter(input_filename, output_filename):
-    r"""Demo.  Applies a notch filter to a file.
-    """
+    r"""Demo.  Applies a notch filter to a file."""
     import wave
 
-    filter = Notch()
-    
     CHUNK = 1024
 
     input = wave.open(input_filename, 'rb')
-    (nchannels, sampwidth, framerate, nframes, comptype,
-     compname) = input.getparams()
+    params = input.getparams()
+
+    framerate = params[2]
+    print framerate
+    filter = George(rate=framerate)
 
     output = wave.open(output_filename, 'wb')
-    output.setparams((nchannels, sampwidth, framerate, nframes, comptype,
-                      compname))
+    output.setparams(params)
 
     data = input.readframes(CHUNK)
+    zi = None                   # Initial conditions.
     while data:
-        output.writeframes(data)
+        y = _wav_to_array(data, params=params)
+        yf, zi = filter.apply(y, zi=zi)
+        output.writeframes(_array_to_wav(yf, params=params))
         data = input.readframes(CHUNK)
     input.close()
     output.close()