Python scipy 模块,signal() 实例源码


项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def applyFilter(data, b, a, padding=100, bidir=True):
    """Apply a linear filter with coefficients a, b. Optionally pad the data before filtering
    and/or run the filter in both directions."""
        import scipy.signal
    except ImportError:
        raise Exception("applyFilter() requires the package scipy.signal.")

    d1 = data.view(np.ndarray)

    if padding > 0:
        d1 = np.hstack([d1[:padding], d1, d1[-padding:]])

    if bidir:
        d1 = scipy.signal.lfilter(b, a, scipy.signal.lfilter(b, a, d1)[::-1])[::-1]
        d1 = scipy.signal.lfilter(b, a, d1)

    if padding > 0:
        d1 = d1[padding:-padding]

    if (hasattr(data, 'implements') and data.implements('MetaArray')):
        return MetaArray(d1, info=data.infoCopy())
        return d1
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def applyFilter(data, b, a, padding=100, bidir=True):
    """Apply a linear filter with coefficients a, b. Optionally pad the data before filtering
    and/or run the filter in both directions."""
        import scipy.signal
    except ImportError:
        raise Exception("applyFilter() requires the package scipy.signal.")

    d1 = data.view(np.ndarray)

    if padding > 0:
        d1 = np.hstack([d1[:padding], d1, d1[-padding:]])

    if bidir:
        d1 = scipy.signal.lfilter(b, a, scipy.signal.lfilter(b, a, d1)[::-1])[::-1]
        d1 = scipy.signal.lfilter(b, a, d1)

    if padding > 0:
        d1 = d1[padding:-padding]

    if (hasattr(data, 'implements') and data.implements('MetaArray')):
        return MetaArray(d1, info=data.infoCopy())
        return d1
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def besselFilter(data, cutoff, order=1, dt=None, btype='low', bidir=True):
    """return data passed through bessel filter"""
        import scipy.signal
    except ImportError:
        raise Exception("besselFilter() requires the package scipy.signal.")

    if dt is None:
            tvals = data.xvals('Time')
            dt = (tvals[-1]-tvals[0]) / (len(tvals)-1)
            dt = 1.0

    b,a = scipy.signal.bessel(order, cutoff * dt, btype=btype) 

    return applyFilter(data, b, a, bidir=bidir)
    #base = data.mean()
    #d1 = scipy.signal.lfilter(b, a, data.view(ndarray)-base) + base
    #if (hasattr(data, 'implements') and data.implements('MetaArray')):
        #return MetaArray(d1, info=data.infoCopy())
    #return d1
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def butterworthFilter(data, wPass, wStop=None, gPass=2.0, gStop=20.0, order=1, dt=None, btype='low', bidir=True):
    """return data passed through bessel filter"""
        import scipy.signal
    except ImportError:
        raise Exception("butterworthFilter() requires the package scipy.signal.")

    if dt is None:
            tvals = data.xvals('Time')
            dt = (tvals[-1]-tvals[0]) / (len(tvals)-1)
            dt = 1.0

    if wStop is None:
        wStop = wPass * 2.0
    ord, Wn = scipy.signal.buttord(wPass*dt*2., wStop*dt*2., gPass, gStop)
    #print "butterworth ord %f   Wn %f   c %f   sc %f" % (ord, Wn, cutoff, stopCutoff)
    b,a = scipy.signal.butter(ord, Wn, btype=btype) 

    return applyFilter(data, b, a, bidir=bidir)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def butterworthFilter(data, wPass, wStop=None, gPass=2.0, gStop=20.0, order=1, dt=None, btype='low', bidir=True):
    """return data passed through bessel filter"""
        import scipy.signal
    except ImportError:
        raise Exception("butterworthFilter() requires the package scipy.signal.")

    if dt is None:
            tvals = data.xvals('Time')
            dt = (tvals[-1]-tvals[0]) / (len(tvals)-1)
            dt = 1.0

    if wStop is None:
        wStop = wPass * 2.0
    ord, Wn = scipy.signal.buttord(wPass*dt*2., wStop*dt*2., gPass, gStop)
    #print "butterworth ord %f   Wn %f   c %f   sc %f" % (ord, Wn, cutoff, stopCutoff)
    b,a = scipy.signal.butter(ord, Wn, btype=btype) 

    return applyFilter(data, b, a, bidir=bidir)
项目:pyrsss    作者:butala    | 项目源码 | 文件源码
def differentiator(n, Hz=1):
    Return linear phase impulse response for a length *n* filter that
    approximates the differential operator. The sampling frequency is

    The filter length *n* must be even. The remez function returns
    type 3 (for *n* odd) and 4 (for *n* even) linear phase
    filters. However, type 3 linear phase filters are 0 at $\omega =
    0$ and $\omega = \pi$.
    if n % 2 == 1:
        raise ValueError('the filter length n must be even')
    return scipy.signal.remez(n,
                              [0, Hz / 2],
                              type='differentiator') * Hz * 2 * NP.pi
