Python concurrent.futures 模块,ThreadPoolExecutor() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用concurrent.futures.ThreadPoolExecutor()

项目:bitrader    作者:jr-minnaar    | 项目源码 | 文件源码
def __init__(self, key, secret, options={}):
        self.options = options
        self.auth = (key, secret)
        if 'hostname' in options:
            self.hostname = options['hostname']
        else:
            self.hostname = 'api.mybitx.com'
        self.port = options['port'] if 'port' in options else 443
        self.pair = options['pair'] if 'pair' in options else 'XBTZAR'
        self.ca = options['ca'] if 'ca' in options else None
        self.timeout = options['timeout'] if 'timeout' in options else 30
        self.headers = {
            'Accept': 'application/json',
            'Accept-Charset': 'utf-8',
            'User-Agent': 'py-bitx v' + __version__
        }
        self._executor = ThreadPoolExecutor(max_workers=5)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def run_in_executor(f):
    """
    A decorator to run the given method in the ThreadPoolExecutor.
    """

    @wraps(f)
    def new_f(self, *args, **kwargs):

        if self.is_shutdown:
            return
        try:
            future = self.executor.submit(f, self, *args, **kwargs)
            future.add_done_callback(_future_completed)
        except Exception:
            log.exception("Failed to submit task to executor")

    return new_f
项目:sauna    作者:NicolasLM    | 项目源码 | 文件源码
def __init__(self, config=None):
        if config is None:
            config = {}
        self.config = config
        self.must_stop = threading.Event()
        self._consumers_queues = []
        if self.config.get("concurrency", 1) > 1:
            self._thread_pool = ThreadPoolExecutor(
                max_workers=self.config.get("concurrency")
            )
        else:
            self._thread_pool = None
        self.import_submodules(__name__ + '.plugins.ext')
        self.import_submodules(__name__ + '.consumers.ext')
        for extra_plugin_path in self.config.get('extra_plugins', []):
            self.import_directory_modules(extra_plugin_path)
        self._current_checks = []
        self._current_checks_lock = threading.Lock()
项目:girlfriend    作者:chihongze    | 项目源码 | 文件源码
def __init__(
            self, name, sub_jobs, pool=None,
            pool_type=ThreadPoolExecutor, join=None,
            error_action="stop", error_handler=None, error_default_value=None,
            goto=None):
        """
        :param name ??????
        :param sub_jobs ?????
        :param pool ????????None??????pool_type??????pool
        :param pool_type ??????pool?None???????????pool
        :param join ????????????context????????????????
        :param error_action ??????????stop????????????
                            ???continue??????????????
        :param error_handler ?????????continue?????????error listener
                             ????????error_handler????
        :param error_default_value ???continue?????????????????
        :param goto ?????????
        """
        if self._error_action != "stop" and self._error_action != "continue":
            raise InvalidArgumentException(u"?????????stop??continue??")
项目:girlfriend    作者:chihongze    | 项目源码 | 文件源码
def __init__(self, name, thread_num, pool=None,
                 pool_type=ThreadPoolExecutor,
                 start_point=None, end_point=None, context_factory=Context,
                 extends_listeners=False, listeners=None, goto=None):
        """
        :param name Fork????
        :param thread_num ??????????
        :param pool ??????????
        :param pool_type ???????pool?????????????????
        :param start_point ????????
        :param end_point ????????
        :param context_factory ?????
        :param extends_listeners ??????????????
        :param listeners ????????????
        :param goto ???join???????????????????????join??
        """
        if self._listeners is None:
            self._listeners = []
项目:son-mano-framework    作者:sonata-nfv    | 项目源码 | 文件源码
def __init__(self, app_id, **kwargs):
        """
        Initialize broker connection.
        :param app_id: string that identifies application

        """
        self.app_id = app_id
        # fetch configuration
        if "url" in kwargs:
            self.rabbitmq_url = kwargs['url']
        else:
            self.rabbitmq_url = os.environ.get("broker_host", RABBITMQ_URL_FALLBACK)
        self.rabbitmq_exchange = os.environ.get("broker_exchange", RABBITMQ_EXCHANGE_FALLBACK)
        self.rabbitmq_exchange_type = "topic"
        # create additional members
        self._connection = None
        # trigger connection setup (without blocking)
        self.setup_connection()

        # Threading workers
        self.thrd_pool = pool.ThreadPoolExecutor(max_workers=100)
        # Track the workers
        self.tasks = []
项目:Learning-Concurrency-in-Python    作者:PacktPublishing    | 项目源码 | 文件源码
def main():

  t1 = timeit.default_timer()
  with ProcessPoolExecutor(max_workers=4) as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

  print("{} Seconds Needed for ProcessPoolExecutor".format(timeit.default_timer() - t1))

  t2 = timeit.default_timer()
  with ThreadPoolExecutor(max_workers=4) as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))
  print("{} Seconds Needed for ThreadPoolExecutor".format(timeit.default_timer() - t2))

  t3 = timeit.default_timer()
  for number in PRIMES:
    isPrime = is_prime(number)
    print("{} is prime: {}".format(number, isPrime))
  print("{} Seconds needed for single threaded execution".format(timeit.default_timer()-t3))
