Python concurrent.futures 模块,ProcessPoolExecutor() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用concurrent.futures.ProcessPoolExecutor()

项目:jack    作者:uclmr    | 项目源码 | 文件源码
def convert_dataset(path, filemap, name, num_processes, max_num_support, max_tokens, is_web=True):
    with open(path, 'rb') as f:
        dataset = pickle.load(f)

    if num_processes == 1:
        instances = process((dataset, filemap, max_num_support, max_tokens, is_web), True)
    else:
        chunk_size = 1000
        executor = ProcessPoolExecutor(num_processes)
        instances = []
        i = 0
        for processed in executor.map(
                process, [(dataset[i * chunk_size:(i + 1) * chunk_size], filemap, max_num_support, max_tokens, is_web)
                          for i in range(len(dataset) // chunk_size + 1)]):
            instances.extend(processed)
            i += chunk_size
            print("%d/%d done" % (min(len(dataset), i), len(dataset)))

    return {"meta": {"source": name}, 'instances': instances}
项目:django-gateone    作者:jimmy201602    | 项目源码 | 文件源码
def run(self):
        cls = MultiprocessRunner
        started = False
        if self not in cls.running_instances:
            cls.running_instances.add(self)
        if not cls.executor_instance:
            self.executor = futures.ProcessPoolExecutor(
                max_workers=self.max_workers)
            cls.executor_instance = self
            started = True
        elif not cls.executor_instance.running:
            cls.executor_instance.executor = futures.ProcessPoolExecutor(
                max_workers=self.max_workers)
            started = True
        self.executor = cls.executor_instance.executor
        if started:
            workers = self.executor._max_workers # Derived from cpu_count()
            logging.debug(
                _("Starting the MultiprocessRunner executor with %s worker "
                "processes.") % workers)
项目:Learning-Concurrency-in-Python    作者:PacktPublishing    | 项目源码 | 文件源码
def main():

  t1 = timeit.default_timer()
  with ProcessPoolExecutor(max_workers=4) as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

  print("{} Seconds Needed for ProcessPoolExecutor".format(timeit.default_timer() - t1))

  t2 = timeit.default_timer()
  with ThreadPoolExecutor(max_workers=4) as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))
  print("{} Seconds Needed for ThreadPoolExecutor".format(timeit.default_timer() - t2))

  t3 = timeit.default_timer()
  for number in PRIMES:
    isPrime = is_prime(number)
    print("{} is prime: {}".format(number, isPrime))
  print("{} Seconds needed for single threaded execution".format(timeit.default_timer()-t3))
项目:demos    作者:dfirence    | 项目源码 | 文件源码
def run_concurrently( queue ):
    start = time.time()
    cpus  = mp.cpu_count()
    qsize = queue.qsize()
    procs = []
    with ProcessPoolExecutor( cpus ) as executor:
        for n in xrange( qsize ):
            proc = mp.Process( target=run_plugin, args=( queue.get(),) )
            procs.append( proc )
            proc.start()
            time.sleep( 0.05 )
        for proc in procs:
            proc.join()
            time.sleep( 0.05 )
    #end = '[+] Ends  {:30} {}: {:.2f}s'.format( 'Concurrency of', qsize, 'tasks',time.time() - start)
    t = '{:.2f}s'.format( time.time() - start )
    end = '[+] Ends  [ {} ] Concurrent Tasks'.format( qsize )

    print ('\033[1;32;40m' + '{:35}--> {}{}'.format(end, t, '\n'))
    print '{}{}'.format( '-' * 48, '\n' )
    #print '{}{}{}{}'.format( end, '\n', '-' * 48, '\n' )
    return
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def main(workers=None):
    if workers:
        workers = int(workers)
    t0 = time.time()

    with futures.ProcessPoolExecutor(workers) as executor:
        actual_workers = executor._max_workers
        to_do = []
        for i in range(JOBS, 0, -1):
            size = SIZE + int(SIZE / JOBS * (i - JOBS/2))
            job = executor.submit(arcfour_test, size, KEY)
            to_do.append(job)

        for future in futures.as_completed(to_do):
            res = future.result()
            print('{:.1f} KB'.format(res/2**10))

    print(STATUS.format(actual_workers, time.time() - t0))
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def save_month(year_month, verbose):
    year, month = [int(s) for s in year_month.split('-')]
    total_size = 0
    img_count = 0
    dates = potd.list_days_of_month(year, month)

    with futures.ProcessPoolExecutor(max_workers=100) as executor:
        downloads = dict((executor.submit(potd.save_one, date, verbose), date)
                             for date in dates)

        for future in futures.as_completed(downloads):
            date = downloads[future]
            if future.exception() is not None:
                print('%r generated an exception: %s' % (date,
                                                         future.exception()))
            else:
                img_size = future.result()
                total_size += img_size
                img_count += 1
                print('%r OK: %r' % (date, img_size))

    return img_count, total_size
项目:SmallReptileTraining    作者:yanbober    | 项目源码 | 文件源码
def runner(self):
        process_pool = ProcessPoolExecutor(max_workers=4)
        futures = dict()
        for url in self.urls:
            future = process_pool.submit(self.get_web_content, url)
            futures[future] = url

        for future in concurrent.futures.as_completed(futures):
            url = futures[future]
            try:
                data = future.result()
            except Exception as e:
                print('Run process url ('+url+') error. '+str(e))
            else:
                print(url+'Request data ok. size='+str(len(data)))
        print('Finished!')
项目:Y8M    作者:mpekalski    | 项目源码 | 文件源码
def combine_pred_csv(fn1, fn2, fn_out='/tmp/combo.csv', wgts=None):
    """ linear add the probabilities from two prediction.csv files.
    inputs:
        fn1, fn2: files to be combined.
        fn_out: output file name
        wgts: a list of two values, for example, [0.5, 0.5]
    output:
        no return values
    """

    executor = futures.ProcessPoolExecutor(max_workers=2)

    t1 = datetime.now()
    print('start combination at ', t1)

    preds1, preds2 = executor.map(file_2_series, (fn1, fn2))       

    t2 = datetime.now()
    print('files read by', t2)

    return combine_preds_2_csv(preds1, preds2, fn_out, wgts)
项目:indy-node    作者:hyperledger    | 项目源码 | 文件源码
def hard_blow():
    # Note: This method might perform more tasks then `num_tasks`,
    # if the values of `num_tasks`, `num_threads` are chosen such that they
    # are not multiples of `num_procs`

    # TODO: WIP

    num_tasks = 10000
    num_procs = 4
    threads_per_proc = 10

    tasks_per_proc = int(math.ceil(num_tasks / num_procs))

    futrs = []
    with ProcessPoolExecutor(max_workers=num_procs) as pe:
        for _ in range(num_procs):
            fut = pe.submit(_task_for_proc, (threads_per_proc, tasks_per_proc))
            futrs.append(fut)
        print('Waiting for futures: main')
        concurrent.futures.wait(futrs)
项目:aesthetics    作者:shubhamchaudhary    | 项目源码 | 文件源码
def get_fisher_vectors_from_folder(self, folder, limit):
        """
        :param str folder: Folder Name
        :param int limit: Number of images to read from each folder
        :return: fisher vectors for images in given folder
        :rtype: np.array
        """
        files = glob.glob(folder + "/*.jpg")[:limit]

        with ProcessPoolExecutor() as pool:
            futures = pool.map(self._worker, files)
            desc = 'Creating Fisher Vectors {} images of folder {}'.format(len(files), os.path.split(folder)[-1])
            futures = tqdm.tqdm(futures, total=len(files), desc=desc, unit='image', ncols=120)
            vectors = [f for f in futures if f is not None and len(f) > 0]
            max_shape = np.array([v.shape[0] for v in vectors]).max()
            vectors = [v for v in vectors if v.shape[0] == max_shape]
        # return np.array(vectors)    # Can't do np.float32, because all images may not have same number of features
        return np.float32(vectors)
项目:aesthetics    作者:shubhamchaudhary    | 项目源码 | 文件源码
def folder(self, folder, limit):
        """
        :param folder: Name of the folder containing images
        :type folder: str
        :param limit: Number of images to be read from given folder
        :type limit: int
        :return: List of descriptors of the given images
        :rtype: np.array
        """
        files = glob.glob(folder + "/*.jpg")[:limit]
        with ProcessPoolExecutor() as executor:
            futures = executor.map(self.image_file, files)
            futures = tqdm.tqdm(futures, total=len(files), desc='Calculating descriptors')
            descriptors = [f for f in futures]
            # descriptors = [self.image_file(file) for file in files]
        descriptors = list(filter(lambda x: x is not None, descriptors))
        return np.concatenate(descriptors)
项目:GulpIO    作者:TwentyBN    | 项目源码 | 文件源码
def __call__(self):
        os.makedirs(self.output_folder, exist_ok=True)
        chunk_slices = calculate_chunk_slices(self.videos_per_chunk,
                                              len(self.adapter))
        gulp_directory = GulpDirectory(self.output_folder)
        new_chunks = gulp_directory.new_chunks(len(chunk_slices))
        chunk_writer = ChunkWriter(self.adapter)
        with ProcessPoolExecutor(max_workers=self.num_workers) as executor:
            result = executor.map(chunk_writer.write_chunk,
                                  new_chunks,
                                  chunk_slices)
            for r in tqdm(result,
                          desc='Chunks finished',
                          unit='chunk',
                          dynamic_ncols=True,
                          total=len(chunk_slices)):
                pass
项目:PhasorToolBox    作者:sonusz    | 项目源码 | 文件源码
def __init__(
        self,
        device_list=[],
        connection_list=[],
        loop: asyncio.AbstractEventLoop() = None,
        executor: futures.Executor() = None
    ):
        if loop:
            self.loop = loop
        else:
            self.loop = asyncio.get_event_loop()
        if executor:
            self.executor = executor
        else:
            self.executor = futures.ProcessPoolExecutor()
        self.loop.set_default_executor(self.executor)
        self.device_list = device_list
        self.connection_list = connection_list
项目:pylearning    作者:amstuta    | 项目源码 | 文件源码
def fit(self, features, targets):
        """
        Trains self.nb_trees number of decision trees.
        :param features:    Array-like object of shape (nb_samples, nb_features)
                            containing the training examples
        :param targets:     Array-like object of shape (nb_samples) containing the
                            training targets.
        """
        if not self.nb_samples:
            self.nb_samples = int(len(features) / 10)
        with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
            random_features = []
            for x in range(self.nb_trees):
                idxs = np.random.choice(np.arange(len(features)), self.nb_samples, replace=True)
                try:
                    chosen_features = itemgetter(*idxs)(features)
                    chosen_targets = itemgetter(*idxs)(targets)
                except:
                    chosen_features = features.iloc[idxs].as_matrix()
                    chosen_targets = targets.iloc[idxs].as_matrix()
                random_features.append((x, chosen_features, chosen_targets))
            self.trees = list(executor.map(self.train_tree, random_features))
项目:youtube-bb    作者:mbuckler    | 项目源码 | 文件源码
def sched_downloads(d_set,dl_dir,num_threads,vids):
  d_set_dir = dl_dir+'/'+d_set+'/'

  # Make the directory for this dataset
  check_call(' '.join(['mkdir', '-p', d_set_dir]), shell=True)

  # Tell the user when downloads were started
  datetime.now().strftime("%Y-%m-%d %H:%M:%S")

  # Download and cut in parallel threads giving
  with futures.ProcessPoolExecutor(max_workers=num_threads) as executor:
    fs = [executor.submit(dl_and_cut,vid) for vid in vids]
    for i, f in enumerate(futures.as_completed(fs)):
      # Write progress to error so that it can be seen
      sys.stderr.write( \
        "Downloaded video: {} / {} \r".format(i, len(vids)))

  print( d_set+': All videos downloaded' )
项目:struc2vec    作者:leoribeiro    | 项目源码 | 文件源码
def simulate_walks(self,num_walks,walk_length):

        # for large graphs, it is serially executed, because of memory use.
        if(len(self.G) > 500000):

            with ProcessPoolExecutor(max_workers=1) as executor:
                job = executor.submit(generate_random_walks_large_graphs,num_walks,walk_length,self.workers,self.G.keys())

                job.result()

        else:

            with ProcessPoolExecutor(max_workers=1) as executor:
                job = executor.submit(generate_random_walks,num_walks,walk_length,self.workers,self.G.keys())

                job.result()


        return
项目:ultrachronic    作者:yoavram    | 项目源码 | 文件源码
def repeat(f, reps, cpus, **kwargs):
    if reps == 1:
        f(**kwargs)
        return
    fname = f.__name__
    print("Starting {} {} times with:".format(fname, reps))
    print(kwargs)
    if cpus == 1:
        for _ in range(reps):
            try:
                f(**kwargs)
            except Exception as e:
                warnings.warn(str(e))
    else:
        from multiprocessing import cpu_count
        from concurrent.futures import ProcessPoolExecutor, as_completed
        if cpus < 1:
            cpus = cpu_count()
        with ProcessPoolExecutor(cpus) as executor:
            futures = [executor.submit(f, **kwargs) for _ in range(reps)]
        for fut in as_completed(futures):
            if fut.exception():
                warnings.warn(str(fut.exception()))
    print("Finished")
项目:research-dist    作者:DrawML    | 项目源码 | 文件源码
def start(self, slave_addr, task):
        self._task = task

        def _start(id, slave_addr, task):
            from multiprocessing import Process
            import multiprocessing
            #multiprocessing.set_start_method('spawn')
            Process(target=_worker_main, args=(id, slave_addr, task)).start()

        from concurrent.futures import ProcessPoolExecutor
        print("[Worker {0}] Create".format(self.id))
        _start(self.id, slave_addr, task)
        #executor = ProcessPoolExecutor()
        #loop = asyncio.get_event_loop()
        #asyncio.ensure_future(loop.run_in_executor(ProcessPoolExecutor(), _worker_main, self.id, slave_addr, task))
        #asyncio.ensure_future(_start(self.id, slave_addr, task))
        #yield from asyncio.sleep(10)
        print("***")
项目:cess    作者:frnsys    | 项目源码 | 文件源码
def start(self, arbiter_host, arbiter_port, start_port, cores):
        logger = logging.getLogger('cluster.worker')
        logger.addHandler(MPLogHandler('/tmp/node.log'))

        cpus = multiprocessing.cpu_count()
        if cores > 0:
            cores = min(cpus, cores)
        else:
            cores = max(1, cpus - cores)

        logger.info('starting {} workers'.format(cores))
        with ProcessPoolExecutor(max_workers=cores) as executor:
            port = start_port
            for _ in range(cores):
                executor.submit(start_worker, arbiter_host, arbiter_port, port)
                port += 1

            while True:
                pass
项目:isambard    作者:woolfson-group    | 项目源码 | 文件源码
def assign_fitnesses(self, targets):
        self._params['evals'] = len(targets)
        px_parameters = zip([self._params['specification']] * len(targets),
                            [self._params['sequence']] * len(targets),
                            [self.parse_individual(x) for x in targets])
        if (self._params['processors'] == 1) or (sys.platform == 'win32'):
            fitnesses = map(self.evaluation_function, px_parameters)
        else:
            with futures.ProcessPoolExecutor(
                    max_workers=self._params['processors']) as executor:
                fitnesses = executor.map(
                    self.evaluation_function, px_parameters)
        tars_fits = list(zip(targets, fitnesses))
        if 'log_params' in self._params:
            if self._params['log_params']:
                self.parameter_log.append(
                    [(self.parse_individual(x[0]), x[1]) for x in tars_fits])
        for ind, fit in tars_fits:
            ind.fitness.values = (fit,)
项目:Concurrency-With-Python    作者:elliotforbes    | 项目源码 | 文件源码
def main():

  t1 = timeit.default_timer()
  with ProcessPoolExecutor(max_workers=4) as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

  print("{} Seconds Needed for ProcessPoolExecutor".format(timeit.default_timer() - t1))

  t2 = timeit.default_timer()
  with ThreadPoolExecutor(max_workers=4) as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))
  print("{} Seconds Needed for ThreadPoolExecutor".format(timeit.default_timer() - t2))

  t3 = timeit.default_timer()
  for number in PRIMES:
    isPrime = is_prime(number)
    print("{} is prime: {}".format(number, isPrime))
  print("{} Seconds needed for single threaded execution".format(timeit.default_timer()-t3))
