Python concurrent.futures 模块,as_completed() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用concurrent.futures.as_completed()

项目:deb-python-concurrent.futures    作者:openstack    | 项目源码 | 文件源码
def test_zero_timeout(self):
        future1 = self.executor.submit(time.sleep, 2)
        completed_futures = set()
        try:
            for future in futures.as_completed(
                    [CANCELLED_AND_NOTIFIED_FUTURE,
                     EXCEPTION_FUTURE,
                     SUCCESSFUL_FUTURE,
                     future1],
                    timeout=0):
                completed_futures.add(future)
        except futures.TimeoutError:
            pass

        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
                              EXCEPTION_FUTURE,
                              SUCCESSFUL_FUTURE]),
                         completed_futures)
项目:Cloud-Custodian    作者:jtroberts83    | 项目源码 | 文件源码
def get_elb_bucket_locations(self):
        elbs = self.manager.get_resource_manager('elb').resources()
        get_elb_attrs = functools.partial(
            _query_elb_attrs, self.manager.session_factory)

        with self.executor_factory(max_workers=2) as w:
            futures = []
            for elb_set in chunks(elbs, 100):
                futures.append(w.submit(get_elb_attrs, elb_set))
            for f in as_completed(futures):
                if f.exception():
                    log.error("Error while scanning elb log targets: %s" % (
                        f.exception()))
                    continue
                for tgt in f.result():
                    yield tgt
项目:Cloud-Custodian    作者:jtroberts83    | 项目源码 | 文件源码
def _process_with_futures(self, helper, buckets, max_workers=3):
        results = []
        with self.executor_factory(max_workers) as w:
            futures = {}
            for b in buckets:
                futures[w.submit(helper, b)] = b
            for f in as_completed(futures):
                if f.exception():
                    b = futures[f]
                    self.log.error(
                        "Error on bucket:%s region:%s policy:%s error: %s",
                        b['Name'], b.get('Location', 'unknown'),
                        self.manager.data.get('name'), f.exception())
                    self.denied_buckets.add(b['Name'])
                    continue
                result = f.result()
                if result:
                    results.append(result)
        return results
项目:Cloud-Custodian    作者:jtroberts83    | 项目源码 | 文件源码
def get_elb_bucket_locations(self):
        elbs = self.manager.get_resource_manager('elb').resources()
        get_elb_attrs = functools.partial(
            _query_elb_attrs, self.manager.session_factory)

        with self.executor_factory(max_workers=2) as w:
            futures = []
            for elb_set in chunks(elbs, 100):
                futures.append(w.submit(get_elb_attrs, elb_set))
            for f in as_completed(futures):
                if f.exception():
                    log.error("Error while scanning elb log targets: %s" % (
                        f.exception()))
                    continue
                for tgt in f.result():
                    yield tgt
项目:Cloud-Custodian    作者:jtroberts83    | 项目源码 | 文件源码
def process(self, buckets):
        with self.executor_factory(max_workers=3) as w:
            futures = {}
            results = []

            for b in buckets:
                futures[w.submit(self.process_bucket, b)] = b

            for future in as_completed(futures):
                if future.exception():
                    bucket = futures[future]
                    self.log.error('error modifying bucket lifecycle: %s\n%s',
                                   bucket['Name'], future.exception())
                results += filter(None, [future.result()])

            return results
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def main(workers=None):
    if workers:
        workers = int(workers)
    t0 = time.time()

    with futures.ProcessPoolExecutor(workers) as executor:
        actual_workers = executor._max_workers
        to_do = []
        for i in range(JOBS, 0, -1):
            size = SIZE + int(SIZE / JOBS * (i - JOBS/2))
            job = executor.submit(arcfour_test, size, KEY)
            to_do.append(job)

        for future in futures.as_completed(to_do):
            res = future.result()
            print('{:.1f} KB'.format(res/2**10))

    print(STATUS.format(actual_workers, time.time() - t0))
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def download_many(cc_list):
    cc_list = cc_list[:5]  # <1>
    with futures.ThreadPoolExecutor(max_workers=3) as executor:  # <2>
        to_do = []
        for cc in sorted(cc_list):  # <3>
            future = executor.submit(download_one, cc)  # <4>
            to_do.append(future)  # <5>
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))  # <6>

        results = []
        for future in futures.as_completed(to_do):  # <7>
            res = future.result()  # <8>
            msg = '{} result: {!r}'
            print(msg.format(future, res)) # <9>
            results.append(res)

    return len(results)