项目:deb-python-fasteners    作者:openstack    | 项目源码 | 文件源码
def test_double_reader_abort(self):
        lock = fasteners.ReaderWriterLock()
        activated = collections.deque()

        def double_bad_reader():
            with lock.read_lock():
                with lock.read_lock():
                    raise RuntimeError("Broken")

        def happy_writer():
            with lock.write_lock():
                activated.append(lock.owner)

        with futures.ThreadPoolExecutor(max_workers=20) as e:
            for i in range(0, 20):
                if i % 2 == 0:
                    e.submit(double_bad_reader)
                else:
                    e.submit(happy_writer)

        self.assertEqual(10, len([a for a in activated if a == 'w']))
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def download_many(cc_list):
    cc_list = cc_list[:5]  # <1>
    with futures.ThreadPoolExecutor(max_workers=3) as executor:  # <2>
        to_do = []
        for cc in sorted(cc_list):  # <3>
            future = executor.submit(download_one, cc)  # <4>
            to_do.append(future)  # <5>
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))  # <6>

        results = []
        for future in futures.as_completed(to_do):  # <7>
            res = future.result()  # <8>
            msg = '{} result: {!r}'
            print(msg.format(future, res)) # <9>
            results.append(res)

    return len(results)
# END FLAGS_THREADPOOL_AS_COMPLETED
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def save_month(year_month, verbose):
    year, month = [int(s) for s in year_month.split('-')]
    total_size = 0
    img_count = 0
    dates = potd.list_days_of_month(year, month)

    with futures.ThreadPoolExecutor(max_workers=100) as executor:
        downloads = dict((executor.submit(potd.save_one, date, verbose), date)
                             for date in dates)

        for future in futures.as_completed(downloads):
            date = downloads[future]
            if future.exception() is not None:
                print('%r generated an exception: %s' % (date,
                                                         future.exception()))
            else:
                img_size = future.result()
                total_size += img_size
                img_count += 1
                print('%r OK: %r' % (date, img_size))

    return img_count, total_size
项目:sbds    作者:steemit    | 项目源码 | 文件源码
def upload_blocks(bucket, chunk_size, max_threads, lines):
    session = botocore.session.get_session()
    client = session.create_client('s3')
    start = time.perf_counter()
    futures = []
    with ThreadPoolExecutor(max_workers=max_threads) as executor:
        # Start the load operations and mark each future with its URL
        for line in lines:
            raw_block, key = load_json_block(line)
            futures.append(executor.submit(client.put_object,Bucket=bucket,
                                 Key=key,
                                 Body=raw_block,
                                 ContentEncoding='UTF-8',
                                 ContentType='application/json'))
        end = time.perf_counter()

    done, pending = concurrent.futures.wait(futures)
    complete = time.perf_counter()
    rate = 1 / ((complete - start) / len(done))
    return len(done), int(rate)
项目:aliyun-log-python-sdk    作者:aliyun    | 项目源码 | 文件源码
def __init__(self, log_client, shard_id, consumer_name, processor, cursor_position, cursor_start_time,
                 max_workers=2):
        self.log_client = log_client
        self.shard_id = shard_id
        self.consumer_name = consumer_name
        self.cursor_position = cursor_position
        self.cursor_start_time = cursor_start_time
        self.processor = processor
        self.checkpoint_tracker = ConsumerCheckpointTracker(self.log_client, self.consumer_name,
                                                            self.shard_id)
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.consumer_status = ConsumerStatus.INITIALIZING
        self.current_task_exist = False
        self.task_future = None
        self.fetch_data_future = None

        self.next_fetch_cursor = ''
        self.shutdown = False
        self.last_fetch_log_group = None

        self.last_log_error_time = 0
        self.last_fetch_time = 0
        self.last_fetch_count = 0

        self.logger = logging.getLogger(__name__)
项目:coscup-line-bot    作者:ncuoolab    | 项目源码 | 文件源码
def __init__(self, bot_type, credentials, sheet_credentials, wit_tokens, db_url='redis://localhost:6379',
                 num_thread=4):
        self.bot_api = api.LineApi(bot_type, credentials)
        self.logger = logging.getLogger('CoscupBot')
        self.task_pool = ThreadPoolExecutor(num_thread)
        self.db_url = db_url
        self.dao = db.Dao(db_url)
        self.dao.del_all_next_command()
        self.dao.del_all_context()
        self.dao.del_all_session()
        self.nlp_message_controllers = self.gen_nlp_message_controllers(wit_tokens)
        self.command_message_controllers = self.gen_command_message_controllers(
            [LanguageCode.zh_tw, LanguageCode.en_us])
        self.sheet_message_controller = modules.SheetMessageController(db_url, sheet_credentials['credential_path'],
                                                                       sheet_credentials['name'], self)
        self.__mq_conn_pool = redis.ConnectionPool.from_url(url=db_url)
        self.edison_queue = utils.RedisQueue('edison', 'queue', connection_pool=self.__mq_conn_pool)
        self.realtime_msg_queue = utils.RedisQueue('realmessage', 'queue', connection_pool=self.__mq_conn_pool)
        self.job_scheduler = BackgroundScheduler()
        self.coscup_api_helper = modules.CoscupInfoHelper(db_url)
        self.start_scheduler()
        self.next_step_dic = {}
        self.take_photo_sec = 6
