Python numpy 模块,dtype() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用numpy.dtype()

项目:IntroToDeepLearning    作者:robb-brown    | 项目源码 | 文件源码
def extract_images(filename):
  """Extract the images into a 4D uint8 numpy array [index, y, x, depth]."""
  print('Extracting', filename)
  with gzip.open(filename) as bytestream:
    magic = _read32(bytestream)
    if magic != 2051:
      raise ValueError(
          'Invalid magic number %d in MNIST image file: %s' %
          (magic, filename))
    num_images = _read32(bytestream)
    rows = _read32(bytestream)
    cols = _read32(bytestream)
    buf = bytestream.read(rows * cols * num_images)
    data = numpy.frombuffer(buf, dtype=numpy.uint8)
    data = data.reshape(num_images, rows, cols, 1)
    return data
项目:IntroToDeepLearning    作者:robb-brown    | 项目源码 | 文件源码
def extract_images(filename):
  """Extract the images into a 4D uint8 numpy array [index, y, x, depth]."""
  print('Extracting', filename)
  with gzip.open(filename) as bytestream:
    magic = _read32(bytestream)
    if magic != 2051:
      raise ValueError(
          'Invalid magic number %d in MNIST image file: %s' %
          (magic, filename))
    num_images = _read32(bytestream)
    rows = _read32(bytestream)
    cols = _read32(bytestream)
    buf = bytestream.read(rows * cols * num_images)
    data = numpy.frombuffer(buf, dtype=numpy.uint8)
    data = data.reshape(num_images, rows, cols, 1)
    return data
项目:mpiFFT4py    作者:spectralDNS    | 项目源码 | 文件源码
def __keytransform__(self, key):
        if isinstance(key[0], np.ndarray):
            shape = key[0].shape
            dtype = key[0].dtype
            i = key[1]
            zero = True if len(key) == 2 else key[2]

        elif isinstance(key[0], tuple):
            if len(key) == 3:
                shape, dtype, i = key
                zero = True

            elif len(key) == 4:
                shape, dtype, i, zero = key

        else:
            raise TypeError("Wrong type of key for work array")

        assert isinstance(zero, bool)
        assert isinstance(i, int)
        self.fillzero = zero
        return (shape, np.dtype(dtype), i)
项目:seq2seq    作者:google    | 项目源码 | 文件源码
def accumulate_strings(values, name="strings"):
  """Accumulates strings into a vector.

  Args:
    values: A 1-d string tensor that contains values to add to the accumulator.

  Returns:
    A tuple (value_tensor, update_op).
  """
  tf.assert_type(values, tf.string)
  strings = tf.Variable(
      name=name,
      initial_value=[],
      dtype=tf.string,
      trainable=False,
      collections=[],
      validate_shape=True)
  value_tensor = tf.identity(strings)
  update_op = tf.assign(
      ref=strings, value=tf.concat([strings, values], 0), validate_shape=False)
  return value_tensor, update_op
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_expect_dtypes_with_tuple(self):

        allowed_dtypes = (dtype('datetime64[ns]'), dtype('float'))

        @expect_dtypes(a=allowed_dtypes)
        def foo(a, b):
            return a, b

        for d in allowed_dtypes:
            good_a = arange(3).astype(d)
            good_b = object()
            ret_a, ret_b = foo(good_a, good_b)
            self.assertIs(good_a, ret_a)
            self.assertIs(good_b, ret_b)

        with self.assertRaises(TypeError) as e:
            foo(arange(3, dtype='uint32'), object())

        expected_message = (
            "{qualname}() expected a value with dtype 'datetime64[ns]' "
            "or 'float64' for argument 'a', but got 'uint32' instead."
        ).format(qualname=qualname(foo))
        self.assertEqual(e.exception.args[0], expected_message)
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def _classify_gems(counts0, counts1):
        """ Infer number of distinct transcriptomes present in each GEM (1 or 2) and
            report cr_constants.GEM_CLASS_GENOME0 for a single cell w/ transcriptome 0,
            report cr_constants.GEM_CLASS_GENOME1 for a single cell w/ transcriptome 1,
            report cr_constants.GEM_CLASS_MULTIPLET for multiple transcriptomes """
        # Assumes that most of the GEMs are single-cell; model counts independently
        thresh0, thresh1 = [cr_constants.DEFAULT_MULTIPLET_THRESHOLD] * 2
        if sum(counts0 > counts1) >= 1 and sum(counts1 > counts0) >= 1:
            thresh0 = np.percentile(counts0[counts0 > counts1], cr_constants.MULTIPLET_PROB_THRESHOLD)
            thresh1 = np.percentile(counts1[counts1 > counts0], cr_constants.MULTIPLET_PROB_THRESHOLD)

        doublet = np.logical_and(counts0 >= thresh0, counts1 >= thresh1)
        dtype = np.dtype('|S%d' % max(len(cls) for cls in cr_constants.GEM_CLASSES))
        result = np.where(doublet, cr_constants.GEM_CLASS_MULTIPLET, cr_constants.GEM_CLASS_GENOME0).astype(dtype)
        result[np.logical_and(np.logical_not(result == cr_constants.GEM_CLASS_MULTIPLET), counts1 > counts0)] = cr_constants.GEM_CLASS_GENOME1

        return result
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def widen_cat_column(old_ds, new_type):
    name = old_ds.name
    tmp_name = "__tmp_" + old_ds.name
    grp = old_ds.parent

    ds = grp.create_dataset(tmp_name,
            data = old_ds[:],
            shape = old_ds.shape,
            maxshape = (None,),
            dtype = new_type,
            compression = COMPRESSION,
            shuffle = True,
            chunks = (CHUNK_SIZE,))

    del grp[name]
    grp.move(tmp_name, name)
    return ds
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def create_levels(ds, levels):
    # Create a dataset in the LEVEL_GROUP
    # and store as native numpy / h5py types
    level_grp = ds.file.get(LEVEL_GROUP)
    if level_grp is None:
        # Create a LEVEL_GROUP
        level_grp = ds.file.create_group(LEVEL_GROUP)
    ds_name = ds.name.split("/")[-1]
    dt = h5py.special_dtype(vlen=str)

    level_grp.create_dataset(ds_name,
                       shape = [len(levels)],
                       maxshape = (None,),
                       dtype = dt,
                       data  = levels,
                       compression = COMPRESSION,
                       chunks = (CHUNK_SIZE,))