# END FLAGS_THREADPOOL_AS_COMPLETED
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def save_month(year_month, verbose):
    year, month = [int(s) for s in year_month.split('-')]
    total_size = 0
    img_count = 0
    dates = potd.list_days_of_month(year, month)

    with futures.ProcessPoolExecutor(max_workers=100) as executor:
        downloads = dict((executor.submit(potd.save_one, date, verbose), date)
                             for date in dates)

        for future in futures.as_completed(downloads):
            date = downloads[future]
            if future.exception() is not None:
                print('%r generated an exception: %s' % (date,
                                                         future.exception()))
            else:
                img_size = future.result()
                total_size += img_size
                img_count += 1
                print('%r OK: %r' % (date, img_size))

    return img_count, total_size
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def save_month(year_month, verbose):
    year, month = [int(s) for s in year_month.split('-')]
    total_size = 0
    img_count = 0
    dates = potd.list_days_of_month(year, month)

    with futures.ThreadPoolExecutor(max_workers=100) as executor:
        downloads = dict((executor.submit(potd.save_one, date, verbose), date)
                             for date in dates)

        for future in futures.as_completed(downloads):
            date = downloads[future]
            if future.exception() is not None:
                print('%r generated an exception: %s' % (date,
                                                         future.exception()))
            else:
                img_size = future.result()
                total_size += img_size
                img_count += 1
                print('%r OK: %r' % (date, img_size))

    return img_count, total_size
项目:qinling    作者:openstack    | 项目源码 | 文件源码
def test_execution_concurrency_no_scale(self):
        self._create_function(name='test_python_sleep.py')

        def _create_execution():
            resp, body = self.client.create_execution(self.function_id)
            return resp, body

        futs = []
        with futurist.ThreadPoolExecutor(max_workers=10) as executor:
            for _ in range(3):
                fut = executor.submit(_create_execution)
                futs.append(fut)
            for f in futures.as_completed(futs):
                # Wait until we get the response
                resp, body = f.result()

                self.assertEqual(201, resp.status)
                self.addCleanup(self.client.delete_resource, 'executions',
                                body['id'], ignore_notfound=True)
                self.assertEqual('success', body['status'])

        resp, body = self.admin_client.get_function_workers(self.function_id)

        self.assertEqual(200, resp.status)
        self.assertEqual(1, len(body['workers']))
项目:qinling    作者:openstack    | 项目源码 | 文件源码
def test_execution_concurrency_scale_up(self):
        self.await_runtime_available(self.runtime_id)
        self._create_function(name='test_python_sleep.py')

        def _create_execution():
            resp, body = self.client.create_execution(self.function_id)
            return resp, body

        futs = []
        with futurist.ThreadPoolExecutor(max_workers=10) as executor:
            for _ in range(6):
                fut = executor.submit(_create_execution)
                futs.append(fut)
            for f in futures.as_completed(futs):
                # Wait until we get the response
                resp, body = f.result()

                self.assertEqual(201, resp.status)
                self.addCleanup(self.client.delete_resource, 'executions',
                                body['id'], ignore_notfound=True)
                self.assertEqual('success', body['status'])

        resp, body = self.admin_client.get_function_workers(self.function_id)
        self.assertEqual(200, resp.status)
        self.assertEqual(2, len(body['workers']))
项目:MeetNet    作者:dimgold    | 项目源码 | 文件源码
def _check_executor(self, dt):
        start = time()
        try:
            for future in as_completed(self._futures[:], 0):
                self._futures.remove(future)
                try:
                    result = future.result()
                except:
                    traceback.print_exc()
                    # make an error tile?
                    continue
                if result is None:
                    continue
                callback, args = result
                callback(*args)

                # capped executor in time, in order to prevent too much slowiness.
                # seems to works quite great with big zoom-in/out
                if time() - start > self.cap_time:
                    break
        except TimeoutError:
            pass
项目:mobike    作者:wangshulingxiao    | 项目源码 | 文件源码
def getloc():
    allloc = []
    u"""??????api???????????
    http://lbs.amap.com/api/webservice/guide/api/search/#text
    """
    with ThreadPoolExecutor(max_workers=5) as executor:
        url = 'http://lbs.amap.com/api/webservice/guide/api/search/#text'
        param = {
            'key': '22d6f93f929728c10ed86258653ae14a',
            'keywords': u'??',
            'city': '027',
            'citylimit': 'true',
            'output': 'json',
            'page': '',
        }
        future_to_url = {executor.submit(load_url, url, merge_dicts(param, {'page': i}), 60): url for i in range(1, 46)}
        for future in futures.as_completed(future_to_url):
            if future.exception() is not None:
                print(future.exception())
            elif future.done():
                data = future.result()['pois']
                allloc.extend([x['location'] for x in data])
        with open('allloc1.pk', 'wb') as f:
            pickle.dump(allloc, f, True)