项目:tacotron    作者:keithito    | 项目源码 | 文件源码
def build_from_path(in_dir, out_dir, num_workers=1, tqdm=lambda x: x):
  executor = ProcessPoolExecutor(max_workers=num_workers)
  futures = []
  index = 1
  for book in books:
    with open(os.path.join(in_dir, book, 'sentence_index.txt')) as f:
      for line in f:
        parts = line.strip().split('\t')
        if line[0] is not '#' and len(parts) == 8 and float(parts[3]) > _min_confidence:
          wav_path = os.path.join(in_dir, book, 'wav', '%s.wav' % parts[0])
          labels_path = os.path.join(in_dir, book, 'lab', '%s.lab' % parts[0])
          text = parts[5]
          task = partial(_process_utterance, out_dir, index, wav_path, labels_path, text)
          futures.append(executor.submit(task))
          index += 1
  results = [future.result() for future in tqdm(futures)]
  return [r for r in results if r is not None]
项目:snmpexporter    作者:dhtech    | 项目源码 | 文件源码
def __init__(self, config_file, poller_pool, annotator_pool):
    super(PollerResource).__init__()
    # Use process pollers as netsnmp is not behaving well using just threads
    logging.debug('Starting poller pool ...')
    self.poller_executor = futures.ProcessPoolExecutor(
        max_workers=poller_pool)
    # Start MIB resolver after processes above (or it will fork it as well)
    logging.debug('Initializing MIB resolver ...')
    import mibresolver
    self.resolver = mibresolver

    logging.debug('Starting annotation pool ...')
    # .. but annotators are just CPU, so use lightweight threads.
    self.annotator_executor = futures.ThreadPoolExecutor(
        max_workers=annotator_pool)
    self.config_file = config_file
