Python mmap 模块,ACCESS_READ 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用mmap.ACCESS_READ

项目:ipfirst    作者:cutd    | 项目源码 | 文件源码
def __init__(self,file_name):       
        try:
            path = os.path.abspath(file_name)
            self._handle= open(path, "rb")
            if mmap is not None:
                self.data = mmap.mmap(self._handle.fileno(), 0, access=mmap.ACCESS_READ)
            else:
                self.data = self._handle.read()

            self.dict={}       
            self.start =self.int_from_4byte(0)
            index_last_offset = self.int_from_4byte(4)
            prefix_start_offset = self.int_from_4byte(8) 
            prefix_end_offset = self.int_from_4byte(12)           
            i=prefix_start_offset
            while i <= prefix_end_offset:                 
                prefix =self.int_from_1byte(i)               
                map_dict={ 'prefix':prefix , 'start_index':self.int_from_4byte(i+1), 'end_index':self.int_from_4byte(i+5) }
                self.dict[prefix]=map_dict
                i+=9

        except Exception as ex:
            print "cannot open file %s" % file
            print ex.message
            exit(0)
项目:Windows-Prefetch-Carver    作者:PoorBillionaire    | 项目源码 | 文件源码
def main():
    p = ArgumentParser()
    p.add_argument('-f', '--file', help='Carve Prefetch files from the given file', required=True)
    p.add_argument('-o', '--outfile', help='Write results to the given file', required=True)
    p.add_argument('-c', '--csv', help='Output results in csv format', action='store_true')
    p.add_argument('-m', '--mactime', help='Output results in mactime format', action='store_true')
    p.add_argument('-t', '--tln', help='Output results in tln format', action='store_true')
    p.add_argument('-s', '--system', help='System name (use with -t)')

    args = p.parse_args()

    with open(args.file, 'rb') as i:
        with contextlib.closing(mmap.mmap(i.fileno(), 0 , access=mmap.ACCESS_READ)) as m:
            with open(args.outfile, 'wb') as o:
                if args.tln:
                    prefetchCarve(m, o, "tln", system_name=args.system)
                elif args.csv:
                    o.write(u'last_run_time,prefetch_file_name,run_count\n')
                    prefetchCarve(m, o, output_type="csv")
                elif args.mactime:
                    prefetchCarve(m, o, output_type="mactime")
                else:
                    prefetchCarve(m, o)
项目:build    作者:fuchsia-mirror    | 项目源码 | 文件源码
def mmapper(filename):
    """A context manager that yields (fd, file_contents) given a file name.
This ensures that the mmap and file objects are closed at the end of the
'with' statement."""
    fileobj = open(filename, 'rb')
    fd = fileobj.fileno()
    if os.fstat(fd).st_size == 0:
        # mmap can't handle empty files.
        try:
            yield fd, ''
        finally:
            fileobj.close()
    else:
        mmapobj = mmap.mmap(fd, 0, access=mmap.ACCESS_READ)
        try:
            yield fd, mmapobj
        finally:
            mmapobj.close()
            fileobj.close()


# elf_info objects are only created by `get_elf_info` or the `copy` or
# `rename` methods.
项目:cas_python_sdk    作者:tencentyun    | 项目源码 | 文件源码
def compute_etag_from_file_obj(file_obj, offset=0, size=None, chunk_size=1024 * 1024):
    etag = hashlib.sha256()

    size = size or os.fstat(file_obj.fileno()).st_size - offset

    if size != 0 and offset % mmap.ALLOCATIONGRANULARITY == 0:
        target = mmap.mmap(file_obj.fileno(), length=size,
                           offset=offset,
                           access=mmap.ACCESS_READ)
    else:
        target = file_obj
        target.seek(offset)

    while size > 0:
        data = target.read(chunk_size)
        etag.update(data[:min(len(data), size)])
        size -= len(data)

    if target is file_obj:
        file_obj.seek(offset)
    else:
        target.close()
    s = etag.hexdigest()
    return s
项目:cas_python_sdk    作者:tencentyun    | 项目源码 | 文件源码
def compute_tree_etag_from_file_obj(file_obj, offset=0, size=None,
                                    chunk_size=1024 * 1024):
    generator = TreeHashGenerator()

    size = size or os.fstat(file_obj.fileno()).st_size - offset
    if size != 0 and offset % mmap.ALLOCATIONGRANULARITY == 0:
        target = mmap.mmap(file_obj.fileno(), length=size,
                           offset=offset,
                           access=mmap.ACCESS_READ)
    else:
        target = file_obj
        target.seek(offset)

    while size > 0:
        data = target.read(chunk_size)
        generator.update(data[:min(len(data), size)])
        size -= len(data)

    if target is file_obj:
        file_obj.seek(offset)
    else:
        target.close()
    return generator.generate().digest()