项目:mobike    作者:wangshulingxiao    | 项目源码 | 文件源码
def mobai(loc):
    allmobai = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        url = 'https://mwx.mobike.com/mobike-api/rent/nearbyBikesInfo.do'
        headers = {
            'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Mobile/14E304 MicroMessenger/6.5.7 NetType/WIFI Language/zh_CN',
            'Content-Type': 'application/x-www-form-urlencoded',
            'Referer': 'https://servicewechat.com/wx80f809371ae33eda/23/page-frame.html',
        }
        data = {
            'longitude': '',
            'latitude': '',
            'citycode': '027',
        }
        future_to_url = {
            executor.submit(load_url, url, merge_dicts(data, {'longitude': i.split(',')[0]}, {'latitude': i.split(',')[1]}), 60, headers): url for i in loc}
        for future in futures.as_completed(future_to_url):
            if future.exception() is not None:
                print(future.exception())
            elif future.done():
                data = future.result()['object']
                allmobai.extend(data)
                # ??mongodb
                result = collection.insert_many(data)
项目:Requester    作者:kylebebak    | 项目源码 | 文件源码
def run(self):
        """Concurrently invoke `get_response` for all of instance's `requests`.
        """
        with futures.ThreadPoolExecutor(
            max_workers=min(self.max_workers, len(self.requests))
        ) as executor:
            to_do = []
            for i, request in enumerate(self.requests):
                future = executor.submit(self.get_response, request, i)
                to_do.append(future)

            for future in futures.as_completed(to_do):
                result = future.result()
                # `responses` and `pending_requests` are instance properties, which means
                # client code can inspect instance to read responses as they are completed
                if result.req.error is not None or result.err == 'skwarg':
                    continue
                try:
                    self.pending_requests.remove(result.req)
                except KeyError:
                    print('{} was not in pending requests, this is weird...'.format(result.req))
                self.responses.append(result)
        self.is_done = True
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_zero_timeout(self):
        future1 = self.executor.submit(time.sleep, 2)
        completed_futures = set()
        try:
            for future in futures.as_completed(
                    [CANCELLED_AND_NOTIFIED_FUTURE,
                     EXCEPTION_FUTURE,
                     SUCCESSFUL_FUTURE,
                     future1],
                    timeout=0):
                completed_futures.add(future)
        except futures.TimeoutError:
            pass

        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
                              EXCEPTION_FUTURE,
                              SUCCESSFUL_FUTURE]),
                         completed_futures)
项目:multitaskLauncher    作者:1a1a11a    | 项目源码 | 文件源码
def get_node_health_mt(nodes_dict, check_type="normal", n_threads=8, print_out=False):
    """use multithreading to check each node health

    Arguments:
        nodes_dict {dict} -- [nodesIP(domainName)->(username, mem, CPU)]

    Keyword Arguments:
        check_type {str} -- [description] (default: {"normal"})
        n_threads {number} -- [description] (default: {8})
    """ 

    with ThreadPoolExecutor(max_workers=n_threads) as executor:
        futures = {executor.submit(check_node_health, nodeinfo[0], node, check_type, print_out): node 
                            for node, nodeinfo in nodes_dict.items()} 
        for future in as_completed(futures):
            node = futures[future]
            nodeinfo = nodes_dict[node]
            result = future.result()
            nodes_dict[node] = (nodeinfo[0], result)

            # print("{} {}".format(node, nodes_dict[node]))
项目:pydelhi_mobile    作者:pydelhi    | 项目源码 | 文件源码
def _check_executor(self, dt):
        start = time()
        try:
            for future in as_completed(self._futures[:], 0):
                self._futures.remove(future)
                try:
                    result = future.result()
                except:
                    traceback.print_exc()
                    # make an error tile?
                    continue
                if result is None:
                    continue
                callback, args = result
                callback(*args)

                # capped executor in time, in order to prevent too much slowiness.
                # seems to works quite great with big zoom-in/out
                if time() - start > self.cap_time:
                    break
        except TimeoutError:
            pass
