我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用concurrent.futures.as_completed()。
def test_zero_timeout(self): future1 = self.executor.submit(time.sleep, 2) completed_futures = set() try: for future in futures.as_completed( [CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE, future1], timeout=0): completed_futures.add(future) except futures.TimeoutError: pass self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE]), completed_futures)
def get_elb_bucket_locations(self): elbs = self.manager.get_resource_manager('elb').resources() get_elb_attrs = functools.partial( _query_elb_attrs, self.manager.session_factory) with self.executor_factory(max_workers=2) as w: futures = [] for elb_set in chunks(elbs, 100): futures.append(w.submit(get_elb_attrs, elb_set)) for f in as_completed(futures): if f.exception(): log.error("Error while scanning elb log targets: %s" % ( f.exception())) continue for tgt in f.result(): yield tgt
def _process_with_futures(self, helper, buckets, max_workers=3): results = [] with self.executor_factory(max_workers) as w: futures = {} for b in buckets: futures[w.submit(helper, b)] = b for f in as_completed(futures): if f.exception(): b = futures[f] self.log.error( "Error on bucket:%s region:%s policy:%s error: %s", b['Name'], b.get('Location', 'unknown'), self.manager.data.get('name'), f.exception()) self.denied_buckets.add(b['Name']) continue result = f.result() if result: results.append(result) return results
def process(self, buckets): with self.executor_factory(max_workers=3) as w: futures = {} results = [] for b in buckets: futures[w.submit(self.process_bucket, b)] = b for future in as_completed(futures): if future.exception(): bucket = futures[future] self.log.error('error modifying bucket lifecycle: %s\n%s', bucket['Name'], future.exception()) results += filter(None, [future.result()]) return results
def main(workers=None): if workers: workers = int(workers) t0 = time.time() with futures.ProcessPoolExecutor(workers) as executor: actual_workers = executor._max_workers to_do = [] for i in range(JOBS, 0, -1): size = SIZE + int(SIZE / JOBS * (i - JOBS/2)) job = executor.submit(arcfour_test, size, KEY) to_do.append(job) for future in futures.as_completed(to_do): res = future.result() print('{:.1f} KB'.format(res/2**10)) print(STATUS.format(actual_workers, time.time() - t0))
def download_many(cc_list): cc_list = cc_list[:5] # <1> with futures.ThreadPoolExecutor(max_workers=3) as executor: # <2> to_do = [] for cc in sorted(cc_list): # <3> future = executor.submit(download_one, cc) # <4> to_do.append(future) # <5> msg = 'Scheduled for {}: {}' print(msg.format(cc, future)) # <6> results = [] for future in futures.as_completed(to_do): # <7> res = future.result() # <8> msg = '{} result: {!r}' print(msg.format(future, res)) # <9> results.append(res) return len(results) # END FLAGS_THREADPOOL_AS_COMPLETED
def save_month(year_month, verbose): year, month = [int(s) for s in year_month.split('-')] total_size = 0 img_count = 0 dates = potd.list_days_of_month(year, month) with futures.ProcessPoolExecutor(max_workers=100) as executor: downloads = dict((executor.submit(potd.save_one, date, verbose), date) for date in dates) for future in futures.as_completed(downloads): date = downloads[future] if future.exception() is not None: print('%r generated an exception: %s' % (date, future.exception())) else: img_size = future.result() total_size += img_size img_count += 1 print('%r OK: %r' % (date, img_size)) return img_count, total_size
def save_month(year_month, verbose): year, month = [int(s) for s in year_month.split('-')] total_size = 0 img_count = 0 dates = potd.list_days_of_month(year, month) with futures.ThreadPoolExecutor(max_workers=100) as executor: downloads = dict((executor.submit(potd.save_one, date, verbose), date) for date in dates) for future in futures.as_completed(downloads): date = downloads[future] if future.exception() is not None: print('%r generated an exception: %s' % (date, future.exception())) else: img_size = future.result() total_size += img_size img_count += 1 print('%r OK: %r' % (date, img_size)) return img_count, total_size
def test_execution_concurrency_no_scale(self): self._create_function(name='test_python_sleep.py') def _create_execution(): resp, body = self.client.create_execution(self.function_id) return resp, body futs = [] with futurist.ThreadPoolExecutor(max_workers=10) as executor: for _ in range(3): fut = executor.submit(_create_execution) futs.append(fut) for f in futures.as_completed(futs): # Wait until we get the response resp, body = f.result() self.assertEqual(201, resp.status) self.addCleanup(self.client.delete_resource, 'executions', body['id'], ignore_notfound=True) self.assertEqual('success', body['status']) resp, body = self.admin_client.get_function_workers(self.function_id) self.assertEqual(200, resp.status) self.assertEqual(1, len(body['workers']))
def test_execution_concurrency_scale_up(self): self.await_runtime_available(self.runtime_id) self._create_function(name='test_python_sleep.py') def _create_execution(): resp, body = self.client.create_execution(self.function_id) return resp, body futs = [] with futurist.ThreadPoolExecutor(max_workers=10) as executor: for _ in range(6): fut = executor.submit(_create_execution) futs.append(fut) for f in futures.as_completed(futs): # Wait until we get the response resp, body = f.result() self.assertEqual(201, resp.status) self.addCleanup(self.client.delete_resource, 'executions', body['id'], ignore_notfound=True) self.assertEqual('success', body['status']) resp, body = self.admin_client.get_function_workers(self.function_id) self.assertEqual(200, resp.status) self.assertEqual(2, len(body['workers']))
def _check_executor(self, dt): start = time() try: for future in as_completed(self._futures[:], 0): self._futures.remove(future) try: result = future.result() except: traceback.print_exc() # make an error tile? continue if result is None: continue callback, args = result callback(*args) # capped executor in time, in order to prevent too much slowiness. # seems to works quite great with big zoom-in/out if time() - start > self.cap_time: break except TimeoutError: pass
def getloc(): allloc = [] u"""??????api??????????? http://lbs.amap.com/api/webservice/guide/api/search/#text """ with ThreadPoolExecutor(max_workers=5) as executor: url = 'http://lbs.amap.com/api/webservice/guide/api/search/#text' param = { 'key': '22d6f93f929728c10ed86258653ae14a', 'keywords': u'??', 'city': '027', 'citylimit': 'true', 'output': 'json', 'page': '', } future_to_url = {executor.submit(load_url, url, merge_dicts(param, {'page': i}), 60): url for i in range(1, 46)} for future in futures.as_completed(future_to_url): if future.exception() is not None: print(future.exception()) elif future.done(): data = future.result()['pois'] allloc.extend([x['location'] for x in data]) with open('allloc1.pk', 'wb') as f: pickle.dump(allloc, f, True)
def mobai(loc): allmobai = [] with ThreadPoolExecutor(max_workers=5) as executor: url = 'https://mwx.mobike.com/mobike-api/rent/nearbyBikesInfo.do' headers = { 'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Mobile/14E304 MicroMessenger/6.5.7 NetType/WIFI Language/zh_CN', 'Content-Type': 'application/x-www-form-urlencoded', 'Referer': 'https://servicewechat.com/wx80f809371ae33eda/23/page-frame.html', } data = { 'longitude': '', 'latitude': '', 'citycode': '027', } future_to_url = { executor.submit(load_url, url, merge_dicts(data, {'longitude': i.split(',')[0]}, {'latitude': i.split(',')[1]}), 60, headers): url for i in loc} for future in futures.as_completed(future_to_url): if future.exception() is not None: print(future.exception()) elif future.done(): data = future.result()['object'] allmobai.extend(data) # ??mongodb result = collection.insert_many(data)
def run(self): """Concurrently invoke `get_response` for all of instance's `requests`. """ with futures.ThreadPoolExecutor( max_workers=min(self.max_workers, len(self.requests)) ) as executor: to_do = [] for i, request in enumerate(self.requests): future = executor.submit(self.get_response, request, i) to_do.append(future) for future in futures.as_completed(to_do): result = future.result() # `responses` and `pending_requests` are instance properties, which means # client code can inspect instance to read responses as they are completed if result.req.error is not None or result.err == 'skwarg': continue try: self.pending_requests.remove(result.req) except KeyError: print('{} was not in pending requests, this is weird...'.format(result.req)) self.responses.append(result) self.is_done = True
def get_node_health_mt(nodes_dict, check_type="normal", n_threads=8, print_out=False): """use multithreading to check each node health Arguments: nodes_dict {dict} -- [nodesIP(domainName)->(username, mem, CPU)] Keyword Arguments: check_type {str} -- [description] (default: {"normal"}) n_threads {number} -- [description] (default: {8}) """ with ThreadPoolExecutor(max_workers=n_threads) as executor: futures = {executor.submit(check_node_health, nodeinfo[0], node, check_type, print_out): node for node, nodeinfo in nodes_dict.items()} for future in as_completed(futures): node = futures[future] nodeinfo = nodes_dict[node] result = future.result() nodes_dict[node] = (nodeinfo[0], result) # print("{} {}".format(node, nodes_dict[node]))
def _check_executor(self, dt): start = time() try: for future in as_completed(self._futures[:], 0): self._futures.remove(future) try: result = future.result() except Exception: traceback.print_exc() # make an error tile? continue if result is None: continue callback, args = result callback(*args) # capped executor in time, in order to prevent too much # slowiness. # seems to works quite great with big zoom-in/out if time() - start > self.cap_time: break except TimeoutError: pass
def process(self, resources): client = local_session(self.manager.session_factory).client('rds') # restore up to 10 in parallel, we have to wait on each. with self.executor_factory( max_workers=min(10, len(resources) or 1)) as w: futures = {} for r in resources: tags = {t['Key']: t['Value'] for t in r['Tags']} if not set(tags).issuperset(self.restore_keys): self.log.warning( "snapshot:%s missing restore tags", r['DBSnapshotIdentifier']) continue futures[w.submit(self.process_instance, client, r)] = r for f in as_completed(futures): r = futures[f] if f.exception(): self.log.warning( "Error restoring db:%s from:%s error:\n%s", r['DBInstanceIdentifier'], r['DBSnapshotIdentifier'], f.exception()) continue
def process(self, buckets, event=None): results = [] with self.executor_factory(max_workers=2) as w: futures = {} for b in buckets: futures[w.submit(self.process_bucket, b)] = b for f in as_completed(futures): b = futures[f] if f.exception(): self.log.error( "Error processing bucket: %s error: %s", b['Name'], f.exception()) continue if f.result(): results.append(b) return results
def process(self, resources, event=None): self.accounts = self.get_accounts() results = [] with self.executor_factory(max_workers=3) as w: futures = [] for resource_set in chunks(resources, 50): futures.append(w.submit( self.process_resource_set, resource_set)) for f in as_completed(futures): if f.exception(): self.log.error( "Exception checking cross account access \n %s" % ( f.exception())) continue results.extend(f.result()) return results
def process(self, snapshots): self.image_snapshots = set() # Be careful re image snapshots, we do this by default # to keep things safe by default, albeit we'd get an error # if we did try to delete something associated to an image. pre = len(snapshots) snapshots = list(filter(None, _filter_ami_snapshots(self, snapshots))) post = len(snapshots) log.info("Deleting %d snapshots, auto-filtered %d ami-snapshots", post, pre - post) with self.executor_factory(max_workers=2) as w: futures = [] for snapshot_set in chunks(reversed(snapshots), size=50): futures.append( w.submit(self.process_snapshot_set, snapshot_set)) for f in as_completed(futures): if f.exception(): self.log.error( "Exception deleting snapshot set \n %s" % ( f.exception())) return snapshots
def process(self, volumes): vol_count = len(volumes) volumes = [v for v in volumes if v['Attachments']] if len(volumes) != vol_count: self.log.warning( "ebs copy tags action implicitly filtered from %d to %d", vol_count, len(volumes)) self.initialize(volumes) with self.executor_factory(max_workers=10) as w: futures = [] for instance_set in chunks(sorted( self.instance_map.keys(), reverse=True), size=100): futures.append( w.submit(self.process_instance_set, instance_set)) for f in as_completed(futures): if f.exception(): self.log.error( "Exception copying instance tags \n %s" % ( f.exception()))
def create_elb_active_attributes_tuples(self, elb_policy_tuples): """ creates a list of tuples for all attributes that are marked as "true" in the load balancer's polices, e.g. (myelb,['Protocol-SSLv1','Protocol-SSLv2']) """ active_policy_attribute_tuples = [] with self.executor_factory(max_workers=2) as w: futures = [] for elb_policy_set in chunks(elb_policy_tuples, 50): futures.append( w.submit(self.process_elb_policy_set, elb_policy_set)) for f in as_completed(futures): if f.exception(): self.log.error( "Exception processing elb policies \n %s" % ( f.exception())) continue for elb_policies in f.result(): active_policy_attribute_tuples.append(elb_policies) return active_policy_attribute_tuples
def process(self, asgs): error = False key = self.data.get('key', DEFAULT_TAG) with self.executor_factory(max_workers=3) as w: futures = {} for asg_set in chunks(asgs, self.batch_size): futures[w.submit(self.process_asg_set, asg_set, key)] = asg_set for f in as_completed(futures): asg_set = futures[f] if f.exception(): error = f.exception() self.log.exception( "Exception untagging asg:%s tag:%s error:%s" % ( ", ".join([a['AutoScalingGroupName'] for a in asg_set]), self.data.get('key', DEFAULT_TAG), f.exception())) if error: raise error
def tag(self, asgs, key, value): error = None with self.executor_factory(max_workers=3) as w: futures = {} for asg_set in chunks(asgs, self.batch_size): futures[w.submit( self.process_asg_set, asg_set, key, value)] = asg_set for f in as_completed(futures): asg_set = futures[f] if f.exception(): self.log.exception( "Exception untagging tag:%s error:%s asg:%s" % ( self.data.get('key', DEFAULT_TAG), f.exception(), ", ".join([a['AutoScalingGroupName'] for a in asg_set]))) if error: raise error
def process(self, resources, event=None): client = local_session(self.manager.session_factory).client('iam') with self.executor_factory(max_workers=2) as w: futures = [] for user_set in chunks( [r for r in resources if 'c7n:Groups' not in r], size=50): futures.append( w.submit(self.get_user_groups, client, user_set)) for f in as_completed(futures): pass matched = [] for r in resources: for p in r['c7n:Groups']: if self.match(p) and r not in matched: matched.append(r) return matched
def process(self, clusters): with self.executor_factory(max_workers=3) as w: futures = [] for cluster in clusters: if not _cluster_eligible_for_snapshot(cluster): continue futures.append(w.submit( self.process_cluster_snapshot, cluster)) for f in as_completed(futures): if f.exception(): self.log.error( "Exception creating cache cluster snapshot \n %s", f.exception()) return clusters
def _common_tag_processer(executor_factory, batch_size, concurrency, process_resource_set, id_key, resources, tags, log): with executor_factory(max_workers=concurrency) as w: futures = [] for resource_set in utils.chunks(resources, size=batch_size): futures.append( w.submit(process_resource_set, resource_set, tags)) for f in as_completed(futures): if f.exception(): log.error( "Exception with tags: %s on resources: %s \n %s" % ( tags, ", ".join([r[id_key] for r in resource_set]), f.exception()))
def process(self, resources): count = len(resources) resources = self.filter_resources(resources) self.log.info( "Filtered from %s resources to %s" % (count, len(resources))) self.id_key = self.manager.get_model().id resource_set = self.create_set(resources) with self.executor_factory(max_workers=3) as w: futures = [] for r in resource_set: futures.append( w.submit(self.process_rename, r, resource_set[r])) for f in as_completed(futures): if f.exception(): self.log.error( "Exception renaming tag set \n %s" % ( f.exception())) return resources
def run(config, start, end, accounts): """run export across accounts and log groups specified in config.""" config = validate.callback(config) destination = config.get('destination') start = start and parse(start) or start end = end and parse(end) or datetime.now() executor = debug and MainThreadExecutor or ThreadPoolExecutor with executor(max_workers=32) as w: futures = {} for account in config.get('accounts', ()): if accounts and account['name'] not in accounts: continue futures[ w.submit(process_account, account, start, end, destination)] = account for f in as_completed(futures): account = futures[f] if f.exception(): log.error("Error on account %s err: %s", account['name'], f.exception()) log.info("Completed %s", account['name'])
def sched_downloads(d_set,dl_dir,num_threads,vids): d_set_dir = dl_dir+'/'+d_set+'/' # Make the directory for this dataset check_call(' '.join(['mkdir', '-p', d_set_dir]), shell=True) # Tell the user when downloads were started datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Download and cut in parallel threads giving with futures.ProcessPoolExecutor(max_workers=num_threads) as executor: fs = [executor.submit(dl_and_cut,vid) for vid in vids] for i, f in enumerate(futures.as_completed(fs)): # Write progress to error so that it can be seen sys.stderr.write( \ "Downloaded video: {} / {} \r".format(i, len(vids))) print( d_set+': All videos downloaded' )
def parallelize(self, fn, number_of_threads=None): """ Parallelize a function call. Number of threads defaults to your cpu count + 1. """ number_of_threads = number_of_threads or (cpu_count() + 1) def _inner(): with ThreadPoolExecutor(number_of_threads) as tpe: tasks = [tpe.submit(fn, item) for item in self._items] for future in as_completed(tasks): try: yield future.result() except Exception as exception: yield exception return Slinkie(_inner())
def upload_template_files(*args): print('\n== UPLOAD ARM TEMPLATES ==') parser = argparse.ArgumentParser(description='Upload ARM Templates') parser.add_argument('--name', metavar='NAME', required=True, help='Name of the thing being uploaded (in CamelCase)') parser.add_argument('--src', metavar='PATH', required=True, help='Path to the directory containing ARM templates to upload. Subdirectories will automatically be crawled.') parser.add_argument('--api-version', metavar='VERSION', required=True, help='API version for the templates being uploaded in yyyy-MM-dd format. (ex: 2016-07-01)') args = parser.parse_args(args) name = args.name api_version = args.api_version src = args.src _upload_templates(name, api_version, src) from concurrent.futures import ThreadPoolExecutor, as_completed with ThreadPoolExecutor(max_workers=40) as executor: tasks = [executor.submit(lambda cmd: os.system(cmd), u) for u in uploads] for t in as_completed(tasks): t.result() # don't use the result but expose exceptions from the threads
def cli_build(args): assert check_output(['docker', 'ps']), "Docker required." build_types = args.build_types git_url = args.git_clone_url git_branch = args.git_clone_branch cli_version = args.cli_version artifact_dir = tempfile.mkdtemp(prefix='cli-build-{}-'.format(datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')), dir=os.getcwd()) if len(build_types) == 1 and build_types[0] == '*': build_types = BUILD_TYPES print_heading('Building for {} from branch {} of {} ' 'and version number will be {}\n' 'Build artifacts will be in {}'.format(', '.join(build_types), git_branch, git_url, cli_version, artifact_dir)) from concurrent.futures import ThreadPoolExecutor, as_completed with ThreadPoolExecutor(max_workers=len(build_types)) as executor: tasks = {executor.submit(build_dispatch, bt, git_url, git_branch, cli_version, artifact_dir, arg_ns=args) for bt in build_types} for t in as_completed(tasks): t.result() print('Done.')
def run(self, data, max=4): results = [] with futures.ThreadPoolExecutor(max_workers=max) as executor: future_to_url = {} for i, payload in enumerate(data): payload['chrome_id'] = i future_to_url[executor.submit(self.run1, payload)] = payload # future_to_url[executor.submit(self.run1_core, payload, browser, begin_time)] = payload for future in futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: data['chrome_id'] = url['chrome_id'] results.append(data) sorted_results = sorted(results, key=lambda tup: tup['chrome_id']) return sorted_results
def map_to_bids(self, bids_map, bids_dir, dicom_dir, biopac_dir, nthreads, overwrite): # Parse bids_map csv table, and create execution list for BIDS generation mapping = pd.read_csv(bids_map, header=0, index_col=None) mapping.replace(np.nan, '', regex=True, inplace=True) with ThreadPoolExecutor(max_workers=nthreads) as executor: futures = [] for _, row in mapping.iterrows(): futures.append(executor.submit(self._process_map_row, row, bids_dir, dicom_dir, self.conversion_tool, biopac_dir, overwrite)) success = True for future in as_completed(futures): if not future.result(): success = False break if not success: self.log.error("There were errors converting the provided datasets to BIDS format. See log for more" " information.")
def repeat(f, reps, cpus, **kwargs): if reps == 1: f(**kwargs) return fname = f.__name__ print("Starting {} {} times with:".format(fname, reps)) print(kwargs) if cpus == 1: for _ in range(reps): try: f(**kwargs) except Exception as e: warnings.warn(str(e)) else: from multiprocessing import cpu_count from concurrent.futures import ProcessPoolExecutor, as_completed if cpus < 1: cpus = cpu_count() with ProcessPoolExecutor(cpus) as executor: futures = [executor.submit(f, **kwargs) for _ in range(reps)] for fut in as_completed(futures): if fut.exception(): warnings.warn(str(fut.exception())) print("Finished")
def concurrent_find(func, params, **kw): timeout = kw.pop("concurrent_timeout", None) with async(func, list(params), **kw) as futures: future = None try: for future in futures.as_completed(timeout=timeout): if not future.exception() and future.result(): futures.kill() return future.result() else: if future: return future.result() except FutureTimeoutError as exc: if not timeout: # ?? raise futures.kill() _logger.warning("Concurrent future timed out (%s)", exc)
def main(keys): t0 = time.time() executor = futures.ThreadPoolExecutor(max_workers=len(keys)) to_do = [] for key in keys: config = copy.deepcopy(krx.load_config(key)) future = executor.submit(download, key, config) to_do.append(future) done = 0 total_records = 0 for future in futures.as_completed(to_do): done += 1 key, records = future.result() total_records += records print(f'[{done:,d}/{len(keys):,d}] {key} records fetched {records:>8,} records', file=sys.stderr) elapsed = time.time() - t0 print(f'{total_records:,} Records fetched in {elapsed:.2f}s', file=sys.stderr)
def main(keys): t0 = time.time() ARGS = dict(zip(['command', 'date'], sys.argv)) date = ARGS.get('date', f'{datetime.date.today():%Y%m%d}') executor = futures.ThreadPoolExecutor(max_workers=len(keys)) to_do = [] for key in keys: config = copy.deepcopy(krx.load_config(key)) config['contents']['data']['schdate'] = date future = executor.submit(download, key, config) to_do.append(future) done = 0 total_records = 0 for future in futures.as_completed(to_do): done += 1 key, records = future.result() total_records += records print(f'[{done:,d}/{len(keys):,d}] {key} records fetched {records:>8,} records', file=sys.stderr) elapsed = time.time() - t0 print(f'{total_records:,} Records fetched in {elapsed:.2f}s', file=sys.stderr)
def collect(self, count, dst_port=util.DEFAULT_DST_PORT, timeout=util.DEFAULT_TIMEOUT): """Collects latency against a set of hosts. Args: count: (int) number of datagrams to send each host timeout: (float) seconds to wait for probes to return """ jobs = [] with futures.ThreadPoolExecutor(max_workers=50) as executor: for host in self.metrics.keys(): logging.info('Assigning target host: %s', host) jobs.append(executor.submit(self.method, host, count=count, port=dst_port, timeout=timeout, )) for job in futures.as_completed(jobs): loss, rtt, host = job.result() self.metrics[host].loss = loss self.metrics[host].rtt = rtt logging.info('Summary {:16}:{:>3}% loss, {:>4} ms rtt'.format( host, loss, rtt))