Python win32file 模块,CreateFile() 实例源码

我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用win32file.CreateFile()

项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def SimpleFileDemo():
    testName = os.path.join( win32api.GetTempPath(), "win32file_demo_test_file")
    if os.path.exists(testName): os.unlink(testName)
    # Open the file for writing.
    handle = win32file.CreateFile(testName, 
                                  win32file.GENERIC_WRITE, 
                                  0, 
                                  None, 
                                  win32con.CREATE_NEW, 
                                  0, 
                                  None)
    test_data = "Hello\0there".encode("ascii")
    win32file.WriteFile(handle, test_data)
    handle.Close()
    # Open it for reading.
    handle = win32file.CreateFile(testName, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
    rc, data = win32file.ReadFile(handle, 1024)
    handle.Close()
    if data == test_data:
        print "Successfully wrote and read a file"
    else:
        raise Exception("Got different data back???")
    os.unlink(testName)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def load(self, drive=''):
        '''Closes cd drive door and waits until cd is readable.'''
        drive = drive or self.drive
        device = self.__getDeviceHandle(drive)
        hdevice = win32file.CreateFile(device, GENERIC_READ,
                                        FILE_SHARE_READ, None, OPEN_EXISTING, 0, 0)
        win32file.DeviceIoControl(hdevice,2967564,"", 0, None)
        win32file.CloseHandle(hdevice)
        # Poll drive for loaded and give up after timeout period
        i=0
        while i < 20:
            if self.__is_cd_inserted(drive) == 1:
                return 1
            else:
                time.sleep(1)
            i = i+1
        return 0
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def CopyFileToCe(src_name, dest_name, progress = None):
    sh = win32file.CreateFile(src_name, win32con.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
    bytes=0
    try:
        dh = wincerapi.CeCreateFile(dest_name, win32con.GENERIC_WRITE, 0, None, win32con.OPEN_ALWAYS, 0, None)
        try:
            while 1:
                hr, data = win32file.ReadFile(sh, 2048)
                if not data:
                    break
                wincerapi.CeWriteFile(dh, data)
                bytes = bytes + len(data)
                if progress is not None: progress(bytes)
        finally:
            pass
            dh.Close()
    finally:
        sh.Close()
    return bytes
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def testTransactNamedPipeBlocking(self):
        event = threading.Event()
        self.startPipeServer(event)
        open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE

        hpipe = win32file.CreateFile(self.pipename,
                                     open_mode,
                                     0, # no sharing
                                     None, # default security
                                     win32con.OPEN_EXISTING,
                                     0, # win32con.FILE_FLAG_OVERLAPPED,
                                     None)

        # set to message mode.
        win32pipe.SetNamedPipeHandleState(
                        hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)

        hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), 1024, None)
        self.failUnlessEqual(got, str2bytes("bar\0foo"))
        event.wait(5)
        self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def testTransactNamedPipeBlockingBuffer(self):
        # Like testTransactNamedPipeBlocking, but a pre-allocated buffer is
        # passed (not really that useful, but it exercises the code path)
        event = threading.Event()
        self.startPipeServer(event)
        open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE

        hpipe = win32file.CreateFile(self.pipename,
                                     open_mode,
                                     0, # no sharing
                                     None, # default security
                                     win32con.OPEN_EXISTING,
                                     0, # win32con.FILE_FLAG_OVERLAPPED,
                                     None)

        # set to message mode.
        win32pipe.SetNamedPipeHandleState(
                        hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)

        buffer = win32file.AllocateReadBuffer(1024)
        hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), buffer, None)
        self.failUnlessEqual(got, str2bytes("bar\0foo"))
        event.wait(5)
        self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def testSimpleFiles(self):
        fd, filename = tempfile.mkstemp()
        os.close(fd)
        os.unlink(filename)
        handle = win32file.CreateFile(filename, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None)
        test_data = str2bytes("Hello\0there")
        try:
            win32file.WriteFile(handle, test_data)
            handle.Close()
            # Try and open for read
            handle = win32file.CreateFile(filename, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
            rc, data = win32file.ReadFile(handle, 1024)
            self.assertEquals(data, test_data)
        finally:
            handle.Close()
            try:
                os.unlink(filename)
            except os.error:
                pass

    # A simple test using normal read/write operations.
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def setUp(self):
        self.watcher_threads = []
        self.watcher_thread_changes = []
        self.dir_names = []
        self.dir_handles = []
        for i in range(self.num_test_dirs):
            td = tempfile.mktemp("-test-directory-changes-%d" % i)
            os.mkdir(td)
            self.dir_names.append(td)
            hdir = win32file.CreateFile(td, 
                                        ntsecuritycon.FILE_LIST_DIRECTORY,
                                        win32con.FILE_SHARE_READ,
                                        None, # security desc
                                        win32con.OPEN_EXISTING,
                                        win32con.FILE_FLAG_BACKUP_SEMANTICS |
                                        win32con.FILE_FLAG_OVERLAPPED,
                                        None)
            self.dir_handles.append(hdir)

            changes = []
            t = threading.Thread(target=self._watcherThreadOverlapped,
                                 args=(td, hdir, changes))
            t.start()
            self.watcher_threads.append(t)
            self.watcher_thread_changes.append(changes)
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def CopyFileToCe(src_name, dest_name, progress = None):
    sh = win32file.CreateFile(src_name, win32con.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
    bytes=0
    try:
        dh = wincerapi.CeCreateFile(dest_name, win32con.GENERIC_WRITE, 0, None, win32con.OPEN_ALWAYS, 0, None)
        try:
            while 1:
                hr, data = win32file.ReadFile(sh, 2048)
                if not data:
                    break
                wincerapi.CeWriteFile(dh, data)
                bytes = bytes + len(data)
                if progress is not None: progress(bytes)
        finally:
            pass
            dh.Close()
    finally:
        sh.Close()
    return bytes
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def SimpleFileDemo():
    testName = os.path.join( win32api.GetTempPath(), "win32file_demo_test_file")
    if os.path.exists(testName): os.unlink(testName)
    # Open the file for writing.
    handle = win32file.CreateFile(testName, 
                                  win32file.GENERIC_WRITE, 
                                  0, 
                                  None, 
                                  win32con.CREATE_NEW, 
                                  0, 
                                  None)
    test_data = "Hello\0there".encode("ascii")
    win32file.WriteFile(handle, test_data)
    handle.Close()
    # Open it for reading.
    handle = win32file.CreateFile(testName, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
    rc, data = win32file.ReadFile(handle, 1024)
    handle.Close()
    if data == test_data:
        print("Successfully wrote and read a file")
    else:
        raise Exception("Got different data back???")
    os.unlink(testName)
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def testTransactNamedPipeBlocking(self):
        event = threading.Event()
        self.startPipeServer(event)
        open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE

        hpipe = win32file.CreateFile(self.pipename,
                                     open_mode,
                                     0, # no sharing
                                     None, # default security
                                     win32con.OPEN_EXISTING,
                                     0, # win32con.FILE_FLAG_OVERLAPPED,
                                     None)

        # set to message mode.
        win32pipe.SetNamedPipeHandleState(
                        hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)

        hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), 1024, None)
        self.failUnlessEqual(got, str2bytes("bar\0foo"))
        event.wait(5)
        self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def testSimpleFiles(self):
        fd, filename = tempfile.mkstemp()
        os.close(fd)
        os.unlink(filename)
        handle = win32file.CreateFile(filename, win32file.GENERIC_WRITE, 0, None, win32con.CREATE_NEW, 0, None)
        test_data = str2bytes("Hello\0there")
        try:
            win32file.WriteFile(handle, test_data)
            handle.Close()
            # Try and open for read
            handle = win32file.CreateFile(filename, win32file.GENERIC_READ, 0, None, win32con.OPEN_EXISTING, 0, None)
            rc, data = win32file.ReadFile(handle, 1024)
            self.assertEquals(data, test_data)
        finally:
            handle.Close()
            try:
                os.unlink(filename)
            except os.error:
                pass

    # A simple test using normal read/write operations.
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def setUp(self):
        self.watcher_threads = []
        self.watcher_thread_changes = []
        self.dir_names = []
        self.dir_handles = []
        for i in range(self.num_test_dirs):
            td = tempfile.mktemp("-test-directory-changes-%d" % i)
            os.mkdir(td)
            self.dir_names.append(td)
            hdir = win32file.CreateFile(td, 
                                        ntsecuritycon.FILE_LIST_DIRECTORY,
                                        win32con.FILE_SHARE_READ,
                                        None, # security desc
                                        win32con.OPEN_EXISTING,
                                        win32con.FILE_FLAG_BACKUP_SEMANTICS |
                                        win32con.FILE_FLAG_OVERLAPPED,
                                        None)
            self.dir_handles.append(hdir)

            changes = []
            t = threading.Thread(target=self._watcherThreadOverlapped,
                                 args=(td, hdir, changes))
            t.start()
            self.watcher_threads.append(t)
            self.watcher_thread_changes.append(changes)
项目:rvmi-rekall    作者:fireeye    | 项目源码 | 文件源码
def _OpenFileForRead(self, path):
        try:
            fhandle = self.fhandle = win32file.CreateFile(
                path,
                win32file.GENERIC_READ,
                win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE,
                None,
                win32file.OPEN_EXISTING,
                win32file.FILE_ATTRIBUTE_NORMAL,
                None)

            self._closer = weakref.ref(
                self, lambda x: win32file.CloseHandle(fhandle))

            self.write_enabled = False
            return fhandle

        except pywintypes.error as e:
            raise IOError("Unable to open %s: %s" % (path, e))
项目:rvmi-rekall    作者:fireeye    | 项目源码 | 文件源码
def _OpenFileForWrite(self, path):
        try:
            fhandle = self.fhandle = win32file.CreateFile(
                path,
                win32file.GENERIC_READ | win32file.GENERIC_WRITE,
                win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE,
                None,
                win32file.OPEN_EXISTING,
                win32file.FILE_ATTRIBUTE_NORMAL,
                None)
            self.write_enabled = True
            self._closer = weakref.ref(
                self, lambda x: win32file.CloseHandle(fhandle))

            return fhandle

        except pywintypes.error as e:
            raise IOError("Unable to open %s: %s" % (path, e))
项目:BrainDamage    作者:mehulj94    | 项目源码 | 文件源码
def changeTimestamp(path,fileName,daylimit):
    print '[*] Inside changeTimestamp'
    dates = {}
    fileName = os.path.join(path, fileName)
    dates['tdata'] = getDate(fileName,daylimit)
    dates['ctime'] = datetime.utcfromtimestamp(os.path.getctime(fileName))

    # print "[*] Original time: ",str(dates['ctime'])

    if __use_win_32:
        filehandle = win32file.CreateFile(fileName, win32file.GENERIC_WRITE, 0, None, win32con.OPEN_EXISTING, 0, None)
        win32file.SetFileTime(filehandle, dates['tdata'],dates['tdata'],dates['tdata'])
        filehandle.close()
        print "[*] Timestamps changed!!"
    else:
        os.utime(fileName, (time.mktime(dates['tdata'].utctimetuple()),)*2)

    dates['mtime'] = datetime.utcfromtimestamp(os.path.getmtime(fileName))    
    # print "[*] Modified time: ",str(dates['mtime'])
项目:ptterm    作者:jonathanslenders    | 项目源码 | 文件源码
def __init__(self):
        self.pty = Pty()
        self.ready_f = Future()
        self._input_ready_callbacks = []
        self.loop = get_event_loop()

        self.stdout_handle = win32file.CreateFile(
            self.pty.conout_name(),
            win32con.GENERIC_READ,
            0,
            win32security.SECURITY_ATTRIBUTES(),
            win32con.OPEN_EXISTING,
            win32con.FILE_FLAG_OVERLAPPED,
            0)

        self.stdin_handle = win32file.CreateFile(
            self.pty.conin_name(),
            win32con.GENERIC_WRITE,
            0,
            win32security.SECURITY_ATTRIBUTES(),
            win32con.OPEN_EXISTING,
            win32con.FILE_FLAG_OVERLAPPED,
            0)
        self._buffer = []
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def pipe(bufsize=8192):
        """Creates overlapped (asynchronous) pipe.
        """
        name = r'\\.\pipe\pycos-pipe-%d-%d' % (os.getpid(), next(_pipe_id))
        openmode = (win32pipe.PIPE_ACCESS_INBOUND | win32file.FILE_FLAG_OVERLAPPED |
                    FILE_FLAG_FIRST_PIPE_INSTANCE)
        pipemode = (win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_READMODE_BYTE)
        rh = wh = None
        try:
            rh = win32pipe.CreateNamedPipe(
                name, openmode, pipemode, 1, bufsize, bufsize,
                win32pipe.NMPWAIT_USE_DEFAULT_WAIT, None)

            wh = win32file.CreateFile(
                name, win32file.GENERIC_WRITE | winnt.FILE_READ_ATTRIBUTES, 0, None,
                win32file.OPEN_EXISTING, win32file.FILE_FLAG_OVERLAPPED, None)

            overlapped = pywintypes.OVERLAPPED()
            # 'yield' can't be used in constructor so use sync wait
            # (in this case it is should be okay)
            overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
            rc = win32pipe.ConnectNamedPipe(rh, overlapped)
            if rc == winerror.ERROR_PIPE_CONNECTED:
                win32event.SetEvent(overlapped.hEvent)
            rc = win32event.WaitForSingleObject(overlapped.hEvent, 1000)
            overlapped = None
            if rc != win32event.WAIT_OBJECT_0:
                pycos.logger.warning('connect failed: %s' % rc)
                raise Exception(rc)
            return (rh, wh)
        except:
            if rh is not None:
                win32file.CloseHandle(rh)
            if wh is not None:
                win32file.CloseHandle(wh)
            raise
项目:pycos    作者:pgiri    | 项目源码 | 文件源码
def pipe(bufsize=8192):
        """Creates overlapped (asynchronous) pipe.
        """
        name = r'\\.\pipe\pycos-pipe-%d-%d' % (os.getpid(), next(_pipe_id))
        openmode = (win32pipe.PIPE_ACCESS_INBOUND | win32file.FILE_FLAG_OVERLAPPED |
                    FILE_FLAG_FIRST_PIPE_INSTANCE)
        pipemode = (win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_READMODE_BYTE)
        rh = wh = None
        try:
            rh = win32pipe.CreateNamedPipe(
                name, openmode, pipemode, 1, bufsize, bufsize,
                win32pipe.NMPWAIT_USE_DEFAULT_WAIT, None)

            wh = win32file.CreateFile(
                name, win32file.GENERIC_WRITE | winnt.FILE_READ_ATTRIBUTES, 0, None,
                win32file.OPEN_EXISTING, win32file.FILE_FLAG_OVERLAPPED, None)

            overlapped = pywintypes.OVERLAPPED()
            # 'yield' can't be used in constructor so use sync wait
            # (in this case it is should be okay)
            overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
            rc = win32pipe.ConnectNamedPipe(rh, overlapped)
            if rc == winerror.ERROR_PIPE_CONNECTED:
                win32event.SetEvent(overlapped.hEvent)
            rc = win32event.WaitForSingleObject(overlapped.hEvent, 1000)
            overlapped = None
            if rc != win32event.WAIT_OBJECT_0:
                pycos.logger.warning('connect failed: %s' % rc)
                raise Exception(rc)
            return (rh, wh)
        except:
            if rh is not None:
                win32file.CloseHandle(rh)
            if wh is not None:
                win32file.CloseHandle(wh)
            raise
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def opencom1():
    # returns a handle to the Windows file
    return win32file.CreateFile(u'COM1:', win32file.GENERIC_READ, \
                                0, None, win32file.OPEN_EXISTING, 0, None)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def eject(self, drive=''):
        '''Opens the cd drive door.'''
        drive = drive or self.drive
        device = self.__getDeviceHandle(drive)
        hdevice = win32file.CreateFile(device, GENERIC_READ,
                                        FILE_SHARE_READ, None, OPEN_EXISTING, 0, 0)
        win32file.DeviceIoControl(hdevice,2967560,"", 0, None)
        win32file.CloseHandle(hdevice)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def close(self, drive=''):
        '''Closes the cd drive door.'''
        drive = drive or self.drive
        device = self.__getDeviceHandle(drive)
        hdevice = win32file.CreateFile(device, GENERIC_READ,
                                        FILE_SHARE_READ, None, OPEN_EXISTING, 0, 0)
        win32file.DeviceIoControl(hdevice,2967564,"", 0, None)
        win32file.CloseHandle(hdevice)
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def watchos():
    #get path or maintain current path of app
    FILE_LIST_DIRECTORY = 0x0001
    try: path_to_watch = myos.get() or "."
    except: path_to_watch = "."
    path_to_watch = os.path.abspath(path_to_watch)

    textbox.insert(END, "Watching %s at %s" % (path_to_watch, time.asctime()) + "\n\n")
    # FindFirstChangeNotification sets up a handle for watching
    #  file changes.
    while 1:

        hDir = win32file.CreateFile (
            path_to_watch,
            FILE_LIST_DIRECTORY,
            win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
            None,
            win32con.OPEN_EXISTING,
            win32con.FILE_FLAG_BACKUP_SEMANTICS,
            None
            )

        change_handle = win32file.ReadDirectoryChangesW (
            hDir,
            1024,
            True,#Heap Size include_subdirectories,
            win32con.FILE_NOTIFY_CHANGE_FILE_NAME |
            win32con.FILE_NOTIFY_CHANGE_DIR_NAME |
            win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES |
            win32con.FILE_NOTIFY_CHANGE_SIZE |
            win32con.FILE_NOTIFY_CHANGE_LAST_WRITE |
            win32con.FILE_NOTIFY_CHANGE_SECURITY,
            None,
            None
            )

        # Loop forever, listing any file changes. The WaitFor... will
        #  time out every half a second allowing for keyboard interrupts
        #  to terminate the loop.
        ACTIONS = {
            1 : "Created",
            2 : "Deleted",
            3 : "Updated",
            4 : "Renamed from something",
            5 : "Renamed to something"
            }
        results = change_handle
        for action, files in results:
            full_filename = os.path.join(path_to_watch, files)
            theact = ACTIONS.get(action, "Unknown")
            textbox.insert(END, str(full_filename) + "\t" + str(theact) +"\n")
项目:gpvdm    作者:roderickmackenzie    | 项目源码 | 文件源码
def run(self):
        if running_on_linux()==True:
            print("thread: start")
            wm = pyinotify.WatchManager()
            print("wathcing path",self.watch_path)
            ret=wm.add_watch(self.watch_path, pyinotify.IN_CLOSE_WRITE, self.onChange,False,False)
            print(ret)
            print("thread: start notifyer",self.notifier)
            self.notifier = pyinotify.Notifier(wm)
            try:
                while 1:
                    self.notifier.process_events()
                    if self.notifier.check_events():
                        self.notifier.read_events()
            #self.notifier.loop()
            except:
                print("error in notify",sys.exc_info()[0])
        else:
            hDir = win32file.CreateFile (self.watch_path,FILE_LIST_DIRECTORY,win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE,None,win32con.OPEN_EXISTING,win32con.FILE_FLAG_BACKUP_SEMANTICS,None)

            while 1:
                results = win32file.ReadDirectoryChangesW (hDir,1024,True,
                win32con.FILE_NOTIFY_CHANGE_FILE_NAME |
                win32con.FILE_NOTIFY_CHANGE_DIR_NAME |
                win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES |
                win32con.FILE_NOTIFY_CHANGE_SIZE |
                win32con.FILE_NOTIFY_CHANGE_LAST_WRITE |
                win32con.FILE_NOTIFY_CHANGE_SECURITY,
                None,
                None)

                for action, file in results:
                    full_filename = os.path.join (self.watch_path, file)
                    self.onChange(full_filename)
项目:cmdchallenge-site    作者:jarv    | 项目源码 | 文件源码
def connect(self, address):
        win32pipe.WaitNamedPipe(address, self._timeout)
        try:
            handle = win32file.CreateFile(
                address,
                win32file.GENERIC_READ | win32file.GENERIC_WRITE,
                0,
                None,
                win32file.OPEN_EXISTING,
                cSECURITY_ANONYMOUS | cSECURITY_SQOS_PRESENT,
                0
            )
        except win32pipe.error as e:
            # See Remarks:
            # https://msdn.microsoft.com/en-us/library/aa365800.aspx
            if e.winerror == cERROR_PIPE_BUSY:
                # Another program or thread has grabbed our pipe instance
                # before we got to it. Wait for availability and attempt to
                # connect again.
                win32pipe.WaitNamedPipe(address, RETRY_WAIT_TIMEOUT)
                return self.connect(address)
            raise e

        self.flags = win32pipe.GetNamedPipeInfo(handle)[0]

        self._handle = handle
        self._address = address
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def testMoreFiles(self):
        # Create a file in the %TEMP% directory.
        testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
        desiredAccess = win32file.GENERIC_READ | win32file.GENERIC_WRITE
        # Set a flag to delete the file automatically when it is closed.
        fileFlags = win32file.FILE_FLAG_DELETE_ON_CLOSE
        h = win32file.CreateFile( testName, desiredAccess, win32file.FILE_SHARE_READ, None, win32file.CREATE_ALWAYS, fileFlags, 0)

        # Write a known number of bytes to the file.
        data = str2bytes("z") * 1025

        win32file.WriteFile(h, data)

        self.failUnless(win32file.GetFileSize(h) == len(data), "WARNING: Written file does not have the same size as the length of the data in it!")

        # Ensure we can read the data back.
        win32file.SetFilePointer(h, 0, win32file.FILE_BEGIN)
        hr, read_data = win32file.ReadFile(h, len(data)+10) # + 10 to get anything extra
        self.failUnless(hr==0, "Readfile returned %d" % hr)

        self.failUnless(read_data == data, "Read data is not what we wrote!")

        # Now truncate the file at 1/2 its existing size.
        newSize = len(data)//2
        win32file.SetFilePointer(h, newSize, win32file.FILE_BEGIN)
        win32file.SetEndOfFile(h)
        self.failUnlessEqual(win32file.GetFileSize(h), newSize)

        # GetFileAttributesEx/GetFileAttributesExW tests.
        self.failUnlessEqual(win32file.GetFileAttributesEx(testName), win32file.GetFileAttributesExW(testName))

        attr, ct, at, wt, size = win32file.GetFileAttributesEx(testName)
        self.failUnless(size==newSize, 
                        "Expected GetFileAttributesEx to return the same size as GetFileSize()")
        self.failUnless(attr==win32file.GetFileAttributes(testName), 
                        "Expected GetFileAttributesEx to return the same attributes as GetFileAttributes")

        h = None # Close the file by removing the last reference to the handle!

        self.failUnless(not os.path.isfile(testName), "After closing the file, it still exists!")
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def testFilePointer(self):
        # via [ 979270 ] SetFilePointer fails with negative offset

        # Create a file in the %TEMP% directory.
        filename = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )

        f = win32file.CreateFile(filename,
                                win32file.GENERIC_READ|win32file.GENERIC_WRITE,
                                0,
                                None,
                                win32file.CREATE_ALWAYS,
                                win32file.FILE_ATTRIBUTE_NORMAL,
                                0)
        try:
            #Write some data
            data = str2bytes('Some data')
            (res, written) = win32file.WriteFile(f, data)

            self.failIf(res)
            self.assertEqual(written, len(data))

            #Move at the beginning and read the data
            win32file.SetFilePointer(f, 0, win32file.FILE_BEGIN)
            (res, s) = win32file.ReadFile(f, len(data))

            self.failIf(res)
            self.assertEqual(s, data)

            #Move at the end and read the data
            win32file.SetFilePointer(f, -len(data), win32file.FILE_END)
            (res, s) = win32file.ReadFile(f, len(data))

            self.failIf(res)
            self.failUnlessEqual(s, data)
        finally:
            f.Close()
            os.unlink(filename)
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def testFileTimesTimezones(self):
        if not issubclass(pywintypes.TimeType, datetime.datetime):
            # maybe should report 'skipped', but that's not quite right as
            # there is nothing you can do to avoid it being skipped!
            return
        filename = tempfile.mktemp("-testFileTimes")
        now_utc = win32timezone.utcnow()
        now_local = now_utc.astimezone(win32timezone.TimeZoneInfo.local())
        h = win32file.CreateFile(filename,
                                 win32file.GENERIC_READ|win32file.GENERIC_WRITE,
                                 0, None, win32file.CREATE_ALWAYS, 0, 0)
        try:
            win32file.SetFileTime(h, now_utc, now_utc, now_utc)
            ct, at, wt = win32file.GetFileTime(h)
            self.failUnlessEqual(now_local, ct)
            self.failUnlessEqual(now_local, at)
            self.failUnlessEqual(now_local, wt)
            # and the reverse - set local, check against utc
            win32file.SetFileTime(h, now_local, now_local, now_local)
            ct, at, wt = win32file.GetFileTime(h)
            self.failUnlessEqual(now_utc, ct)
            self.failUnlessEqual(now_utc, at)
            self.failUnlessEqual(now_utc, wt)
        finally:
            h.close()
            os.unlink(filename)
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def HttpExtensionProc(self, ecb):
        # NOTE: If you use a ThreadPoolExtension, you must still perform
        # this check in HttpExtensionProc - raising the exception from
        # The "Dispatch" method will just cause the exception to be
        # rendered to the browser.
        if self.reload_watcher.change_detected:
            print "Doing reload"
            raise InternalReloadException

        if ecb.GetServerVariable("UNICODE_URL").endswith("test.py"):
            file_flags = win32con.FILE_FLAG_SEQUENTIAL_SCAN  | win32con.FILE_FLAG_OVERLAPPED
            hfile = win32file.CreateFile(__file__, win32con.GENERIC_READ,
                                         0, None, win32con.OPEN_EXISTING,
                                         file_flags, None)
            flags = isapicon.HSE_IO_ASYNC | isapicon.HSE_IO_DISCONNECT_AFTER_SEND | \
                    isapicon.HSE_IO_SEND_HEADERS
            # We pass hFile to the callback simply as a way of keeping it alive
            # for the duration of the transmission
            try:
                ecb.TransmitFile(TransmitFileCallback, hfile,
                                 int(hfile),
                                 "200 OK",
                                 0, 0, None, None, flags)
            except:
                # Errors keep this source file open!
                hfile.Close()
                raise
        else:
            # default response
            ecb.SendResponseHeaders("200 OK", "Content-Type: text/html\r\n\r\n", 0)
            print >> ecb, "<HTML><BODY>"
            print >> ecb, "The root of this site is at", ecb.MapURLToPath("/")
            print >> ecb, "</BODY></HTML>"
            ecb.close()
        return isapicon.HSE_STATUS_SUCCESS
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def HttpExtensionProc(self, ecb):
        # NOTE: If you use a ThreadPoolExtension, you must still perform
        # this check in HttpExtensionProc - raising the exception from
        # The "Dispatch" method will just cause the exception to be
        # rendered to the browser.
        if self.reload_watcher.change_detected:
            print("Doing reload")
            raise InternalReloadException

        if ecb.GetServerVariable("UNICODE_URL").endswith("test.py"):
            file_flags = win32con.FILE_FLAG_SEQUENTIAL_SCAN  | win32con.FILE_FLAG_OVERLAPPED
            hfile = win32file.CreateFile(__file__, win32con.GENERIC_READ,
                                         0, None, win32con.OPEN_EXISTING,
                                         file_flags, None)
            flags = isapicon.HSE_IO_ASYNC | isapicon.HSE_IO_DISCONNECT_AFTER_SEND | \
                    isapicon.HSE_IO_SEND_HEADERS
            # We pass hFile to the callback simply as a way of keeping it alive
            # for the duration of the transmission
            try:
                ecb.TransmitFile(TransmitFileCallback, hfile,
                                 int(hfile),
                                 "200 OK",
                                 0, 0, None, None, flags)
            except:
                # Errors keep this source file open!
                hfile.Close()
                raise
        else:
            # default response
            ecb.SendResponseHeaders("200 OK", "Content-Type: text/html\r\n\r\n", 0)
            print("<HTML><BODY>", file=ecb)
            print("The root of this site is at", ecb.MapURLToPath("/"), file=ecb)
            print("</BODY></HTML>", file=ecb)
            ecb.close()
        return isapicon.HSE_STATUS_SUCCESS
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def TestDeviceNotifications(dir_names):
    wc = win32gui.WNDCLASS()
    wc.lpszClassName = 'test_devicenotify'
    wc.style =  win32con.CS_GLOBALCLASS|win32con.CS_VREDRAW | win32con.CS_HREDRAW
    wc.hbrBackground = win32con.COLOR_WINDOW+1
    wc.lpfnWndProc={win32con.WM_DEVICECHANGE:OnDeviceChange}
    class_atom=win32gui.RegisterClass(wc)
    hwnd = win32gui.CreateWindow(wc.lpszClassName,
        'Testing some devices',
        # no need for it to be visible.
        win32con.WS_CAPTION,
        100,100,900,900, 0, 0, 0, None)

    hdevs = []
    # Watch for all USB device notifications
    filter = win32gui_struct.PackDEV_BROADCAST_DEVICEINTERFACE(
                                        GUID_DEVINTERFACE_USB_DEVICE)
    hdev = win32gui.RegisterDeviceNotification(hwnd, filter,
                                               win32con.DEVICE_NOTIFY_WINDOW_HANDLE)
    hdevs.append(hdev)
    # and create handles for all specified directories
    for d in dir_names:
        hdir = win32file.CreateFile(d, 
                                    winnt.FILE_LIST_DIRECTORY, 
                                    winnt.FILE_SHARE_READ | winnt.FILE_SHARE_WRITE | winnt.FILE_SHARE_DELETE,
                                    None, # security attributes
                                    win32con.OPEN_EXISTING,
                                    win32con.FILE_FLAG_BACKUP_SEMANTICS | # required privileges: SE_BACKUP_NAME and SE_RESTORE_NAME.
                                    win32con.FILE_FLAG_OVERLAPPED,
                                    None)

        filter = win32gui_struct.PackDEV_BROADCAST_HANDLE(hdir)
        hdev = win32gui.RegisterDeviceNotification(hwnd, filter,
                                          win32con.DEVICE_NOTIFY_WINDOW_HANDLE)
        hdevs.append(hdev)

    # now start a message pump and wait for messages to be delivered.
    print("Watching", len(hdevs), "handles - press Ctrl+C to terminate, or")
    print("add and remove some USB devices...")
    if not dir_names:
        print("(Note you can also pass paths to watch on the command-line - eg,")
        print("pass the root of an inserted USB stick to see events specific to")
        print("that volume)")
    while 1:
        win32gui.PumpWaitingMessages()
        time.sleep(0.01)
    win32gui.DestroyWindow(hwnd)
    win32gui.UnregisterClass(wc.lpszClassName, None)
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def testTransactNamedPipeAsync(self):
        event = threading.Event()
        overlapped = pywintypes.OVERLAPPED()
        overlapped.hEvent = win32event.CreateEvent(None, 0, 0, None)
        self.startPipeServer(event, 0.5)
        open_mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE

        hpipe = win32file.CreateFile(self.pipename,
                                     open_mode,
                                     0, # no sharing
                                     None, # default security
                                     win32con.OPEN_EXISTING,
                                     win32con.FILE_FLAG_OVERLAPPED,
                                     None)

        # set to message mode.
        win32pipe.SetNamedPipeHandleState(
                        hpipe, win32pipe.PIPE_READMODE_MESSAGE, None, None)

        buffer = win32file.AllocateReadBuffer(1024)
        hr, got = win32pipe.TransactNamedPipe(hpipe, str2bytes("foo\0bar"), buffer, overlapped)
        self.failUnlessEqual(hr, winerror.ERROR_IO_PENDING)
        nbytes = win32file.GetOverlappedResult(hpipe, overlapped, True)
        got = buffer[:nbytes]
        self.failUnlessEqual(got, str2bytes("bar\0foo"))
        event.wait(5)
        self.failUnless(event.isSet(), "Pipe server thread didn't terminate")
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def testMoreFiles(self):
        # Create a file in the %TEMP% directory.
        testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
        desiredAccess = win32file.GENERIC_READ | win32file.GENERIC_WRITE
        # Set a flag to delete the file automatically when it is closed.
        fileFlags = win32file.FILE_FLAG_DELETE_ON_CLOSE
        h = win32file.CreateFile( testName, desiredAccess, win32file.FILE_SHARE_READ, None, win32file.CREATE_ALWAYS, fileFlags, 0)

        # Write a known number of bytes to the file.
        data = str2bytes("z") * 1025

        win32file.WriteFile(h, data)

        self.failUnless(win32file.GetFileSize(h) == len(data), "WARNING: Written file does not have the same size as the length of the data in it!")

        # Ensure we can read the data back.
        win32file.SetFilePointer(h, 0, win32file.FILE_BEGIN)
        hr, read_data = win32file.ReadFile(h, len(data)+10) # + 10 to get anything extra
        self.failUnless(hr==0, "Readfile returned %d" % hr)

        self.failUnless(read_data == data, "Read data is not what we wrote!")

        # Now truncate the file at 1/2 its existing size.
        newSize = len(data)//2
        win32file.SetFilePointer(h, newSize, win32file.FILE_BEGIN)
        win32file.SetEndOfFile(h)
        self.failUnlessEqual(win32file.GetFileSize(h), newSize)

        # GetFileAttributesEx/GetFileAttributesExW tests.
        self.failUnlessEqual(win32file.GetFileAttributesEx(testName), win32file.GetFileAttributesExW(testName))

        attr, ct, at, wt, size = win32file.GetFileAttributesEx(testName)
        self.failUnless(size==newSize, 
                        "Expected GetFileAttributesEx to return the same size as GetFileSize()")
        self.failUnless(attr==win32file.GetFileAttributes(testName), 
                        "Expected GetFileAttributesEx to return the same attributes as GetFileAttributes")

        h = None # Close the file by removing the last reference to the handle!

        self.failUnless(not os.path.isfile(testName), "After closing the file, it still exists!")
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def testFilePointer(self):
        # via [ 979270 ] SetFilePointer fails with negative offset

        # Create a file in the %TEMP% directory.
        filename = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )

        f = win32file.CreateFile(filename,
                                win32file.GENERIC_READ|win32file.GENERIC_WRITE,
                                0,
                                None,
                                win32file.CREATE_ALWAYS,
                                win32file.FILE_ATTRIBUTE_NORMAL,
                                0)
        try:
            #Write some data
            data = str2bytes('Some data')
            (res, written) = win32file.WriteFile(f, data)

            self.failIf(res)
            self.assertEqual(written, len(data))

            #Move at the beginning and read the data
            win32file.SetFilePointer(f, 0, win32file.FILE_BEGIN)
            (res, s) = win32file.ReadFile(f, len(data))

            self.failIf(res)
            self.assertEqual(s, data)

            #Move at the end and read the data
            win32file.SetFilePointer(f, -len(data), win32file.FILE_END)
            (res, s) = win32file.ReadFile(f, len(data))

            self.failIf(res)
            self.failUnlessEqual(s, data)
        finally:
            f.Close()
            os.unlink(filename)
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def testFileTimesTimezones(self):
        if not issubclass(pywintypes.TimeType, datetime.datetime):
            # maybe should report 'skipped', but that's not quite right as
            # there is nothing you can do to avoid it being skipped!
            return
        filename = tempfile.mktemp("-testFileTimes")
        now_utc = win32timezone.utcnow()
        now_local = now_utc.astimezone(win32timezone.TimeZoneInfo.local())
        h = win32file.CreateFile(filename,
                                 win32file.GENERIC_READ|win32file.GENERIC_WRITE,
                                 0, None, win32file.CREATE_ALWAYS, 0, 0)
        try:
            win32file.SetFileTime(h, now_utc, now_utc, now_utc)
            ct, at, wt = win32file.GetFileTime(h)
            self.failUnlessEqual(now_local, ct)
            self.failUnlessEqual(now_local, at)
            self.failUnlessEqual(now_local, wt)
            # and the reverse - set local, check against utc
            win32file.SetFileTime(h, now_local, now_local, now_local)
            ct, at, wt = win32file.GetFileTime(h)
            self.failUnlessEqual(now_utc, ct)
            self.failUnlessEqual(now_utc, at)
            self.failUnlessEqual(now_utc, wt)
        finally:
            h.close()
            os.unlink(filename)