项目:HAB2017    作者:LBCC-SpaceClub    | 项目源码 | 文件源码
def _check_executor(self, dt):
        start = time()
        try:
            for future in as_completed(self._futures[:], 0):
                self._futures.remove(future)
                try:
                    result = future.result()
                except Exception:
                    traceback.print_exc()
                    # make an error tile?
                    continue
                if result is None:
                    continue
                callback, args = result
                callback(*args)

                # capped executor in time, in order to prevent too much
                # slowiness.
                # seems to works quite great with big zoom-in/out
                if time() - start > self.cap_time:
                    break
        except TimeoutError:
            pass
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_zero_timeout(self):
        future1 = self.executor.submit(time.sleep, 2)
        completed_futures = set()
        try:
            for future in futures.as_completed(
                    [CANCELLED_AND_NOTIFIED_FUTURE,
                     EXCEPTION_FUTURE,
                     SUCCESSFUL_FUTURE,
                     future1],
                    timeout=0):
                completed_futures.add(future)
        except futures.TimeoutError:
            pass

        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
                              EXCEPTION_FUTURE,
                              SUCCESSFUL_FUTURE]),
                         completed_futures)
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def process(self, resources):
        client = local_session(self.manager.session_factory).client('rds')
        # restore up to 10 in parallel, we have to wait on each.
        with self.executor_factory(
                max_workers=min(10, len(resources) or 1)) as w:
            futures = {}
            for r in resources:
                tags = {t['Key']: t['Value'] for t in r['Tags']}
                if not set(tags).issuperset(self.restore_keys):
                    self.log.warning(
                        "snapshot:%s missing restore tags",
                        r['DBSnapshotIdentifier'])
                    continue
                futures[w.submit(self.process_instance, client, r)] = r
            for f in as_completed(futures):
                r = futures[f]
                if f.exception():
                    self.log.warning(
                        "Error restoring db:%s from:%s error:\n%s",
                        r['DBInstanceIdentifier'], r['DBSnapshotIdentifier'],
                        f.exception())
                    continue
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def _process_with_futures(self, helper, buckets, max_workers=3):
        results = []
        with self.executor_factory(max_workers) as w:
            futures = {}
            for b in buckets:
                futures[w.submit(helper, b)] = b
            for f in as_completed(futures):
                if f.exception():
                    b = futures[f]
                    self.log.error(
                        "Error on bucket:%s region:%s policy:%s error: %s",
                        b['Name'], b.get('Location', 'unknown'),
                        self.manager.data.get('name'), f.exception())
                    self.denied_buckets.add(b['Name'])
                    continue
                result = f.result()
                if result:
                    results.append(result)
        return results
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def get_elb_bucket_locations(self):
        elbs = self.manager.get_resource_manager('elb').resources()
        get_elb_attrs = functools.partial(
            _query_elb_attrs, self.manager.session_factory)

        with self.executor_factory(max_workers=2) as w:
            futures = []
            for elb_set in chunks(elbs, 100):
                futures.append(w.submit(get_elb_attrs, elb_set))
            for f in as_completed(futures):
                if f.exception():
                    log.error("Error while scanning elb log targets: %s" % (
                        f.exception()))
                    continue
                for tgt in f.result():
                    yield tgt
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def process(self, buckets, event=None):
        results = []
        with self.executor_factory(max_workers=2) as w:
            futures = {}
            for b in buckets:
                futures[w.submit(self.process_bucket, b)] = b

            for f in as_completed(futures):
                b = futures[f]
                if f.exception():
                    self.log.error(
                        "Error processing bucket: %s error: %s",
                        b['Name'], f.exception())
                    continue
                if f.result():
                    results.append(b)
        return results
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def process(self, buckets):
        with self.executor_factory(max_workers=3) as w:
            futures = {}
            results = []

            for b in buckets:
                futures[w.submit(self.process_bucket, b)] = b

            for future in as_completed(futures):
                if future.exception():
                    bucket = futures[future]
                    self.log.error('error modifying bucket lifecycle: %s\n%s',
                                   bucket['Name'], future.exception())
                results += filter(None, [future.result()])

            return results
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def process(self, resources, event=None):
        self.accounts = self.get_accounts()
        results = []
        with self.executor_factory(max_workers=3) as w:
            futures = []
            for resource_set in chunks(resources, 50):
                futures.append(w.submit(
                    self.process_resource_set, resource_set))
            for f in as_completed(futures):
                if f.exception():
                    self.log.error(
                        "Exception checking cross account access \n %s" % (
                            f.exception()))
                    continue
                results.extend(f.result())
        return results
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def process(self, snapshots):
        self.image_snapshots = set()
        # Be careful re image snapshots, we do this by default
        # to keep things safe by default, albeit we'd get an error
        # if we did try to delete something associated to an image.
        pre = len(snapshots)
        snapshots = list(filter(None, _filter_ami_snapshots(self, snapshots)))
        post = len(snapshots)
        log.info("Deleting %d snapshots, auto-filtered %d ami-snapshots",
                 post, pre - post)

        with self.executor_factory(max_workers=2) as w:
            futures = []
            for snapshot_set in chunks(reversed(snapshots), size=50):
                futures.append(
                    w.submit(self.process_snapshot_set, snapshot_set))
            for f in as_completed(futures):
                if f.exception():
                    self.log.error(
                        "Exception deleting snapshot set \n %s" % (
                            f.exception()))
        return snapshots
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def process(self, volumes):
        vol_count = len(volumes)
        volumes = [v for v in volumes if v['Attachments']]
        if len(volumes) != vol_count:
            self.log.warning(
                "ebs copy tags action implicitly filtered from %d to %d",
                vol_count, len(volumes))
        self.initialize(volumes)
        with self.executor_factory(max_workers=10) as w:
            futures = []
            for instance_set in chunks(sorted(
                    self.instance_map.keys(), reverse=True), size=100):
                futures.append(
                    w.submit(self.process_instance_set, instance_set))
            for f in as_completed(futures):
                if f.exception():
                    self.log.error(
                        "Exception copying instance tags \n %s" % (
                            f.exception()))
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def create_elb_active_attributes_tuples(self, elb_policy_tuples):
        """
        creates a list of tuples for all attributes that are marked
        as "true" in the load balancer's polices, e.g.
        (myelb,['Protocol-SSLv1','Protocol-SSLv2'])
        """
        active_policy_attribute_tuples = []
        with self.executor_factory(max_workers=2) as w:
            futures = []
            for elb_policy_set in chunks(elb_policy_tuples, 50):
                futures.append(
                    w.submit(self.process_elb_policy_set, elb_policy_set))

            for f in as_completed(futures):
                if f.exception():
                    self.log.error(
                        "Exception processing elb policies \n %s" % (
                            f.exception()))
                    continue
                for elb_policies in f.result():
                    active_policy_attribute_tuples.append(elb_policies)

        return active_policy_attribute_tuples
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def process(self, asgs):
        error = False
        key = self.data.get('key', DEFAULT_TAG)
        with self.executor_factory(max_workers=3) as w:
            futures = {}
            for asg_set in chunks(asgs, self.batch_size):
                futures[w.submit(self.process_asg_set, asg_set, key)] = asg_set
            for f in as_completed(futures):
                asg_set = futures[f]
                if f.exception():
                    error = f.exception()
                    self.log.exception(
                        "Exception untagging asg:%s tag:%s error:%s" % (
                            ", ".join([a['AutoScalingGroupName']
                                       for a in asg_set]),
                            self.data.get('key', DEFAULT_TAG),
                            f.exception()))
        if error:
            raise error
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def tag(self, asgs, key, value):
        error = None
        with self.executor_factory(max_workers=3) as w:
            futures = {}
            for asg_set in chunks(asgs, self.batch_size):
                futures[w.submit(
                    self.process_asg_set, asg_set, key, value)] = asg_set
            for f in as_completed(futures):
                asg_set = futures[f]
                if f.exception():
                    self.log.exception(
                        "Exception untagging tag:%s error:%s asg:%s" % (
                            self.data.get('key', DEFAULT_TAG),
                            f.exception(),
                            ", ".join([a['AutoScalingGroupName']
                                       for a in asg_set])))
        if error:
            raise error
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def process(self, resources, event=None):
        client = local_session(self.manager.session_factory).client('iam')
        with self.executor_factory(max_workers=2) as w:
            futures = []
            for user_set in chunks(
                    [r for r in resources if 'c7n:Groups' not in r], size=50):
                futures.append(
                    w.submit(self.get_user_groups, client, user_set))
            for f in as_completed(futures):
                pass

        matched = []
        for r in resources:
            for p in r['c7n:Groups']:
                if self.match(p) and r not in matched:
                    matched.append(r)
        return matched
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def process(self, clusters):
        with self.executor_factory(max_workers=3) as w:
            futures = []
            for cluster in clusters:
                if not _cluster_eligible_for_snapshot(cluster):
                    continue
                futures.append(w.submit(
                    self.process_cluster_snapshot,
                    cluster))

            for f in as_completed(futures):
                if f.exception():
                    self.log.error(
                        "Exception creating cache cluster snapshot \n %s",
                        f.exception())
        return clusters
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def _common_tag_processer(executor_factory, batch_size, concurrency,
                          process_resource_set, id_key, resources, tags,
                          log):

    with executor_factory(max_workers=concurrency) as w:
        futures = []
        for resource_set in utils.chunks(resources, size=batch_size):
            futures.append(
                w.submit(process_resource_set, resource_set, tags))

        for f in as_completed(futures):
            if f.exception():
                log.error(
                    "Exception with tags: %s on resources: %s \n %s" % (
                        tags,
                        ", ".join([r[id_key] for r in resource_set]),
                        f.exception()))
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def process(self, resources):
        count = len(resources)
        resources = self.filter_resources(resources)
        self.log.info(
            "Filtered from %s resources to %s" % (count, len(resources)))
        self.id_key = self.manager.get_model().id
        resource_set = self.create_set(resources)
        with self.executor_factory(max_workers=3) as w:
            futures = []
            for r in resource_set:
                futures.append(
                    w.submit(self.process_rename, r, resource_set[r]))
            for f in as_completed(futures):
                if f.exception():
                    self.log.error(
                        "Exception renaming tag set \n %s" % (
                            f.exception()))
        return resources
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def run(config, start, end, accounts):
    """run export across accounts and log groups specified in config."""
    config = validate.callback(config)
    destination = config.get('destination')
    start = start and parse(start) or start
    end = end and parse(end) or datetime.now()
    executor = debug and MainThreadExecutor or ThreadPoolExecutor
    with executor(max_workers=32) as w:
        futures = {}
        for account in config.get('accounts', ()):
            if accounts and account['name'] not in accounts:
                continue
            futures[
                w.submit(process_account, account, start, end, destination)] = account
        for f in as_completed(futures):
            account = futures[f]
            if f.exception():
                log.error("Error on account %s err: %s",
                          account['name'], f.exception())
            log.info("Completed %s", account['name'])