项目:cas_python_sdk    作者:tencentyun    | 项目源码 | 文件源码
def compute_hash_from_file_obj(file_obj, offset=0, size=None, chunk_size=1024 * 1024):
    etag = hashlib.sha256()
    generator = TreeHashGenerator()

    size = size or os.fstat(file_obj.fileno()).st_size - offset

    if size != 0 and offset % mmap.ALLOCATIONGRANULARITY == 0:
        target = mmap.mmap(file_obj.fileno(), length=size,
                           offset=offset,
                           access=mmap.ACCESS_READ)
    else:
        target = file_obj
        target.seek(offset)

    while size > 0:
        data = target.read(chunk_size)
        generator.update(data[:min(len(data), size)])
        etag.update(data[:min(len(data), size)])
        size -= len(data)

    if target is file_obj:
        file_obj.seek(offset)
    else:
        target.close()
    return etag.hexdigest(), generator.generate().digest()
项目:transvar    作者:zwdzwd    | 项目源码 | 文件源码
def __init__(self, fasta_file):
        self.faidx = {}

        self.fasta_file=fasta_file

        try:
            self.fasta_fd = open(fasta_file)
            self.fasta_handle = mmap.mmap(self.fasta_fd.fileno(), 0, access=mmap.ACCESS_READ)
        except IOError:
            print("Reference sequence doesn't exist")

        try:
            self.faidx_handle=open(fasta_file+".fai")
        except IOError:
            print("samtools faidx file doesn't exist for reference")
        self.load_faidx()

    # Function to cache fasta index in dictionary
    # faidx format contains the following columns:
    ##.the name of the sequence
    ##.the length of the sequence
    ##.the offset of the first base in the file
    ##.the number of bases in each fasta line
    ##.the number of bytes in each fasta line
项目:SmartVHDL    作者:TheClams    | 项目源码 | 文件源码
def get_list_file(self, projname, callback=None):
        global list_module_files
        global lmf_update_ongoing
        lmf_update_ongoing = True
        lmf = []
        for folder in sublime.active_window().folders():
            for root, dirs, files in os.walk(folder):
                for fn in files:
                    if fn.lower().endswith(('.vhd','.vho','.vhdl')):
                        ffn = os.path.join(root,fn)
                        f = open(ffn)
                        if os.stat(ffn).st_size:
                            s = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
                            if s.find(b'entity') != -1:
                                lmf.append(ffn)
                            elif s.find(b'component') != -1:
                                lmf.append(ffn)
        sublime.status_message('List of module files updated')
        list_module_files[projname] = lmf[:]
        lmf_update_ongoing = False
        if callback:
            callback()
项目:janus    作者:nks5295    | 项目源码 | 文件源码
def create(self, access='write'):
        """ Create a new block of shared memory using the mmap module. """

        if access == 'write':
            mmap_access = mmap.ACCESS_WRITE
        else:
            mmap_access = mmap.ACCESS_READ

        name = "conque_%s_%s" % (self.mem_type, self.mem_key)

        self.shm = mmap.mmap(0, self.mem_size * self.char_width, name, mmap_access)

        if not self.shm:
            return False
        else:
            return True
项目:packages    作者:fuchsia-mirror    | 项目源码 | 文件源码
def mmapper(filename):
    """A context manager that yields (fd, file_contents) given a file name.
This ensures that the mmap and file objects are closed at the end of the
'with' statement."""
    fileobj = open(filename, 'rb')
    fd = fileobj.fileno()
    if os.fstat(fd).st_size == 0:
        # mmap can't handle empty files.
        try:
            yield fd, ''
        finally:
            fileobj.close()
    else:
        mmapobj = mmap.mmap(fd, 0, access=mmap.ACCESS_READ)
        try:
            yield fd, mmapobj
        finally:
            mmapobj.close()
            fileobj.close()


# elf_info objects are only created by `get_elf_info` or the `copy` or
# `rename` methods.
项目:python-sense-emu    作者:RPi-Distro    | 项目源码 | 文件源码
def __init__(self, settings):
        self.settings = settings
        self._fd = init_imu()
        self._map = mmap.mmap(self._fd.fileno(), 0, access=mmap.ACCESS_READ)
        self._last_data = None
        self._imu_data = {
            'accel':            (0.0, 0.0, 0.0),
            'accelValid':       False,
            'compass':          (0.0, 0.0, 0.0),
            'compassValid':     False,
            'fusionPose':       (0.0, 0.0, 0.0),
            'fusionPoseValid':  False,
            'fusionQPose':      (0.0, 0.0, 0.0, 0.0),
            'fusionQPoseValid': False,
            'gyro':             (0.0, 0.0, 0.0),
            'gyroValid':        False,
            'humidity':         float('nan'),
            'humidityValid':    False,
            'pressure':         float('nan'),
            'pressureValid':    False,
            'temperature':      float('nan'),
            'temperatureValid': False,
            'timestamp':        0,
            }