项目:CodeReader    作者:jasonrbr    | 项目源码 | 文件源码
def testSimpleOverlapped(self):
        # Create a file in the %TEMP% directory.
        import win32event
        testName = os.path.join( win32api.GetTempPath(), "win32filetest.dat" )
        desiredAccess = win32file.GENERIC_WRITE
        overlapped = pywintypes.OVERLAPPED()
        evt = win32event.CreateEvent(None, 0, 0, None)
        overlapped.hEvent = evt
        # Create the file and write shit-loads of data to it.
        h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.CREATE_ALWAYS, 0, 0)
        chunk_data = str2bytes("z") * 0x8000
        num_loops = 512
        expected_size = num_loops * len(chunk_data)
        for i in range(num_loops):
            win32file.WriteFile(h, chunk_data, overlapped)
            win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
            overlapped.Offset = overlapped.Offset + len(chunk_data)
        h.Close()
        # Now read the data back overlapped
        overlapped = pywintypes.OVERLAPPED()
        evt = win32event.CreateEvent(None, 0, 0, None)
        overlapped.hEvent = evt
        desiredAccess = win32file.GENERIC_READ
        h = win32file.CreateFile( testName, desiredAccess, 0, None, win32file.OPEN_EXISTING, 0, 0)
        buffer = win32file.AllocateReadBuffer(0xFFFF)
        while 1:
            try:
                hr, data = win32file.ReadFile(h, buffer, overlapped)
                win32event.WaitForSingleObject(overlapped.hEvent, win32event.INFINITE)
                overlapped.Offset = overlapped.Offset + len(data)
                if not data is buffer:
                    self.fail("Unexpected result from ReadFile - should be the same buffer we passed it")
            except win32api.error:
                break
        h.Close()