项目:mobileinsight-mobile    作者:mobile-insight    | 项目源码 | 文件源码
def _check_executor(self, dt):
        start = time()
        try:
            for future in as_completed(self._futures[:], 0):
                self._futures.remove(future)
                try:
                    result = future.result()
                except Exception:
                    traceback.print_exc()
                    # make an error tile?
                    continue
                if result is None:
                    continue
                callback, args = result
                callback(*args)

                # capped executor in time, in order to prevent too much
                # slowiness.
                # seems to works quite great with big zoom-in/out
                if time() - start > self.cap_time:
                    break
        except TimeoutError:
            pass
项目:youtube-bb    作者:mbuckler    | 项目源码 | 文件源码
def sched_downloads(d_set,dl_dir,num_threads,vids):
  d_set_dir = dl_dir+'/'+d_set+'/'

  # Make the directory for this dataset
  check_call(' '.join(['mkdir', '-p', d_set_dir]), shell=True)

  # Tell the user when downloads were started
  datetime.now().strftime("%Y-%m-%d %H:%M:%S")

  # Download and cut in parallel threads giving
  with futures.ProcessPoolExecutor(max_workers=num_threads) as executor:
    fs = [executor.submit(dl_and_cut,vid) for vid in vids]
    for i, f in enumerate(futures.as_completed(fs)):
      # Write progress to error so that it can be seen
      sys.stderr.write( \
        "Downloaded video: {} / {} \r".format(i, len(vids)))

  print( d_set+': All videos downloaded' )