项目:Chalutier    作者:LaBaleineFr    | 项目源码 | 文件源码
def optimiz(currencies, debug):
    currencies = sorted(currencies)
    if len(currencies) < 2 or len(currencies) > 10:
        return {"error": "2 to 10 currencies"}
    max_workers = 4 if sys.version_info[1] < 5 else None
    executor = ThreadPoolExecutor(max_workers)
    data = dict(future.result() for future in wait([executor.submit(get_ochl, cur) for cur in currencies]).done)
    data = [data[cur] for cur in currencies]
    errors = [x['error'] for x in data if 'error' in x]
    if errors:
        return {"error": "Currencies not found : " + str(errors)}
    weights, m, s, a, b = markowitz_optimization(data, debug)
    if debug:
        import matplotlib as mpl
        mpl.use('Agg')
        import matplotlib.pyplot as plt
        fig, ax = plt.subplots()
        plt.plot(s, m, 'o', markersize=1)
        plt.plot(b, a, 'or')
        fig.savefig("chalu.png")
    result = dict()
    for i, cur in enumerate(currencies):
        result[cur] = weights[i]
    return {"result": result}
项目:esi-routes    作者:ccpgames    | 项目源码 | 文件源码
def main():
    """Generate the jumpmap.json with ESI."""

    try:
        all_systems = retry_get(ESI.Universe.get_universe_systems)
    except NameError:
        raise SystemExit(1)

    num_systems = len(all_systems)
    complete = 0
    systems = {}
    with ThreadPoolExecutor(max_workers=100) as executor:
        for future in executor.map(system_get, all_systems):
            complete += 1
            system, result = future
            systems[system] = result
            print("{}/{} systems complete".format(complete, num_systems))

    with open("jumpmap.json", "w") as openjumpmap:
        openjumpmap.write(json.dumps(systems))
项目:gRPC-Makerboards    作者:PeridotYouClod    | 项目源码 | 文件源码
def serve():
  protoConfig = ProtoConfig.getConfig()
  arduino = protoConfig.arduinos[0]

  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  sensors_pb2.add_ArduinoServicer_to_server(Arduino(arduino), server)
  port = protoConfig.ports.arduinoPort
  server.add_insecure_port('[::]:%s' % port)
  server.start()
  print('Started Arduino Server on Port %s ' % port)

  try:
    while True:
      time.sleep(_ONE_DAY_IN_SECONDS)
  except KeyboardInterrupt:
    server.stop(0)
项目:gRPC-Makerboards    作者:PeridotYouClod    | 项目源码 | 文件源码
def serve():
  protoConfig = ProtoConfig.getConfig()
  sensor_db = Mongo()
  sensor_db.GetClient() # initalize the Db
  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  dao_pb2.add_DaoServicer_to_server(Dao(sensor_db), server)
  port = protoConfig.ports.daoPort
  server.add_insecure_port('[::]:%s' % port)
  server.start()
  print('Started Dao Server on Port %s ' % port)

  try:
    while True:
      time.sleep(_ONE_DAY_IN_SECONDS)
  except KeyboardInterrupt:
    server.stop(0)
项目:gRPC-Makerboards    作者:PeridotYouClod    | 项目源码 | 文件源码
def serve():
  protoConfig = ProtoConfig.getConfig()
  server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
  pushServer = Push(accessToken=protoConfig.wioLinks['havok'].accessToken)
  sensors_pb2.add_PushServicer_to_server(pushServer, server)
  port = protoConfig.ports.pushPort
  server.add_insecure_port('[::]:%s' % port)
  server.start()
  print('Started Push Server on Port %s ' % port)

  websocket.enableTrace(True)
  ws = websocket.WebSocketApp(
    "wss://us.wio.seeed.io/v1/node/event",
    on_message = pushServer.on_message,
    on_error = pushServer.on_error,
    on_close = pushServer.on_close)
  ws.on_open = pushServer.on_open
  ws.run_forever()

  try:
    while True:
      time.sleep(_ONE_DAY_IN_SECONDS)
  except KeyboardInterrupt:
    server.stop(0)
项目:SmallReptileTraining    作者:yanbober    | 项目源码 | 文件源码
def runner(self):
        thread_pool = ThreadPoolExecutor(max_workers=2, thread_name_prefix='DEMO')
        futures = dict()
        for url in self.urls:
            future = thread_pool.submit(self.get_web_content, url)
            futures[future] = url

        for future in concurrent.futures.as_completed(futures):
            url = futures[future]
            try:
                data = future.result()
            except Exception as e:
                print('Run thread url ('+url+') error. '+str(e))
            else:
                print(url+'Request data ok. size='+str(len(data)))
        print('Finished!')