项目:cellranger    作者:10XGenomics    | 项目源码 | 文件源码
def reg2bin_vector(begin, end):
    '''Vectorized tabix reg2bin -- much faster than reg2bin'''
    result = np.zeros(begin.shape)

    # Entries filled
    done = np.zeros(begin.shape, dtype=np.bool)

    for (bits, bins) in rev_bit_bins:
        begin_shift = begin >> bits
        new_done = (begin >> bits) == (end >> bits)
        mask = np.logical_and(new_done, np.logical_not(done))
        offset = ((1 << (29 - bits)) - 1) / 7
        result[mask] = offset + begin_shift[mask]

        done = new_done

    return result.astype(np.int32)
项目:gee-bridge    作者:francbartoli    | 项目源码 | 文件源码
def flip_code(code):
    if isinstance(code, (numpy.dtype,type)):
        # since several things map to complex64 we must carefully select
        # the opposite that is an exact match (ticket 1518)
        if code == numpy.int8:
            return gdalconst.GDT_Byte
        if code == numpy.complex64:
            return gdalconst.GDT_CFloat32

        for key, value in codes.items():
            if value == code:
                return key
        return None
    else:
        try:
            return codes[code]
        except KeyError:
            return None
项目:j3dview    作者:blank63    | 项目源码 | 文件源码
def gl_init(self,array_table):
        self.gl_hide = False

        self.gl_vertex_array = gl.VertexArray()
        glBindVertexArray(self.gl_vertex_array)

        self.gl_vertex_buffer = gl.Buffer()
        glBindBuffer(GL_ARRAY_BUFFER,self.gl_vertex_buffer)

        self.gl_element_count = 3*gl_count_triangles(self)
        self.gl_element_buffer = gl.Buffer()
        glBindBuffer(GL_ELEMENT_ARRAY_BUFFER,self.gl_element_buffer)

        vertex_type =  numpy.dtype([array_table[attribute].field() for attribute in self.attributes])
        vertex_count = sum(len(primitive.vertices) for primitive in self.primitives)
        vertex_array = numpy.empty(vertex_count,vertex_type)

        for attribute in self.attributes:
            array_table[attribute].load(self,vertex_array)

        vertex_array,element_map = numpy.unique(vertex_array,return_inverse=True)
        element_array = gl_create_element_array(self,element_map,self.gl_element_count)

        glBufferData(GL_ARRAY_BUFFER,vertex_array.nbytes,vertex_array,GL_STATIC_DRAW)
        glBufferData(GL_ELEMENT_ARRAY_BUFFER,element_array.nbytes,element_array,GL_STATIC_DRAW)
项目:pointnet    作者:charlesq34    | 项目源码 | 文件源码
def make2d(array, cols=None, dtype=None):
    '''
    Make a 2D array from an array of arrays.  The `cols' and `dtype'
    arguments can be omitted if the array is not empty.

    '''
    if (cols is None or dtype is None) and not len(array):
        raise RuntimeError("cols and dtype must be specified for empty "
                           "array")

    if cols is None:
        cols = len(array[0])

    if dtype is None:
        dtype = array[0].dtype

    return _np.fromiter(array, [('_', dtype, (cols,))],
                        count=len(array))['_']
项目:pointnet    作者:charlesq34    | 项目源码 | 文件源码
def _read(self, stream, text, byte_order):
        '''
        Read the actual data from a PLY file.

        '''
        if text:
            self._read_txt(stream)
        else:
            if self._have_list:
                # There are list properties, so a simple load is
                # impossible.
                self._read_bin(stream, byte_order)
            else:
                # There are no list properties, so loading the data is
                # much more straightforward.
                self._data = _np.fromfile(stream,
                                          self.dtype(byte_order),
                                          self.count)

        if len(self._data) < self.count:
            k = len(self._data)
            del self._data
            raise PlyParseError("early end-of-file", self, k)

        self._check_sanity()