项目:slinkie    作者:segfaultsourcery    | 项目源码 | 文件源码
def parallelize(self, fn, number_of_threads=None):
        """
        Parallelize a function call. Number of threads defaults to your cpu count + 1.
        """

        number_of_threads = number_of_threads or (cpu_count() + 1)

        def _inner():
            with ThreadPoolExecutor(number_of_threads) as tpe:
                tasks = [tpe.submit(fn, item) for item in self._items]
                for future in as_completed(tasks):
                    try:
                        yield future.result()
                    except Exception as exception:
                        yield exception

        return Slinkie(_inner())
项目:azure-cli    作者:Azure    | 项目源码 | 文件源码
def upload_template_files(*args):

    print('\n== UPLOAD ARM TEMPLATES ==')

    parser = argparse.ArgumentParser(description='Upload ARM Templates')
    parser.add_argument('--name', metavar='NAME', required=True, help='Name of the thing being uploaded (in CamelCase)')
    parser.add_argument('--src', metavar='PATH', required=True, help='Path to the directory containing ARM templates to upload. Subdirectories will automatically be crawled.')
    parser.add_argument('--api-version', metavar='VERSION', required=True, help='API version for the templates being uploaded in yyyy-MM-dd format. (ex: 2016-07-01)')
    args = parser.parse_args(args)

    name = args.name
    api_version = args.api_version
    src = args.src

    _upload_templates(name, api_version, src)

    from concurrent.futures import ThreadPoolExecutor, as_completed
    with ThreadPoolExecutor(max_workers=40) as executor:
        tasks = [executor.submit(lambda cmd: os.system(cmd), u) for u in uploads]
        for t in as_completed(tasks):
            t.result() # don't use the result but expose exceptions from the threads
