Python mmap 模块,PROT_WRITE 实例源码

我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用mmap.PROT_WRITE

项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, *args, **kwds):
        ## Create shared memory for rendered image
        #pg.dbg(namespace={'r': self})
        if sys.platform.startswith('win'):
            self.shmtag = "pyqtgraph_shmem_" + ''.join([chr((random.getrandbits(20)%25) + 97) for i in range(20)])
            self.shm = mmap.mmap(-1, mmap.PAGESIZE, self.shmtag) # use anonymous mmap on windows
        else:
            self.shmFile = tempfile.NamedTemporaryFile(prefix='pyqtgraph_shmem_')
            self.shmFile.write(b'\x00' * (mmap.PAGESIZE+1))
            fd = self.shmFile.fileno()
            self.shm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE)
        atexit.register(self.close)

        GraphicsView.__init__(self, *args, **kwds)
        self.scene().changed.connect(self.update)
        self.img = None
        self.renderTimer = QtCore.QTimer()
        self.renderTimer.timeout.connect(self.renderView)
        self.renderTimer.start(16)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, *args, **kwds):
        ## Create shared memory for rendered image
        #pg.dbg(namespace={'r': self})
        if sys.platform.startswith('win'):
            self.shmtag = "pyqtgraph_shmem_" + ''.join([chr((random.getrandbits(20)%25) + 97) for i in range(20)])
            self.shm = mmap.mmap(-1, mmap.PAGESIZE, self.shmtag) # use anonymous mmap on windows
        else:
            self.shmFile = tempfile.NamedTemporaryFile(prefix='pyqtgraph_shmem_')
            self.shmFile.write(b'\x00' * (mmap.PAGESIZE+1))
            fd = self.shmFile.fileno()
            self.shm = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE)
        atexit.register(self.close)

        GraphicsView.__init__(self, *args, **kwds)
        self.scene().changed.connect(self.update)
        self.img = None
        self.renderTimer = QtCore.QTimer()
        self.renderTimer.timeout.connect(self.renderView)
        self.renderTimer.start(16)