项目:python-sense-emu    作者:RPi-Distro    | 项目源码 | 文件源码
def __init__(self):
        self._fd = init_screen()
        self._map = mmap.mmap(self._fd.fileno(), 0, access=mmap.ACCESS_READ)
        # Construct arrays representing the LED states (_screen) and the user
        # controlled gamma lookup table (_gamma)
        self._screen = np.frombuffer(self._map, dtype=np.uint16, count=64).reshape((8, 8))
        self._gamma = np.frombuffer(self._map, dtype=np.uint8, count=32, offset=128)
        # Construct the final gamma correction lookup table. This is equivalent
        # to gamma correction of 1/4 (*much* brighter) because the HAT's RGB
        # LEDs are much brighter than a corresponding LCD display. It also uses
        # a non-zero starting point so that LEDs that are off appear gray
        self._gamma_rgbled = (
                np.sqrt(np.sqrt(np.linspace(0.05, 1, 32))) * 255
                ).astype(np.uint8)
        self._touch_stop = Event()
        self._touch_thread = Thread(target=self._touch_run)
        self._touch_thread.daemon = True
        self._touch_thread.start()
项目:qgis_resources_sharing    作者:akbargumbira    | 项目源码 | 文件源码
def _load_file_contents(f, size=None):
    try:
        fd = f.fileno()
    except (UnsupportedOperation, AttributeError):
        fd = None
    # Attempt to use mmap if possible
    if fd is not None:
        if size is None:
            size = os.fstat(fd).st_size
        if has_mmap:
            try:
                contents = mmap.mmap(fd, size, access=mmap.ACCESS_READ)
            except mmap.error:
                # Perhaps a socket?
                pass
            else:
                return contents, size
    contents = f.read()
    size = len(contents)
    return contents, size
项目:pgn2anki    作者:asdfjkl    | 项目源码 | 文件源码
def init_mmap(self):
        # Open fd.
        if self.fd is None:
            path = os.path.join(self.directory, self.filename) + self.suffix
            self.fd = os.open(path, os.O_RDONLY | os.O_BINARY if hasattr(os, "O_BINARY") else os.O_RDONLY)

        # Open mmap.
        if self.data is None:
            self.data = mmap.mmap(self.fd, 0, access=mmap.ACCESS_READ)

            if sys.version_info >= (3, ):
                self.read_ubyte = self.data.__getitem__
            else:
                def read_ubyte(data_ptr):
                    return ord(self.data[data_ptr])

                self.read_ubyte = read_ubyte
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def __init__(self, database):
        """Reader for the MaxMind DB file format

        Arguments:
        database -- A path to a valid MaxMind DB file such as a GeoIP2
                    database file.
        """
        with open(database, 'rb') as db_file:
            self._buffer = mmap.mmap(
                db_file.fileno(), 0, access=mmap.ACCESS_READ)

        metadata_start = self._buffer.rfind(self._METADATA_START_MARKER,
                                            self._buffer.size() - 128 * 1024)

        if metadata_start == -1:
            raise InvalidDatabaseError('Error opening database file ({0}). '
                                       'Is this a valid MaxMind DB file?'
                                       ''.format(database))

        metadata_start += len(self._METADATA_START_MARKER)
        metadata_decoder = Decoder(self._buffer, metadata_start)
        (metadata, _) = metadata_decoder.decode(metadata_start)
        self._metadata = Metadata(**metadata)  # pylint: disable=star-args

        self._decoder = Decoder(self._buffer, self._metadata.search_tree_size
                                + self._DATA_SECTION_SEPARATOR_SIZE)
项目:sharedbuffers    作者:jampp    | 项目源码 | 文件源码
def map_file(cls, fileobj, offset = 0, size = None):
        fileobj.seek(offset)
        total_size = cls._Header.unpack(fileobj.read(cls._Header.size))[0]
        map_start = offset - offset % mmap.ALLOCATIONGRANULARITY
        buf = mmap.mmap(fileobj.fileno(), total_size + offset - map_start, 
            access = mmap.ACCESS_READ, offset = map_start)
        rv = cls(buffer(buf, offset - map_start))
        rv._file = fileobj
        rv._mmap = buf
        return rv
项目:sharedbuffers    作者:jampp    | 项目源码 | 文件源码
def map_file(cls, fileobj, offset = 0, size = None):
        map_start = offset - offset % mmap.ALLOCATIONGRANULARITY
        fileobj.seek(map_start)
        buf = mmap.mmap(fileobj.fileno(), 0, access = mmap.ACCESS_READ, offset = map_start)
        rv = cls(buf, offset - map_start)
        rv._file = fileobj
        return rv