项目:twentybn-dl    作者:TwentyBN    | 项目源码 | 文件源码
def download_chunks(self, max_workers=30):
        print('Will now download chunks.')
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
            result = list(executor.map(self.download_chunk,
                                       zip(self.urls,
                                           self.md5sums,
                                           self.output_files)))
        DownloadResultProcessor.process_and_print(result)
项目:gap_statistic    作者:milesgranger    | 项目源码 | 文件源码
def _process_with_multiprocessing(self, X: Union[pd.DataFrame, np.ndarray], n_refs: int, cluster_array: np.ndarray):
        """
        Process calling of .calculate_gap() method using the multiprocessing library
        """
        with ProcessPoolExecutor(max_workers=self.n_jobs) as executor:

            jobs = [executor.submit(self._calculate_gap, X, n_refs, n_clusters)
                    for n_clusters in cluster_array
                    ]

            for future in as_completed(jobs):
                gap_value, k = future.result()
                yield (gap_value, k)
项目:astrobase    作者:waqasbhatti    | 项目源码 | 文件源码
def parallel_varfeatures(lclist,
                         outdir,
                         maxobjects=None,
                         timecols=None,
                         magcols=None,
                         errcols=None,
                         mindet=1000,
                         lcformat='hat-sql',
                         nworkers=None):
    '''
    This runs varfeatures in parallel for all light curves in lclist.

    '''
    # make sure to make the output directory if it doesn't exist
    if not os.path.exists(outdir):
        os.makedirs(outdir)

    if maxobjects:
        lclist = lclist[:maxobjects]

    tasks = [(x, outdir, timecols, magcols, errcols, mindet, lcformat)
             for x in lclist]

    with ProcessPoolExecutor(max_workers=nworkers) as executor:
        resultfutures = executor.map(varfeatures_worker, tasks)

    results = [x for x in resultfutures]
    resdict = {os.path.basename(x):y for (x,y) in zip(lclist, results)}

    return resdict