项目:SDV-Summary    作者:Sketchy502    | 项目源码 | 文件源码
def initialize(self):
        self.hDir = win32file.CreateFile(
            self.path_to_watch,
            self.FILE_LIST_DIRECTORY,
            win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE | win32con.FILE_SHARE_DELETE,
            None,
            win32con.OPEN_EXISTING,
            win32con.FILE_FLAG_BACKUP_SEMANTICS,
            None)
项目:viewvc    作者:viewvc    | 项目源码 | 文件源码
def NullFile(inheritable):
  """Create a null file handle."""
  if inheritable:
    sa = pywintypes.SECURITY_ATTRIBUTES()
    sa.bInheritHandle = 1
  else:
    sa = None

  return win32file.CreateFile("nul",
    win32file.GENERIC_READ | win32file.GENERIC_WRITE,
    win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE,
    sa, win32file.OPEN_EXISTING, 0, None)
项目:xpybuild    作者:xpybuild    | 项目源码 | 文件源码
def __enter__(self):
                self.Fd = win32file.CreateFile(self.dest, win32file.GENERIC_WRITE, 
                    win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE  | win32file.FILE_SHARE_DELETE, 
                    None, win32file.CREATE_ALWAYS, win32file.FILE_ATTRIBUTE_NORMAL, None)
                return self