项目:mobike    作者:wangshulingxiao    | 项目源码 | 文件源码
def getloc():
    allloc = []
    u"""??????api???????????
    http://lbs.amap.com/api/webservice/guide/api/search/#text
    """
    with ThreadPoolExecutor(max_workers=5) as executor:
        url = 'http://lbs.amap.com/api/webservice/guide/api/search/#text'
        param = {
            'key': '22d6f93f929728c10ed86258653ae14a',
            'keywords': u'??',
            'city': '027',
            'citylimit': 'true',
            'output': 'json',
            'page': '',
        }
        future_to_url = {executor.submit(load_url, url, merge_dicts(param, {'page': i}), 60): url for i in range(1, 46)}
        for future in futures.as_completed(future_to_url):
            if future.exception() is not None:
                print(future.exception())
            elif future.done():
                data = future.result()['pois']
                allloc.extend([x['location'] for x in data])
        with open('allloc1.pk', 'wb') as f:
            pickle.dump(allloc, f, True)
项目:mobike    作者:wangshulingxiao    | 项目源码 | 文件源码
def mobai(loc):
    allmobai = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        url = 'https://mwx.mobike.com/mobike-api/rent/nearbyBikesInfo.do'
        headers = {
            'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_3_1 like Mac OS X) AppleWebKit/603.1.30 (KHTML, like Gecko) Mobile/14E304 MicroMessenger/6.5.7 NetType/WIFI Language/zh_CN',
            'Content-Type': 'application/x-www-form-urlencoded',
            'Referer': 'https://servicewechat.com/wx80f809371ae33eda/23/page-frame.html',
        }
        data = {
            'longitude': '',
            'latitude': '',
            'citycode': '027',
        }
        future_to_url = {
            executor.submit(load_url, url, merge_dicts(data, {'longitude': i.split(',')[0]}, {'latitude': i.split(',')[1]}), 60, headers): url for i in loc}
        for future in futures.as_completed(future_to_url):
            if future.exception() is not None:
                print(future.exception())
            elif future.done():
                data = future.result()['object']
                allmobai.extend(data)
                # ??mongodb
                result = collection.insert_many(data)
项目:loman    作者:janusassetallocation    | 项目源码 | 文件源码
def test_node_specific_thread_pool_executor():
    sleep_time = 0.2
    n = 10
    def wait(c):
        sleep(sleep_time)
        return c

    executor_map = {'foo': ThreadPoolExecutor(n)}
    comp = Computation(executor_map=executor_map)
    start_dt = datetime.utcnow()
    for c in range(n):
        comp.add_node(c, wait, kwds={'c': C(c)}, executor='foo')
    comp.compute_all()
    end_dt = datetime.utcnow()
    delta = (end_dt - start_dt).total_seconds()
    assert delta < (n-1) * sleep_time
项目:Requester    作者:kylebebak    | 项目源码 | 文件源码
def run(self):
        """Concurrently invoke `get_response` for all of instance's `requests`.
        """
        with futures.ThreadPoolExecutor(
            max_workers=min(self.max_workers, len(self.requests))
        ) as executor:
            to_do = []
            for i, request in enumerate(self.requests):
                future = executor.submit(self.get_response, request, i)
                to_do.append(future)

            for future in futures.as_completed(to_do):
                result = future.result()
                # `responses` and `pending_requests` are instance properties, which means
                # client code can inspect instance to read responses as they are completed
                if result.req.error is not None or result.err == 'skwarg':
                    continue
                try:
                    self.pending_requests.remove(result.req)
                except KeyError:
                    print('{} was not in pending requests, this is weird...'.format(result.req))
                self.responses.append(result)
        self.is_done = True
项目:pynetsim    作者:jjo-sec    | 项目源码 | 文件源码
def main():
    config = core.get_config()
    core.init_logging("pynetsim", log_level=getattr(logging, config.get("main").get("log_level", "debug").upper()))
    log.debug("Starting socket listeners")
    listener_pool = ThreadPoolExecutor(max_workers=2)
    futures = []
    tcp_listener = TCPSocketListener(config)
    udp_listener = UDPSocketListener(config)
    futures.append(listener_pool.submit(tcp_listener.start))
    futures.append(listener_pool.submit(udp_listener.start))
    core.wait()

    log.debug("Stopping socket listeners")
    tcp_listener.shutdown()
    udp_listener.shutdown()
    for future in futures:
        if future.running():
            future.cancel()
    tcp_listener.shutdown()
    udp_listener.shutdown()
    log.debug("Exiting...")
项目:DLink_Harvester    作者:MikimotoH    | 项目源码 | 文件源码
def main():
    os.makedirs(localstor, exist_ok=True)
    with open('tsd_dlink_filelist.csv', 'w') as fout:
        cw = csv.writer(fout)
        cw.writerow(['model', 'rev', 'fw_ver', 'fw_url', 'date', 'fsize', 'sha1', 'md5'])
    global executor
    executor = futures.ThreadPoolExecutor()

    models = parse_models()
    startI = next(i for i,sp in enumerate(models) if sp[0]=='DBT' and sp[1]=='120')
    for model in models[startI:]:
        pfx,sfx = model[0], model[1]
        selectModel(pfx, sfx)

    print('wait for Executor shutdown')
    executor.shutdown(True)