项目:astrobase    作者:waqasbhatti    | 项目源码 | 文件源码
def parallel_cp(pfpicklelist,
                outdir,
                lcbasedir,
                lclistpkl=None,
                nbrradiusarcsec=30.0,
                maxobjects=None,
                lcformat='hat-sql',
                timecols=None,
                magcols=None,
                errcols=None,
                nworkers=32):
    '''This drives the parallel execution of runcp for a list of periodfinding
    result pickles.

    '''

    if not os.path.exists(outdir):
        os.mkdir(outdir)

    if maxobjects:
        pfpicklelist = pfpicklelist[:maxobjects]

    tasklist = [(x, outdir, lcbasedir,
                 {'lcformat':lcformat,
                  'timecols':timecols,
                  'magcols':magcols,
                  'errcols':errcols,
                  'lclistpkl':lclistpkl,
                  'nbrradiusarcsec':nbrradiusarcsec}) for
                x in pfpicklelist]

    resultfutures = []
    results = []

    with ProcessPoolExecutor(max_workers=nworkers) as executor:
        resultfutures = executor.map(runcp_worker, tasklist)

    results = [x for x in resultfutures]

    executor.shutdown()
    return results
项目:hidi    作者:VEVO    | 项目源码 | 文件源码
def transform(self, io, **kwargs):
        return self.executor_fork(ProcessPoolExecutor, io, **kwargs)
