我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用twisted.python.threadpool.ThreadPool()。
def _threadpoolTest(self, method): # This is a schizophrenic test: it seems to be trying to test # both the dispatch() behavior of the ThreadPool as well as # the serialization behavior of threadable.synchronize(). It # would probably make more sense as two much simpler tests. N = 10 tp = threadpool.ThreadPool() tp.start() try: waiting = threading.Lock() waiting.acquire() actor = Synchronization(N, waiting) for i in xrange(N): tp.dispatch(actor, actor.run) self._waitForLock(waiting) self.failIf(actor.failures, "run() re-entered %d times" % (actor.failures,)) finally: tp.stop()
def setupService(self): #self.log(log.info, u'Setting up') self.settings = self.parent.settings # Configure metrics to be collected each X seconds metrics_interval = int(self.channel.get('metrics_logger_interval', 60)) self.metrics = Bunch(tx_count=0, starttime=time.time(), interval=metrics_interval) subscriptions = read_list(self.channel.mqtt_topics) self.mqtt_service = MqttAdapter( name = u'mqtt-' + self.channel.realm, broker_host = self.settings.mqtt.host, broker_port = int(self.settings.mqtt.port), broker_username = self.settings.mqtt.username, broker_password = self.settings.mqtt.password, callback = self.mqtt_receive, subscriptions = subscriptions) self.registerService(self.mqtt_service) self.influx = InfluxDBAdapter(settings = self.settings.influxdb) # Perform MQTT message processing using a different thread pool self.threadpool = ThreadPool() self.thimble = Thimble(reactor, self.threadpool, self, ["process_message"])
def test_wsgi(self): """ The I{--wsgi} option takes the fully-qualifed Python name of a WSGI application object and creates a L{WSGIResource} at the root which serves that application. """ options = Options() options.parseOptions(['--wsgi', __name__ + '.application']) root = options['root'] self.assertTrue(root, WSGIResource) self.assertIdentical(root._reactor, reactor) self.assertTrue(isinstance(root._threadpool, ThreadPool)) self.assertIdentical(root._application, application) # The threadpool should start and stop with the reactor. self.assertFalse(root._threadpool.started) reactor.fireSystemEvent('startup') self.assertTrue(root._threadpool.started) self.assertFalse(root._threadpool.joined) reactor.fireSystemEvent('shutdown') self.assertTrue(root._threadpool.joined)
def test_callInThreadException(self): """ L{ThreadPool.callInThread} logs exceptions raised by the callable it is passed. """ class NewError(Exception): pass def raiseError(): raise NewError() tp = threadpool.ThreadPool(0, 1) tp.callInThread(raiseError) tp.start() tp.stop() errors = self.flushLoggedErrors(NewError) self.assertEqual(len(errors), 1)
def test_callbackThread(self): """ L{ThreadPool.callInThreadWithCallback} calls the function it is given and the C{onResult} callback in the same thread. """ threadIds = [] event = threading.Event() def onResult(success, result): threadIds.append(threading.currentThread().ident) event.set() def func(): threadIds.append(threading.currentThread().ident) tp = threadpool.ThreadPool(0, 1) tp.callInThreadWithCallback(onResult, func) tp.start() self.addCleanup(tp.stop) event.wait(self.getTimeout()) self.assertEqual(len(threadIds), 2) self.assertEqual(threadIds[0], threadIds[1])
def test_existingWork(self): """ Work added to the threadpool before its start should be executed once the threadpool is started: this is ensured by trying to release a lock previously acquired. """ waiter = threading.Lock() waiter.acquire() tp = threadpool.ThreadPool(0, 1) tp.callInThread(waiter.release) # before start() tp.start() try: self._waitForLock(waiter) finally: tp.stop()
def __init__(self, coordinator, failTest, newWorker, *args, **kwargs): """ Initialize this L{MemoryPool} with a test case. @param coordinator: a worker used to coordinate work in the L{Team} underlying this threadpool. @type coordinator: L{twisted._threads.IExclusiveWorker} @param failTest: A 1-argument callable taking an exception and raising a test-failure exception. @type failTest: 1-argument callable taking (L{Failure}) and raising L{unittest.FailTest}. @param newWorker: a 0-argument callable that produces a new L{twisted._threads.IWorker} provider on each invocation. @type newWorker: 0-argument callable returning L{twisted._threads.IWorker}. """ self._coordinator = coordinator self._failTest = failTest self._newWorker = newWorker threadpool.ThreadPool.__init__(self, *args, **kwargs)
def __init__(self, testCase, *args, **kwargs): """ Create a L{PoolHelper}. @param testCase: a test case attached to this helper. @type args: The arguments passed to a L{threadpool.ThreadPool}. @type kwargs: The arguments passed to a L{threadpool.ThreadPool} """ coordinator, self.performCoordination = createMemoryWorker() self.workers = [] def newWorker(): self.workers.append(createMemoryWorker()) return self.workers[-1][0] self.threadpool = MemoryPool(coordinator, testCase.fail, newWorker, *args, **kwargs)
def run(self, handler): from twisted.web import server, wsgi from twisted.python.threadpool import ThreadPool from twisted.internet import reactor thread_pool = ThreadPool() thread_pool.start() reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop) factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler)) reactor.listenTCP(self.port, factory, interface=self.host) if not reactor.running: reactor.run()
def _initThreadPool(self): from twisted.python import threadpool self.threadpool = threadpool.ThreadPool(0, 10, 'twisted.internet.reactor') self.callWhenRunning(self.threadpool.start) self.addSystemEventTrigger('during', 'shutdown', self.threadpool.stop)
def testPersistence(self): tp = threadpool.ThreadPool(7, 20) tp.start() # XXX Sigh - race condition: start should return a Deferred # which fires when all the workers it started have fully # started up. time.sleep(0.1) self.assertEquals(len(tp.threads), 7) self.assertEquals(tp.min, 7) self.assertEquals(tp.max, 20) # check that unpickled threadpool has same number of threads s = pickle.dumps(tp) tp2 = pickle.loads(s) tp2.start() # XXX As above time.sleep(0.1) self.assertEquals(len(tp2.threads), 7) self.assertEquals(tp2.min, 7) self.assertEquals(tp2.max, 20) tp.stop() tp2.stop()
def testExistingWork(self): waiter = threading.Lock() waiter.acquire() tp = threadpool.ThreadPool(0, 1) tp.callInThread(waiter.release) # before start() tp.start() try: self._waitForLock(waiter) finally: tp.stop()
def setUp(self): self.event = threading.Event() self.threadpool = threadpool.ThreadPool(0, 10) self.threadpool.start()
def run(self, handler): from twisted.web import server, wsgi from twisted.python.threadpool import ThreadPool from twisted.internet import reactor thread_pool = ThreadPool() thread_pool.start() reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop) factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler)) reactor.listenTCP(self.port, factory, interface=self.host) reactor.run()
def twisted(app, address, **options): from twisted.web import server, wsgi from twisted.python.threadpool import ThreadPool from twisted.internet import reactor thread_pool = ThreadPool() thread_pool.start() reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop) factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, app)) reactor.listenTCP(address[1], factory, interface=address[0]) reactor.run()
def setupFacade(config): """Get the L{Facade} instance to use in the API service.""" from fluiddb.api.facade import Facade from fluiddb.util.transact import Transact maxThreads = int(config.get('service', 'max-threads')) threadpool = ThreadPool(minthreads=0, maxthreads=maxThreads) reactor.callWhenRunning(threadpool.start) reactor.addSystemEventTrigger('during', 'shutdown', threadpool.stop) transact = Transact(threadpool) factory = FluidinfoSessionFactory('API-%s' % config.get('service', 'port')) return Facade(transact, factory)
def make(self, dependency_resources): """Create and start a new thread pool.""" from twisted.internet import reactor global _threadPool if _threadPool is None: _threadPool = ThreadPool(minthreads=1, maxthreads=1) reactor.callWhenRunning(_threadPool.start) reactor.addSystemEventTrigger('during', 'shutdown', _threadPool.stop) return _threadPool
def __init__(self): super(ClockWithThreads, self).__init__() self._pool = ThreadPool()