项目:pyrsss    作者:butala    | 项目源码 | 文件源码
def __init__(self, n, axis, order=2, mode='valid'):
        Construct gradient operator for signal of dimension *n* for
        dimension *axis*. Use a filter kernel of length *order* (must
        be even). Use convolution type *mode*.
        self.n = n
        self.ndim = len(self.n)
        self.axis = axis
        if axis < 0 or axis >= self.ndim:
            raise ValueError('0 <= axis (= {0}) < ndim = {1}'.format(axis, self.ndim))

        self.d = differentiator(order)

        h_list = []
        for i in reversed(range(self.ndim)):
            if i == axis:

        super(GradientFilter, self).__init__(n, h_list, mode=mode)
项目:droppy    作者:BV-DR    | 项目源码 | 文件源码
def reSample( df , dt = None , xAxis = None , n = None , kind = 'linear') :
   """ re-sample the signal """

   if type(df) == pd.Series : df = pd.DataFrame(df)

   f = interp1d( df.index, np.transpose(df.values) , kind=kind, axis=-1, copy=True, bounds_error=True, assume_sorted=True)
   if dt :
      end = int(+(df.index[-1] - df.index[0] ) / dt)  * dt +  df.index[0]
      xAxis = np.linspace( df.index[0] , end , 1+int(+(end - df.index[0] ) / dt) )
   elif n :
      xAxis = np.linspace( df.index[0] ,  df.index[-1] , n )
   elif xAxis == None :
      raise(Exception("reSample : either dt or xAxis should be provided" ))

   #For rounding issue, ensure that xAxis is within ts.xAxis
   #xAxis[ np.where( xAxis > np.max(df.index[:]) ) ] = df.index[ np.where( xAxis > np.max(df.index[:]) ) ]
   return pd.DataFrame( data = np.transpose(f(xAxis)), index = xAxis , columns = map( lambda x : "reSample("+ x +")" , df.columns  ) )
项目:droppy    作者:BV-DR    | 项目源码 | 文件源码
def getPSD( df , dw = 0.05, roverlap = 0.5, window='hanning', detrend='constant') :
      Compute the power spectral density

   if type(df) == pd.Series : df = pd.DataFrame(df)

   nfft = int ( (2*pi / dw) / dx(df) )
   nperseg = 2**int(log(nfft)/log(2))
   noverlap = nperseg * roverlap

   """ Return the PSD of a time signal """
   try : 
      from scipy.signal import welch
   except :
      raise Exception("Welch function not found, please install scipy > 0.12")

   data = []
   for iSig in range(df.shape[1]) :
      test = welch( df.values[:,iSig]  , fs = 1. / dx(df) , window=window, nperseg=nperseg, noverlap=noverlap, nfft=nfft, detrend=detrend, return_onesided=True, scaling='density')
      data.append( test[1] / (2*pi) )
   xAxis = test[0][:] * 2*pi
   return pd.DataFrame( data = np.transpose(data), index = xAxis , columns = [ "psd("+ str(x) +")" for  x in df.columns ]  )
项目:droppy    作者:BV-DR    | 项目源码 | 文件源码
def derivFFT(df, n=1  ) :
   """ Deriv a signal trought FFT, warning, edge can be a bit noisy...
   indexList : channel to derive
   n : order of derivation
   deriv = []
   for iSig in range(df.shape[1]) :
      fft = np.fft.fft( df.values[:,iSig] )   #FFT
      freq = np.fft.fftfreq( df.shape[0] , dx(df) )

      from copy import deepcopy
      fft0 = deepcopy(fft)
      if n>0 :
         fft *= (1j * 2*pi* freq[:])**n                    #Derivation in frequency domain
      else :
         fft[-n:] *= (1j * 2*pi* freq[-n:])**n
         fft[0:-n] = 0.

      tts = np.real(np.fft.ifft(fft))
      tts -= tts[0]
      deriv.append( tts )    #Inverse FFT

   return pd.DataFrame( data = np.transpose(deriv), index = df.index , columns = [ "DerivFFT("+ x +")" for x in df.columns ]  )
项目:kaggle-seizure-prediction    作者:sics-lm    | 项目源码 | 文件源码
def load_segment(segment_path, old_segment_format=True, normalize_signal=False, resample_frequency=None):
    Convienience function for loading segments
    :param segment_path: Path to the segment file to load.
    :param old_segment_format: If True, the old format will be used. If False, the format backed by a pandas
                               dataframe will be used.
    :param normalize_signal: If True, the signal will be normalized using the subject median and median absolute
    :param resample_frequency: If this is set to a number, the signal will be resampled to that frequency.
    :return: A Segment or DFSegment object with the data from the segment in *segment_path*.
    if normalize_signal:
        return load_and_standardize(segment_path, old_segment_format=old_segment_format)
        if old_segment_format:
            segment = Segment(segment_path)
            segment = DFSegment.from_mat_file(segment_path)
        if resample_frequency is not None:
            segment.resample_frequency(resample_frequency, inplace=True)
        return segment
项目:braindecode    作者:robintibor    | 项目源码 | 文件源码
def highpass_cnt(data, low_cut_hz, fs, filt_order=3, axis=0):
     Highpass signal applying **causal** butterworth filter of given order.

    data: 2d-array
        Time x channels
    low_cut_hz: float
    fs: float
    filt_order: int

    highpassed_data: 2d-array
        Data after applying highpass filter.
    if (low_cut_hz is None) or (low_cut_hz == 0):"Not doing any highpass, since low 0 or None")
        return data.copy()
    b, a = scipy.signal.butter(filt_order, low_cut_hz / (fs / 2.0),
    assert filter_is_stable(a)
    data_highpassed = scipy.signal.lfilter(b, a, data, axis=axis)
    return data_highpassed
项目:tango    作者:LLNL    | 项目源码 | 文件源码
def _damp_flux(flux, taperwidth):
    """Apply damping to the sides of the input flux.  The damping applies to a width of
    size taperwidth on each side of the domain.

    Currently, uses a Tukey window.

      flux              input signal on which to damp sides (array)
      taperwidth        width over which to apply damping at each boundary, as a fraction of domain size (scalar)
      dampedFlux        signal with edges damped (array)
    alpha = taperwidth * 2      # alpha sets the width of the damping for the entire signal, accounting for both left and right boundaries
    tukeyWindow = scipy.signal.tukey(len(flux), alpha)
    dampedFlux = flux * tukeyWindow
    return dampedFlux