项目:oshino    作者:CodersOfTheNight    | 项目源码 | 文件源码
def test_custom_executor_class(self):
        cfg = load("tests/data/test_config.yml")
        obj = cfg.executor_class()
        assert isinstance(obj, ProcessPoolExecutor)
项目:deb-python-concurrent.futures    作者:openstack    | 项目源码 | 文件源码
def test_context_manager_shutdown(self):
        with futures.ProcessPoolExecutor(max_workers=5) as e:
            processes = e._processes
            self.assertEqual(list(e.map(abs, range(-5, 5))),
                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])

        for p in processes:
            p.join()
项目:deb-python-concurrent.futures    作者:openstack    | 项目源码 | 文件源码
def test_del_shutdown(self):
        executor = futures.ProcessPoolExecutor(max_workers=5)
        list(executor.map(abs, range(-5, 5)))
        queue_management_thread = executor._queue_management_thread
        processes = executor._processes
        del executor
        gc.collect()

        queue_management_thread.join()
        for p in processes:
            p.join()
项目:deb-python-concurrent.futures    作者:openstack    | 项目源码 | 文件源码
def with_process_pool_executor():
    with ProcessPoolExecutor(10) as executor:
        return list(executor.map(is_prime, PRIMES))
项目:DataProperty    作者:thombashi    | 项目源码 | 文件源码
def __to_dp_matrix_mt(self, value_matrix):
        from concurrent import futures

        col_data_mapping = {}
        try:
            with futures.ProcessPoolExecutor(self.max_workers) as executor:
                future_list = [
                    executor.submit(
                        _to_dp_list_helper, self, col_idx,
                        data_list, self.__get_col_type_hint(col_idx),
                        self.strip_str_value
                    )
                    for col_idx, data_list
                    in enumerate(zip(*value_matrix))
                ]

                for future in futures.as_completed(future_list):
                    col_idx, value_dp_list = future.result()
                    col_data_mapping[col_idx] = value_dp_list
        finally:
            logger.debug("shutdown ProcessPoolExecutor")
            executor.shutdown()

        return list(zip(*[
            col_data_mapping[col_idx] for col_idx in sorted(col_data_mapping)
        ]))