项目:sharedbuffers    作者:jampp    | 项目源码 | 文件源码
def map_file(cls, fileobj, offset = 0, size = None):
        map_start = offset - offset % mmap.ALLOCATIONGRANULARITY
        fileobj.seek(map_start)
        buf = mmap.mmap(fileobj.fileno(), 0, access = mmap.ACCESS_READ, offset = map_start)
        rv = cls(buf, offset - map_start)
        rv._file = fileobj
        return rv
项目:abe-bootstrap    作者:TryCoin-Team    | 项目源码 | 文件源码
def map_file(self, file, start):  # Initialize with bytes from file
    self.input = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ)
    self.read_cursor = start
项目:abe-bootstrap    作者:TryCoin-Team    | 项目源码 | 文件源码
def map_file(self, file, start):  # Initialize with bytes from file
    self.input = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ)
    self.read_cursor = start
项目:electrum-martexcoin-server    作者:martexcoin    | 项目源码 | 文件源码
def map_file(self, file, start):    # Initialize with bytes from file
        self.input = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ)
        self.read_cursor = start
项目:krait    作者:lmdu    | 项目源码 | 文件源码
def _read_fasta(self):
        if self.fasta_file.endswith('.gz'):
            return gzip.open(self.fasta_file, 'rb')
        else:
            #return open(self.fasta_file, 'rb')
            f = open(self.fasta_file, 'rb')
            mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
            return mm
项目:CyberScan    作者:medbenali    | 项目源码 | 文件源码
def __init__(self, filename, flags=STANDARD, cache=True):
        """
        Create and return an GeoIP instance.

        :arg filename: File path to a GeoIP database
        :arg flags: Flags that affect how the database is processed.
            Currently supported flags are STANDARD (default),
            MEMORY_CACHE (preload the whole file into memory) and
            MMAP_CACHE (access the file via mmap)
        :arg cache: Used in tests to skip instance caching
        """
        self._lock = Lock()
        self._flags = flags
        self._netmask = None

        if self._flags & const.MMAP_CACHE and mmap is None:  # pragma: no cover
            import warnings
            warnings.warn("MMAP_CACHE cannot be used without a mmap module")
            self._flags &= ~const.MMAP_CACHE

        if self._flags & const.MMAP_CACHE:
            f = codecs.open(filename, 'rb', ENCODING)
            access = mmap.ACCESS_READ
            self._fp = mmap.mmap(f.fileno(), 0, access=access)
            self._type = 'MMAP_CACHE'
            f.close()
        elif self._flags & const.MEMORY_CACHE:
            f = codecs.open(filename, 'rb', ENCODING)
            self._memory = f.read()
            self._fp = util.str2fp(self._memory)
            self._type = 'MEMORY_CACHE'
            f.close()
        else:
            self._fp = codecs.open(filename, 'rb', ENCODING)
            self._type = 'STANDARD'

        try:
            self._lock.acquire()
            self._setup_segments()
        finally:
            self._lock.release()
项目:nstock    作者:ybenitezf    | 项目源码 | 文件源码
def __init__(self, dbfile, use_mmap=True, basepos=0):
        self._file = dbfile
        self.is_closed = False

        # Seek to the end to get total file size (to check if mmap is OK)
        dbfile.seek(0, os.SEEK_END)
        filesize = self._file.tell()
        dbfile.seek(basepos)

        self._diroffset = self._file.read_long()
        self._dirlength = self._file.read_int()
        self._file.seek(self._diroffset)
        self._dir = self._file.read_pickle()
        self._options = self._file.read_pickle()
        self._locks = {}
        self._source = None

        use_mmap = (
            use_mmap
            and hasattr(self._file, "fileno")  # check file is a real file
            and filesize < sys.maxsize  # check fit on 32-bit Python
        )
        if mmap and use_mmap:
            # Try to open the entire segment as a memory-mapped object
            try:
                fileno = self._file.fileno()
                self._source = mmap.mmap(fileno, 0, access=mmap.ACCESS_READ)
            except (mmap.error, OSError):
                e = sys.exc_info()[1]
                # If we got an error because there wasn't enough memory to
                # open the map, ignore it and fall through, we'll just use the
                # (slower) "sub-file" implementation
                if e.errno == errno.ENOMEM:
                    pass
                else:
                    raise
            else:
                # If that worked, we can close the file handle we were given
                self._file.close()
                self._file = None