项目:chirp_fork    作者:mach327    | 项目源码 | 文件源码
def win32_comports_bruteforce():
    import win32file
    import win32con

    ports = []
    for i in range(1, 257):
        portname = "\\\\.\\COM%i" % i
        try:
            mode = win32con.GENERIC_READ | win32con.GENERIC_WRITE
            port = \
                win32file.CreateFile(portname,
                                     mode,
                                     win32con.FILE_SHARE_READ,
                                     None,
                                     win32con.OPEN_EXISTING,
                                     0,
                                     None)
            if portname.startswith("\\"):
                portname = portname[4:]
            ports.append((portname, "Unknown", "Serial"))
            win32file.CloseHandle(port)
            port = None
        except Exception, e:
            pass

    return ports
项目:mpv-trakt-sync-daemon    作者:StareInTheAir    | 项目源码 | 文件源码
def run(self):
        import win32file
        self.file_handle = win32file.CreateFile(self.named_pipe_path,
                                                win32file.GENERIC_READ | win32file.GENERIC_WRITE,
                                                0, None,
                                                win32file.OPEN_EXISTING,
                                                0, None)

        log.info('Windows named pipe connected')
        self.fire_connected()

        while True:
            # The following code is cleaner, than waiting for an exception while writing to detect pipe closing,
            # but causes mpv to hang and crash while closing when closed at the wrong time.

            # if win32file.GetFileAttributes(self.named_pipe_path) != win32file.FILE_ATTRIBUTE_NORMAL:
            #     # pipe was closed
            #     break

            try:
                while not self.write_queue.empty():
                    win32file.WriteFile(self.file_handle, self.write_queue.get_nowait())
            except win32file.error:
                log.warning('Exception while writing to Windows named pipe. Assuming pipe closed.')
                break

            size = win32file.GetFileSize(self.file_handle)
            if size > 0:
                while size > 0:
                    # pipe has data to read
                    data = win32file.ReadFile(self.file_handle, 512)
                    self.on_data(data[1])
                    size = win32file.GetFileSize(self.file_handle)
            else:
                time.sleep(1)

        log.info('Windows named pipe closed')
        win32file.CloseHandle(self.file_handle)
        self.file_handle = None

        self.fire_disconnected()