项目:DLink_Harvester    作者:MikimotoH    | 项目源码 | 文件源码
def main():
    try:
        executor = ThreadPoolExecutor()
        os.makedirs(dlDir, exist_ok=True)
        with open('netgear_filelist.csv', 'w') as fout:
            cw = csv.writer(fout)
            cw.writerow(['model', 'fw_ver', 'fileName', 'fw_url', 'fw_date', 'fileSize', 'sha1', 'md5'])
        catIdx = int(sys.argv[1]) if len(sys.argv)>1 else 0
        famIdx = int(sys.argv[2]) if len(sys.argv)>2 else 0
        prdIdx = int(sys.argv[3]) if len(sys.argv)>3 else 0
        while True:
            catIdx, famIdx, prdIdx = main1(catIdx, famIdx, prdIdx, executor)
            if catIdx is None:
                return
            assert famIdx is not None
            assert prdIdx is not None
            print("\n[main] Continue from cat,fam,prd=(%d,%d,%d)\n" %
                  (catIdx, famIdx, prdIdx))
    except BaseException as ex:
        traceback.print_exc()
    finally:
        executor.shutdown(True)
项目:DLink_Harvester    作者:MikimotoH    | 项目源码 | 文件源码
def main():
    global executor
    try:
        session = requests.Session()
        executor = ThreadPoolExecutor()
        os.makedirs(dlDir, exist_ok=True)
        url = 'http://www.zyxel.com/us/en/support/download_landing.shtml'
        with open('zyxel_us_filelist.csv', 'w') as fout:
            cw = csv.writer(fout)
            cw.writerow(['model', 'fver', 'fname', 'furl', 'fdate', 'fsize', 'sha1', 'md5'])
        resp = session.get(url=url)
        root = html.fromstring(resp.text)
        models = get_all_models(root)

        for modelName in sorted(models.keys()):
            kbid = models[modelName]
            resp2 = session.get(url='http://www.zyxel.com/us/en/support/DownloadLandingSR.shtml',
                                params=dict(c="us", l="en", kbid=kbid, md=modelName))
            walkFiles(modelName, session, resp2)
    except BaseException as ex:
        traceback.print_exc()
    finally:
        print('Wait for exeuctor shuddown')
        executor.shutdown(True)
项目:DLink_Harvester    作者:MikimotoH    | 项目源码 | 文件源码
def main():
    global executor
    try:
        session = requests.Session()
        executor = ThreadPoolExecutor()
        os.makedirs(dlDir, exist_ok=True)
        url='http://downloadcenter.netgear.com'
        with open('netgear_filelist.csv', 'w') as fout:
            cw = csv.writer(fout)
            cw.writerow(['model', 'fw_ver', 'fileName', 'fw_url', 'fw_date', 'fileSize', 'sha1', 'md5'])
        response = session.get(url=url)
        root = html.fromstring(response.text)
        href = root.xpath(".//a[@id='ctl00_ctl00_ctl00_mainContent_localizedContent_bodyCenter_BasicSearchPanel_btnAdvancedSearch']/@href")
        href = strip_js(href[0])
        formdata = {"__EVENTTARGET": href}
        resp2 = form_submit(session, root, url,
                            "aspnetForm",
                            formdata,
                            {"Referer": url})
        walkCategories(session, resp2)
    except BaseException as ex:
        traceback.print_exc()
    finally:
        executor.shutdown(True)
项目:DLink_Harvester    作者:MikimotoH    | 项目源码 | 文件源码
def main():
    global executor
    try:
        session = requests.Session()
        executor = ThreadPoolExecutor()
        os.makedirs(dlDir, exist_ok=True)
        with open('tenda_filelist.csv', 'w') as fout:
            cw = csv.writer(fout)
            cw.writerow(['model', 'fver', 'fname', 'furl', 'fdate', 'fsize', 'sha1', 'md5'])
        walkFiles('http://www.tendacn.com/en/service/download-cata-11.html')
        walkFiles('http://tendacn.com/en/service/download-cata-11-2.html')
        walkFiles('http://www.tendacn.com/en/service/download-cata-11-3.html')
    except BaseException as ex:
        traceback.print_exc()
    finally:
        print('Wait for exeuctor shuddown')
        executor.shutdown(True)
项目:DLink_Harvester    作者:MikimotoH    | 项目源码 | 文件源码
def main():
    global executor
    executor=ThreadPoolExecutor()

    os.makedirs(localstor, exist_ok=True)

    with open('us_dlink_filelist.csv', 'w') as fout:
        cw = csv.writer(fout)
        cw.writerow(['model', 'rev', 'fw_ver', 'fw_url', 'fsize', 'fdate', 'sha1', 'md5'])

    start_url="http://support.dlink.com/AllPro.aspx?type=all"
    d = pq(url=start_url)
    # all 442 models
    models = [_.text_content().strip() for _ in d('tr > td:nth-child(1) > .aRedirect')]

    for model in models:
        prod_url = "http://support.dlink.com/ProductInfo.aspx?m=%s"%parse.quote(model)
        crawl_prod(prod_url, model)
    executor.shutdown(True)