项目:pointnet    作者:charlesq34    | 项目源码 | 文件源码
def _read_bin(self, stream, byte_order):
        '''
        Load a PLY element from a binary PLY file.  The element may
        contain list properties.

        '''
        self._data = _np.empty(self.count, dtype=self.dtype(byte_order))

        for k in _range(self.count):
            for prop in self.properties:
                try:
                    self._data[prop.name][k] = \
                        prop._read_bin(stream, byte_order)
                except StopIteration:
                    raise PlyParseError("early end-of-file",
                                        self, k, prop)
项目:sharedbuffers    作者:jampp    | 项目源码 | 文件源码
def _merge_all(parts, dtype):
    if len(parts) == 1:
        return parts[0]
    else:
        nparts = []
        for i in xrange(0, len(parts), 2):
            if i+1 < len(parts):
                npart = numpy.empty((len(parts[i])+len(parts[i+1]), 2), dtype)
                merge_elements = index_merge(parts[i], parts[i+1], npart)
                if merge_elements != len(npart):
                    npart = npart[:merge_elements]
                nparts.append(npart)
            else:
                nparts.append(parts[i])
        del parts
        return _merge_all(nparts, dtype)
项目:sharedbuffers    作者:jampp    | 项目源码 | 文件源码
def __init__(self, buf, offset = 0):
        # Accelerate class attributes
        self._encode = self.encode
        self._dtype = self.dtype
        self._xxh = self.xxh

        # Initialize buffer
        if offset:
            self._buf = self._likebuf = buffer(buf, offset)
        else:
            self._buf = buf
            self._likebuf = _likebuffer(buf)

        # Parse header and map index
        self.index_elements, self.index_offset = self._Header.unpack_from(self._buf, 0)

        self.index = numpy.ndarray(buffer = self._buf, 
            offset = self.index_offset, 
            dtype = self.dtype, 
            shape = (self.index_elements, 3))
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def test_rescaleData():
    dtypes = map(np.dtype, ('ubyte', 'uint16', 'byte', 'int16', 'int', 'float'))
    for dtype1 in dtypes:
        for dtype2 in dtypes:
            data = (np.random.random(size=10) * 2**32 - 2**31).astype(dtype1)
            for scale, offset in [(10, 0), (10., 0.), (1, -50), (0.2, 0.5), (0.001, 0)]:
                if dtype2.kind in 'iu':
                    lim = np.iinfo(dtype2)
                    lim = lim.min, lim.max
                else:
                    lim = (-np.inf, np.inf)
                s1 = np.clip(float(scale) * (data-float(offset)), *lim).astype(dtype2)
                s2 = pg.rescaleData(data, scale, offset, dtype2)
                assert s1.dtype == s2.dtype
                if dtype2.kind in 'iu':
                    assert np.all(s1 == s2)
                else:
                    assert np.allclose(s1, s2)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def solve3DTransform(points1, points2):
    """
    Find a 3D transformation matrix that maps points1 onto points2.
    Points must be specified as either lists of 4 Vectors or 
    (4, 3) arrays.
    """
    import numpy.linalg
    pts = []
    for inp in (points1, points2):
        if isinstance(inp, np.ndarray):
            A = np.empty((4,4), dtype=float)
            A[:,:3] = inp[:,:3]
            A[:,3] = 1.0
        else:
            A = np.array([[inp[i].x(), inp[i].y(), inp[i].z(), 1] for i in range(4)])
        pts.append(A)

    ## solve 3 sets of linear equations to determine transformation matrix elements
    matrix = np.zeros((4,4))
    for i in range(3):
        ## solve Ax = B; x is one row of the desired transformation matrix
        matrix[i] = numpy.linalg.solve(pts[0], pts[1][:,i])  

    return matrix
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, index, channel_names=None, channel_ids=None,
                 name=None, description=None, file_origin=None,
                 coordinates=None, **annotations):
        '''
        Initialize a new :class:`ChannelIndex` instance.
        '''
        # Inherited initialization
        # Sets universally recommended attributes, and places all others
        # in annotations
        super(ChannelIndex, self).__init__(name=name,
                                           description=description,
                                           file_origin=file_origin,
                                           **annotations)

        # Defaults
        if channel_names is None:
            channel_names = np.array([], dtype='S')
        if channel_ids is None:
            channel_ids = np.array([], dtype='i')

        # Store recommended attributes
        self.channel_names = np.array(channel_names)
        self.channel_ids = np.array(channel_ids)
        self.index = np.array(index)
        self.coordinates = coordinates
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def load_bytes(self, data_blocks, dtype='<i1', start=None, end=None, expected_size=None):
        """
        Return list of bytes contained
        in the specified set of blocks.

        NB : load all data as files cannot exceed 4Gb 
             find later other solutions to spare memory.
        """
        chunks = list()
        raw = ''
        # keep only data blocks having
        # a size greater than zero
        blocks = [k for k in data_blocks if k.size > 0]
        for data_block in blocks :
            self.file.seek(data_block.start)
            raw = self.file.read(data_block.size)[0:expected_size]
            databytes = np.frombuffer(raw, dtype=dtype)
            chunks.append(databytes)
        # concatenate all chunks and return
        # the specified slice
        if len(chunks)>0 :
            databytes = np.concatenate(chunks)
            return databytes[start:end]
        else :
            return np.array([])
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def load_channel_data(self, ep, ch):
        """
        Return a numpy array containing the
        list of bytes corresponding to the
        specified episode and channel.
        """
        #memorise the sample size and symbol
        sample_size = self.sample_size(ep, ch)
        sample_symbol = self.sample_symbol(ep, ch)

        #create a bit mask to define which
        #sample to keep from the file
        bit_mask = self.create_bit_mask(ep, ch)

        #load all bytes contained in an episode
        data_blocks = self.get_data_blocks(ep)
        databytes = self.load_bytes(data_blocks)
        raw = self.filter_bytes(databytes, bit_mask)

        #reshape bytes from the sample size
        dt = np.dtype(numpy_map[sample_symbol])
        dt.newbyteorder('<')
        return np.frombuffer(raw.reshape([len(raw) / sample_size, sample_size]), dt)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def get_signal_data(self, ep, ch):
        """
        Return a numpy array containing all samples of a
        signal, acquired on an Elphy analog channel, formatted
        as a list of (time, value) tuples.
        """
        #get data from the file
        y_data = self.load_encoded_data(ep, ch)
        x_data = np.arange(0, len(y_data))

        #create a recarray
        data = np.recarray(len(y_data), dtype=[('x', b_float), ('y', b_float)])

        #put in the recarray the scaled data
        x_factors = self.x_scale_factors(ep, ch)
        y_factors = self.y_scale_factors(ep, ch)
        data['x'] = x_factors.scale(x_data)
        data['y'] = y_factors.scale(y_data)

        return data
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def get_tag_data(self, ep, tag_ch):
        """
        Return a numpy array containing all samples of a
        signal, acquired on an Elphy tag channel, formatted
        as a list of (time, value) tuples.
        """
        #get data from the file
        y_data = self.load_encoded_tags(ep, tag_ch)
        x_data = np.arange(0, len(y_data))

        #create a recarray
        data = np.recarray(len(y_data), dtype=[('x', b_float), ('y', b_int)])

        #put in the recarray the scaled data
        factors = self.x_tag_scale_factors(ep)
        data['x'] = factors.scale(x_data)
        data['y'] = y_data
        return data
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def get_event(self, ep, ch, marked_ks):
        """
        Return a :class:`ElphyEvent` which is a
        descriptor of the specified event channel.
        """
        assert ep in range(1, self.n_episodes + 1)
        assert ch in range(1, self.n_channels + 1)

        # find the event channel number
        evt_channel = np.where(marked_ks == -1)[0][0]
        assert evt_channel in range(1, self.n_events(ep) + 1)

        block = self.episode_block(ep)
        ep_blocks = self.get_blocks_stored_in_episode(ep)
        evt_blocks = [k for k in ep_blocks if k.identifier == 'REVT']
        n_events = np.sum([k.n_events[evt_channel - 1] for k in evt_blocks], dtype=int)
        x_unit = block.ep_block.x_unit

        return ElphyEvent(self, ep, evt_channel, x_unit, n_events, ch_number=ch)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def load_encoded_events(self, episode, evt_channel, identifier):
        """
        Return times stored as a 4-bytes integer
        in the specified event channel.
        """
        data_blocks = self.group_blocks_of_type(episode, identifier)
        ep_blocks = self.get_blocks_stored_in_episode(episode)
        evt_blocks = [k for k in ep_blocks if k.identifier == identifier]

        #compute events on each channel
        n_events = np.sum([k.n_events for k in evt_blocks], dtype=int, axis=0)
        pre_events = np.sum(n_events[0:evt_channel - 1], dtype=int)
        start = pre_events
        end = start + n_events[evt_channel - 1]
        expected_size = 4 * np.sum(n_events, dtype=int)
        return self.load_bytes(data_blocks, dtype='<i4', start=start, end=end, expected_size=expected_size)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def load_encoded_spikes(self, episode, evt_channel, identifier):
        """
        Return times stored as a 4-bytes integer
        in the specified spike channel.
        NB: it is meant for Blackrock-type, having an additional byte for each event time as spike sorting label.
            These additiona bytes are appended trailing the times.
        """
        # to load the requested spikes for the specified episode and event channel:
        # get all the elphy blocks having as identifier 'RSPK' (or whatever)
        all_rspk_blocks = [k for k in self.blocks if k.identifier == identifier]
        rspk_block = all_rspk_blocks[episode-1]
        # RDATA(h?dI) REVT(NbVeV:I, NbEv:256I ... spike data are 4byte integers
        rspk_header = 4*( rspk_block.size - rspk_block.data_size-2 + len(rspk_block.n_events))
        pre_events = np.sum(rspk_block.n_events[0:evt_channel-1], dtype=int, axis=0)
        # the real start is after header, preceeding events (which are 4byte) and preceeding labels (1byte)
        start = rspk_header + (4*pre_events) + pre_events
        end = start + 4*rspk_block.n_events[evt_channel-1]
        raw = self.load_bytes( [rspk_block], dtype='<i1', start=start, end=end, expected_size=rspk_block.size )
        # re-encoding after reading byte by byte
        res = np.frombuffer(raw[0:(4*rspk_block.n_events[evt_channel-1])], dtype='<i4')
        res.sort() # sometimes timings are not sorted
        #print "load_encoded_data() - spikes:",res
        return res
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def get_waveform_data(self, episode, electrode_id):
        """
        Return waveforms corresponding to the specified
        spike channel. This function is triggered when the
        ``waveforms`` property of an :class:`Spike` descriptor
        instance is accessed.
        """
        block = self.episode_block(episode)
        times, databytes = self.load_encoded_waveforms(episode, electrode_id)
        n_events, = databytes.shape
        wf_samples = databytes['waveform'].shape[1]
        dtype = [
            ('time', float),
            ('electrode_id', int),
            ('unit_id', int),
            ('waveform', float, (wf_samples, 2))
        ]
        data = np.empty(n_events, dtype=dtype)
        data['electrode_id'] = databytes['channel_id'][:, 0]
        data['unit_id'] = databytes['unit_id'][:, 0]
        data['time'] = databytes['elphy_time'][:, 0] * block.ep_block.dX
        data['waveform'][:, :, 0] = times * block.ep_block.dX
        data['waveform'][:, :, 1] = databytes['waveform'] * block.ep_block.dY_wf + block.ep_block.Y0_wf
        return data
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def get_rspk_data(self, spk_channel):
        """
        Return times stored as a 4-bytes integer
        in the specified event channel.
        """
        evt_blocks = self.get_blocks_of_type('RSPK')
        #compute events on each channel
        n_events = np.sum([k.n_events for k in evt_blocks], dtype=int, axis=0)
        pre_events = np.sum(n_events[0:spk_channel], dtype=int) # sum of array values up to spk_channel-1!!!!
        start = pre_events + (7 + len(n_events))# rspk header
        end = start + n_events[spk_channel]
        expected_size = 4 * np.sum(n_events, dtype=int) # constant
        return self.load_bytes(evt_blocks, dtype='<i4', start=start, end=end, expected_size=expected_size)