项目:kAFL    作者:RUB-SysSec    | 项目源码 | 文件源码
def wipe(self):
        filter_bitmap_fd = os.open("/dev/shm/kafl_filter0", os.O_RDWR | os.O_SYNC | os.O_CREAT)
        os.ftruncate(filter_bitmap_fd, self.config.config_values['BITMAP_SHM_SIZE'])
        filter_bitmap = mmap.mmap(filter_bitmap_fd, self.config.config_values['BITMAP_SHM_SIZE'], mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
        for i in range(self.config.config_values['BITMAP_SHM_SIZE']):
            filter_bitmap[i] = '\x00'
        filter_bitmap.close()
        os.close(filter_bitmap_fd)

        filter_bitmap_fd = os.open("/dev/shm/kafl_tfilter", os.O_RDWR | os.O_SYNC | os.O_CREAT)
        os.ftruncate(filter_bitmap_fd, 0x1000000)
        filter_bitmap = mmap.mmap(filter_bitmap_fd, 0x1000000, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
        for i in range(0x1000000):
            filter_bitmap[i] = '\x00'
        filter_bitmap.close()
        os.close(filter_bitmap_fd)
项目:kAFL    作者:RUB-SysSec    | 项目源码 | 文件源码
def __set_binary(self, filename, binaryfile, max_size):
        shm_fd = os.open(filename, os.O_RDWR | os.O_SYNC | os.O_CREAT)
        os.ftruncate(shm_fd, max_size)
        shm = mmap.mmap(shm_fd, max_size, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
        shm.seek(0x0)
        shm.write('\x00' * max_size)
        shm.seek(0x0)

        f = open(binaryfile, "rb")
        bytes = f.read(1024)
        if bytes:
            shm.write(bytes)
        while bytes != "":
            bytes = f.read(1024)
            if bytes:
                shm.write(bytes)

        f.close()
        shm.close()
        os.close(shm_fd)
项目:kAFL    作者:RUB-SysSec    | 项目源码 | 文件源码
def init(self):
        self.control = socket.socket(socket.AF_UNIX)
        while True:
            try:
                self.control.connect(self.control_filename)
                #self.control.connect(self.control_filename)
                break
            except socket_error:
                pass
                #time.sleep(0.01)

        self.kafl_shm_f     = os.open(self.bitmap_filename, os.O_RDWR | os.O_SYNC | os.O_CREAT)
        self.fs_shm_f       = os.open(self.payload_filename, os.O_RDWR | os.O_SYNC | os.O_CREAT)
        #argv_fd             = os.open(self.argv_filename, os.O_RDWR | os.O_SYNC | os.O_CREAT)
        os.ftruncate(self.kafl_shm_f, self.bitmap_size)
        os.ftruncate(self.fs_shm_f, (128 << 10))
        #os.ftruncate(argv_fd, (4 << 10))

        self.kafl_shm       = mmap.mmap(self.kafl_shm_f, self.bitmap_size, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
        self.fs_shm         = mmap.mmap(self.fs_shm_f, (128 << 10),  mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)

        return True
项目:senic-hub    作者:getsenic    | 项目源码 | 文件源码
def __init__(self, ha_api_url, ble_adapter_name, mac_address, components):
        super().__init__()

        self.components = []
        self.active_component = None
        component_instances = get_component_instances(components, mac_address)
        self.set_components(component_instances)

        self.manager = None
        self.ble_adapter_name = ble_adapter_name
        self.controller = None
        self.mac_address = mac_address
        self.battery_level = None

        # memory map using mmap to store nuimo battery level
        fd = os.open('/tmp/' + self.mac_address.replace(':', '-'), os.O_CREAT | os.O_TRUNC | os.O_RDWR)
        assert os.write(fd, b'\x00' * mmap.PAGESIZE) == mmap.PAGESIZE
        buf = mmap.mmap(fd, mmap.PAGESIZE, mmap.MAP_SHARED, mmap.PROT_WRITE)
        self.bl = ctypes.c_int.from_buffer(buf)
        self.bl.value = 0
        offset = struct.calcsize(self.bl._type_)
        assert buf[offset] == 0
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def __init__(self, transfer_size):
        fd, self.filename = tempfile.mkstemp()
        os.ftruncate(fd, 20)
        self.buf = mmap.mmap(fd, 20, mmap.MAP_SHARED, mmap.PROT_WRITE)
        os.close(fd)
        self.total_bytes = ctypes.c_uint64.from_buffer(self.buf)
        self.total_bytes.value = 0
        self.average_time = ctypes.c_double.from_buffer(self.buf, 8)
        self.average_time.value = 0.0
        self.transfer_size = ctypes.c_uint32.from_buffer(self.buf, 16)
        self.transfer_size.value = transfer_size
项目:kAFL    作者:RUB-SysSec    | 项目源码 | 文件源码
def __get_shm(self, type_id, slave_id):
        if slave_id in self.tmp_shm[type_id]:
            shm = self.tmp_shm[type_id][slave_id]
        else:
            shm_fd = os.open(self.files[type_id] + str(slave_id), os.O_RDWR | os.O_SYNC)
            shm = mmap.mmap(shm_fd, self.sizes[type_id]*self.tasks_per_requests, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
            self.tmp_shm[type_id][slave_id] = shm
        return shm
项目:kAFL    作者:RUB-SysSec    | 项目源码 | 文件源码
def open_global_bitmap(self):
        self.global_bitmap_fd = os.open(self.config.argument_values['work_dir'] + "/bitmap", os.O_RDWR | os.O_SYNC | os.O_CREAT)
        os.ftruncate(self.global_bitmap_fd, self.bitmap_size)
        self.global_bitmap = mmap.mmap(self.global_bitmap_fd, self.bitmap_size, mmap.MAP_SHARED, mmap.PROT_WRITE | mmap.PROT_READ)
项目:PythonForWindows    作者:hakril    | 项目源码 | 文件源码
def get_map(cls, size):
        prot = mmap.PROT_EXEC | mmap.PROT_WRITE | mmap.PROT_READ
        return cls(-1, size, prot=prot)
项目:python-wayland    作者:sde1000    | 项目源码 | 文件源码
def resize(self, width, height):
        # Drop previous buffer and shm data if necessary
        if self.buffer:
            self.buffer.destroy()
            self.shm_data.close()

        wl_shm_format, cairo_shm_format = self._w.shm_formats[0]

        stride = cairo.ImageSurface.format_stride_for_width(
            cairo_shm_format, width)
        size = stride * height

        with AnonymousFile(size) as fd:
            self.shm_data = mmap.mmap(
                fd, size, prot=mmap.PROT_READ | mmap.PROT_WRITE,
                flags=mmap.MAP_SHARED)
            pool = self._w.shm.create_pool(fd, size)
            self.buffer = pool.create_buffer(
                0, width, height, stride, wl_shm_format)
            pool.destroy()
        self.s = cairo.ImageSurface(cairo_shm_format, width, height,
                                    data=self.shm_data, stride=stride)
        self.surface.attach(self.buffer, 0, 0)
        self.width = width
        self.height = height
        if self.redraw_func:
            self.redraw_func(self)
        self.surface.commit()
项目:TISP    作者:kaayy    | 项目源码 | 文件源码
def __init__(self, knowledgebase):
        if _sanity_check:
            ExprGenerator.setup()
            State.ExtraInfoGen = ExprGenerator
        Perceptron.model = Model()
        Perceptron.indepKB = IndepKnowledgeBase()
        Perceptron.KB = knowledgebase
        knowledgebase.postedit_indepkb(Perceptron.indepKB)
        Perceptron.c = 0
        Perceptron.iter = FLAGS.iter
        Perceptron.beamsize = FLAGS.beam
        Perceptron.parser = Parser(Perceptron.indepKB, Perceptron.KB, Perceptron.model, State)

        Perceptron.ncpus = FLAGS.ncpus

        Perceptron.ontheflyfd = FLAGS.ontheflyfd
        Perceptron.single_gold = FLAGS.singlegold
        Perceptron.output_prefix = FLAGS.outputprefix
        Perceptron.fdbeamsize = FLAGS.fdbeam

        if Perceptron.ncpus > 0:
            Perceptron.shared_memo_size = int(1024 * 1024 * 1024)  # 1G shared memory
            Perceptron.shared_memo = mmap.mmap(-1, Perceptron.shared_memo_size,
                                               mmap.MAP_SHARED, mmap.PROT_READ | mmap.PROT_WRITE)

        Perceptron.ref_beams = {}
        if FLAGS.ref:
            print >> LOGS, "loading refs",
            hgs = pickle.load(open(FLAGS.ref))
            self.load_hgs(hgs)

        if FLAGS.extraref:
            print >> LOGS, "loading extra refs",
            hgs = pickle.load(open(FLAGS.extraref))
            self.load_hgs(hgs)
项目:PyShellCode    作者:thomaskeck    | 项目源码 | 文件源码
def create_function_from_shellcode(shell_code, restype=ctypes.c_int64, argtypes=()):
    mm = mmap.mmap(-1, len(shell_code), flags=mmap.MAP_SHARED | mmap.MAP_ANONYMOUS, prot=mmap.PROT_WRITE | mmap.PROT_READ | mmap.PROT_EXEC)
    mm.write(shell_code)
    ctypes_buffer = ctypes.c_int.from_buffer(mm)
    function = ctypes.CFUNCTYPE(restype, *argtypes)(ctypes.addressof(ctypes_buffer))
    function._avoid_gc_for_mmap = mm
    return function
项目:PyShellCode    作者:thomaskeck    | 项目源码 | 文件源码
def from_ShellCode(cls, shell_code, restype=ctypes.c_int64, *argtypes):
        if not isinstance(shell_code, bytes):
            shell_code = bytes(shell_code, 'utf-8')
        mm = mmap.mmap(-1, len(shell_code), flags=mmap.MAP_SHARED | mmap.MAP_ANONYMOUS, prot=mmap.PROT_WRITE | mmap.PROT_READ | mmap.PROT_EXEC)
        mm.write(shell_code)
        return cls(mm, restype, *argtypes)
项目:PyCNC    作者:Nikolay-Kha    | 项目源码 | 文件源码
def __init__(self, phys_address, size=PAGE_SIZE):
        """ Create object which maps physical memory to Python's mmap object.
        :param phys_address: based address of physical memory
        """
        self._size = size
        phys_address -= phys_address % PAGE_SIZE
        fd = self._open_dev("/dev/mem")
        self._memmap = mmap.mmap(fd, size, flags=mmap.MAP_SHARED,
                                 prot=mmap.PROT_READ | mmap.PROT_WRITE,
                                 offset=phys_address)
        self._close_dev(fd)
        atexit.register(self.cleanup)
项目:manticore    作者:trailofbits    | 项目源码 | 文件源码
def _mmap(fd, offset, size):
        prot = MMAP.PROT_READ | MMAP.PROT_WRITE
        flags = MMAP.MAP_PRIVATE

        if size &0xfff !=0:
            size = (size & ~0xfff) + 0x1000

        assert size > 0

        result = mmap_function(0, size, prot, flags, fd, offset)
        return ctypes.cast(result, ctypes.POINTER(ctypes.c_char))
项目:tracer    作者:trnila    | 项目源码 | 文件源码
def fn(self, syscall):
        if syscall.result is not None:
            return

        proc = syscall.process
        addr = InjectedMemory(syscall.process, 1024, prot=mmap.PROT_READ | mmap.PROT_WRITE | mmap.PROT_EXEC)

        proc.write_bytes(addr.addr, self.EXIT_CODE)

        p = syscall.process.tracer.backend.debugger[syscall.process.pid]
        p.setInstrPointer(addr.addr)
        regs = p.getregs()
        regs.orig_rax = SYSCALL_NOOP
        p.setregs(regs)
项目:tracer    作者:trnila    | 项目源码 | 文件源码
def __init__(
            self,
            process,
            length,
            prot=mmap.PROT_READ | mmap.PROT_WRITE,
            flags=mmap.MAP_ANONYMOUS | mmap.MAP_PRIVATE):
        self.length = length
        self.process = process
        self.mapped = True
        self.caller_frame = inspect.stack()[1]

        parameters = [0, length, prot, flags, -1, 0]
        self.addr = inject_syscall(process, SYSCALL_MMAP, parameters)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def renderView(self):
        if self.img is None:
            ## make sure shm is large enough and get its address
            if self.width() == 0 or self.height() == 0:
                return
            size = self.width() * self.height() * 4
            if size > self.shm.size():
                if sys.platform.startswith('win'):
                    ## windows says "WindowsError: [Error 87] the parameter is incorrect" if we try to resize the mmap
                    self.shm.close()
                    ## it also says (sometimes) 'access is denied' if we try to reuse the tag.
                    self.shmtag = "pyqtgraph_shmem_" + ''.join([chr((random.getrandbits(20)%25) + 97) for i in range(20)])
                    self.shm = mmap.mmap(-1, size, self.shmtag)
                elif sys.platform == 'darwin':
                    self.shm.close()
                    self.shmFile.close()
                    self.shmFile = tempfile.NamedTemporaryFile(prefix='pyqtgraph_shmem_')
                    self.shmFile.write(b'\x00' * (size + 1))
                    self.shmFile.flush()
                    self.shm = mmap.mmap(self.shmFile.fileno(), size, mmap.MAP_SHARED, mmap.PROT_WRITE)
                else:
                    self.shm.resize(size)

            ## render the scene directly to shared memory
            if USE_PYSIDE:
                ch = ctypes.c_char.from_buffer(self.shm, 0)
                #ch = ctypes.c_char_p(address)
                self.img = QtGui.QImage(ch, self.width(), self.height(), QtGui.QImage.Format_ARGB32)
            else:
                address = ctypes.addressof(ctypes.c_char.from_buffer(self.shm, 0))

                # different versions of pyqt have different requirements here..
                try:
                    self.img = QtGui.QImage(sip.voidptr(address), self.width(), self.height(), QtGui.QImage.Format_ARGB32)
                except TypeError:
                    try:
                        self.img = QtGui.QImage(memoryview(buffer(self.shm)), self.width(), self.height(), QtGui.QImage.Format_ARGB32)
                    except TypeError:
                        # Works on PyQt 4.9.6
                        self.img = QtGui.QImage(address, self.width(), self.height(), QtGui.QImage.Format_ARGB32)
            self.img.fill(0xffffffff)
            p = QtGui.QPainter(self.img)
            self.render(p, self.viewRect(), self.rect())
            p.end()
            self.sceneRendered.emit((self.width(), self.height(), self.shm.size(), self.shmFileName()))
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def renderView(self):
        if self.img is None:
            ## make sure shm is large enough and get its address
            if self.width() == 0 or self.height() == 0:
                return
            size = self.width() * self.height() * 4
            if size > self.shm.size():
                if sys.platform.startswith('win'):
                    ## windows says "WindowsError: [Error 87] the parameter is incorrect" if we try to resize the mmap
                    self.shm.close()
                    ## it also says (sometimes) 'access is denied' if we try to reuse the tag.
                    self.shmtag = "pyqtgraph_shmem_" + ''.join([chr((random.getrandbits(20)%25) + 97) for i in range(20)])
                    self.shm = mmap.mmap(-1, size, self.shmtag)
                elif sys.platform == 'darwin':
                    self.shm.close()
                    self.shmFile.close()
                    self.shmFile = tempfile.NamedTemporaryFile(prefix='pyqtgraph_shmem_')
                    self.shmFile.write(b'\x00' * (size + 1))
                    self.shmFile.flush()
                    self.shm = mmap.mmap(self.shmFile.fileno(), size, mmap.MAP_SHARED, mmap.PROT_WRITE)
                else:
                    self.shm.resize(size)

            ## render the scene directly to shared memory
            if USE_PYSIDE:
                ch = ctypes.c_char.from_buffer(self.shm, 0)
                #ch = ctypes.c_char_p(address)
                self.img = QtGui.QImage(ch, self.width(), self.height(), QtGui.QImage.Format_ARGB32)
            else:
                address = ctypes.addressof(ctypes.c_char.from_buffer(self.shm, 0))

                # different versions of pyqt have different requirements here..
                try:
                    self.img = QtGui.QImage(sip.voidptr(address), self.width(), self.height(), QtGui.QImage.Format_ARGB32)
                except TypeError:
                    try:
                        self.img = QtGui.QImage(memoryview(buffer(self.shm)), self.width(), self.height(), QtGui.QImage.Format_ARGB32)
                    except TypeError:
                        # Works on PyQt 4.9.6
                        self.img = QtGui.QImage(address, self.width(), self.height(), QtGui.QImage.Format_ARGB32)
            self.img.fill(0xffffffff)
            p = QtGui.QPainter(self.img)
            self.render(p, self.viewRect(), self.rect())
            p.end()
            self.sceneRendered.emit((self.width(), self.height(), self.shm.size(), self.shmFileName()))
项目:tracer    作者:trnila    | 项目源码 | 文件源码
def fn(self, syscall):
        if syscall.result is not None:
            return

        p = syscall.process.tracer.backend.debugger[syscall.process.pid]
        proc = syscall.process
        text = b"hello world!\n"

        # backup state before syscall
        orig_ip = p.getInstrPointer()
        backup = Backup()
        backup.backup(p.getregs())

        # prepare buffer for text that will be written to stdout
        buffer = InjectedMemory(syscall.process, 1024)
        proc.write_bytes(buffer.addr, text)

        # create instructions for write(0, buffer, size) on fly
        code = b''.join([
            self.CODE_SET_SYSCALL,
            self.CODE_SET_DESCRIPTOR,
            self.CODE_SET_LEN + struct.pack("<q", len(text)),
            self.CODE_SET_BUF + struct.pack("<q", buffer.addr),
            self.CODE_SYSCALL
        ])

        # prepare region for code instructions
        addr = InjectedMemory(syscall.process, 1024, prot=mmap.PROT_READ | mmap.PROT_WRITE | mmap.PROT_EXEC)
        proc.write_bytes(addr.addr, code)

        # jump to injected code
        p.setInstrPointer(addr.addr)

        # replace syscall with NOOP syscall
        regs = p.getregs()
        regs.orig_rax = SYSCALL_NOOP
        p.setregs(regs)

        # execute all instructions that we have written to injected memory
        # XXX: performance: maybe we can insert syscall(SIGTRAP)
        while p.getInstrPointer() < addr.addr + len(code):
            p.singleStep()
            p.waitEvent()

        # go back before original syscall
        p.setInstrPointer(orig_ip - 2)
        p.syscall()
        p.waitSyscall()

        # unmap memory in before state
        addr.munmap()
        buffer.munmap()

        # we are before syscall, restore registers
        regs = p.getregs()
        backup.restore(regs)
        p.setregs(regs)