我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用concurrent.futures.wait()。
def test_all_completed(self): future1 = self.executor.submit(divmod, 2, 0) future2 = self.executor.submit(mul, 2, 21) finished, pending = futures.wait( [SUCCESSFUL_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, future1, future2], return_when=futures.ALL_COMPLETED) self.assertEqual(set([SUCCESSFUL_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, future1, future2]), finished) self.assertEqual(set(), pending)
def shutdown(self): """ Close all connections. ``Session`` instances should not be used for any purpose after being shutdown. """ with self._lock: if self.is_shutdown: return else: self.is_shutdown = True # PYTHON-673. If shutdown was called shortly after session init, avoid # a race by cancelling any initial connection attempts haven't started, # then blocking on any that have. for future in self._initial_connect_futures: future.cancel() wait_futures(self._initial_connect_futures) for pool in tuple(self._pools.values()): pool.shutdown()
def test_timeout(self): future1 = self.executor.submit(mul, 6, 7) future2 = self.executor.submit(time.sleep, 3) finished, pending = futures.wait( [CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, future1, future2], timeout=1.5, return_when=futures.ALL_COMPLETED) self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, future1]), finished) self.assertEqual(set([future2]), pending)
def optimiz(currencies, debug): currencies = sorted(currencies) if len(currencies) < 2 or len(currencies) > 10: return {"error": "2 to 10 currencies"} max_workers = 4 if sys.version_info[1] < 5 else None executor = ThreadPoolExecutor(max_workers) data = dict(future.result() for future in wait([executor.submit(get_ochl, cur) for cur in currencies]).done) data = [data[cur] for cur in currencies] errors = [x['error'] for x in data if 'error' in x] if errors: return {"error": "Currencies not found : " + str(errors)} weights, m, s, a, b = markowitz_optimization(data, debug) if debug: import matplotlib as mpl mpl.use('Agg') import matplotlib.pyplot as plt fig, ax = plt.subplots() plt.plot(s, m, 'o', markersize=1) plt.plot(b, a, 'or') fig.savefig("chalu.png") result = dict() for i, cur in enumerate(currencies): result[cur] = weights[i] return {"result": result}
def stop_listen(self): """Stop listening for events""" self.set_traps(False) self.stop_request.set() nb_threads = len([f for f in self.futures if f.running()]) if nb_threads: # ack current thread self.current_cont_event.set() # wait for current thread to terminate while [f for f in self.futures if f.running()] == nb_threads: time.sleep(0.1) # ack the rest of the threads while [f for f in self.futures if f.running()]: if self.queue.full(): (*rest, continue_event) = self.queue.get() continue_event.set() # let the threads terminate time.sleep(0.1) # wait for threads to exit wait(self.futures)
def test_timeout(self): future1 = self.executor.submit(mul, 6, 7) future2 = self.executor.submit(time.sleep, 6) finished, pending = futures.wait( [CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, future1, future2], timeout=5, return_when=futures.ALL_COMPLETED) self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, future1]), finished) self.assertEqual(set([future2]), pending)
def doCrawl(self): flag = True while flag: futures = [] #??????5? with ThreadPoolExecutor(max_workers = 5) as executor: for i in range(self.context['currentPage'],self.context['currentPage'] + 5): futures.append(executor.submit(self.getMangaDataByPage,i)) wait(futures) #?????? for f in futures: gmetadata = f.result() #?????? if gmetadata == None: flag = False break for data in gmetadata: #?? if int(data['posted']) < self.context['currentPosted']: self.db.insertEromanga(data) self.context['currentPosted'] = int(data['posted']) self.context['currentPage'] += 1
def doCrawlNewest(self): flag = True while flag: futures = [] #??????5? with ThreadPoolExecutor(max_workers = 5) as executor: for i in range(self.context['currentPage'],self.context['currentPage'] + 5): futures.append(executor.submit(self.getMangaDataByPage,i)) wait(futures) #?????? for f in futures: gmetadata = f.result() for data in gmetadata: #???????? if int(data['posted']) <= self.context['newestPosted']: info("info","excrawler.crawlNewest","???????????? posted:" + data['posted']) setConfig("app","new_context","") flag = False exit() #?? if int(data['posted']) < self.context['currentPosted']: self.db.insertEromanga(data) self.context['currentPosted'] = int(data['posted']) self.context['currentPage'] += 1
def run(self, funcs): """Run a set of functions in parallel, returning their results. Make sure any function you pass exits with a reasonable timeout. If it doesn't return within the timeout or the result is ignored due an exception in a separate thread it will continue to stick around until it finishes, including blocking process exit. Args: funcs: An iterable of functions or iterable of args to functools.partial. Returns: A list of return values with the values matching the order in funcs. Raises: Propagates the first exception encountered in one of the functions. """ funcs = [f if callable(f) else functools.partial(*f) for f in funcs] if len(funcs) == 1: # Ignore threads if it's not needed. return [funcs[0]()] if len(funcs) > self._workers: # Lazy init and grow as needed. self.shutdown() self._workers = len(funcs) self._executor = futures.ThreadPoolExecutor(self._workers) futs = [self._executor.submit(f) for f in funcs] done, not_done = futures.wait(futs, self._timeout, futures.FIRST_EXCEPTION) # Make sure to propagate any exceptions. for f in done: if not f.cancelled() and f.exception() is not None: if not_done: # If there are some calls that haven't finished, cancel and recreate # the thread pool. Otherwise we may have a thread running forever # blocking parallel calls. for nd in not_done: nd.cancel() self.shutdown(False) # Don't wait, they may be deadlocked. raise f.exception() # Either done or timed out, so don't wait again. return [f.result(timeout=0) for f in futs]
def _shutdown_instance(self, inst, port): force_shutdown_time = time.time() + _SHUTDOWN_TIMEOUT try: environ = self.build_request_environ( 'GET', '/_ah/stop', [], '', '0.1.0.3', port, fake_login=True) self._handle_request(environ, start_response_utils.null_start_response, inst=inst, request_type=instance.SHUTDOWN_REQUEST) logging.debug('Sent shutdown request: %s', inst) except: logging.exception('Internal error while handling shutdown request.') finally: time_to_wait = force_shutdown_time - time.time() self._quit_event.wait(time_to_wait) inst.quit(force=True)
def quit(self): """Stops the Module.""" self._quit_event.set() # The instance adjustment thread depends on the balanced module and the # watcher so wait for it exit before quitting them. if self._watcher: self._watcher.quit() self._change_watcher_thread.join() self._balanced_module.quit() for wsgi_servr in self._wsgi_servers: wsgi_servr.quit() with self._condition: instances = self._instances self._instances = [] self._condition.notify_all() for inst in instances: inst.quit(force=True)
def quit(self): """Stops the Module.""" self._quit_event.set() self._change_watcher_thread.join() # The instance adjustment thread depends on the balanced module and the # watcher so wait for it exit before quitting them. if self._watcher: self._watcher.quit() self._balanced_module.quit() for wsgi_servr in self._wsgi_servers: wsgi_servr.quit() with self._condition: instances = self._instances self._instances = [] self._condition.notify_all() for inst in instances: inst.quit(force=True)
def _choose_instance(self, timeout_time): """Returns an Instance to handle a request or None if all are busy.""" with self._condition: while time.time() < timeout_time and not self._quit_event.is_set(): for inst in self._instances: if inst.can_accept_requests: return inst else: inst = self._start_any_instance() if inst: break self._condition.wait(timeout_time - time.time()) else: return None if inst: inst.wait(timeout_time) return inst
def live_migrate_servers_from_host(self, image, host_to_evacuate, flavor, block_migration=False, destination_host=None, disk_over_commit=False, number_of_parallel_migrations=1, **kwargs): """Live Migrate servers. This scenario migrates all the VM in the specified compute host to another compute node on the same availability zone. :param image: image to be used to boot an instance :param flavor: flavor to be used to boot an instance :param block_migration: Specifies the migration type on migrated instance or not """ servers_to_migrate = self._get_servers_from_compute(host_to_evacuate) print "migrating servers: " + str(servers_to_migrate) executor = ThreadPoolExecutor(max_workers=int(number_of_parallel_migrations)) futures = [] for server in servers_to_migrate: a = executor.submit(self._migrate_server, server, destination_host, block_migration, disk_over_commit) futures.append(a) print(wait(futures))
def perform_requests(self): signal.signal(signal.SIGINT, self.exit_fast) signal.signal(signal.SIGTERM, self.exit_fast) self.state = b'E' for q_batch in self.get_batch(): for (_, _) in self.split_batch(q_batch): if self.state != b"R": self.state = b'R' yield continue # wait for all batches to finish before returning self.state = b'W' while self.futures: f_len = len(self.futures) self.futures = [i for i in self.futures if not i.done()] if f_len != len(self.futures): self.ui.debug('Waiting for final requests to finish. ' 'remaining requests: {}' ''.format(len(self.futures))) wait(self.futures, return_when=FIRST_COMPLETED) self.state = b'D' yield True
def memdump(self, local_filename, remote_filename=None, compress=False): dump_object = self.start_memdump(remote_filename=remote_filename, compress=compress) dump_object.wait() dump_object.get(local_filename) dump_object.delete()
def get(self, local_filename): if not self._done: self.wait() if self._error: raise self._error src = self.lr_session.get_raw_file(self.remote_filename, timeout=3600, delay=5) dst = open(local_filename, "wb") shutil.copyfileobj(src, dst)
def wait(self): self.lr_session._poll_command(self.memdump_id, timeout=3600, delay=5) self._done = True
def run(self): log.debug("Starting Live Response Job Scheduler") while True: log.debug("Waiting for item on Scheduler Queue") item = self.schedule_queue.get(block=True) log.debug("Got item: {0}".format(item)) if isinstance(item, WorkItem): # new WorkItem available self._unscheduled_jobs[item.sensor_id].append(item) elif isinstance(item, CompletionNotification): # job completed self._idle_workers.add(item.sensor_id) elif isinstance(item, WorkerStatus): if item.status == "error": log.error("Error encountered by JobWorker[{0}]: {1}".format(item.sensor_id, item.exception)) elif item.status == "exiting": log.debug("JobWorker[{0}] has exited, waiting...".format(item.sensor_id)) self._job_workers[item.sensor_id].join() log.debug("JobWorker[{0}] deleted".format(item.sensor_id)) del self._job_workers[item.sensor_id] try: self._idle_workers.remove(item.sensor_id) except KeyError: pass elif item.status == "ready": log.debug("JobWorker[{0}] now ready to accept jobs, session established".format(item.sensor_id)) self._idle_workers.add(item.sensor_id) else: log.debug("Unknown status from JobWorker[{0}]: {1}".format(item.sensor_id, item.status)) else: log.debug("Received unknown item on the scheduler Queue, exiting") # exiting the scheduler if we get None # TODO: wait for all worker threads to exit return self._schedule_jobs()
def test_wait_for_all(): def f(sleep_time: int): sleep(sleep_time) return sleep_time def calc(fs): fs_done = wait(fs).done r = sum(r.result() for r in fs_done) return r pool = ThreadPoolExecutor() fs = [pool.submit(f, arg) for arg in (3, 2, 5)] result = pool.submit(calc, fs).result() assert result == 10
def add_execution_profile(self, name, profile, pool_wait_timeout=5): """ Adds an :class:`.ExecutionProfile` to the cluster. This makes it available for use by ``name`` in :meth:`.Session.execute` and :meth:`.Session.execute_async`. This method will raise if the profile already exists. Normally profiles will be injected at cluster initialization via ``Cluster(execution_profiles)``. This method provides a way of adding them dynamically. Adding a new profile updates the connection pools according to the specified ``load_balancing_policy``. By default, this method will wait up to five seconds for the pool creation to complete, so the profile can be used immediately upon return. This behavior can be controlled using ``pool_wait_timeout`` (see `concurrent.futures.wait <https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.wait>`_ for timeout semantics). """ if not isinstance(profile, ExecutionProfile): raise TypeError("profile must be an instance of ExecutionProfile") if self._config_mode == _ConfigMode.LEGACY: raise ValueError("Cannot add execution profiles when legacy parameters are set explicitly.") if name in self.profile_manager.profiles: raise ValueError("Profile %s already exists") self.profile_manager.profiles[name] = profile profile.load_balancing_policy.populate(self, self.metadata.all_hosts()) # on_up after populate allows things like DCA LBP to choose default local dc for host in filter(lambda h: h.is_up, self.metadata.all_hosts()): profile.load_balancing_policy.on_up(host) futures = set() for session in self.sessions: futures.update(session.update_created_pools()) _, not_done = wait_futures(futures, pool_wait_timeout) if not_done: raise OperationTimedOut("Failed to create all new connection pools in the %ss timeout.")
def __init__(self, cluster, hosts, keyspace=None): self.cluster = cluster self.hosts = hosts self.keyspace = keyspace self._lock = RLock() self._pools = {} self._profile_manager = cluster.profile_manager self._metrics = cluster.metrics self._request_init_callbacks = [] self._protocol_version = self.cluster.protocol_version self.encoder = Encoder() # create connection pools in parallel self._initial_connect_futures = set() for host in hosts: future = self.add_or_renew_pool(host, is_host_addition=False) if future: self._initial_connect_futures.add(future) futures = wait_futures(self._initial_connect_futures, return_when=FIRST_COMPLETED) while futures.not_done and not any(f.result() for f in futures.done): futures = wait_futures(futures.not_done, return_when=FIRST_COMPLETED) if not any(f.result() for f in self._initial_connect_futures): msg = "Unable to connect to any servers" if self.keyspace: msg += " using keyspace '%s'" % self.keyspace raise NoHostAvailable(msg, [h.address for h in hosts])
def tearDown(self): self.executor.shutdown(wait=True) dt = time.time() - self.t1 if test_support.verbose: print("%.2fs" % dt) self.assertLess(dt, 60, "synchronization issue: test lasted too long")
def test_first_completed(self): future1 = self.executor.submit(mul, 21, 2) future2 = self.executor.submit(time.sleep, 1.5) done, not_done = futures.wait( [CANCELLED_FUTURE, future1, future2], return_when=futures.FIRST_COMPLETED) self.assertEqual(set([future1]), done) self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
def test_first_completed_some_already_completed(self): future1 = self.executor.submit(time.sleep, 1.5) finished, pending = futures.wait( [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], return_when=futures.FIRST_COMPLETED) self.assertEqual( set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), finished) self.assertEqual(set([future1]), pending)
def test_first_exception(self): future1 = self.executor.submit(mul, 2, 21) future2 = self.executor.submit(sleep_and_raise, 1.5) future3 = self.executor.submit(time.sleep, 3) finished, pending = futures.wait( [future1, future2, future3], return_when=futures.FIRST_EXCEPTION) self.assertEqual(set([future1, future2]), finished) self.assertEqual(set([future3]), pending)
def test_first_exception_some_already_complete(self): future1 = self.executor.submit(divmod, 21, 0) future2 = self.executor.submit(time.sleep, 1.5) finished, pending = futures.wait( [SUCCESSFUL_FUTURE, CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, future1, future2], return_when=futures.FIRST_EXCEPTION) self.assertEqual(set([SUCCESSFUL_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, future1]), finished) self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
def test_pending_calls_race(self): # Issue #14406: multi-threaded race condition when waiting on all # futures. event = threading.Event() def future_func(): event.wait() oldswitchinterval = sys.getcheckinterval() sys.setcheckinterval(1) try: fs = set(self.executor.submit(future_func) for i in range(100)) event.set() futures.wait(fs, return_when=futures.ALL_COMPLETED) finally: sys.setcheckinterval(oldswitchinterval)
def test_map_submits_without_iteration(self): """Tests verifying issue 11777.""" finished = [] def record_finished(n): finished.append(n) self.executor.map(record_finished, range(10)) self.executor.shutdown(wait=True) self.assertEqual(len(finished), 10)
def run(self): # init listeners, add them to the event loop for s in self.sockets: s.setblocking(False) self.poller.register(s, selectors.EVENT_READ, self.accept) timeout = self.cfg.timeout or 0.5 while self.alive: # notify the arbiter we are alive self.notify() # can we accept more connections? if self.nr_conns < self.worker_connections: # wait for an event events = self.poller.select(0.02) for key, mask in events: callback = key.data callback(key.fileobj) if not self.is_parent_alive(): break # hanle keepalive timeouts self.murder_keepalived() # if the number of connections is < to the max we can handle at # the same time there is no need to wait for one if len(self.futures) < self.cfg.threads: continue result = futures.wait(self.futures, timeout=timeout, return_when=futures.FIRST_COMPLETED) if not result.done: break else: [self.futures.remove(f) for f in result.done] self.tpool.shutdown(False) self.poller.close()
def listen_vcpu(self, vcpu_io, queue): """Listen to an individual virtual CPU""" logging.info('Start listening on VCPU %s', vcpu_io.vcpu_nb) # we need a per thread continue event continue_event = threading.Event() while not self.stop_request.is_set(): try: nitro_raw_ev = vcpu_io.get_event() except ValueError as e: if not self.vm_io.syscall_filters: # if there are no filters, get_event should not timeout # since we capture all system calls # so log the error logging.debug(str(e)) else: e = NitroEvent(nitro_raw_ev, vcpu_io) # put the event in the queue # and wait for the event to be processed, # when the main thread will set the continue_event item = (e, continue_event) queue.put(item) continue_event.wait() # reset continue_event continue_event.clear() vcpu_io.continue_vm() logging.debug('stop listening on VCPU %s', vcpu_io.vcpu_nb)
def tearDown(self): self.executor.shutdown(wait=True) dt = time.time() - self.t1 if test.support.verbose: print("%.2fs" % dt, end=' ') self.assertLess(dt, 60, "synchronization issue: test lasted too long")