# ---------------------------------------------------------
# factories.py
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __mmap_ncs_packet_headers(self, filename):
        """
        Memory map of the Neuralynx .ncs file optimized for extraction of
        data packet headers
        Reading standard dtype improves speed, but timestamps need to be
        reconstructed
        """
        filesize = getsize(self.sessiondir + sep + filename)  # in byte
        if filesize > 16384:
            data = np.memmap(self.sessiondir + sep + filename,
                             dtype='<u4',
                             shape=((filesize - 16384) / 4 / 261, 261),
                             mode='r', offset=16384)

            ts = data[:, 0:2]
            multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),
                              axis=0)
            timestamps = np.sum(ts * multi, axis=1)
            # timestamps = data[:,0] + (data[:,1] *2**32)
            header_u4 = data[:, 2:5]

            return timestamps, header_u4
        else:
            return None
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __mmap_ncs_packet_timestamps(self, filename):
        """
        Memory map of the Neuralynx .ncs file optimized for extraction of
        data packet headers
        Reading standard dtype improves speed, but timestamps need to be
        reconstructed
        """
        filesize = getsize(self.sessiondir + sep + filename)  # in byte
        if filesize > 16384:
            data = np.memmap(self.sessiondir + sep + filename,
                             dtype='<u4',
                             shape=(int((filesize - 16384) / 4 / 261), 261),
                             mode='r', offset=16384)

            ts = data[:, 0:2]
            multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),
                              axis=0)
            timestamps = np.sum(ts * multi, axis=1)
            # timestamps = data[:,0] + data[:,1]*2**32

            return timestamps
        else:
            return None
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __mmap_nev_file(self, filename):
        """ Memory map the Neuralynx .nev file """
        nev_dtype = np.dtype([
            ('reserved', '<i2'),
            ('system_id', '<i2'),
            ('data_size', '<i2'),
            ('timestamp', '<u8'),
            ('event_id', '<i2'),
            ('ttl_input', '<i2'),
            ('crc_check', '<i2'),
            ('dummy1', '<i2'),
            ('dummy2', '<i2'),
            ('extra', '<i4', (8,)),
            ('event_string', 'a128'),
        ])

        if getsize(self.sessiondir + sep + filename) > 16384:
            return np.memmap(self.sessiondir + sep + filename,
                             dtype=nev_dtype, mode='r', offset=16384)
        else:
            return None
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __extract_nev_file_spec(self):
        """
        Extract file specification from an .nsx file
        """
        filename = '.'.join([self._filenames['nsx'], 'nev'])
        # Header structure of files specification 2.2 and higher. For files 2.1
        # and lower, the entries ver_major and ver_minor are not supported.
        dt0 = [
            ('file_id', 'S8'),
            ('ver_major', 'uint8'),
            ('ver_minor', 'uint8')]

        nev_file_id = np.fromfile(filename, count=1, dtype=dt0)[0]
        if nev_file_id['file_id'].decode() == 'NEURALEV':
            spec = '{0}.{1}'.format(
                nev_file_id['ver_major'], nev_file_id['ver_minor'])
        else:
            raise IOError('NEV file type {0} is not supported'.format(
                nev_file_id['file_id']))

        return spec
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __read_nsx_data_variant_a(self, nsx_nb):
        """
        Extract nsx data from a 2.1 .nsx file
        """
        filename = '.'.join([self._filenames['nsx'], 'ns%i' % nsx_nb])

        # get shape of data
        shape = (
            self.__nsx_databl_param['2.1']('nb_data_points', nsx_nb),
            self.__nsx_basic_header[nsx_nb]['channel_count'])
        offset = self.__nsx_params['2.1']('bytes_in_headers', nsx_nb)

        # read nsx data
        # store as dict for compatibility with higher file specs
        data = {1: np.memmap(
            filename, dtype='int16', shape=shape, offset=offset)}

        return data
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __read_nev_data(self, nev_data_masks, nev_data_types):
        """
        Extract nev data from a 2.1 or 2.2 .nev file
        """
        filename = '.'.join([self._filenames['nev'], 'nev'])
        data_size = self.__nev_basic_header['bytes_in_data_packets']
        header_size = self.__nev_basic_header['bytes_in_headers']

        # read all raw data packets and markers
        dt0 = [
            ('timestamp', 'uint32'),
            ('packet_id', 'uint16'),
            ('value', 'S{0}'.format(data_size - 6))]

        raw_data = np.memmap(filename, offset=header_size, dtype=dt0)

        masks = self.__nev_data_masks(raw_data['packet_id'])
        types = self.__nev_data_types(data_size)

        data = {}
        for k, v in nev_data_masks.items():
            data[k] = raw_data.view(types[k][nev_data_types[k]])[masks[k][v]]

        return data
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __get_nev_rec_times(self):
        """
        Extracts minimum and maximum time points from a nev file.
        """
        filename = '.'.join([self._filenames['nev'], 'nev'])

        dt = [('timestamp', 'uint32')]
        offset = \
            self.__get_file_size(filename) - \
            self.__nev_params('bytes_in_data_packets')
        last_data_packet = np.memmap(filename, offset=offset, dtype=dt)[0]

        n_starts = [0 * self.__nev_params('event_unit')]
        n_stops = [
            last_data_packet['timestamp'] * self.__nev_params('event_unit')]

        return n_starts, n_stops
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __get_waveforms_dtype(self):
        """
        Extracts the actual waveform dtype set for each channel.
        """
        # Blackrock code giving the approiate dtype
        conv = {0: 'int8', 1: 'int8', 2: 'int16', 4: 'int32'}

        # get all electrode ids from nev ext header
        all_el_ids = self.__nev_ext_header[b'NEUEVWAV']['electrode_id']

        # get the dtype of waveform (this is stupidly complicated)
        if self.__is_set(
                np.array(self.__nev_basic_header['additionnal_flags']), 0):
            dtype_waveforms = dict((k, 'int16') for k in all_el_ids)
        else:
            # extract bytes per waveform
            waveform_bytes = \
                self.__nev_ext_header[b'NEUEVWAV']['bytes_per_waveform']
            # extract dtype for waveforms fro each electrode
            dtype_waveforms = dict(zip(all_el_ids, conv[waveform_bytes]))

        return dtype_waveforms
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __read_comment(self,n_start,n_stop,data,lazy=False):
        event_unit = self.__nev_params('event_unit')
        if lazy:
            times = []
            labels = np.array([],dtype='s')
        else:
            times = data['timestamp']*event_unit
            labels = data['comment'].astype(str)
        # mask for given time interval
        mask = (times >= n_start) & (times < n_stop)

        if np.sum(mask)>0:
            ev = Event(
                times = times[mask].astype(float),
                labels = labels[mask],
                name = 'comment')
            if lazy:
                ev.lazy_shape = np.sum(mask)
        else:
            ev = None
        return ev
    # --------------end------added by zhangbo 20170926--------
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def reformat_integer_v1(data, nbchannel, header):
    """
    reformat when dtype is int16 for ABF version 1
    """
    chans = [chan_num for chan_num in
             header['nADCSamplingSeq'] if chan_num >= 0]
    for n, i in enumerate(chans[:nbchannel]):  # respect SamplingSeq
        data[:, n] /= header['fInstrumentScaleFactor'][i]
        data[:, n] /= header['fSignalGain'][i]
        data[:, n] /= header['fADCProgrammableGain'][i]
        if header['nTelegraphEnable'][i]:
            data[:, n] /= header['fTelegraphAdditGain'][i]
        data[:, n] *= header['fADCRange']
        data[:, n] /= header['lADCResolution']
        data[:, n] += header['fInstrumentOffset'][i]
        data[:, n] -= header['fSignalOffset'][i]
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def solve3DTransform(points1, points2):
    """
    Find a 3D transformation matrix that maps points1 onto points2.
    Points must be specified as either lists of 4 Vectors or 
    (4, 3) arrays.
    """
    import numpy.linalg
    pts = []
    for inp in (points1, points2):
        if isinstance(inp, np.ndarray):
            A = np.empty((4,4), dtype=float)
            A[:,:3] = inp[:,:3]
            A[:,3] = 1.0
        else:
            A = np.array([[inp[i].x(), inp[i].y(), inp[i].z(), 1] for i in range(4)])
        pts.append(A)

    ## solve 3 sets of linear equations to determine transformation matrix elements
    matrix = np.zeros((4,4))
    for i in range(3):
        ## solve Ax = B; x is one row of the desired transformation matrix
        matrix[i] = numpy.linalg.solve(pts[0], pts[1][:,i])  

    return matrix
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, index, channel_names=None, channel_ids=None,
                 name=None, description=None, file_origin=None,
                 coordinates=None, **annotations):
        '''
        Initialize a new :class:`ChannelIndex` instance.
        '''
        # Inherited initialization
        # Sets universally recommended attributes, and places all others
        # in annotations
        super(ChannelIndex, self).__init__(name=name,
                                           description=description,
                                           file_origin=file_origin,
                                           **annotations)

        # Defaults
        if channel_names is None:
            channel_names = np.array([], dtype='S')
        if channel_ids is None:
            channel_ids = np.array([], dtype='i')

        # Store recommended attributes
        self.channel_names = np.array(channel_names)
        self.channel_ids = np.array(channel_ids)
        self.index = np.array(index)
        self.coordinates = coordinates
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def load_bytes(self, data_blocks, dtype='<i1', start=None, end=None, expected_size=None):
        """
        Return list of bytes contained
        in the specified set of blocks.

        NB : load all data as files cannot exceed 4Gb 
             find later other solutions to spare memory.
        """
        chunks = list()
        raw = ''
        # keep only data blocks having
        # a size greater than zero
        blocks = [k for k in data_blocks if k.size > 0]
        for data_block in blocks :
            self.file.seek(data_block.start)
            raw = self.file.read(data_block.size)[0:expected_size]
            databytes = np.frombuffer(raw, dtype=dtype)
            chunks.append(databytes)
        # concatenate all chunks and return
        # the specified slice
        if len(chunks)>0 :
            databytes = np.concatenate(chunks)
            return databytes[start:end]
        else :
            return np.array([])
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def load_channel_data(self, ep, ch):
        """
        Return a numpy array containing the
        list of bytes corresponding to the
        specified episode and channel.
        """
        #memorise the sample size and symbol
        sample_size = self.sample_size(ep, ch)
        sample_symbol = self.sample_symbol(ep, ch)

        #create a bit mask to define which
        #sample to keep from the file
        bit_mask = self.create_bit_mask(ep, ch)

        #load all bytes contained in an episode
        data_blocks = self.get_data_blocks(ep)
        databytes = self.load_bytes(data_blocks)
        raw = self.filter_bytes(databytes, bit_mask)

        #reshape bytes from the sample size
        dt = np.dtype(numpy_map[sample_symbol])
        dt.newbyteorder('<')
        return np.frombuffer(raw.reshape([len(raw) / sample_size, sample_size]), dt)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def get_signal_data(self, ep, ch):
        """
        Return a numpy array containing all samples of a
        signal, acquired on an Elphy analog channel, formatted
        as a list of (time, value) tuples.
        """
        #get data from the file
        y_data = self.load_encoded_data(ep, ch)
        x_data = np.arange(0, len(y_data))

        #create a recarray
        data = np.recarray(len(y_data), dtype=[('x', b_float), ('y', b_float)])

        #put in the recarray the scaled data
        x_factors = self.x_scale_factors(ep, ch)
        y_factors = self.y_scale_factors(ep, ch)
        data['x'] = x_factors.scale(x_data)
        data['y'] = y_factors.scale(y_data)

        return data
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def get_tag_data(self, ep, tag_ch):
        """
        Return a numpy array containing all samples of a
        signal, acquired on an Elphy tag channel, formatted
        as a list of (time, value) tuples.
        """
        #get data from the file
        y_data = self.load_encoded_tags(ep, tag_ch)
        x_data = np.arange(0, len(y_data))

        #create a recarray
        data = np.recarray(len(y_data), dtype=[('x', b_float), ('y', b_int)])

        #put in the recarray the scaled data
        factors = self.x_tag_scale_factors(ep)
        data['x'] = factors.scale(x_data)
        data['y'] = y_data
        return data
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def load_encoded_events(self, episode, evt_channel, identifier):
        """
        Return times stored as a 4-bytes integer
        in the specified event channel.
        """
        data_blocks = self.group_blocks_of_type(episode, identifier)
        ep_blocks = self.get_blocks_stored_in_episode(episode)
        evt_blocks = [k for k in ep_blocks if k.identifier == identifier]

        #compute events on each channel
        n_events = np.sum([k.n_events for k in evt_blocks], dtype=int, axis=0)
        pre_events = np.sum(n_events[0:evt_channel - 1], dtype=int)
        start = pre_events
        end = start + n_events[evt_channel - 1]
        expected_size = 4 * np.sum(n_events, dtype=int)
        return self.load_bytes(data_blocks, dtype='<i4', start=start, end=end, expected_size=expected_size)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def load_encoded_spikes(self, episode, evt_channel, identifier):
        """
        Return times stored as a 4-bytes integer
        in the specified spike channel.
        NB: it is meant for Blackrock-type, having an additional byte for each event time as spike sorting label.
            These additiona bytes are appended trailing the times.
        """
        # to load the requested spikes for the specified episode and event channel:
        # get all the elphy blocks having as identifier 'RSPK' (or whatever)
        all_rspk_blocks = [k for k in self.blocks if k.identifier == identifier]
        rspk_block = all_rspk_blocks[episode-1]
        # RDATA(h?dI) REVT(NbVeV:I, NbEv:256I ... spike data are 4byte integers
        rspk_header = 4*( rspk_block.size - rspk_block.data_size-2 + len(rspk_block.n_events))
        pre_events = np.sum(rspk_block.n_events[0:evt_channel-1], dtype=int, axis=0)
        # the real start is after header, preceeding events (which are 4byte) and preceeding labels (1byte)
        start = rspk_header + (4*pre_events) + pre_events
        end = start + 4*rspk_block.n_events[evt_channel-1]
        raw = self.load_bytes( [rspk_block], dtype='<i1', start=start, end=end, expected_size=rspk_block.size )
        # re-encoding after reading byte by byte
        res = np.frombuffer(raw[0:(4*rspk_block.n_events[evt_channel-1])], dtype='<i4')
        res.sort() # sometimes timings are not sorted
        #print "load_encoded_data() - spikes:",res
        return res
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def get_spiketrain(self, episode, electrode_id):
        """
        Return a :class:`Spike` which is a
        descriptor of the specified spike channel.
        """
        assert episode in range(1, self.n_episodes + 1)
        assert electrode_id in range(1, self.n_spiketrains(episode) + 1)
        # get some properties stored in the episode sub-block
        block = self.episode_block(episode)
        x_unit = block.ep_block.x_unit
        x_unit_wf = getattr(block.ep_block, 'x_unit_wf', None)
        y_unit_wf = getattr(block.ep_block, 'y_unit_wf', None)
        # number of spikes in the entire episode
        spk_blocks = [k for k in self.blocks if k.identifier == 'RSPK']
        n_events = np.sum([k.n_events[electrode_id - 1] for k in spk_blocks], dtype=int)
        # number of samples in a waveform
        wf_sampling_frequency = 1.0 / block.ep_block.dX
        wf_blocks = [k for k in self.blocks if k.identifier == 'RspkWave']
        if wf_blocks :
            wf_samples = wf_blocks[0].wavelength
            t_start = wf_blocks[0].pre_trigger * block.ep_block.dX
        else:
            wf_samples = 0
            t_start = 0
        return ElphySpikeTrain(self, episode, electrode_id, x_unit, n_events, wf_sampling_frequency, wf_samples, x_unit_wf, y_unit_wf, t_start)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def get_rspk_data(self, spk_channel):
        """
        Return times stored as a 4-bytes integer
        in the specified event channel.
        """
        evt_blocks = self.get_blocks_of_type('RSPK')
        #compute events on each channel
        n_events = np.sum([k.n_events for k in evt_blocks], dtype=int, axis=0)
        pre_events = np.sum(n_events[0:spk_channel], dtype=int) # sum of array values up to spk_channel-1!!!!
        start = pre_events + (7 + len(n_events))# rspk header
        end = start + n_events[spk_channel]
        expected_size = 4 * np.sum(n_events, dtype=int) # constant
        return self.load_bytes(evt_blocks, dtype='<i4', start=start, end=end, expected_size=expected_size)