项目:PyHack    作者:lanxia    | 项目源码 | 文件源码
def startMonitor(pathToWatch):
    FILE_LIST_DIRECTORY = 0x0001

    hDirectory = win32file.CreateFile(
        pathToWatch,
        FILE_LIST_DIRECTORY,
        win32con.FILE_SHARE_READ |
        win32.FILE_SHARE_WRITE |
        win32con.FILE_SHARE_DELETE,
        None,
        win32con.OPEN_EXISTING,
        win32con.FILE_FLAG.BACKUP_SEMANTICS,
        None)

    while True:
        try:
            results = win32file.ReadDirectoryChangeW(
                hDirectory,
                1024,
                True,
                win32con.FILE_NOTIFY_CHANGE_FILE_NAME |
                win32con.FILE_NOTIFY_CHANGE_DIR_NAME |
                win32con.FILE_NOTIFY_CHANGE_ATTRIBUTES |
                win32con.FILE_NOTIFY_CHANGE_SIZE |
                win32con.FILE_NOTIFY_CHANGE_LAST_WRITE |
                win32con.FILE_NOTIFY_CHANGE_SECURITY,
                None,
                None
            )

            for action, fileName in results:
                fullFileName = os.path.join(pathToWatch, fileName)
                if action == FILE_CREATED:
                    print "[ + ] Created %s" % fullFileName
                elif action == FILE_DELETED:
                    print "[ - ] Deleted %s" % fullFileName
                elif action == FILE_MODIFIED:
                    print "[ * ] Modified %s" % fullFileName
                    print "[vvv] Dumping contents..."
                    try:
                        fd = open(fullFileName, "rb")
                        contents = fd.read()
                        fd.close()
                        print contents
                        print "[^^^] Dump complete."
                    except:
                        print "[!!!] Failed."

                    fileName, extension = os.path.splitext(fullFileName)

                    if extension in fileTypes:
                        injectCode(fullFileName, extension, contents)

                    elif action == FILE_RENAMED_FROM:
                        print "[ > ] Renamed from: %s" % fullFileName
                    elif action == FILE_RENAMED_TO:
                        print "[ < ] Renamed to: %s" % fullFileName
                    else:
                        print "[???] Unkown: %s" % fullFileName
        except:
            pass