项目:azure-cli    作者:Azure    | 项目源码 | 文件源码
def cli_build(args):
    assert check_output(['docker', 'ps']), "Docker required."
    build_types = args.build_types
    git_url = args.git_clone_url
    git_branch = args.git_clone_branch
    cli_version = args.cli_version
    artifact_dir = tempfile.mkdtemp(prefix='cli-build-{}-'.format(datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')), dir=os.getcwd())
    if len(build_types) == 1 and build_types[0] == '*':
        build_types = BUILD_TYPES
    print_heading('Building for {} from branch {} of {} '
                  'and version number will be {}\n'
                  'Build artifacts will be in {}'.format(', '.join(build_types), git_branch, git_url, cli_version, artifact_dir))
    from concurrent.futures import ThreadPoolExecutor, as_completed
    with ThreadPoolExecutor(max_workers=len(build_types)) as executor:
        tasks = {executor.submit(build_dispatch, bt, git_url, git_branch, cli_version, artifact_dir, arg_ns=args) for bt in build_types}
        for t in as_completed(tasks):
            t.result()
    print('Done.')
项目:falsy    作者:pingf    | 项目源码 | 文件源码
def run(self, data, max=4):
        results = []
        with futures.ThreadPoolExecutor(max_workers=max) as executor:
            future_to_url = {}
            for i, payload in enumerate(data):
                payload['chrome_id'] = i
                future_to_url[executor.submit(self.run1, payload)] = payload
                # future_to_url[executor.submit(self.run1_core, payload, browser, begin_time)] = payload
            for future in futures.as_completed(future_to_url):
                url = future_to_url[future]
                try:
                    data = future.result()
                except Exception as exc:
                    print('%r generated an exception: %s' % (url, exc))
                else:
                    data['chrome_id'] = url['chrome_id']
                    results.append(data)

        sorted_results = sorted(results, key=lambda tup: tup['chrome_id'])
        return sorted_results
项目:fmrif_tools    作者:nih-fmrif    | 项目源码 | 文件源码
def map_to_bids(self, bids_map, bids_dir, dicom_dir, biopac_dir, nthreads, overwrite):

        # Parse bids_map csv table, and create execution list for BIDS generation
        mapping = pd.read_csv(bids_map, header=0, index_col=None)
        mapping.replace(np.nan, '', regex=True, inplace=True)

        with ThreadPoolExecutor(max_workers=nthreads) as executor:

            futures = []

            for _, row in mapping.iterrows():
                futures.append(executor.submit(self._process_map_row, row, bids_dir, dicom_dir, self.conversion_tool,
                                               biopac_dir, overwrite))

            success = True

            for future in as_completed(futures):

                if not future.result():
                    success = False
                    break

            if not success:
                self.log.error("There were errors converting the provided datasets to BIDS format. See log for more" 
                               " information.")
项目:ultrachronic    作者:yoavram    | 项目源码 | 文件源码
def repeat(f, reps, cpus, **kwargs):
    if reps == 1:
        f(**kwargs)
        return
    fname = f.__name__
    print("Starting {} {} times with:".format(fname, reps))
    print(kwargs)
    if cpus == 1:
        for _ in range(reps):
            try:
                f(**kwargs)
            except Exception as e:
                warnings.warn(str(e))
    else:
        from multiprocessing import cpu_count
        from concurrent.futures import ProcessPoolExecutor, as_completed
        if cpus < 1:
            cpus = cpu_count()
        with ProcessPoolExecutor(cpus) as executor:
            futures = [executor.submit(f, **kwargs) for _ in range(reps)]
        for fut in as_completed(futures):
            if fut.exception():
                warnings.warn(str(fut.exception()))
    print("Finished")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_zero_timeout(self):
        future1 = self.executor.submit(time.sleep, 2)
        completed_futures = set()
        try:
            for future in futures.as_completed(
                    [CANCELLED_AND_NOTIFIED_FUTURE,
                     EXCEPTION_FUTURE,
                     SUCCESSFUL_FUTURE,
                     future1],
                    timeout=0):
                completed_futures.add(future)
        except futures.TimeoutError:
            pass

        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
                              EXCEPTION_FUTURE,
                              SUCCESSFUL_FUTURE]),
                         completed_futures)