项目:thesaurus_query.vim    作者:Ron89    | 项目源码 | 文件源码
def query(word):
    if not _oo_file_access():
        return [-1, []]
    index_file, data_file = _data_pair()
    with open(index_file) as index_opened:
        index = mmap.mmap(
            index_opened.fileno(), 0, access=mmap.ACCESS_READ
        )
        decorated = encode_utf_8(u''.join([u'\n',word,u'|']))
        found_loc = index.find(decorated)
        if found_loc==-1:
            index.close()
            return [1, []]
        index.seek(found_loc+1)
        item_index = int(index.readline().strip().split(
            encode_utf_8('|'))[1])
        index.close()
    query_result = list()
    with open(data_file) as data:
        data.seek(item_index)
        header = decode_utf_8(data.readline())
        num_groups = int(header.strip().split(u'|')[1])
        for i in range(num_groups):
            group = decode_utf_8(
                data.readline()).strip().split(u'|')
            query_result.append(
                [group[0][1:-1], [
                    _strip_descriptor(group_item)
                    for group_item in group[1:]]])
    return [0, query_result]
项目:lbryum-server    作者:lbryio    | 项目源码 | 文件源码
def map_file(self, file, start):  # Initialize with bytes from file
        self.input = mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ)
        self.read_cursor = start
项目:NetStorageKit-Python    作者:akamai    | 项目源码 | 文件源码
def _upload_data_to_request(self, source):
        mmapped_data = None
        try:
            with open(source, 'rb') as f:
                mmapped_data = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
        except Exception as e:
            if mmapped_data: mmapped_data.close()
            raise NetstorageError(e)

        return mmapped_data
项目:paperless    作者:lrnt    | 项目源码 | 文件源码
def contains_text(self, text):
        b_text = bytes(re.escape(text), 'utf-8')
        for page in glob.iglob(path.join(self.path_pages, '*.txt')):
            with open(page, 'rb', 0) as f:
                with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as s:
                    if re.search(br'(?i)%b' % b_text, s):
                        return True
        return False
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def __init__(self, filename, flags=0):
        """
        Initialize the class.

        @param filename: path to a geoip database. If MEMORY_CACHE is used,
            the file can be gzipped.
        @type filename: str
        @param flags: flags that affect how the database is processed.
            Currently the only supported flags are STANDARD (the default),
            MEMORY_CACHE (preload the whole file into memory), and
            MMAP_CACHE (access the file via mmap).
        @type flags: int
        """
        self._filename = filename
        self._flags = flags

        if self._flags & const.MMAP_CACHE:
            with open(filename, 'rb') as f:
                self._filehandle = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)

        elif self._flags & const.MEMORY_CACHE:
            if filename.endswith('.gz'):
                opener = gzip.open
            else:
                opener = open

            with opener(filename, 'rb') as f:
                self._memoryBuffer = f.read()
                self._filehandle = StringIO(self._memoryBuffer)
        else:
            self._filehandle = codecs.open(filename, 'rb','latin_1')

        self._setup_segments()
项目:seismic-python    作者:malcolmw    | 项目源码 | 文件源码
def _initialize_mmap(self, ttdir):
        mmttf = {}
        init = True
        for infile in os.listdir(ttdir):
            station, phase = infile.split(".")[:2]
            infile = open(os.path.abspath(os.path.join(ttdir, infile)), 'rb')
            mmf = mmap.mmap(infile.fileno(), 0, access=mmap.ACCESS_READ)
            if station not in mmttf:
                mmttf[station] = {}
            mmttf[station][phase] = mmf
            if init:
                init = False
                self._initialize_grid(mmf)
            else:
                nr, nlat, nlon = struct.unpack("3i", mmf.read(12))
                dr, dlat, dlon = struct.unpack("3f", mmf.read(12))
                r0, lat0, lon0 = struct.unpack("3f", mmf.read(12))
                if not nr == self.nr\
                        or not nlat == self.nlat\
                        or not nlon == self.nlon\
                        or not dr == self.dr\
                        or not dlat == self.dlat\
                        or not dlon == self.dlon\
                        or not r0 == self.r0\
                        or not lat0 == self.lat0\
                        or not lon0 == self.lon0:
                    raise ValueError("travel-time headers do not match")
        self.mmttf = mmttf