项目:BAG_framework    作者:ucb-art    | 项目源码 | 文件源码
def get_transfer_function(self, in_name, out_name, in_type='v', atol=0.0):
        # type: (str, str, str, float) -> TransferFunctionContinuous
        """Compute the transfer function between the two given nodes.

        in_name : str
            the input voltage/current node name.
        out_name : Union[str, List[str]]
            the output voltage node name.
        in_type : str
            set to 'v' for input voltage sources.  Otherwise, current sources.
        atol : float
            absolute tolerance for checking zeros in the numerator.  Used to filter out scipy warnings.

        system : TransferFunctionContinuous
            the scipy transfer function object.  See scipy.signal package on how to use this object.
        num, den = self.get_num_den(in_name, out_name, in_type=in_type, atol=atol)
        return TransferFunctionContinuous(num, den)
项目:driveboardapp    作者:nortd    | 项目源码 | 文件源码
def test_scipy(pyi_builder):
        from distutils.version import LooseVersion

        # Test top-level SciPy importability.
        import scipy
        from scipy import *

        # Test hooked SciPy modules.
        import scipy.sparse.csgraph

        # Test problematic SciPy modules.
        import scipy.linalg
        import scipy.signal

        # SciPy >= 0.16 privatized the previously public "scipy.lib" package as
        # "scipy._lib". Since this package is problematic, test its
        # importability regardless of SciPy version.
        if LooseVersion(scipy.__version__) >= LooseVersion('0.16.0'):
            import scipy._lib
            import scipy.lib
项目:pygeotools    作者:dshean    | 项目源码 | 文件源码
def butter_low(dt_list, val, lowpass=1.0):
    import scipy.signal
    #val_mask =
    #dt is 300 s, 5 min
    dt_diff = np.diff(dt_list)
    if isinstance(dt_diff[0], float):
        dt_diff *= 86400.
        dt_diff = np.array([dt.total_seconds() for dt in dt_diff])
    dt = malib.fast_median(dt_diff)
    #f is 0.00333 Hz
    #288 samples/day
    fs = 1./dt
    nyq = fs/2.
    order = 3
    f_max = (1./(86400*lowpass)) / nyq
    b, a = scipy.signal.butter(order, f_max, btype='lowpass')
    #b, a = sp.signal.butter(order, (f_min, f_max), btype='bandstop')
    w, h = scipy.signal.freqz(b, a, worN=2000)
    # w_f = (nyq/np.pi)*w
    # w_f_days = 1/w_f/86400.
    #plt.plot(w_f_days, np.abs(h))

    val_f = scipy.signal.filtfilt(b, a, val)
    return val_f
项目:AutismVoicePrint    作者:opraveen    | 项目源码 | 文件源码
def is_periodic(aud_sample, SAMPLING_RATE = 8000):
    :param aud_sample: Numpy 1D array rep of audio sample
    :param SAMPLING_RATE: Used to focus on human speech freq range
    :return: True if periodic, False if aperiodic

    threshold = 20

    # Use auto-correlation to find if there is enough periodicity in [50-400] Hz range
    values = signal.correlate(aud_sample, aud_sample, mode='full')

    # [50-400 Hz] corresponds to [2.5-20] ms OR [20-160] samples for 8 KHz sampling rate
    l_idx = int(SAMPLING_RATE*2.5/1000)
    r_idx = int(SAMPLING_RATE*20/1000)
    values = values[len(values)/2:]

    subset_values = values[l_idx:r_idx]

    if np.argmax(subset_values) < threshold:
        return False
        return True
项目:Thrifty    作者:swkrueger    | 项目源码 | 文件源码
def test_despreader_peaks(pos):
    """Test correlation peaks with signal at different positions."""
    estimator = soa_estimator.SoaEstimator(TEMPLATE, None,
    block = gen_block(pos)
    fft = np.fft.fft(block)
    corr = estimator.despread(fft)
    corr_abs = np.abs(corr)

    assert len(corr) == BLOCK_LEN - len(TEMPLATE) + 1

    if pos <= BLOCK_LEN - len(TEMPLATE):
        peak_idx = np.argmax(corr_abs)
        non_peak = np.delete(corr_abs, peak_idx)
        assert peak_idx == pos
        assert corr_abs[peak_idx] >= PEAK_MAG - 0.1
        np.testing.assert_array_less(non_peak, MAX_SIDEBAND + 0.1)
        np.testing.assert_array_less(corr_abs, MAX_SIDEBAND + 0.1)
项目:ASP    作者:TUIlmenauAMS    | 项目源码 | 文件源码
def DCT4(samples):
        Method to create DCT4 transformation using DCT3

        Arguments   :

            samples : (1D Array) Input samples to be transformed

        Returns     :

            y       :  (1D Array) Transformed output samples


    # Initialize
    samplesup=np.zeros(2*N, dtype = np.float32)
    # Upsample signal:


    return y[0:N]

#The DST4 transform:
项目:ASP    作者:TUIlmenauAMS    | 项目源码 | 文件源码
def DST4(samples):
        Method to create DST4 transformation using DST3

        Arguments   :
            samples : (1D Array) Input samples to be transformed

        Returns     :
            y       :  (1D Array) Transformed output samples


    # Initialize
    samplesup=np.zeros(2*N, dtype = np.float32)

    # Upsample signal
    # Reverse order to obtain DST4 out of DCT4:
    samplesup[0::2] = samples
    y = spfft.dst(samplesup,type=3,norm='ortho')*np.sqrt(2)#/2

    # Flip sign of every 2nd subband to obtain DST4 out of DCT4
    #y=(y[0:N])*(((-1)*np.ones(N, dtype = np.float32))**range(N))

    return y[0: N]
项目:ASP    作者:TUIlmenauAMS    | 项目源码 | 文件源码
def x2polyphase(x, N):
        Method to convert input signal x (a row vector) into a polphase row vector
        of blocks with length N.

        Arguments   :
            x       : (1D Array) Input samples
            N       : (int)  Number of subbands

        Returns     :
            y       :  (3D Array) Polyphase representation of the input signal

        Author : Gerald Schuller ('shl'), Dec/2/15

    #Number of blocks in the signal:

    xp=np.zeros((1,N,L), dtype = np.float32)

    for m in range(L):
        xp[0,:,m] = x[m*N: (m*N+N)]

    return xp
项目:SamuROI    作者:samuroi    | 项目源码 | 文件源码
def bandstop(data, fs, start, stop, order = 3):
    Apply bandstop filter on data.

    :param data: 3D video data
    :param fs: sampling frequency
    :param order: the order of butter filter used.
    :param start: lower frequency where band starts
    :param stop: higher frequency where band ends
    :return: the filtered 3d data set.
    nyq = 0.5 * fs
    high = stop / nyq
    low = start / nyq
    order = order
    b, a = scipy.signal.butter(order, [low, high], btype='bandstop')
    # zi = scipy.signal.lfiltic(b, a, y=[0.])
    dataf = scipy.signal.lfilter(b, a, data)
    return dataf
项目:signal-filter-visualizer    作者:jsantero    | 项目源码 | 文件源码
def updateUi(self):
        if self.previewCheckBox.checkState():  # Draw preview of filtered signal
            currentlySelected = self.stackedWidget.currentWidget()
            function = currentlySelected.returnFunction()
            input_data = self.centralWidget.getData()

            previewElement = ChainElement(name="Preview")
            previewElement.function = function
            previewElement.input_ = input_data
            output_data = previewElement.output
            data = (*output_data, input_data[1])
            if self.previewPlot:  # PlotWidget created already
            else:  # PlotWidget not yet created
                self.previewPlot = PlotWidget(data)
                self.previewDlg = QDialog()
                self.previewDlg.setWindowTitle("Signal Preview")
                grid = QGridLayout()
项目:weakmon    作者:rtmrtmrtmrtm    | 项目源码 | 文件源码
def freq_shift_ramp(x, hza, dt):
    N_orig = len(x)
    N_padded = 2**nextpow2(N_orig)
    t = numpy.arange(0, N_padded)
    f_shift = numpy.linspace(hza[0], hza[1], len(x))
    f_shift = numpy.append(f_shift, hza[1]*numpy.ones(N_padded-len(x)))

    pc1 = f_shift[:-1] - f_shift[1:]
    phase_correction = numpy.add.accumulate(
      t * dt * numpy.append(numpy.zeros(1), 2*numpy.pi*pc1))

    lo = numpy.exp(1j*(2*numpy.pi*dt*f_shift*t + phase_correction))
    x0 = numpy.append(x, numpy.zeros(N_padded-N_orig, x.dtype))
    h = scipy.signal.hilbert(x0)*lo
    ret = h[:N_orig].real
    return ret

# avoid most of the round-up-to-power-of-two penalty by
# doing log-n shifts. discontinuity at boundaries,
# but that's OK for JT65 2048-sample symbols.
项目:weakmon    作者:rtmrtmrtmrtm    | 项目源码 | 文件源码
def __init__(self, from_rate, to_rate):
        self.from_rate = from_rate
        self.to_rate = to_rate

        if self.from_rate > self.to_rate:
            # prepare a filter to precede resampling.
            self.filter = butter_lowpass(0.45 * self.to_rate,
            self.zi = scipy.signal.lfiltic(self.filter[0],

        # total number of input and output samples,
        # so we can insert/delete to keep long-term
        # rates correct.
        self.nin = 0
        self.nout = 0

    # how much will output be delayed?
    # in units of output samples.
项目:WaveletQuotes    作者:JobyKK    | 项目源码 | 文件源码
def cwt_time(data, wavelet, widths, dt, axis):
    # wavelets can be complex so output is complex
    output = np.zeros((len(widths),) + data.shape, dtype=np.complex)

    # compute in time
    slices = [None for _ in data.shape]
    slices[axis] = slice(None)
    for ind, width in enumerate(widths):
        # number of points needed to capture wavelet
        M = 10 * width / dt
        # times to use, centred at zero
        t = np.arange((-M + 1) / 2., (M + 1) / 2.) * dt
        # sample wavelet and normalise
        norm = (dt / width) ** .5
        wavelet_data = norm * wavelet(t, width)
        output[ind, :] = scipy.signal.fftconvolve(data,
    return output
项目:WaveletQuotes    作者:JobyKK    | 项目源码 | 文件源码
def coi(self):
        """The Cone of Influence is the region near the edges of the
        input signal in which edge effects may be important.

        Return a tuple (T, S) that describes the edge of the cone
        of influence as a single line in (time, scale).
        Tmin = self.time.min()
        Tmax = self.time.max()
        Tmid = Tmin + (Tmax - Tmin) / 2
        s = np.logspace(np.log10(self.scales.min()),
        c1 = Tmin + self.wavelet.coi(s)
        c2 = Tmax - self.wavelet.coi(s)

        C = np.hstack((c1[np.where(c1 < Tmid)], c2[np.where(c2 > Tmid)]))
        S = np.hstack((s[np.where(c1 < Tmid)], s[np.where(c2 > Tmid)]))

        # sort w.r.t time
        iC = C.argsort()
        sC = C[iC]
        sS = S[iC]

        return sC, sS
项目:WaveletQuotes    作者:JobyKK    | 项目源码 | 文件源码
def cwt_time(data, wavelet, widths, dt, axis):
    # wavelets can be complex so output is complex
    output = np.zeros((len(widths),) + data.shape, dtype=np.complex)

    # compute in time
    slices = [None for _ in data.shape]
    slices[axis] = slice(None)
    for ind, width in enumerate(widths):
        # number of points needed to capture wavelet
        M = 10 * width / dt
        # times to use, centred at zero
        t = np.arange((-M + 1) / 2., (M + 1) / 2.) * dt
        # sample wavelet and normalise
        norm = (dt / width) ** .5
        wavelet_data = norm * wavelet(t, width)
        output[ind, :] = scipy.signal.fftconvolve(data,
    return output
项目:nept    作者:vandermeerlab    | 项目源码 | 文件源码
def mean_csd(perievent_lfp1, perievent_lfp2, window, fs):
    """Computes the mean Cross-Spectral Density (CSD) between perievent slices

    perievent_lfp1 : nept.AnalogSignal
    perievent_lfp2 : nept.AnalogSignal
    window : int
    fs : int

    freq : np.array
    power : np.array

    freq, power = scipy.signal.csd(,,
                                   fs=fs, nperseg=window, nfft=int(window*2))

    return freq, np.mean(power, axis=0)
项目:nept    作者:vandermeerlab    | 项目源码 | 文件源码
def mean_coherence(perievent_lfp1, perievent_lfp2, window, fs):
    """Computes the mean coherence between perievent slices

    perievent_lfp1 : nept.AnalogSignal
    perievent_lfp2 : nept.AnalogSignal
    window : int
    fs : int

    freq : np.array
    coherence : np.array

    freq, coherence = scipy.signal.coherence(,, fs=fs, nperseg=window, nfft=int(window*2))

    return freq, np.mean(coherence, axis=0)
项目:tensor2tensor    作者:tensorflow    | 项目源码 | 文件源码
def add_delta_deltas(filterbanks, name=None):
  """Compute time first and second-order derivative channels.

    filterbanks: float32 tensor with shape [batch_size, len, num_bins, 1]
    name: scope name

    float32 tensor with shape [batch_size, len, num_bins, 3]
  delta_filter = np.array([2, 1, 0, -1, -2])
  delta_delta_filter = scipy.signal.convolve(delta_filter, delta_filter, "full")

  delta_filter_stack = np.array(
      [[0] * 4 + [1] + [0] * 4, [0] * 2 + list(delta_filter) + [0] * 2,
      dtype=np.float32).T[:, None, None, :]

  delta_filter_stack /= np.sqrt(
      np.sum(delta_filter_stack**2, axis=0, keepdims=True))

  filterbanks = tf.nn.conv2d(
      filterbanks, delta_filter_stack, [1, 1, 1, 1], "SAME", data_format="NHWC",
  return filterbanks
项目:mac-package-build    作者:persepolisdm    | 项目源码 | 文件源码
def test_scipy(pyi_builder):
        from distutils.version import LooseVersion

        # Test top-level SciPy importability.
        import scipy
        from scipy import *

        # Test hooked SciPy modules.
        import scipy.sparse.csgraph

        # Test problematic SciPy modules.
        import scipy.linalg
        import scipy.signal

        # SciPy >= 0.16 privatized the previously public "scipy.lib" package as
        # "scipy._lib". Since this package is problematic, test its
        # importability regardless of SciPy version.
        if LooseVersion(scipy.__version__) >= LooseVersion('0.16.0'):
            import scipy._lib
            import scipy.lib
项目:bird-species-classification    作者:johnmartinsson    | 项目源码 | 文件源码
def read_wave_file(filename):
    """ Read a wave file from disk
    # Arguments
        filename : the name of the wave file
    # Returns
        (fs, x)  : (sampling frequency, signal)
    if (not os.path.isfile(filename)):
        raise ValueError("File does not exist")

    s =, 'rb')

    if (s.getnchannels() != 1):
        raise ValueError("Wave file should be mono")
    # if (s.getframerate() != 22050):
        # raise ValueError("Sampling rate of wave file should be 16000")

    strsig = s.readframes(s.getnframes())
    x = np.fromstring(strsig, np.short)
    fs = s.getframerate()

    x = x/32768.0

    return fs, x
项目:bird-species-classification    作者:johnmartinsson    | 项目源码 | 文件源码
def read_wave_file_not_normalized(filename):
    """ Read a wave file from disk
    # Arguments
        filename : the name of the wave file
    # Returns
        (fs, x)  : (sampling frequency, signal)
    if (not os.path.isfile(filename)):
        raise ValueError("File does not exist")

    s =, 'rb')

    if (s.getnchannels() != 1):
        raise ValueError("Wave file should be mono")
    # if (s.getframerate() != 22050):
        # raise ValueError("Sampling rate of wave file should be 16000")

    strsig = s.readframes(s.getnframes())
    x = np.fromstring(strsig, np.short)
    fs = s.getframerate()

    return fs, x
项目:GRIPy    作者:giruenf    | 项目源码 | 文件源码
def cwt_time(data, wavelet, widths, dt, axis):
    # wavelets can be complex so output is complex
    output = np.zeros((len(widths),) + data.shape, dtype=np.complex)

    # compute in time
    slices = [None for _ in data.shape]
    slices[axis] = slice(None)
    for ind, width in enumerate(widths):
        # number of points needed to capture wavelet
        M = 10 * width / dt
        # times to use, centred at zero
        t = np.arange((-M + 1) / 2., (M + 1) / 2.) * dt
        # sample wavelet and normalise
        norm = (dt / width) ** .5
        wavelet_data = norm * wavelet(t, width)
        output[ind, :] = scipy.signal.fftconvolve(data,
    return output
项目:GRIPy    作者:giruenf    | 项目源码 | 文件源码
def coi(self):
        """The Cone of Influence is the region near the edges of the
        input signal in which edge effects may be important.

        Return a tuple (T, S) that describes the edge of the cone
        of influence as a single line in (time, scale).
        Tmin = self.time.min()
        Tmax = self.time.max()
        Tmid = Tmin + (Tmax - Tmin) / 2
        s = np.logspace(np.log10(self.scales.min()),
        c1 = Tmin + self.wavelet.coi(s)
        c2 = Tmax - self.wavelet.coi(s)

        C = np.hstack((c1[np.where(c1 < Tmid)], c2[np.where(c2 > Tmid)]))
        S = np.hstack((s[np.where(c1 < Tmid)], s[np.where(c2 > Tmid)]))

        # sort w.r.t time
        iC = C.argsort()
        sC = C[iC]
        sS = S[iC]

        return sC, sS
项目:GRIPy    作者:giruenf    | 项目源码 | 文件源码
def frequency_phase_rotation(values, angle, deg=False):
    window_size = 64
    noverlap = 32
    window = signal.hann(window_size, sym=False)
    if not signal.check_COLA(window, len(window), noverlap):
        raise Exception('check_COLA failed.')
    f, t, Zxx = signal.stft(values, window=window, nperseg=window_size, 
    Zxx_rotated = np.zeros(Zxx.shape, dtype=np.complex)
    for freq_idx in range(Zxx.shape[0]): # Loop over all frequencies
        Zxx_rotated[freq_idx] = phase_rotation(Zxx[freq_idx], angle, deg)
    t, x = signal.istft(Zxx_rotated, window=window, nperseg=window_size, 
    return t, x
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def adaptiveDetrend(data, x=None, threshold=3.0):
    """Return the signal with baseline removed. Discards outliers from baseline measurement."""
        import scipy.signal
    except ImportError:
        raise Exception("adaptiveDetrend() requires the package scipy.signal.")

    if x is None:
        x = data.xvals(0)

    d = data.view(np.ndarray)

    d2 = scipy.signal.detrend(d)

    stdev = d2.std()
    mask = abs(d2) < stdev*threshold
    #d3 = where(mask, 0, d2)
    #d4 = d2 - lowPass(d3, cutoffs[1], dt=dt)

    lr = scipy.stats.linregress(x[mask], d[mask])
    base = lr[1] + lr[0]*x
    d4 = d - base

    if (hasattr(data, 'implements') and data.implements('MetaArray')):
        return MetaArray(d4, info=data.infoCopy())
    return d4
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def adaptiveDetrend(data, x=None, threshold=3.0):
    """Return the signal with baseline removed. Discards outliers from baseline measurement."""
        import scipy.signal
    except ImportError:
        raise Exception("adaptiveDetrend() requires the package scipy.signal.")

    if x is None:
        x = data.xvals(0)

    d = data.view(np.ndarray)

    d2 = scipy.signal.detrend(d)

    stdev = d2.std()
    mask = abs(d2) < stdev*threshold
    #d3 = where(mask, 0, d2)
    #d4 = d2 - lowPass(d3, cutoffs[1], dt=dt)

    lr = scipy.stats.linregress(x[mask], d[mask])
    base = lr[1] + lr[0]*x
    d4 = d - base

    if (hasattr(data, 'implements') and data.implements('MetaArray')):
        return MetaArray(d4, info=data.infoCopy())
    return d4
项目:pynufft    作者:jyhmiinlin    | 项目源码 | 文件源码
def checker(input_var, desire_size):
    check if debug = 1

    if input_var is None:
        print('input_variable does not exist!')
    if desire_size is None:
        print('desire_size does not exist!')

    dd = numpy.size(desire_size)
    dims = numpy.shape(input_var)
#     print('dd=',dd,'dims=',dims)
    if numpy.isnan(numpy.sum(input_var[:])):
        print('input has NaN')

    if numpy.ndim(input_var) < dd:
        print('input signal has too few dimensions')

    if dd > 1:
        if dims[0:dd] != desire_size[0:dd]:
            print('input signal has wrong size1')
    elif dd == 1:
        if dims[0] != desire_size:
            print('input signal has wrong size2')

    if numpy.mod(, != 0:
        print('input signal shape is not multiples of desired size!')
项目:pyrsss    作者:butala    | 项目源码 | 文件源码
def __init__(self, n, axis, order=3, mode='valid'):
        Construct a second-order gradient operator for signal of dimension
        *n* for dimension *axis*. Use a filter kernel of length
        *order* (must be odd). Use convolution type *mode*.
        # assert that the filter length is odd
        assert(order % 2 == 1)
        self.n = n
        self.ndim = len(self.n)
        self.axis = axis
        if axis < 0 or axis >= self.ndim:
            raise ValueError('0 <= axis (= {0}) < ndim = {1}'.format(axis, self.ndim))

        self.d = differentiator(int(order/2) + 1)
        self.d2 = NP.convolve(self.d, self.d)

        self.mode = mode

        h_list = []
        m = []
        for i in reversed(range(self.ndim)):
            if i == axis:
        self.m = m

        if mode == 'circ':
            n_prime = array(n) - m + 1
            super(Gradient2Filter, self).__init__(n_prime, h_list, mode=mode)
            super(Gradient2Filter, self).__init__(n, h_list, mode=mode)
项目:droppy    作者:BV-DR    | 项目源码 | 文件源码
def rampDf(df , rStart, rEnd) :
   """ Ramp the signal between rStart and rEnd (in place) """
   a =  ramp_v( df.index[:] , rStart, rEnd )
   for c in df.columns :
      df[c][:] *= a[:]
   return df
项目:droppy    作者:BV-DR    | 项目源码 | 文件源码
def fillCos( A = 1.0, T = 10. , tMin = 0. , tMax = 50. , n = 200) :
   """ for testing purpose, fill signal with a cosine """
   xAxis = np.linspace( tMin , tMax , n )
   data = np.zeros( (n,2) )
   data[:,0] = A * np.cos(2*pi*xAxis/T)
   data[:,1] = A * np.sin(2*pi*xAxis/T)
   return  pd.DataFrame( data=data , index = xAxis  , columns =  [ "Cos" , "Sin" ]  )
项目:droppy    作者:BV-DR    | 项目源码 | 文件源码
def slidingFFT( se, T  ,  n = 1 , tStart = None , preSample = False , nHarmo = 5 , kind = abs , phase = None) :
   Harmonic analysis on a sliding windows
   se : Series to analyse
   T : Period
   tStart : start _xAxis
   n : size of the sliding windows in period.
   reSample : reSample the signal so that a period correspond to a integer number of time step
   nHarmo : number of harmonics to return
   kind : module, real,  imaginary part, as a function (abs, np.imag, np.real ...)
   phase : phase shift (for instance to extract in-phase with cos or sin)

   if (type(se) == pd.DataFrame) :
      if len(se.columns) == 1 : se = se.iloc[:,0]

   nWin = int(0.5 + n*T / dx(se) )
   #ReSample to get round number of time step per period
   if preSample :
      new = reSample( se, dt = n*T / (nWin) )
   else :
      new = se
   signal = new.values[:]
   #Allocate results
   res = np.zeros( (new.shape[0] , nHarmo ) )
   for iWin in range(new.shape[0] - nWin) :
      sig = signal[ iWin : iWin+nWin  ]  #windows
      fft = np.fft.fft( sig )            #FTT
      if phase !=None  :                 #Phase shift
         fft *= np.exp( 1j* ( 2*pi*(iWin*1./nWin) + phase ))
      fftp = kind( fft )       #Take module, real or imaginary part
      spectre = 2*fftp/(nWin)  #Scale
      for ih in range(nHarmo):
         res[iWin, ih] = spectre[ih*n]
         if ih == 0 : res[iWin, ih] /= 2.0
         #if ih == 0 : res[iWin, ih] = 2.0
   return pd.DataFrame( data = res , index = new.index , columns = map( lambda x : "Harmo {:} ({:})".format(x ,  ) , range(nHarmo)  ) )
项目:droppy    作者:BV-DR    | 项目源码 | 文件源码
def deriv(df, n=1) :
   """ Deriv a signal through finite difference
   from signalTreatment import der  , der2
   deriv = []
   if n == 1  :
      for iSig in range(df.shape[1]) :
         deriv.append(  der( df.values[:,iSig] , df.index ) )
   elif n == 2 :
      for iSig in range(df.shape[1]) :
         deriv.append(  der2( df.values[:,iSig] , df.index[:] ) )
   return pd.DataFrame( data = np.transpose(deriv), index = df.index , columns = [ "Deriv("+ x +")" for x in df.columns ]  )
项目:droppy    作者:BV-DR    | 项目源码 | 文件源码
def testPSD(display = True) :
   """ Read a signal, compute PSD and compare standard deviation to m0 """
   df = read( r'../../testData/motion.dat' )

   RsSig = np.std( df.values[:,0] ) * 4
   print ("Rs from sigma " , RsSig)
   psd = getPSD(df  )
   RsM0 = (np.sum(psd.values[:,0])* dx(psd) ) **0.5 * 4.004
   print ("Rs from m0    ",RsM0)

   psd = psd[0.1 : 2.0]
   if display :
      df.plot( )
项目:OASIS    作者:j-friedrich    | 项目源码 | 文件源码
def GetSn(y, range_ff=[0.25, 0.5], method='mean'):
    Estimate noise power through the power spectral density over the range of large frequencies

    y : array, shape (T,)
        One dimensional array containing the fluorescence intensities with
        one entry per time-bin.
    range_ff : (1,2) array, nonnegative, max value <= 0.5
        range of frequency (x Nyquist rate) over which the spectrum is averaged
    method : string, optional, default 'mean'
        method of averaging: Mean, median, exponentiated mean of logvalues

    sn : noise standard deviation

    ff, Pxx = scipy.signal.welch(y)
    ind1 = ff > range_ff[0]
    ind2 = ff < range_ff[1]
    ind = np.logical_and(ind1, ind2)
    Pxx_ind = Pxx[ind]
    sn = {
        'mean': lambda Pxx_ind: np.sqrt(np.mean(Pxx_ind / 2)),
        'median': lambda Pxx_ind: np.sqrt(np.median(Pxx_ind / 2)),
        'logmexp': lambda Pxx_ind: np.sqrt(np.exp(np.mean(np.log(Pxx_ind / 2))))

    return sn
项目:third_person_im    作者:bstadie    | 项目源码 | 文件源码
def discount_cumsum(x, discount):
    # See
    # Here, we have y[t] - discount*y[t+1] = x[t]
    # or rev(y)[t] - discount*rev(y)[t-1] = rev(x)[t]
    return scipy.signal.lfilter([1], [1, float(-discount)], x[::-1], axis=0)[::-1]
项目:Auspex    作者:BBN-Q    | 项目源码 | 文件源码
def filter_records_fir(coeffs,
                           num_taps, # ignored
                           record_length, # ignored (uses shape of recs)
                           num_records, # ignored (uses shape of recs)
        # split out a, b coefficients
        b = coeffs[:len(coeffs)//2]
        a = coeffs[len(coeffs)//2:]

        filtered_signal = scipy.signal.lfilter(b, a, recs)
        result[:] = np.copy(filtered_signal[:, ::decim_factor], order="C")
项目:Auspex    作者:BBN-Q    | 项目源码 | 文件源码
def filter_records_iir(coeffs,
                           order, # ignored
                           record_length, # ignored (uses shape of recs)
                           num_records, # ignored (uses shape of recs)
        # split out a, b coefficients
        b = coeffs[:len(coeffs)//2]
        a = coeffs[len(coeffs)//2:]

        result[:] = scipy.signal.lfilter(b, a, recs)