项目:easypy    作者:weka-io    | 项目源码 | 文件源码
def concurrent_find(func, params, **kw):
    timeout = kw.pop("concurrent_timeout", None)
    with async(func, list(params), **kw) as futures:
        future = None
        try:
            for future in futures.as_completed(timeout=timeout):
                if not future.exception() and future.result():
                    futures.kill()
                    return future.result()
            else:
                if future:
                    return future.result()
        except FutureTimeoutError as exc:
            if not timeout:
                # ??
                raise
            futures.kill()
            _logger.warning("Concurrent future timed out (%s)", exc)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_zero_timeout(self):
        future1 = self.executor.submit(time.sleep, 2)
        completed_futures = set()
        try:
            for future in futures.as_completed(
                    [CANCELLED_AND_NOTIFIED_FUTURE,
                     EXCEPTION_FUTURE,
                     SUCCESSFUL_FUTURE,
                     future1],
                    timeout=0):
                completed_futures.add(future)
        except futures.TimeoutError:
            pass

        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
                              EXCEPTION_FUTURE,
                              SUCCESSFUL_FUTURE]),
                         completed_futures)
项目:yQuant    作者:yoonbae81    | 项目源码 | 文件源码
def main(keys):
    t0 = time.time()

    executor = futures.ThreadPoolExecutor(max_workers=len(keys))

    to_do = []
    for key in keys:
        config = copy.deepcopy(krx.load_config(key))
        future = executor.submit(download, key, config)
        to_do.append(future)

    done = 0
    total_records = 0
    for future in futures.as_completed(to_do):
        done += 1
        key, records = future.result()
        total_records += records
        print(f'[{done:,d}/{len(keys):,d}] {key} records fetched {records:>8,} records', file=sys.stderr)

    elapsed = time.time() - t0
    print(f'{total_records:,} Records fetched in {elapsed:.2f}s', file=sys.stderr)
项目:yQuant    作者:yoonbae81    | 项目源码 | 文件源码
def main(keys):
    t0 = time.time()

    ARGS = dict(zip(['command', 'date'], sys.argv))
    date = ARGS.get('date', f'{datetime.date.today():%Y%m%d}')

    executor = futures.ThreadPoolExecutor(max_workers=len(keys))

    to_do = []
    for key in keys:
        config = copy.deepcopy(krx.load_config(key))
        config['contents']['data']['schdate'] = date
        future = executor.submit(download, key, config)
        to_do.append(future)

    done = 0
    total_records = 0
    for future in futures.as_completed(to_do):
        done += 1
        key, records = future.result()
        total_records += records
        print(f'[{done:,d}/{len(keys):,d}] {key} records fetched {records:>8,} records', file=sys.stderr)

    elapsed = time.time() - t0
    print(f'{total_records:,} Records fetched in {elapsed:.2f}s', file=sys.stderr)
项目:llama    作者:dropbox    | 项目源码 | 文件源码
def collect(self, count, dst_port=util.DEFAULT_DST_PORT,
                timeout=util.DEFAULT_TIMEOUT):
        """Collects latency against a set of hosts.

        Args:
            count: (int) number of datagrams to send each host
            timeout: (float) seconds to wait for probes to return
        """
        jobs = []
        with futures.ThreadPoolExecutor(max_workers=50) as executor:
            for host in self.metrics.keys():
                logging.info('Assigning target host: %s', host)
                jobs.append(executor.submit(self.method, host,
                                            count=count,
                                            port=dst_port,
                                            timeout=timeout,
                                           ))
        for job in futures.as_completed(jobs):
            loss, rtt, host = job.result()
            self.metrics[host].loss = loss
            self.metrics[host].rtt = rtt
            logging.info('Summary {:16}:{:>3}% loss, {:>4} ms rtt'.format(
                host, loss, rtt))