项目:archive-Holmes-Totem-Service-Library    作者:HolmesProcessing    | 项目源码 | 文件源码
def __init__ (self, filename):
        self.file     = open(filename, "rb")
        self.datamap  = mmap.mmap(self.file.fileno(), 0, access=mmap.ACCESS_READ)
        self.offset   = 0
        self.filesize = self.datamap.size()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def __init__(self, dbfile, use_mmap=True, basepos=0):
        self._file = dbfile
        self.is_closed = False

        # Seek to the end to get total file size (to check if mmap is OK)
        dbfile.seek(0, os.SEEK_END)
        filesize = self._file.tell()
        dbfile.seek(basepos)

        self._diroffset = self._file.read_long()
        self._dirlength = self._file.read_int()
        self._file.seek(self._diroffset)
        self._dir = self._file.read_pickle()
        self._options = self._file.read_pickle()
        self._locks = {}
        self._source = None

        use_mmap = (
            use_mmap
            and hasattr(self._file, "fileno")  # check file is a real file
            and filesize < sys.maxsize  # check fit on 32-bit Python
        )
        if mmap and use_mmap:
            # Try to open the entire segment as a memory-mapped object
            try:
                fileno = self._file.fileno()
                self._source = mmap.mmap(fileno, 0, access=mmap.ACCESS_READ)
            except (mmap.error, OSError):
                e = sys.exc_info()[1]
                # If we got an error because there wasn't enough memory to
                # open the map, ignore it and fall through, we'll just use the
                # (slower) "sub-file" implementation
                if e.errno == errno.ENOMEM:
                    pass
                else:
                    raise
            else:
                # If that worked, we can close the file handle we were given
                self._file.close()
                self._file = None
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def setUp(self):
        with open(support.TESTFN, "wb+") as f:
            f.seek(_4G)
            f.write(b"asdf")
            f.flush()
            self.mapping = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
项目:honeyd-python    作者:sookyp    | 项目源码 | 文件源码
def __init__(self, fingerprint_file, mac_file):
        """Function initializes the parser and obtains the open file descriptors
        Args:
            fingerprint_file : nmap-os-db file
            mac_file : nmap-mac-prefixes file
        """
        logger.debug('Initializing Nmap fingerprint parser.')
        fd_fingerprint_file = open(fingerprint_file)
        self.fingerprint_file = mmap.mmap(fd_fingerprint_file.fileno(), 0, access=mmap.ACCESS_READ)

        fd_mac_file = open(mac_file)
        self.mac_file = mmap.mmap(fd_mac_file.fileno(), 0, access=mmap.ACCESS_READ)
项目:Moderat    作者:Swordf1sh    | 项目源码 | 文件源码
def __init__(self, filename, flags=STANDARD, cache=True):
        """
        Create and return an GeoIP instance.

        :arg filename: File path to a GeoIP database
        :arg flags: Flags that affect how the database is processed.
            Currently supported flags are STANDARD (default),
            MEMORY_CACHE (preload the whole file into memory) and
            MMAP_CACHE (access the file via mmap)
        :arg cache: Used in tests to skip instance caching
        """
        self._lock = Lock()
        self._flags = flags
        self._netmask = None

        if self._flags & const.MMAP_CACHE and mmap is None:  # pragma: no cover
            import warnings
            warnings.warn("MMAP_CACHE cannot be used without a mmap module")
            self._flags &= ~const.MMAP_CACHE

        if self._flags & const.MMAP_CACHE:
            f = codecs.open(filename, 'rb', ENCODING)
            access = mmap.ACCESS_READ
            self._fp = mmap.mmap(f.fileno(), 0, access=access)
            self._type = 'MMAP_CACHE'
            f.close()
        elif self._flags & const.MEMORY_CACHE:
            f = codecs.open(filename, 'rb', ENCODING)
            self._memory = f.read()
            self._fp = util.str2fp(self._memory)
            self._type = 'MEMORY_CACHE'
            f.close()
        else:
            self._fp = codecs.open(filename, 'rb', ENCODING)
            self._type = 'STANDARD'

        try:
            self._lock.acquire()
            self._setup_segments()
        finally:
            self._lock.release()
项目:WhooshSearch    作者:rokartnaz    | 项目源码 | 文件源码
def __init__(self, dbfile, use_mmap=True, basepos=0):
        self._file = dbfile
        self.is_closed = False

        # Seek to the end to get total file size (to check if mmap is OK)
        dbfile.seek(0, os.SEEK_END)
        filesize = self._file.tell()
        dbfile.seek(basepos)

        self._diroffset = self._file.read_long()
        self._dirlength = self._file.read_int()
        self._file.seek(self._diroffset)
        self._dir = self._file.read_pickle()
        self._options = self._file.read_pickle()
        self._locks = {}
        self._source = None

        use_mmap = (
            use_mmap
            and hasattr(self._file, "fileno")  # check file is a real file
            and filesize < sys.maxsize  # check fit on 32-bit Python
        )
        if mmap and use_mmap:
            # Try to open the entire segment as a memory-mapped object
            try:
                fileno = self._file.fileno()
                self._source = mmap.mmap(fileno, 0, access=mmap.ACCESS_READ)
            except (mmap.error, OSError):
                e = sys.exc_info()[1]
                # If we got an error because there wasn't enough memory to
                # open the map, ignore it and fall through, we'll just use the
                # (slower) "sub-file" implementation
                if e.errno == errno.ENOMEM:
                    pass
                else:
                    raise
            else:
                # If that worked, we can close the file handle we were given
                self._file.close()
                self._file = None