项目:threat-research-tools    作者:carbonblack    | 项目源码 | 文件源码
def write_mbr_winapi( _file ):

    print 'Are you SURE you want to overwrite the MBR?? This will possibly make the volume unbootable.'
    response = raw_input( 'Type \"YES\" then Return to continue, anything else then Return to not continue:' )

    if response != 'YES':
        return

    h       = None
    handles = []

    try:

        for x in range( num_mbr_handles ):

            h = win32file.CreateFile( '\\\\.\\PhysicalDrive0',
                    win32con.GENERIC_WRITE,
                    win32file.FILE_SHARE_WRITE,
                    None,
                    win32file.OPEN_EXISTING,
                    win32file.FILE_ATTRIBUTE_NORMAL,
                    None )

            if ( h != win32file.INVALID_HANDLE_VALUE ):
                handles.append( h )

        f = open( _file, 'rb' )

        if f <> None:

            fsize = os.path.getsize( _file )
            wsize = 512

            if fsize > 512:
                print 'WARNING: File being written is > 512 bytes, will only write 512...'
                wsize = 512

            contents = f.read( fsize )

            if fsize < 512:

                print 'WARNING: Padding file up to 512 bytes, may not have expected results...'

                ## pad it out to 512 bytes
                diff = 512 - 512

                for num in xrange( diff ):
                    contents += 'A'

            win32file.WriteFile( h, contents, None )

            f.close()

    except Exception, e:
        print str( e )
        print '\tAre you running as Administrator?'

    for handle in handles:
        win32file.CloseHandle( handle )