项目:Learning-Concurrency-in-Python    作者:PacktPublishing    | 项目源码 | 文件源码
def main():
  print("Starting ThreadPoolExecutor")
  with ProcessPoolExecutor(max_workers=3) as executor:
    future = executor.submit(task, (2))
    future = executor.submit(task, (3))
    future = executor.submit(task, (4))

  print("All tasks complete")
项目:Learning-Concurrency-in-Python    作者:PacktPublishing    | 项目源码 | 文件源码
def main():
  executor = ProcessPoolExecutor(max_workers=3)
  task1 = executor.submit(task)
  task2 = executor.submit(task)
项目:structured-query-engine    作者:apsdehal    | 项目源码 | 文件源码
def future_flush(self):
        with ProcessPoolExecutor() as executor:
            executor.submit(self.flush_to_file())
项目:sru    作者:taolei87    | 项目源码 | 文件源码
def flatten_json(file, proc_func):
    '''A multi-processing wrapper for loading SQuAD data file.'''
    with open(file) as f:
        data = json.load(f)['data']
    with ProcessPoolExecutor(max_workers=args.threads) as executor:
        rows = executor.map(proc_func, data)
    rows = sum(rows, [])
    return rows
项目:peony-twitter    作者:odrling    | 项目源码 | 文件源码
def executor():
    return ProcessPoolExecutor()
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def main(workers=None):
    if workers:
        workers = int(workers)
    t0 = time.time()

    with futures.ProcessPoolExecutor(workers) as executor:
        actual_workers = executor._max_workers
        to_do = (executor.submit(sha, SIZE) for i in range(JOBS))
        for future in futures.as_completed(to_do):
            res = future.result()
            print(res)

    print(STATUS.format(actual_workers, time.time() - t0))