项目:oyente    作者:ethereum    | 项目源码 | 文件源码
def run_re_file(re_str, fn):
    size = os.stat(fn).st_size
    with open(fn, 'r') as tf:
        data = mmap.mmap(tf.fileno(), size, access=mmap.ACCESS_READ)
        return re.findall(re_str, data)
项目:oyente    作者:ethereum    | 项目源码 | 文件源码
def run_re_file(re_str, fn):
    size = os.stat(fn).st_size
    with open(fn, 'r') as tf:
        data = mmap.mmap(tf.fileno(), size, access=mmap.ACCESS_READ)
        return re.findall(re_str, data)
项目:oyente    作者:ethereum    | 项目源码 | 文件源码
def run_re_file(re_str, fn):
    size = os.stat(fn).st_size
    with open(fn, 'r') as tf:
        data = mmap.mmap(tf.fileno(), size, access=mmap.ACCESS_READ)
        return re.findall(re_str, data)
项目:autoinjection    作者:ChengWiLL    | 项目源码 | 文件源码
def __init__(self, buf = None, pos = 0, filename = None, fp = None):
        if fp is not None:
            try:
                fileno = fp.fileno()
                length = os.path.getsize(fp.name)
                import mmap
            except:
                # read whole file into memory
                buf = fp.read()
                pos = 0
            else:
                # map the whole file into memory
                if length:
                    # length must not be zero
                    buf = mmap.mmap(fileno, length, access = mmap.ACCESS_READ)
                    pos = os.lseek(fileno, 0, 1)
                else:
                    buf = ''
                    pos = 0

            if filename is None:
                try:
                    filename = fp.name
                except AttributeError:
                    filename = None

        self.buf = buf
        self.pos = pos
        self.line = 1
        self.col = 1
        self.filename = filename
项目:bokken    作者:thestr4ng3r    | 项目源码 | 文件源码
def __init__(self, buf = None, pos = 0, filename = None, fp = None):
        if fp is not None:
            try:
                fileno = fp.fileno()
                length = os.path.getsize(fp.name)
                import mmap
            except:
                # read whole file into memory
                buf = fp.read()
                pos = 0
            else:
                # map the whole file into memory
                if length:
                    # length must not be zero
                    buf = mmap.mmap(fileno, length, access = mmap.ACCESS_READ)
                    pos = os.lseek(fileno, 0, 1)
                else:
                    buf = ''
                    pos = 0

            if filename is None:
                try:
                    filename = fp.name
                except AttributeError:
                    filename = None

        self.buf = buf
        self.pos = pos
        self.line = 1
        self.col = 1
        self.filename = filename
项目:flare-floss    作者:fireeye    | 项目源码 | 文件源码
def test_mmap():
    for test, expectation in tests:
        f = tempfile.NamedTemporaryFile()
        f.write(test)
        f.flush()
        test_mmap = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
        assert buf_filled_with(test_mmap, test_mmap[0]) == expectation
项目:isf    作者:w3h    | 项目源码 | 文件源码
def __init__(self, source, bytelength=None, byteoffset=None):
        self.source = source
        source.seek(0, os.SEEK_END)
        self.filelength = source.tell()
        if byteoffset is None:
            byteoffset = 0
        if bytelength is None:
            bytelength = self.filelength - byteoffset
        self.byteoffset = byteoffset
        self.bytelength = bytelength
        self.filemap = mmap.mmap(source.fileno(), 0, access=mmap.ACCESS_READ)
项目:FightstickDisplay    作者:calexil    | 项目源码 | 文件源码
def __init__(self, filename):
        """Read the given TrueType file.

        :Parameters:
            `filename`
                The name of any Windows, OS2 or Macintosh Truetype file.

        The object must be closed (see `close`) after use.

        An exception will be raised if the file does not exist or cannot
        be read.
        """
        if not filename: filename = ''
        len = os.stat(filename).st_size
        self._fileno = os.open(filename, os.O_RDONLY)
        if hasattr(mmap, 'MAP_SHARED'):
            self._data = mmap.mmap(self._fileno, len, mmap.MAP_SHARED,
                mmap.PROT_READ)
        else:
            self._data = mmap.mmap(self._fileno, len, None, mmap.ACCESS_READ)

        offsets = _read_offset_table(self._data, 0)
        self._tables = {}
        for table in _read_table_directory_entry.array(self._data, 
            offsets.size, offsets.num_tables):
            self._tables[table.tag] = table

        self._names = None
        self._horizontal_metrics = None
        self._character_advances = None
        self._character_kernings = None
        self._glyph_kernings = None
        self._character_map = None
        self._glyph_map = None
        self._font_selection_flags = None

        self.header = \
            _read_head_table(self._data, self._tables['head'].offset)
        self.horizontal_header = \
            _read_horizontal_header(self._data, self._tables['hhea'].offset)