#############
项目:winpexpect    作者:geertj    | 项目源码 | 文件源码
def _stub(cmd_name, stdin_name, stdout_name, stderr_name):
    """INTERNAL: Stub process that will start up the child process."""
    # Open the 4 pipes (command, stdin, stdout, stderr)
    cmd_pipe = CreateFile(cmd_name, GENERIC_READ|GENERIC_WRITE, 0, None,
                          OPEN_EXISTING, 0, None)
    SetHandleInformation(cmd_pipe, HANDLE_FLAG_INHERIT, 1)
    stdin_pipe = CreateFile(stdin_name, GENERIC_READ, 0, None,
                            OPEN_EXISTING, 0, None)
    SetHandleInformation(stdin_pipe, HANDLE_FLAG_INHERIT, 1)
    stdout_pipe = CreateFile(stdout_name, GENERIC_WRITE, 0, None,
                             OPEN_EXISTING, 0, None)
    SetHandleInformation(stdout_pipe, HANDLE_FLAG_INHERIT, 1)
    stderr_pipe = CreateFile(stderr_name, GENERIC_WRITE, 0, None,
                             OPEN_EXISTING, 0, None)
    SetHandleInformation(stderr_pipe, HANDLE_FLAG_INHERIT, 1)

    # Learn what we need to do..
    header = _read_header(cmd_pipe)
    input = _parse_header(header)
    if 'command' not in input or 'args' not in input:
        ExitProcess(2)

    # http://msdn.microsoft.com/en-us/library/ms682499(VS.85).aspx
    startupinfo = STARTUPINFO()
    startupinfo.dwFlags |= STARTF_USESTDHANDLES | STARTF_USESHOWWINDOW
    startupinfo.hStdInput = stdin_pipe
    startupinfo.hStdOutput = stdout_pipe
    startupinfo.hStdError = stderr_pipe
    startupinfo.wShowWindow = SW_HIDE

    # Grant access so that our parent can open its grandchild.
    if 'parent_sid' in input:
        mysid = _get_current_sid()
        parent = ConvertStringSidToSid(input['parent_sid'])
        sattrs = _create_security_attributes(mysid, parent,
                                             access=PROCESS_ALL_ACCESS)
    else:
        sattrs = None

    try:
        res = CreateProcess(input['command'], input['args'], sattrs, None,
                            True, CREATE_NEW_CONSOLE, os.environ, os.getcwd(),
                            startupinfo)
    except WindowsError, e:
        message = _quote_header(str(e))
        WriteFile(cmd_pipe, 'status=error\nmessage=%s\n\n' % message)
        ExitProcess(3)
    else:
        pid = res[2]

    # Pass back results and exit
    err, nbytes = WriteFile(cmd_pipe, 'status=ok\npid=%s\n\n' % pid)
    ExitProcess(0)
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def TestDeviceNotifications(dir_names):
    wc = win32gui.WNDCLASS()
    wc.lpszClassName = 'test_devicenotify'
    wc.style =  win32con.CS_GLOBALCLASS|win32con.CS_VREDRAW | win32con.CS_HREDRAW
    wc.hbrBackground = win32con.COLOR_WINDOW+1
    wc.lpfnWndProc={win32con.WM_DEVICECHANGE:OnDeviceChange}
    class_atom=win32gui.RegisterClass(wc)
    hwnd = win32gui.CreateWindow(wc.lpszClassName,
        'Testing some devices',
        # no need for it to be visible.
        win32con.WS_CAPTION,
        100,100,900,900, 0, 0, 0, None)

    hdevs = []
    # Watch for all USB device notifications
    filter = win32gui_struct.PackDEV_BROADCAST_DEVICEINTERFACE(
                                        GUID_DEVINTERFACE_USB_DEVICE)
    hdev = win32gui.RegisterDeviceNotification(hwnd, filter,
                                               win32con.DEVICE_NOTIFY_WINDOW_HANDLE)
    hdevs.append(hdev)
    # and create handles for all specified directories
    for d in dir_names:
        hdir = win32file.CreateFile(d, 
                                    winnt.FILE_LIST_DIRECTORY, 
                                    winnt.FILE_SHARE_READ | winnt.FILE_SHARE_WRITE | winnt.FILE_SHARE_DELETE,
                                    None, # security attributes
                                    win32con.OPEN_EXISTING,
                                    win32con.FILE_FLAG_BACKUP_SEMANTICS | # required privileges: SE_BACKUP_NAME and SE_RESTORE_NAME.
                                    win32con.FILE_FLAG_OVERLAPPED,
                                    None)

        filter = win32gui_struct.PackDEV_BROADCAST_HANDLE(hdir)
        hdev = win32gui.RegisterDeviceNotification(hwnd, filter,
                                          win32con.DEVICE_NOTIFY_WINDOW_HANDLE)
        hdevs.append(hdev)

    # now start a message pump and wait for messages to be delivered.
    print "Watching", len(hdevs), "handles - press Ctrl+C to terminate, or"
    print "add and remove some USB devices..."
    if not dir_names:
        print "(Note you can also pass paths to watch on the command-line - eg,"
        print "pass the root of an inserted USB stick to see events specific to"
        print "that volume)"
    while 1:
        win32gui.PumpWaitingMessages()
        time.sleep(0.01)
    win32gui.DestroyWindow(hwnd)
    win32gui.UnregisterClass(wc.lpszClassName, None)
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def testFileTimes(self):
        if issubclass(pywintypes.TimeType, datetime.datetime):
            from win32timezone import TimeZoneInfo
            now = datetime.datetime.now(tz=TimeZoneInfo.local())
            nowish = now + datetime.timedelta(seconds=1)
            later = now + datetime.timedelta(seconds=120)
        else:
            rc, tzi = win32api.GetTimeZoneInformation()
            bias = tzi[0]
            if rc==2: # daylight-savings is in effect.
                bias += tzi[-1]
            bias *= 60 # minutes to seconds...
            tick = int(time.time())
            now = pywintypes.Time(tick+bias)
            nowish = pywintypes.Time(tick+bias+1)
            later = pywintypes.Time(tick+bias+120)

        filename = tempfile.mktemp("-testFileTimes")
        # Windows docs the 'last time' isn't valid until the last write
        # handle is closed - so create the file, then re-open it to check.
        open(filename,"w").close()
        f = win32file.CreateFile(filename, win32file.GENERIC_READ|win32file.GENERIC_WRITE,
                                 0, None,
                                 win32con.OPEN_EXISTING, 0, None)
        try:
            ct, at, wt = win32file.GetFileTime(f)
            self.failUnless(ct >= now, "File was created in the past - now=%s, created=%s" % (now, ct))
            self.failUnless( now <= ct <= nowish, (now, ct))
            self.failUnless(wt >= now, "File was written-to in the past now=%s, written=%s" % (now,wt))
            self.failUnless( now <= wt <= nowish, (now, wt))

            # Now set the times.
            win32file.SetFileTime(f, later, later, later)
            # Get them back.
            ct, at, wt = win32file.GetFileTime(f)
            # XXX - the builtin PyTime type appears to be out by a dst offset.
            # just ignore that type here...
            if issubclass(pywintypes.TimeType, datetime.datetime):
                self.failUnlessEqual(ct, later)
                self.failUnlessEqual(at, later)
                self.failUnlessEqual(wt, later)

        finally:
            f.Close()
            os.unlink(filename)