项目:notebooks    作者:fluentpython    | 项目源码 | 文件源码
def download_many(cc_list):
    with futures.ProcessPoolExecutor() as executor:  # <1>
        res = executor.map(download_one, sorted(cc_list))

    return len(list(res))
项目:squishy    作者:tmehlinger    | 项目源码 | 文件源码
def process_messages(self, messages):
        future_to_message = {}
        processed = []

        self.logger.debug('processing %d messages', len(messages))
        for message in messages:
            # ThreadPoolExecutor/ProcessPoolExecutor will throw a
            # RuntimeException if we try to submit while it's shutting down.
            # If we encounter a RuntimeError, immediately stop trying to
            # submit new tasks; they will get requeued after the interval
            # configured on the queue's policy.
            try:
                future = self.pool.submit(self.func, message)
            except RuntimeError:
                self.logger.exception('cannot submit jobs to pool')
                raise
            else:
                future_to_message[future] = message

        for future in futures.as_completed(future_to_message,
                                           timeout=self.timeout):
            message = future_to_message.pop(future)
            try:
                future.result()
            except:
                self.logger.exception('exception processing message %s',
                                      message.message_id)
            else:
                processed.append(message)

        return processed
项目:squishy    作者:tmehlinger    | 项目源码 | 文件源码
def __init__(self, func, pool_size=4, timeout=None):
        super(ProcessPoolWorker, self).__init__(func, pool_size=pool_size,
                                                timeout=timeout)
        self.pool = futures.ProcessPoolExecutor(max_workers=pool_size)
        self.logger = get_logger(__name__)
项目:SmallReptileTraining    作者:yanbober    | 项目源码 | 文件源码
def __init__(self):
        self.crawl = CrawlProcess()
        self.output = OutPutProcess()
        self.crawl_pool = ProcessPoolExecutor(max_workers=8)
        self.crawl_deep = 100   #????
        self.crawl_cur_count = 0
项目:Y8M    作者:mpekalski    | 项目源码 | 文件源码
def combine_preds_2_csv(preds1, preds2, fn_out='/tmp/combo.csv', wgts=None):
    t1 = datetime.now()
    print('start combination at ', t1)
    executor = futures.ProcessPoolExecutor(max_workers=6)
    add_pred_series_wgts = functools.partial(add_pred_series, wgts=wgts)

    preds1 = sorted(preds1, key=lambda x: x[0])
    preds2 = sorted(preds2, key=lambda x: x[0])
    t2 = datetime.now()
    print('sorted preds2 at ', t2)

    lines = executor.map(add_pred_series_wgts, zip(preds1, preds2))

    t2 = datetime.now()
    print('finished adding lines at ', t2)
    #print('Lines processed: {}'.format(len(lines)))

    cnt = 0             
    with open(fn_out, 'w') as fout:
        fout.write('VideoId,LabelConfidencePairs\n')
        for line in lines:
            fout.write(line+'\n')
            cnt += 1

    print('{} prediction lines were written to {}'.format(cnt, fn_out))
    t3 = datetime.now()
    print('finished combination at', t3)
    print('Total run time: {}'.format(t3 - t1))
    return None