项目:DLink_Harvester    作者:MikimotoH    | 项目源码 | 文件源码
def main():
    global executor
    try:
        session = requests.Session()
        executor = ThreadPoolExecutor()
        os.makedirs(dlDir, exist_ok=True)
        url = 'http://support.netgear.cn/'
        with open('netgear_cn_filelist.csv', 'w') as fout:
            cw = csv.writer(fout)
            cw.writerow(['model', 'fver', 'fname', 'furl', 'fdate', 'fsize', 'sha1', 'md5'])
        resp = session.get(url=url)
        root = html.fromstring(resp.text)
        startProd = 1
        prods = root.xpath(".//select[@name='select']/option")
        for iProd, prod in enumerate(prods[startProd:], startProd):
            # prodText = prod.xpath("./text()")[0].strip()
            prodUrl = prod.xpath("./@value")[0].strip()
            walkProd(session, urljoin(resp.url, prodUrl))
    except BaseException as ex:
        traceback.print_exc()
    finally:
        print('Wait for exeuctor shuddown')
        executor.shutdown(True)
项目:DLink_Harvester    作者:MikimotoH    | 项目源码 | 文件源码
def main():
    os.makedirs(localstor, exist_ok=True)
    with open('tsd_dlink_filelist.csv', 'w') as fout:
        cw = csv.writer(fout)
        cw.writerow(['model', 'rev', 'fw_ver', 'fw_url', 'date', 'fsize', 'sha1', 'md5'])
    global executor
    executor = futures.ThreadPoolExecutor(None)

    models = parse_models()
    startI = 0  # next(i for i,sp in enumerate(models) if sp[0]=='DIR' and sp[1]=='845L')
    for model in models[startI:]:
        pfx,sfx = model[0], model[1]
        selectModel(pfx, sfx)

    print('wait for Executor shutdown')
    executor.shutdown(True)
项目:DLink_Harvester    作者:MikimotoH    | 项目源码 | 文件源码
def main():
    global executor
    try:
        sess = requests.Session()
        executor = ThreadPoolExecutor()
        os.makedirs(dlDir, exist_ok=True)
        with open('tenda_us_filelist.csv', 'w') as fout:
            cw = csv.writer(fout)
            cw.writerow(['model', 'fver', 'fname', 'furl', 'fdate', 'fsize', 'sha1', 'md5'])
        walkSelects()
        # walkModels(sess, 'http://www.tendaus.com/Default.aspx?Module=WebsiteEN&Action=DownloadCenter')
        # for Id in range(1, 200):
        #     walkTables(sess, "http://www.tendaus.com/Default.aspx?Module=WebsiteEN&Action=DownloadCenter&Id=%(Id)s"%locals())
    except BaseException as ex:
        traceback.print_exc()
    finally:
        print('Wait for exeuctor shutdown')
        executor.shutdown(True)
项目:DLink_Harvester    作者:MikimotoH    | 项目源码 | 文件源码
def main():
    with open('ca_dlink_filelist.csv', 'w') as fout:
        cw = csv.writer(fout)
        cw.writerow(['model', 'rev', 'fw_ver', 'fw_url', 'date', 'fsize', 'sha1', 'md5'])
    global executor
    executor = futures.ThreadPoolExecutor()

    d = pq(url='http://support.dlink.ca/AllPro.aspx?type=all')
    # all 442 models
    models = [_.text_content().strip() for _ in d('tr > td:nth-child(1) > .aRedirect')]

    for model in models:
        prod_url = 'http://support.dlink.ca/ProductInfo.aspx?m=%s'%parse.quote(model)
        crawl_prod(prod_url, model)
    print('wait for Executor shutdown')
    executor.shutdown(True)
项目:FCR    作者:facebookincubator    | 项目源码 | 文件源码
def __init__(self, app_name, args=None, loop=None):
        self._app_name = app_name
        self._shutting_down = False
        self._stats_mgr = None

        Option.parse_args(args)

        self._loop = loop or asyncio.get_event_loop()
        self._loop.set_debug(self.ASYNCIO_DEBUG)

        executor = ThreadPoolExecutor(max_workers=self.MAX_DEFAULT_EXECUTOR_THREADS)
        self._loop.set_default_executor(executor)

        self._init_logging()

        self._loop.add_signal_handler(signal.SIGINT, self.shutdown)
        self._loop.add_signal_handler(signal.SIGTERM, self.shutdown)

        self.logger = logging.getLogger(self._app_name)
项目:sqlalchemy_aio    作者:RazerM    | 项目源码 | 文件源码
def __init__(self, pool, dialect, url, logging_name=None, echo=None,
                 execution_options=None, loop=None, **kwargs):

        self._engine = Engine(
            pool, dialect, url, logging_name=logging_name, echo=echo,
            execution_options=execution_options, **kwargs)

        self._loop = loop

        max_workers = None

        # https://www.python.org/dev/peps/pep-0249/#threadsafety
        if dialect.dbapi.threadsafety < 2:
            # This might seem overly-restrictive, but when we instantiate an
            # AsyncioResultProxy from AsyncioEngine.execute, subsequent
            # fetchone calls could be in different threads. Let's limit to one.
            max_workers = 1

        self._engine_executor = ThreadPoolExecutor(max_workers=max_workers)
