我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用win32event.QS_ALLINPUT。
def test(fn): print "The main thread is %d" % (win32api.GetCurrentThreadId()) GIT = CreateGIT() interp = win32com.client.Dispatch("Python.Interpreter") cookie = GIT.RegisterInterfaceInGlobal(interp._oleobj_, pythoncom.IID_IDispatch) events = fn(4, cookie) numFinished = 0 while 1: try: rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT) if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events): numFinished = numFinished + 1 if numFinished >= len(events): break elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message # This is critical - whole apartment model demo will hang. pythoncom.PumpWaitingMessages() else: # Timeout print "Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount()) except KeyboardInterrupt: break GIT.RevokeInterfaceFromGlobal(cookie) del interp del GIT
def test(fn): print("The main thread is %d" % (win32api.GetCurrentThreadId())) GIT = CreateGIT() interp = win32com.client.Dispatch("Python.Interpreter") cookie = GIT.RegisterInterfaceInGlobal(interp._oleobj_, pythoncom.IID_IDispatch) events = fn(4, cookie) numFinished = 0 while 1: try: rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT) if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events): numFinished = numFinished + 1 if numFinished >= len(events): break elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message # This is critical - whole apartment model demo will hang. pythoncom.PumpWaitingMessages() else: # Timeout print("Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())) except KeyboardInterrupt: break GIT.RevokeInterfaceFromGlobal(cookie) del interp del GIT
def _DoTestMarshal(self, fn, bCoWait = 0): #print "The main thread is %d" % (win32api.GetCurrentThreadId()) threads, events = fn(2) numFinished = 0 while 1: try: if bCoWait: rc = pythoncom.CoWaitForMultipleHandles(0, 2000, events) else: # Specifying "bWaitAll" here will wait for messages *and* all events # (which is pretty useless) rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT) if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events): numFinished = numFinished + 1 if numFinished >= len(events): break elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message # This is critical - whole apartment model demo will hang. pythoncom.PumpWaitingMessages() else: # Timeout print "Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount()) except KeyboardInterrupt: break for t in threads: t.join(2) self.failIf(t.isAlive(), "thread failed to stop!?") threads = None # threads hold references to args # Seems to be a leak here I can't locate :( #self.failUnlessEqual(pythoncom._GetInterfaceCount(), 0) #self.failUnlessEqual(pythoncom._GetGatewayCount(), 0)
def _DoTestMarshal(self, fn, bCoWait = 0): #print "The main thread is %d" % (win32api.GetCurrentThreadId()) threads, events = fn(2) numFinished = 0 while 1: try: if bCoWait: rc = pythoncom.CoWaitForMultipleHandles(0, 2000, events) else: # Specifying "bWaitAll" here will wait for messages *and* all events # (which is pretty useless) rc = win32event.MsgWaitForMultipleObjects(events, 0, 2000, win32event.QS_ALLINPUT) if rc >= win32event.WAIT_OBJECT_0 and rc < win32event.WAIT_OBJECT_0+len(events): numFinished = numFinished + 1 if numFinished >= len(events): break elif rc==win32event.WAIT_OBJECT_0 + len(events): # a message # This is critical - whole apartment model demo will hang. pythoncom.PumpWaitingMessages() else: # Timeout print("Waiting for thread to stop with interfaces=%d, gateways=%d" % (pythoncom._GetInterfaceCount(), pythoncom._GetGatewayCount())) except KeyboardInterrupt: break for t in threads: t.join(2) self.failIf(t.isAlive(), "thread failed to stop!?") threads = None # threads hold references to args # Seems to be a leak here I can't locate :( #self.failUnlessEqual(pythoncom._GetInterfaceCount(), 0) #self.failUnlessEqual(pythoncom._GetGatewayCount(), 0)