# ---------------------------------------------------------
# factories.py
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __mmap_ncs_packet_headers(self, filename):
        """
        Memory map of the Neuralynx .ncs file optimized for extraction of
        data packet headers
        Reading standard dtype improves speed, but timestamps need to be
        reconstructed
        """
        filesize = getsize(self.sessiondir + sep + filename)  # in byte
        if filesize > 16384:
            data = np.memmap(self.sessiondir + sep + filename,
                             dtype='<u4',
                             shape=((filesize - 16384) / 4 / 261, 261),
                             mode='r', offset=16384)

            ts = data[:, 0:2]
            multi = np.repeat(np.array([1, 2 ** 32], ndmin=2), len(data),
                              axis=0)
            timestamps = np.sum(ts * multi, axis=1)
            # timestamps = data[:,0] + (data[:,1] *2**32)
            header_u4 = data[:, 2:5]

            return timestamps, header_u4
        else:
            return None
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __mmap_nev_file(self, filename):
        """ Memory map the Neuralynx .nev file """
        nev_dtype = np.dtype([
            ('reserved', '<i2'),
            ('system_id', '<i2'),
            ('data_size', '<i2'),
            ('timestamp', '<u8'),
            ('event_id', '<i2'),
            ('ttl_input', '<i2'),
            ('crc_check', '<i2'),
            ('dummy1', '<i2'),
            ('dummy2', '<i2'),
            ('extra', '<i4', (8,)),
            ('event_string', 'a128'),
        ])

        if getsize(self.sessiondir + sep + filename) > 16384:
            return np.memmap(self.sessiondir + sep + filename,
                             dtype=nev_dtype, mode='r', offset=16384)
        else:
            return None