项目:multitaskLauncher    作者:1a1a11a    | 项目源码 | 文件源码
def get_node_health_mt(nodes_dict, check_type="normal", n_threads=8, print_out=False):
    """use multithreading to check each node health

    Arguments:
        nodes_dict {dict} -- [nodesIP(domainName)->(username, mem, CPU)]

    Keyword Arguments:
        check_type {str} -- [description] (default: {"normal"})
        n_threads {number} -- [description] (default: {8})
    """ 

    with ThreadPoolExecutor(max_workers=n_threads) as executor:
        futures = {executor.submit(check_node_health, nodeinfo[0], node, check_type, print_out): node 
                            for node, nodeinfo in nodes_dict.items()} 
        for future in as_completed(futures):
            node = futures[future]
            nodeinfo = nodes_dict[node]
            result = future.result()
            nodes_dict[node] = (nodeinfo[0], result)

            # print("{} {}".format(node, nodes_dict[node]))
项目:loopchain    作者:theloopkr    | 项目源码 | 文件源码
def __init__(self, gRPC_module, inner_service_port=None):
        self.__peer_id = None if ObjectManager().peer_service is None else ObjectManager().peer_service.peer_id

        # for peer_service, it refers to peer_inner_service / for rs_service, it refers to rs_admin_service
        self.inner_server = grpc.server(futures.ThreadPoolExecutor(max_workers=conf.MAX_WORKERS))
        self.outer_server = grpc.server(futures.ThreadPoolExecutor(max_workers=conf.MAX_WORKERS))

        # members for private, It helps simplicity of code intelligence
        self.__gRPC_module = gRPC_module
        self.__port = 0
        self.__inner_service_port = inner_service_port
        self.__peer_target = None
        if inner_service_port is not None:  # It means this is Peer's CommonService not RS.
            peer_port = inner_service_port - conf.PORT_DIFF_INNER_SERVICE
            self.__peer_target = util.get_private_ip() + ":" + str(peer_port)
        self.__subscriptions = queue.Queue()  # tuple with (channel, stub)
        self.__group_id = ""

        # broadcast process
        self.__broadcast_process = self.__run_broadcast_process()

        self.__loop_functions = []
项目:swarmci    作者:ghostsquad    | 项目源码 | 文件源码
def describe_threaded_runner():

    @pytest.fixture(scope='module')
    def runner_fixture():
        return ThreadedRunner(thread_pool_executor=ThreadPoolExecutor(max_workers=2))

    def describe_run_all_threaded_behavior():
        def given_many_tasks():
            def when_first_task_fails():
                def expect_later_tasks_still_run(runner_fixture):
                    task1_mock, task2_mock = create_task_mock(count=2)
                    task1_mock.successful = False

                    with pytest.raises(TaskFailedError):
                        runner_fixture.run_all([task1_mock, task2_mock])

                    task1_mock.execute.assert_called_once()
                    task2_mock.execute.assert_called_once()
项目:swarmci    作者:ghostsquad    | 项目源码 | 文件源码
def build_tasks_hierarchy(swarmci_config, task_factory):
    stages_from_yaml = swarmci_config.pop('stages', None)
    if stages_from_yaml is None:
        raise SwarmCIError('Did not find "stages" key in the .swarmci file.')
    elif type(stages_from_yaml) is not list:
        raise SwarmCIError('The value of the "stages" key should be a list in the .swarmci file.')

    thread_pool_executor = ThreadPoolExecutor(max_workers=25)

    stage_tasks = []
    for stage in stages_from_yaml:
        job_tasks = []
        for job in stage['jobs']:
            commands = []
            for cmd in job['commands']:
                commands.append(task_factory.create(TaskType.COMMAND, cmd=cmd))

            job_tasks.append(task_factory.create(TaskType.JOB, job=job, commands=commands))

        stage_tasks.append(
            task_factory.create(TaskType.STAGE, stage=stage, jobs=job_tasks, thread_pool_executor=thread_pool_executor))

    return task_factory.create(TaskType.BUILD, stages=stage_tasks)
项目:almond-nnparser    作者:Stanford-Mobisocial-IoT-Lab    | 项目源码 | 文件源码
def run():
    np.random.seed(42)
    config = ServerConfig.load(('./server.conf',))

    if sys.version_info[2] >= 6:
        thread_pool = ThreadPoolExecutor(thread_name_prefix='query-thread-')
    else:
        thread_pool = ThreadPoolExecutor(max_workers=32)
    app = Application(config, thread_pool)

    if config.ssl_key:
        ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
        ssl_ctx.load_cert_chain(config.ssl_chain, config.ssl_key)
        app.listen(config.port, ssl_options=ssl_ctx)
    else:
        app.listen(config.port)

    if config.user:
        os.setgid(grp.getgrnam(config.user)[2])
        os.setuid(pwd.getpwnam(config.user)[2])

    if sd:
        sd.notify('READY=1')

    tokenizer_service = TokenizerService()
    tokenizer_service.run()

    for language in config.languages:
        load_language(app, tokenizer_service, language, config.get_model_directory(language))

    sys.stdout.flush()
    tornado.ioloop.IOLoop.current().start()