项目:rom-info    作者:drx    | 项目源码 | 文件源码
def __init__(self, file_name, open_mode='rb', mmap_access=mmap.ACCESS_READ, **kwargs):
        try:
            self.file = open(file_name, mode=open_mode, **kwargs)
        except OSError as e:
            raise SystemExit(e)

        self.file_name = file_name
        self.mmap = None
        self.mmap_access = mmap_access
项目:lsdj-wave-cruncher    作者:iLambda    | 项目源码 | 文件源码
def __init__(self, source, bytelength=None, byteoffset=None):
        self.source = source
        source.seek(0, os.SEEK_END)
        self.filelength = source.tell()
        if byteoffset is None:
            byteoffset = 0
        if bytelength is None:
            bytelength = self.filelength - byteoffset
        self.byteoffset = byteoffset
        self.bytelength = bytelength
        self.filemap = mmap.mmap(source.fileno(), 0, access=mmap.ACCESS_READ)
项目:lsdj-wave-cruncher    作者:iLambda    | 项目源码 | 文件源码
def __init__(self, source, bytelength=None, byteoffset=None):
        self.source = source
        source.seek(0, os.SEEK_END)
        self.filelength = source.tell()
        if byteoffset is None:
            byteoffset = 0
        if bytelength is None:
            bytelength = self.filelength - byteoffset
        self.byteoffset = byteoffset
        self.bytelength = bytelength
        self.filemap = mmap.mmap(source.fileno(), 0, access=mmap.ACCESS_READ)
项目:cryptogram    作者:xinmingzhang    | 项目源码 | 文件源码
def __init__(self, filename):
        """Read the given TrueType file.

        :Parameters:
            `filename`
                The name of any Windows, OS2 or Macintosh Truetype file.

        The object must be closed (see `close`) after use.

        An exception will be raised if the file does not exist or cannot
        be read.
        """
        if not filename: filename = ''
        len = os.stat(filename).st_size
        self._fileno = os.open(filename, os.O_RDONLY)
        if hasattr(mmap, 'MAP_SHARED'):
            self._data = mmap.mmap(self._fileno, len, mmap.MAP_SHARED,
                mmap.PROT_READ)
        else:
            self._data = mmap.mmap(self._fileno, len, None, mmap.ACCESS_READ)

        offsets = _read_offset_table(self._data, 0)
        self._tables = {}
        for table in _read_table_directory_entry.array(self._data, 
            offsets.size, offsets.num_tables):
            self._tables[table.tag] = table

        self._names = None
        self._horizontal_metrics = None
        self._character_advances = None
        self._character_kernings = None
        self._glyph_kernings = None
        self._character_map = None
        self._glyph_map = None
        self._font_selection_flags = None

        self.header = \
            _read_head_table(self._data, self._tables['head'].offset)
        self.horizontal_header = \
            _read_horizontal_header(self._data, self._tables['hhea'].offset)
项目:ucscsdk    作者:CiscoUcs    | 项目源码 | 文件源码
def encode_multipart_data(file_dir, file_name, progress=Progress()):

    boundary = random_string(30)

    CRLF = '\r\n'
    file_path = os.path.join(file_dir, file_name)

    def encode_body_header(file_name):
        return ('--' + boundary,
                'Content-Disposition: form-data; name="Filedata"; '
                'filename="%s"' % file_name,
                'Content-Type: "application/x-compressed"',
                '')

    def encode_file_data(file_path):
        with open(file_path, "rb") as f:
            mmapped_file = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
            return mmapped_file.read(mmapped_file.size())

    lines = []

    lines.extend(encode_body_header(file_name))
    body_header = (CRLF.join(lines) + CRLF).encode('utf-8')
    body_footer = (CRLF + '--%s--' % boundary + CRLF + '').encode('utf-8')

    file_data = encode_file_data(file_path)

    body = body_header + file_data + body_footer

    headers = {'content-type': 'multipart/form-data; boundary=' + boundary,
               'content-length': str(len(body))}

    return body, headers
项目:androidjnipp    作者:naver    | 项目源码 | 文件源码
def findPackage(self, filepath):
        size = os.stat(filepath).st_size
        mtime = os.stat(filepath).st_mtime
        filename = os.path.basename(filepath)

        fp = open(filepath)
        filedata = mmap.mmap(fp.fileno(), size, access=mmap.ACCESS_READ)

        match = re.search('package (.+);', filedata)
        package = match.group(1).replace('.', '/')
        fp.close()
        return package;