项目:Y8M    作者:mpekalski    | 项目源码 | 文件源码
def main(unused_argv):

  print("tensorflow version: %s" % tf.__version__)

  all_frame_files = gfile.Glob(FLAGS.input_data_pattern)
  f_fullpath = all_frame_files[FLAGS.file_from : FLAGS.file_to]
  f_fns = [x.split('/')[-1] for x in f_fullpath]

  exist_files = gfile.Glob(FLAGS.output_path + "C*tfrecord")
  exist_fn = [x.split('/')[-1].replace('CAtr', 'Atr')  for x in exist_files]

  yet_2_split = [x for x,y in zip(f_fullpath, f_fns) if y not in exist_fn]

  vf = [FLAGS.output_path + 'C' + x.split('/')[-1] for x in yet_2_split]

  mylog('number of files suggested: %d'%len(f_fullpath))
  mylog('number of files yet to process: %d'%len(yet_2_split))

  if FLAGS.parallel:
    from concurrent import futures
    executor = futures.ProcessPoolExecutor(max_workers=2)
    executor.map(process_one_file, zip(yet_2_split, vf))
  else: 
    for filenames in zip(yet_2_split, vf):
        #mylog('processing: {}'.format(filenames))
        process_one_file(filenames)

  mylog("done")
项目:Safe-RL-Benchmark    作者:befelix    | 项目源码 | 文件源码
def _benchmark_par(self):
        n_jobs = config.n_jobs
        with ProcessPoolExecutor(max_workers=n_jobs) as ex:
            fs = [ex.submit(_dispatch_wrap, run) for run in self.runs]
            self.runs = [f.result() for f in fs]
项目:python-cookbook-3rd    作者:tuanavu    | 项目源码 | 文件源码
def find_all_robots(logdir):
    '''
    Find all hosts across and entire sequence of files
    '''
    files = glob.glob(logdir+"/*.log.gz")
    all_robots = set()
    with futures.ProcessPoolExecutor() as pool:
        for robots in pool.map(find_robots, files):
            all_robots.update(robots)
    return all_robots
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_context_manager_shutdown(self):
        with futures.ProcessPoolExecutor(max_workers=5) as e:
            processes = e._processes
            self.assertEqual(list(e.map(abs, range(-5, 5))),
                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])

        for p in processes:
            p.join()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_del_shutdown(self):
        executor = futures.ProcessPoolExecutor(max_workers=5)
        list(executor.map(abs, range(-5, 5)))
        queue_management_thread = executor._queue_management_thread
        processes = executor._processes
        del executor
        test.support.gc_collect()

        queue_management_thread.join()
        for p in processes:
            p.join()
项目:DLink_Harvester    作者:MikimotoH    | 项目源码 | 文件源码
def main():
    os.makedirs(localstor, exist_ok=True)
    with ProcessPoolExecutor() as executor:
        with ftputil.FTPHost('files.dlink.com.au', 'anonymous', '') as host:
            with open('au_dlink_ftp_filelist.csv', 'w') as fout:
                cw = csv.writer(fout)
                cw.writerow(["ftpurl", "fsize", "fdate", "file_sha1", "file_md5"])

            models = host.listdir('/Products/')
            for model in models:
                if not host.path.isdir('/Products/%(model)s'%locals()):
                    continue
                if not re.match(r'[A-Z]+', model, re.I):
                    continue
                revs = host.listdir('/Products/%(model)s/'%locals())
                for rev in revs:
                    if not re.match(r'REV_\w+', rev, re.I):
                        continue
                    try:
                        fwitems = host.listdir('/Products/%(model)s/%(rev)s/Firmware/'%locals())
                    except:
                        continue
                    try:
                        for fwitem in fwitems:
                            print('visiting /Products/%(model)s/%(rev)s/Firmware/%(fwitem)s/'%locals())
                            try:
                                fw_files = host.path.listdir('/Products/%(model)s/%(rev)s/Firmware/%(fwitem)s/'%locals())
                                for fw_file in fw_files:
                                    host.keep_alive()
                                    executor.submit(download, '/Products/%(model)s/%(rev)s/Firmware/%(fwitem)s/%(fw_file)s'%locals())
                            except:
                                if host.path.isfile('/Products/%(model)s/%(rev)s/Firmware/%(fwitem)s'%locals()):
                                    executor.submit(download,'/Products/%(model)s/%(rev)s/Firmware/%(fwitem)s'%locals())
                    except Exception as ex:
                        print(ex)