项目:PowerSpikeGG    作者:PowerSpikeGG    | 项目源码 | 文件源码
def start_server(riot_api_token, listening_port, max_workers):
    """Starts a server."""
    service = MatchFetcher(riot_api_token)

    server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))
    service_pb2.add_MatchFetcherServicer_to_server(service, server)
    server.add_insecure_port('[::]:%s' % listening_port)
    server.start()

    return server, service
项目:python-libjuju    作者:juju    | 项目源码 | 文件源码
def test_explicit_loop_threaded(event_loop):
    async with base.CleanModel() as model:
        model_name = model.info.name
        new_loop = asyncio.new_event_loop()
        with ThreadPoolExecutor(1) as executor:
            f = executor.submit(
                new_loop.run_until_complete,
                _deploy_in_loop(new_loop, model_name, model._connector.jujudata))
            f.result()
        await model._wait_for_new('application', 'ubuntu')
        assert 'ubuntu' in model.applications
项目:wen_spiders    作者:HughWen    | 项目源码 | 文件源码
def multi_thread(self, begin_id):
        self.make_id_set(begin_id)
        coll = MONGO_CLIENT['kr2']['kr_flashes_multi']
        for i in range(20):
            t = threading.Thread(target=self.loop_parse_news_flashes, name='thread%s' % i, args=[coll])
            t.start()


            # pool = ThreadPoolExecutor(64)
            # for i in range(16):
            #     pool.submit(parse_news_flashes)
项目:public-dns    作者:ssut    | 项目源码 | 文件源码
def main():
    sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
    from publicdns.client import PublicDNS

    domains = []
    filename = os.path.join(os.path.dirname(__file__), 'google_domains.txt')
    with open(filename, 'r') as f:
        domains = f.read().split('\n')
    size = len(domains)

    tqdmargs = {
        'total': 100,
        'unit': 'it',
        'unit_scale': True,
        'leave': True,
    }

    with ThreadPoolExecutor(max_workers=4) as pool:
        print('- dns.resolver')
        started = timeit.default_timer()
        resolver = dns_resolver.Resolver()
        resolver.nameservers = ['8.8.8.8', '8.8.4.4']
        futures = [pool.submit(resolver.query, domains[i % size], 'A')
                   for i in range(100)]
        for _ in tqdm(as_completed(futures), **tqdmargs):
            pass
        elapsed = timeit.default_timer() - started
        print('dns.resolver * 100 - took {}s'.format(elapsed))

    with ThreadPoolExecutor(max_workers=4) as pool:
        print('- PublicDNS')
        started = timeit.default_timer()
        client = PublicDNS()
        futures = [pool.submit(client.query, domains[i % size], 'A')
                   for i in range(100)]
        for _ in tqdm(as_completed(futures), **tqdmargs):
            pass
        elapsed = timeit.default_timer() - started
        print('\nPublicDNS * 100 - took {}s'.format(elapsed))
项目:bitcoin-arbitrage    作者:ucfyao    | 项目源码 | 文件源码
def __init__(self):
        self.markets = []
        self.observers = []
        self.depths = {}
        self.init_markets(config.markets)
        self.init_observers(config.observers)
        self.threadpool = ThreadPoolExecutor(max_workers=10)
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def __init__(self,
                 neo4j_client: Neo4jClient,
                 max_workers: int = None):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        super().__init__(neo4j_client)
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def test_wait_for_all():
    def f(sleep_time: int):
        sleep(sleep_time)
        return sleep_time

    def calc(fs):
        fs_done = wait(fs).done
        r = sum(r.result() for r in fs_done)
        return r

    pool = ThreadPoolExecutor()
    fs = [pool.submit(f, arg) for arg in (3, 2, 5)]
    result = pool.submit(calc, fs).result()
    assert result == 10
项目:QUANTAXIS    作者:yutiansut    | 项目源码 | 文件源码
def __init__(self, thread_num=2, *args, **kwargs):
        self.thread_num = thread_num
        self._queue = queue.Queue(maxsize=200)
        self.api_no_connection = TdxHq_API()
        self._api_worker = Thread(
            target=self.api_worker, args=(), name='API Worker')
        self._api_worker.start()

        self.executor = ThreadPoolExecutor(self.thread_num)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def __init__(self):
        self.metadata = MockMetadata()
        self.added_hosts = []
        self.removed_hosts = []
        self.scheduler = Mock(spec=_Scheduler)
        self.executor = Mock(spec=ThreadPoolExecutor)
        self.profile_manager.profiles[EXEC_PROFILE_DEFAULT] = ExecutionProfile(RoundRobinPolicy())