我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用win32file.ReadFile()。
def SimpleFileDemo(): testName = os.path.join( win32api.GetTempPath(), "win32file_demo_test_file") if os.path.exists(testName): os.unlink(testName) # Open the file for writing. handle = win32file.CreateFile(testName, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None) test_data = "Hello\0there".encode("ascii") win32file.WriteFile(handle, test_data) handle.Close() # Open it for reading. handle = win32file.CreateFile(testName, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None) rc, data = win32file.ReadFile(handle, 1024) handle.Close() if data == test_data: print "Successfully wrote and read a file" else: raise Exception("Got different data back???") os.unlink(testName)
def checkWork(self): finished = 0 fullDataRead = [] while 1: try: buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1) # finished = (result == -1) if not bytesToRead: break hr, data = win32file.ReadFile(self.pipe, bytesToRead, None) fullDataRead.append(data) except win32api.error: finished = 1 break dataBuf = ''.join(fullDataRead) if dataBuf: self.receivedCallback(dataBuf) if finished: self.cleanup() return len(dataBuf)
def rod(self,p): while(1): #print "going to read" try: res,data = win32file.ReadFile(p, 4096) #print res,data if res != winerror.ERROR_MORE_DATA: data=data.decode("utf-8") self.new_data.emit(data) else: print("no more data") break except: print("pipe closed") break # win32pipe.DisconnectNamedPipe(p)
def _recv(self, which, maxsize): conn, maxsize = self.get_conn_maxsize(which, maxsize) if conn is None: return None try: x = msvcrt.get_osfhandle(conn.fileno()) (read, nAvail, nMessage) = PeekNamedPipe(x, 0) if maxsize < nAvail: nAvail = maxsize if nAvail > 0: (errCode, read) = ReadFile(x, nAvail, None) except ValueError: return self._close(which) except (subprocess.pywintypes.error, Exception): if geterror()[0] in (109, errno.ESHUTDOWN): return self._close(which) raise if self.universal_newlines: # Translate newlines. For Python 3.x assume read is text. # If bytes then another solution is needed. read = read.replace("\r\n", "\n").replace("\r", "\n") return read
def serialReadEvent(self): #get that character we set up n = win32file.GetOverlappedResult(self._serial.hComPort, self._overlappedRead, 0) if n: first = str(self.read_buf[:n]) #now we should get everything that is already in the buffer flags, comstat = win32file.ClearCommError(self._serial.hComPort) if comstat.cbInQue: win32event.ResetEvent(self._overlappedRead.hEvent) rc, buf = win32file.ReadFile(self._serial.hComPort, win32file.AllocateReadBuffer(comstat.cbInQue), self._overlappedRead) n = win32file.GetOverlappedResult(self._serial.hComPort, self._overlappedRead, 1) #handle all the received data: self.protocol.dataReceived(first + str(buf[:n])) else: #handle all the received data: self.protocol.dataReceived(first) #set up next one win32event.ResetEvent(self._overlappedRead.hEvent) rc, self.read_buf = win32file.ReadFile(self._serial.hComPort, win32file.AllocateReadBuffer(1), self._overlappedRead)
def _child_reader(self, handle): """INTERNAL: Reader thread that reads stdout/stderr of the child process.""" status = 'data' while True: try: err, data = ReadFile(handle, self.maxread) assert err == 0 # not expecting error w/o overlapped io except WindowsError, e: if e.winerror == ERROR_BROKEN_PIPE: status = 'eof' data = '' else: status = 'error' data = e.winerror self.child_output.put((handle, status, data)) if status != 'data': break
def _recv(self, which, maxsize): conn, maxsize = self.get_conn_maxsize(which, maxsize) if conn is None: return None try: x = msvcrt.get_osfhandle(conn.fileno()) (read, nAvail, nMessage) = PeekNamedPipe(x, 0) if maxsize < nAvail: nAvail = maxsize if nAvail > 0: (errCode, read) = ReadFile(x, nAvail, None) except (ValueError, NameError): return self._close(which) except (subprocess.pywintypes.error, Exception), why: if why[0] in (109, errno.ESHUTDOWN): return self._close(which) raise if self.universal_newlines: read = self._translate_newlines(read) return read
def CopyFileToCe(src_name, dest_name, progress = None): sh = win32file.CreateFile(src_name, win32con.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None) bytes=0 try: dh = wincerapi.CeCreateFile(dest_name, win32con.GENERIC_WRITE, 0, None, win32con.OPEN_ALWAYS, 0, None) try: while 1: hr, data = win32file.ReadFile(sh, 2048) if not data: break wincerapi.CeWriteFile(dh, data) bytes = bytes + len(data) if progress is not None: progress(bytes) finally: pass dh.Close() finally: sh.Close() return bytes
def testSimpleFiles(self): fd, filename = tempfile.mkstemp() os.close(fd) os.unlink(filename) handle = win32file.CreateFile(filename, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None) test_data = str2bytes("Hello\0there") try: win32file.WriteFile(handle, test_data) handle.Close() # Try and open for read handle = win32file.CreateFile(filename, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None) rc, data = win32file.ReadFile(handle, 1024) self.assertEquals(data, test_data) finally: handle.Close() try: os.unlink(filename) except os.error: pass # A simple test using normal read/write operations.
def _IOCPServerThread(self, handle, port, drop_overlapped_reference): overlapped = pywintypes.OVERLAPPED() win32pipe.ConnectNamedPipe(handle, overlapped) if drop_overlapped_reference: # Be naughty - the overlapped object is now dead, but # GetQueuedCompletionStatus will still find it. Our check of # reference counting should catch that error. overlapped = None # even if we fail, be sure to close the handle; prevents hangs # on Vista 64... try: self.failUnlessRaises(RuntimeError, win32file.GetQueuedCompletionStatus, port, -1) finally: handle.Close() return result = win32file.GetQueuedCompletionStatus(port, -1) ol2 = result[-1] self.failUnless(ol2 is overlapped) data = win32file.ReadFile(handle, 512)[1] win32file.WriteFile(handle, data)
def SimpleFileDemo(): testName = os.path.join( win32api.GetTempPath(), "win32file_demo_test_file") if os.path.exists(testName): os.unlink(testName) # Open the file for writing. handle = win32file.CreateFile(testName, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None) test_data = "Hello\0there".encode("ascii") win32file.WriteFile(handle, test_data) handle.Close() # Open it for reading. handle = win32file.CreateFile(testName, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None) rc, data = win32file.ReadFile(handle, 1024) handle.Close() if data == test_data: print("Successfully wrote and read a file") else: raise Exception("Got different data back???") os.unlink(testName)
def checkWork(self): finished = 0 fullDataRead = [] while 1: try: buffer, bytesToRead, result = win32pipe.PeekNamedPipe(self.pipe, 1) # finished = (result == -1) if not bytesToRead: break hr, data = win32file.ReadFile(self.pipe, bytesToRead, None) fullDataRead.append(data) except win32api.error: finished = 1 break dataBuf = b''.join(fullDataRead) if dataBuf: self.receivedCallback(dataBuf) if finished: self.cleanup() return len(dataBuf)
def _recv(self, which, maxsize): conn, maxsize = self.get_conn_maxsize(which, maxsize) if conn is None: return None try: x = msvcrt.get_osfhandle(conn.fileno()) (read, nAvail, nMessage) = PeekNamedPipe(x, 0) if maxsize < nAvail: nAvail = maxsize if nAvail > 0: (errCode, read) = ReadFile(x, nAvail, None) except ValueError: return self._close(which) except (subprocess.pywintypes.error, Exception), why: if why[0] in (109, errno.ESHUTDOWN): return self._close(which) raise #if self.universal_newlines: # read = self._translate_newlines(read) return read
def add_input_ready_callback(self, callback): """ Add a new callback to be called for when there's input ready to read. """ def poll(): while True: try: status, from_pipe = win32file.ReadFile(self.stdout_handle, 65536) except Exception: # The pipe has ended. self.ready_f.set_result(None) return result = from_pipe.decode('utf-8', 'ignore') self._buffer.append(result) self.loop.call_from_executor(callback) self.loop.run_in_executor(poll, _daemon=True) return self._input_ready_callbacks.append(callback)
def _recv(self, which, maxsize): conn, maxsize = self.get_conn_maxsize(which, maxsize) if conn is None: return None try: x = msvcrt.get_osfhandle(conn.fileno()) (read, nAvail, nMessage) = PeekNamedPipe(x, 0) if maxsize < nAvail: nAvail = maxsize if nAvail > 0: (errCode, read) = ReadFile(x, nAvail, None) except ValueError: return self._close(which) except (subprocess.pywintypes.error, Exception) as why: if why[0] in (109, errno.ESHUTDOWN): return self._close(which) raise if self.universal_newlines: read = self._translate_newlines(read) return read
def ReadFile(handle, desired_bytes, ol = None): c_read = DWORD() buffer = ctypes.create_string_buffer(desired_bytes+1) success = ctypes.windll.kernel32.ReadFile(handle, buffer, desired_bytes, ctypes.byref(c_read), ol) buffer[c_read.value] = null_byte return ctypes.windll.kernel32.GetLastError(), decode(buffer.value)
def readcom1line(comhandle): err = 0 char = None line = '' while err == 0 and char != '\r': err, char = win32file.ReadFile(comhandle, 1, None) if err == 0: if char == '\n': pass # eat newlines else: line = line + char else: break return line
def __init__(self, protocol, deviceNameOrPortNumber, reactor, baudrate = 9600, bytesize = EIGHTBITS, parity = PARITY_NONE, stopbits = STOPBITS_ONE, xonxoff = 0, rtscts = 0): self._serial = serial.Serial(deviceNameOrPortNumber, baudrate=baudrate, bytesize=bytesize, parity=parity, stopbits=stopbits, timeout=None, xonxoff=xonxoff, rtscts=rtscts) self.flushInput() self.flushOutput() self.reactor = reactor self.protocol = protocol self.outQueue = [] self.closed = 0 self.closedNotifies = 0 self.writeInProgress = 0 self.protocol = protocol self._overlappedRead = win32file.OVERLAPPED() self._overlappedRead.hEvent = win32event.CreateEvent(None, 1, 0, None) self._overlappedWrite = win32file.OVERLAPPED() self._overlappedWrite.hEvent = win32event.CreateEvent(None, 0, 0, None) self.reactor.addEvent(self._overlappedRead.hEvent, self, 'serialReadEvent') self.reactor.addEvent(self._overlappedWrite.hEvent, self, 'serialWriteEvent') self.protocol.makeConnection(self) flags, comstat = win32file.ClearCommError(self._serial.hComPort) rc, self.read_buf = win32file.ReadFile(self._serial.hComPort, win32file.AllocateReadBuffer(1), self._overlappedRead)
def ReadFile(handle, size): err, data = _ReadFile(handle, size) return err, data.decode('ascii')
def _read_header(handle, bufsize=4096): """INTERNAL: read a stub header from a handle.""" header = '' while '\n\n' not in header: err, data = ReadFile(handle, bufsize) header += data return header
def write_outconfig(fileName, pipe): tmpFile = open(fileName, "w") while 1: try: (ret, line) = win32file.ReadFile(pipe, 4096, None) if ret != 0 or line == "": break else: tmpFile.write(line) except: break tmpFile.close()
def recv(self, bufsize, flags=0): err, data = win32file.ReadFile(self._handle, bufsize) return data
def recv_into(self, buf, nbytes=0): if six.PY2: return self._recv_into_py2(buf, nbytes) readbuf = buf if not isinstance(buf, memoryview): readbuf = memoryview(buf) err, data = win32file.ReadFile( self._handle, readbuf[:nbytes] if nbytes else readbuf ) return len(data)
def _recv_into_py2(self, buf, nbytes): err, data = win32file.ReadFile(self._handle, nbytes or len(buf)) n = len(data) buf[:n] = data return n
def ReadFile(handle, desired_bytes, ol = None): c_read = DWORD() buffer = ctypes.create_string_buffer(desired_bytes+1) success = ctypes.windll.kernel32.ReadFile(handle, buffer, desired_bytes, ctypes.byref(c_read), ol) buffer[c_read.value] = '\0' return ctypes.windll.kernel32.GetLastError(), decode(buffer.value)
def _serverThread(self, pipe_handle, event, wait_time): # just do one connection and terminate. hr = win32pipe.ConnectNamedPipe(pipe_handle) self.failUnless(hr in (0, winerror.ERROR_PIPE_CONNECTED), "Got error code 0x%x" % (hr,)) hr, got = win32file.ReadFile(pipe_handle, 100) self.failUnlessEqual(got, str2bytes("foo\0bar")) time.sleep(wait_time) win32file.WriteFile(pipe_handle, str2bytes("bar\0foo")) pipe_handle.Close() event.set()
def testMoreFiles(self): # Create a file in the %TEMP% directory. testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" ) desiredAccess = win32file.GENERIC_READ | win32file.GENERIC_WRITE # Set a flag to delete the file automatically when it is closed. fileFlags = win32file.FILE_FLAG_DELETE_ON_CLOSE h = win32file.CreateFile( testName, desiredAccess, win32file.FILE_SHARE_READ, None, win32file.CREATE_ALWAYS, fileFlags, 0) # Write a known number of bytes to the file. data = str2bytes("z") * 1025 win32file.WriteFile(h, data) self.failUnless(win32file.GetFileSize(h) == len(data), "WARNING: Written file does not have the same size as the length of the data in it!") # Ensure we can read the data back. win32file.SetFilePointer(h, 0, win32file.FILE_BEGIN) hr, read_data = win32file.ReadFile(h, len(data)+10) # + 10 to get anything extra self.failUnless(hr==0, "Readfile returned %d" % hr) self.failUnless(read_data == data, "Read data is not what we wrote!") # Now truncate the file at 1/2 its existing size. newSize = len(data)//2 win32file.SetFilePointer(h, newSize, win32file.FILE_BEGIN) win32file.SetEndOfFile(h) self.failUnlessEqual(win32file.GetFileSize(h), newSize) # GetFileAttributesEx/GetFileAttributesExW tests. self.failUnlessEqual(win32file.GetFileAttributesEx(testName), win32file.GetFileAttributesExW(testName)) attr, ct, at, wt, size = win32file.GetFileAttributesEx(testName) self.failUnless(size==newSize, "Expected GetFileAttributesEx to return the same size as GetFileSize()") self.failUnless(attr==win32file.GetFileAttributes(testName), "Expected GetFileAttributesEx to return the same attributes as GetFileAttributes") h = None # Close the file by removing the last reference to the handle! self.failUnless(not os.path.isfile(testName), "After closing the file, it still exists!")
def testSimpleOverlapped(self): # Create a file in the %TEMP% directory. import win32event testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" ) desiredAccess = win32file.GENERIC_WRITE overlapped = pywintypes.OVERLAPPED() evt = win32event.CreateEvent(None, 0, 0, None) overlapped.hEvent = evt # Create the file and write shit-loads of data to it. h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.CREATE_ALWAYS, 0, 0) chunk_data = str2bytes("z") * 0x8000 num_loops = 512 expected_size = num_loops * len(chunk_data) for i in range(num_loops): win32file.WriteFile(h, chunk_data, overlapped) win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE) overlapped.Offset = overlapped.Offset + len(chunk_data) h.Close() # Now read the data back overlapped overlapped = pywintypes.OVERLAPPED() evt = win32event.CreateEvent(None, 0, 0, None) overlapped.hEvent = evt desiredAccess = win32file.GENERIC_READ h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.OPEN_EXISTING, 0, 0) buffer = win32file.AllocateReadBuffer(0xFFFF) while 1: try: hr, data = win32file.ReadFile(h, buffer, overlapped) win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE) overlapped.Offset = overlapped.Offset + len(data) if not data is buffer: self.fail("Unexpected result from ReadFile - should be the same buffer we passed it") except win32api.error: break h.Close()
def _finishPortSetup(self): """ Finish setting up the serial port. This is a separate method to facilitate testing. """ flags, comstat = win32file.ClearCommError(self._serial.hComPort) rc, self.read_buf = win32file.ReadFile(self._serial.hComPort, win32file.AllocateReadBuffer(1), self._overlappedRead)
def read(self, offset, length): try: win32file.SetFilePointer(self.fhandle, offset, 0) _, data = win32file.ReadFile(self.fhandle, length) except Exception: return addrspace.ZEROER.GetZeros(length) return data
def DumpWithRead(self, output_filename): """Read the image and write all the data to a raw file.""" with open(output_filename, "wb") as outfd: offset = 0 for start, length in self.runs: if start > offset: print("\nPadding from 0x%X to 0x%X\n" % (offset, start)) self.PadWithNulls(outfd, start - offset) offset = start end = start + length while offset < end: to_read = min(self.buffer_size, end - offset) win32file.SetFilePointer(self.fd, offset, 0) _, data = win32file.ReadFile(self.fd, to_read) outfd.write(data) offset += to_read offset_in_mb = offset/1024/1024 if not offset_in_mb % 50: sys.stdout.write("\n%04dMB\t" % offset_in_mb) sys.stdout.write(".") sys.stdout.flush()
def read_text(self, amount): " Read terminal output and return it. " result = ''.join(self._buffer) self._buffer = [] return result #status, from_pipe = win32file.ReadFile(self.stdout_handle, 65536) #return from_pipe.decode('utf-8', 'ignore')
def run(self): import win32file self.file_handle = win32file.CreateFile(self.named_pipe_path, win32file.GENERIC_READ | win32file.GENERIC_WRITE, 0, None, win32file.OPEN_EXISTING, 0, None) log.info('Windows named pipe connected') self.fire_connected() while True: # The following code is cleaner, than waiting for an exception while writing to detect pipe closing, # but causes mpv to hang and crash while closing when closed at the wrong time. # if win32file.GetFileAttributes(self.named_pipe_path) != win32file.FILE_ATTRIBUTE_NORMAL: # # pipe was closed # break try: while not self.write_queue.empty(): win32file.WriteFile(self.file_handle, self.write_queue.get_nowait()) except win32file.error: log.warning('Exception while writing to Windows named pipe. Assuming pipe closed.') break size = win32file.GetFileSize(self.file_handle) if size > 0: while size > 0: # pipe has data to read data = win32file.ReadFile(self.file_handle, 512) self.on_data(data[1]) size = win32file.GetFileSize(self.file_handle) else: time.sleep(1) log.info('Windows named pipe closed') win32file.CloseHandle(self.file_handle) self.file_handle = None self.fire_disconnected()
def SpoolWorker(srcHandle, destHandle, outFiles, doneEvent): """Thread entry point for implementation of MakeSpyPipe""" try: buffer = win32file.AllocateReadBuffer(SPOOL_BYTES) while 1: try: #print >> SPOOL_ERROR, "Calling ReadFile..."; SPOOL_ERROR.flush() hr, data = win32file.ReadFile(srcHandle, buffer) #print >> SPOOL_ERROR, "ReadFile returned '%s', '%s'" % (str(hr), str(data)); SPOOL_ERROR.flush() if hr != 0: raise Exception("win32file.ReadFile returned %i, '%s'" % (hr, data)) elif len(data) == 0: break except pywintypes.error, e: #print >> SPOOL_ERROR, "ReadFile threw '%s'" % str(e); SPOOL_ERROR.flush() if e.args[0] == winerror.ERROR_BROKEN_PIPE: break else: raise e #print >> SPOOL_ERROR, "Writing to %i file objects..." % len(outFiles); SPOOL_ERROR.flush() for f in outFiles: f.write(data) #print >> SPOOL_ERROR, "Done writing to file objects."; SPOOL_ERROR.flush() #print >> SPOOL_ERROR, "Writing to destination %s" % str(destHandle); SPOOL_ERROR.flush() if destHandle: #print >> SPOOL_ERROR, "Calling WriteFile..."; SPOOL_ERROR.flush() hr, bytes = win32file.WriteFile(destHandle, data) #print >> SPOOL_ERROR, "WriteFile() passed %i bytes and returned %i, %i" % (len(data), hr, bytes); SPOOL_ERROR.flush() if hr != 0 or bytes != len(data): raise Exception("win32file.WriteFile() passed %i bytes and returned %i, %i" % (len(data), hr, bytes)) srcHandle.Close() if doneEvent: win32event.SetEvent(doneEvent) if destHandle: destHandle.Close() except: info = sys.exc_info() SPOOL_ERROR.writelines(apply(traceback.format_exception, info), '') SPOOL_ERROR.flush() del info