我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用concurrent.futures.ALL_COMPLETED。
def test_all_completed(self): future1 = self.executor.submit(divmod, 2, 0) future2 = self.executor.submit(mul, 2, 21) finished, pending = futures.wait( [SUCCESSFUL_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, future1, future2], return_when=futures.ALL_COMPLETED) self.assertEqual(set([SUCCESSFUL_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, future1, future2]), finished) self.assertEqual(set(), pending)
def stop(self): if self.renderer.running: tasks = [] if self.task is not None and not self.task.done(): self.task.cancel() tasks.append(self.task) if self.waiter is not None and not self.waiter.done(): self.waiter.cancel() tasks.append(self.waiter) await self.renderer._stop() if len(tasks) > 0: await asyncio.wait(tasks, return_when=futures.ALL_COMPLETED) self.renderer.finish(self._frame)
def _stop(self): """ Stop this AnimationLoop Shuts down the loop and triggers cleanup tasks. """ if not self.running: return False self.running = False for layer in self.layers[::-1]: await self.remove_layer(layer) if self._anim_task is not None and not self._anim_task.done(): self._anim_task.cancel() await asyncio.wait([self._anim_task], return_when=futures.ALL_COMPLETED) self._logger.info("AnimationLoop stopped")
def _close_input_devices(self): if not hasattr(self, '_opened') or not self._opened: return self._opened = False for event_device in self._event_devices: asyncio.get_event_loop().remove_reader(event_device.fileno()) event_device.close() tasks = [] for task in self._tasks: if not task.done(): task.cancel() tasks.append(task) await asyncio.wait(tasks, return_when=futures.ALL_COMPLETED) self._event_devices.clear()
def test_timeout(self): future1 = self.executor.submit(mul, 6, 7) future2 = self.executor.submit(time.sleep, 3) finished, pending = futures.wait( [CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, future1, future2], timeout=1.5, return_when=futures.ALL_COMPLETED) self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, future1]), finished) self.assertEqual(set([future2]), pending)
def test_timeout(self): future1 = self.executor.submit(mul, 6, 7) future2 = self.executor.submit(time.sleep, 6) finished, pending = futures.wait( [CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, future1, future2], timeout=5, return_when=futures.ALL_COMPLETED) self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, future1]), finished) self.assertEqual(set([future2]), pending)
def _run(self): dm = UChromaDeviceManager() atexit.register(UChromaServer.exit, self._loop) dbus = DeviceManagerAPI(dm, self._logger) power = PowerMonitor() for sig in (signal.SIGINT, signal.SIGTERM): self._loop.add_signal_handler(sig, self._shutdown_callback) try: dbus.run() power.start() ensure_future(dm.monitor_start(), loop=self._loop) self._loop.run_forever() except KeyboardInterrupt: pass finally: for sig in (signal.SIGTERM, signal.SIGINT): self._loop.remove_signal_handler(sig) power.stop() self._loop.run_until_complete(asyncio.wait( \ [dm.close_devices(), dm.monitor_stop()], return_when=futures.ALL_COMPLETED))
def exit(loop): try: loop.run_until_complete(asyncio.wait( \ list(asyncio.Task.all_tasks()), return_when=futures.ALL_COMPLETED)) loop.close() except KeyboardInterrupt: pass
def test_pending_calls_race(self): # Issue #14406: multi-threaded race condition when waiting on all # futures. event = threading.Event() def future_func(): event.wait() oldswitchinterval = sys.getcheckinterval() sys.setcheckinterval(1) try: fs = set(self.executor.submit(future_func) for i in range(100)) event.set() futures.wait(fs, return_when=futures.ALL_COMPLETED) finally: sys.setcheckinterval(oldswitchinterval)
def run(self): self._load_config() for service in self.config.iter_instances("service", "Service"): self._start_service(service) listen = self.config.raw_data[0]["core"]["listen"] self.http = HTTP(self.loop, listen) self.http.add_route("*", r"/{dir:.*}", self._http_handler) for prov in self.config.iter_providers(): self.providers[prov.name] = prov yield from prov.start() yield from self.http.start() reload_fut = asyncio.async(self.reload(), loop=self.loop) yield from self.stop_event.wait() self.log.info("Interrupted.") reload_fut.cancel() yield from reload_fut for obj in self._running_objects: obj.cancel() yield from asyncio.wait(self._running_objects, return_when=futures.ALL_COMPLETED) if self._running_cleanups: yield from asyncio.wait(self._running_cleanups, return_when=futures.ALL_COMPLETED) for provider in self.providers.values(): yield from prov.stop() self.http.stop() yield from self.http.wait_closed() self.log.info("Exit.")
def test_pending_calls_race(self): # Issue #14406: multi-threaded race condition when waiting on all # futures. event = threading.Event() def future_func(): event.wait() oldswitchinterval = sys.getswitchinterval() sys.setswitchinterval(1e-6) try: fs = {self.executor.submit(future_func) for i in range(100)} event.set() futures.wait(fs, return_when=futures.ALL_COMPLETED) finally: sys.setswitchinterval(oldswitchinterval)
def pull_stats(config, storage_dir, loop, executor): """ Launch coroutines for pulling statistics from UNIX sockets. This a delegating routine. Arguments: config (obj): A configParser object which holds configuration. storage_dir (str): The absolute directory path to save the statistics. loop (obj): A base event loop. executor(obj): A ThreadPoolExecutor object. Returns: True if statistics from *all* UNIX sockets are fetched False otherwise. """ # absolute directory path which contains UNIX socket files. results = [] # stores the result of finished tasks socket_dir = config.get('pull', 'socket-dir') pull_timeout = config.getfloat('pull', 'pull-timeout') if int(pull_timeout) == 0: pull_timeout = None socket_files = [f for f in glob.glob(socket_dir + '/*') if is_unix_socket(f)] if not socket_files: log.error("found zero UNIX sockets under %s to connect to", socket_dir) return False log.debug('pull statistics') coroutines = [get(socket_file, cmd, storage_dir, loop, executor, config) for socket_file in socket_files for cmd in CMDS] # Launch all connections. done, pending = yield from asyncio.wait(coroutines, timeout=pull_timeout, return_when=ALL_COMPLETED) for task in done: log.debug('task status: %s', task) results.append(task.result()) log.debug('task report, done:%s pending:%s succeed:%s failed:%s', len(done), len(pending), results.count(True), results.count(False)) for task in pending: log.warning('cancelling task %s as it reached its timeout threshold of' ' %.2f seconds', task, pull_timeout) task.cancel() # only when all tasks are finished successfully we claim success return not pending and len(set(results)) == 1 and True in set(results)