Python multiprocessing 模块,pool() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用multiprocessing.pool()

项目:ceph-lcm    作者:Mirantis    | 项目源码 | 文件源码
def get_container_id_mapping(pool, compose_cmd):
    service_names = subprocess.check_output(
        compose_cmd + ["config", "--services"]
    )
    service_names = service_names.strip().decode("utf-8").split("\n")
    id_mapping = {
        name: pool.apply_async(pool_container_id, (name, compose_cmd))
        for name in service_names
    }

    while not all(future.ready() for future in id_mapping.values()):
        time.sleep(0.1)
    for name, future in list(id_mapping.items()):
        if not future.successful():
            raise RuntimeError("Cannot get ID of service {0}".format(name))
        id_mapping[name] = future.get()

    return id_mapping
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_pool_worker_lifetime_early_close(self):
        # Issue #10332: closing a pool whose workers have limited lifetimes
        # before all the tasks completed would make join() hang.
        p = multiprocessing.Pool(3, maxtasksperchild=1)
        results = []
        for i in range(6):
            results.append(p.apply_async(sqr, (i, 0.3)))
        p.close()
        p.join()
        # check the results
        for (j, res) in enumerate(results):
            self.assertEqual(res.get(), sqr(j))

#
# Test of creating a customized manager class
#
项目:pytorch-dist    作者:apaszke    | 项目源码 | 文件源码
def _repopulate_pool(self):
        """Bring the number of pool processes up to the specified number,
        for use after reaping workers which have exited.
        """
        for i in range(self._processes - len(self._pool)):
            # changed worker -> clean_worker
            args = (self._inqueue, self._outqueue,
                    self._initializer,
                    self._initargs, self._maxtasksperchild)
            if hasattr(self, '_wrap_exception'):
                args += (self._wrap_exception,)
            w = self.Process(target=clean_worker, args=args)
            self._pool.append(w)
            w.name = w.name.replace('Process', 'PoolWorker')
            w.daemon = True
            w.start()
            util.debug('added worker')
项目:planetplanet    作者:rodluger    | 项目源码 | 文件源码
def __init__(self, comm=None, loadbalance=False, debug=False,
                 wait_on_start = True, exit_on_end = True, 
                 cores_per_task = 1, **kwargs):
        if MPI is None:
            raise ImportError("Please install mpi4py")

        self.comm = MPI.COMM_WORLD if comm is None else comm
        self.rank = self.comm.Get_rank()
        if cores_per_task > 1:
          self.size  = max(1, self.comm.Get_size() // cores_per_task)
        else:
          self.size = self.comm.Get_size() - 1
        self.function = _error_function
        self.loadbalance = loadbalance
        self.debug = debug
        if self.size == 0:
            raise ValueError("Tried to create an MPI pool, but there "
                             "was only one MPI process available. "
                             "Need at least two.")
        self.exit_on_end = exit_on_end

        # Enter main loop for workers?
        if wait_on_start:
            if self.is_worker():
                self.wait()
项目:planetplanet    作者:rodluger    | 项目源码 | 文件源码
def Pool(pool = 'AnyPool', **kwargs):
    '''
    Chooses between the different pools.
    If ``pool == 'AnyPool'``, chooses based on availability.

    '''

    if pool == 'MPIPool':
        return MPIPool(**kwargs)  
    elif pool == 'MultiPool':
        return MultiPool(**kwargs)
    elif pool == 'SerialPool':
        return SerialPool(**kwargs)
    elif pool == 'AnyPool':
        if MPIPool.enabled():
            return MPIPool(**kwargs)  
        elif MultiPool.enabled():
            return MultiPool(**kwargs)
        else:
            return SerialPool(**kwargs)
    else:
        raise ValueError('Invalid pool ``%s``.' % pool)
项目:pytorch-skipthoughts    作者:kaniblu    | 项目源码 | 文件源码
def load_embeddings_mp(path, word_dim, processes=None):

    if processes is None:
        processes = multiprocessing.cpu_count()

    pool = mp.Pool(processes, initializer=_mp_initialize,
                   initargs=(word_dim,))

    with open(path, "r") as f:
        iterator = chunks(f, n=processes,
                          k=processes * 10000)
        ret = {}
        for batches in iterator:
            results = pool.map_async(_mp_process, batches)
            results = results.get()
            results = aggregate_dicts(*results)

            ret.update(results)

        return ret
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def process_profilelog(fn, pout = None):
    # Either call with a list of filenames and set pout or a filename and optionally pout.
    if not pout:
        pout = fn + '.processed'
    pout = open(pout, 'w')

    import pstats
    if isinstance(fn, list):
        p = pstats.Stats(*fn, stream=pout)
    else:
        p = pstats.Stats(fn, stream=pout)
    p.sort_stats('time')
    p.print_stats()
    p.print_callers()
    p.sort_stats('cumulative')
    p.print_stats()

    pout.flush()
    pout.close()  

#
# Was present to work around multiprocessing pool bugs in python < 2.7.3
#
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def multiprocessingpool(*args, **kwargs):

    import multiprocessing.pool
    #import multiprocessing.util
    #multiprocessing.util.log_to_stderr(10)
    # Deal with a multiprocessing bug where signals to the processes would be delayed until the work
    # completes. Putting in a timeout means the signals (like SIGINT/SIGTERM) get processed.
    def wrapper(func):
        def wrap(self, timeout=None):
            return func(self, timeout=timeout if timeout is not None else 1e100)
        return wrap
    multiprocessing.pool.IMapIterator.next = wrapper(multiprocessing.pool.IMapIterator.next)

    return multiprocessing.Pool(*args, **kwargs)
项目:pytorch    作者:tylergenter    | 项目源码 | 文件源码
def _repopulate_pool(self):
        """Bring the number of pool processes up to the specified number,
        for use after reaping workers which have exited.
        """
        for i in range(self._processes - len(self._pool)):
            # changed worker -> clean_worker
            args = (self._inqueue, self._outqueue,
                    self._initializer,
                    self._initargs, self._maxtasksperchild)
            if hasattr(self, '_wrap_exception'):
                args += (self._wrap_exception,)
            w = self.Process(target=clean_worker, args=args)
            self._pool.append(w)
            w.name = w.name.replace('Process', 'PoolWorker')
            w.daemon = True
            w.start()
            util.debug('added worker')
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_imap_handle_iterable_exception(self):
        if self.TYPE == 'manager':
            self.skipTest('test not appropriate for {}'.format(self.TYPE))

        it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
        for i in range(3):
            self.assertEqual(next(it), i*i)
        self.assertRaises(SayWhenError, it.next)

        # SayWhenError seen at start of problematic chunk's results
        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
        for i in range(6):
            self.assertEqual(next(it), i*i)
        self.assertRaises(SayWhenError, it.next)
        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
        for i in range(4):
            self.assertEqual(next(it), i*i)
        self.assertRaises(SayWhenError, it.next)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_imap_unordered_handle_iterable_exception(self):
        if self.TYPE == 'manager':
            self.skipTest('test not appropriate for {}'.format(self.TYPE))

        it = self.pool.imap_unordered(sqr,
                                      exception_throwing_generator(10, 3),
                                      1)
        expected_values = map(sqr, range(10))
        with self.assertRaises(SayWhenError):
            # imap_unordered makes it difficult to anticipate the SayWhenError
            for i in range(10):
                value = next(it)
                self.assertIn(value, expected_values)
                expected_values.remove(value)

        it = self.pool.imap_unordered(sqr,
                                      exception_throwing_generator(20, 7),
                                      2)
        expected_values = map(sqr, range(20))
        with self.assertRaises(SayWhenError):
            for i in range(20):
                value = next(it)
                self.assertIn(value, expected_values)
                expected_values.remove(value)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_pool_worker_lifetime_early_close(self):
        # Issue #10332: closing a pool whose workers have limited lifetimes
        # before all the tasks completed would make join() hang.
        p = multiprocessing.Pool(3, maxtasksperchild=1)
        results = []
        for i in range(6):
            results.append(p.apply_async(sqr, (i, 0.3)))
        p.close()
        p.join()
        # check the results
        for (j, res) in enumerate(results):
            self.assertEqual(res.get(), sqr(j))


#
# Test that manager has expected number of shared objects left
#
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_imap_handle_iterable_exception(self):
        if self.TYPE == 'manager':
            self.skipTest('test not appropriate for {}'.format(self.TYPE))

        it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
        for i in range(3):
            self.assertEqual(next(it), i*i)
        self.assertRaises(SayWhenError, it.next)

        # SayWhenError seen at start of problematic chunk's results
        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
        for i in range(6):
            self.assertEqual(next(it), i*i)
        self.assertRaises(SayWhenError, it.next)
        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
        for i in range(4):
            self.assertEqual(next(it), i*i)
        self.assertRaises(SayWhenError, it.next)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_imap_unordered_handle_iterable_exception(self):
        if self.TYPE == 'manager':
            self.skipTest('test not appropriate for {}'.format(self.TYPE))

        it = self.pool.imap_unordered(sqr,
                                      exception_throwing_generator(10, 3),
                                      1)
        expected_values = map(sqr, range(10))
        with self.assertRaises(SayWhenError):
            # imap_unordered makes it difficult to anticipate the SayWhenError
            for i in range(10):
                value = next(it)
                self.assertIn(value, expected_values)
                expected_values.remove(value)

        it = self.pool.imap_unordered(sqr,
                                      exception_throwing_generator(20, 7),
                                      2)
        expected_values = map(sqr, range(20))
        with self.assertRaises(SayWhenError):
            for i in range(20):
                value = next(it)
                self.assertIn(value, expected_values)
                expected_values.remove(value)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_pool_worker_lifetime_early_close(self):
        # Issue #10332: closing a pool whose workers have limited lifetimes
        # before all the tasks completed would make join() hang.
        p = multiprocessing.Pool(3, maxtasksperchild=1)
        results = []
        for i in range(6):
            results.append(p.apply_async(sqr, (i, 0.3)))
        p.close()
        p.join()
        # check the results
        for (j, res) in enumerate(results):
            self.assertEqual(res.get(), sqr(j))


#
# Test that manager has expected number of shared objects left
#
项目:pytorch-coriander    作者:hughperkins    | 项目源码 | 文件源码
def _repopulate_pool(self):
        """Bring the number of pool processes up to the specified number,
        for use after reaping workers which have exited.
        """
        for i in range(self._processes - len(self._pool)):
            # changed worker -> clean_worker
            args = (self._inqueue, self._outqueue,
                    self._initializer,
                    self._initargs, self._maxtasksperchild)
            if hasattr(self, '_wrap_exception'):
                args += (self._wrap_exception,)
            w = self.Process(target=clean_worker, args=args)
            self._pool.append(w)
            w.name = w.name.replace('Process', 'PoolWorker')
            w.daemon = True
            w.start()
            util.debug('added worker')
项目:pytorch    作者:ezyang    | 项目源码 | 文件源码
def _repopulate_pool(self):
        """Bring the number of pool processes up to the specified number,
        for use after reaping workers which have exited.
        """
        for i in range(self._processes - len(self._pool)):
            # changed worker -> clean_worker
            args = (self._inqueue, self._outqueue,
                    self._initializer,
                    self._initargs, self._maxtasksperchild)
            if hasattr(self, '_wrap_exception'):
                args += (self._wrap_exception,)
            w = self.Process(target=clean_worker, args=args)
            self._pool.append(w)
            w.name = w.name.replace('Process', 'PoolWorker')
            w.daemon = True
            w.start()
            util.debug('added worker')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_imap_handle_iterable_exception(self):
        if self.TYPE == 'manager':
            self.skipTest('test not appropriate for {}'.format(self.TYPE))

        it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
        for i in range(3):
            self.assertEqual(next(it), i*i)
        self.assertRaises(SayWhenError, it.__next__)

        # SayWhenError seen at start of problematic chunk's results
        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
        for i in range(6):
            self.assertEqual(next(it), i*i)
        self.assertRaises(SayWhenError, it.__next__)
        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
        for i in range(4):
            self.assertEqual(next(it), i*i)
        self.assertRaises(SayWhenError, it.__next__)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_imap_unordered_handle_iterable_exception(self):
        if self.TYPE == 'manager':
            self.skipTest('test not appropriate for {}'.format(self.TYPE))

        it = self.pool.imap_unordered(sqr,
                                      exception_throwing_generator(10, 3),
                                      1)
        expected_values = list(map(sqr, list(range(10))))
        with self.assertRaises(SayWhenError):
            # imap_unordered makes it difficult to anticipate the SayWhenError
            for i in range(10):
                value = next(it)
                self.assertIn(value, expected_values)
                expected_values.remove(value)

        it = self.pool.imap_unordered(sqr,
                                      exception_throwing_generator(20, 7),
                                      2)
        expected_values = list(map(sqr, list(range(20))))
        with self.assertRaises(SayWhenError):
            for i in range(20):
                value = next(it)
                self.assertIn(value, expected_values)
                expected_values.remove(value)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_unpickleable_result(self):
        from multiprocessing.pool import MaybeEncodingError
        p = multiprocessing.Pool(2)

        # Make sure we don't lose pool processes because of encoding errors.
        for iteration in range(20):

            scratchpad = [None]
            def errback(exc):
                scratchpad[0] = exc

            res = p.apply_async(unpickleable_result, error_callback=errback)
            self.assertRaises(MaybeEncodingError, res.get)
            wrapped = scratchpad[0]
            self.assertTrue(wrapped)
            self.assertIsInstance(scratchpad[0], MaybeEncodingError)
            self.assertIsNotNone(wrapped.exc)
            self.assertIsNotNone(wrapped.value)

        p.close()
        p.join()
项目:pytorch    作者:pytorch    | 项目源码 | 文件源码
def _repopulate_pool(self):
        """Bring the number of pool processes up to the specified number,
        for use after reaping workers which have exited.
        """
        for i in range(self._processes - len(self._pool)):
            # changed worker -> clean_worker
            args = (self._inqueue, self._outqueue,
                    self._initializer,
                    self._initargs, self._maxtasksperchild)
            if hasattr(self, '_wrap_exception'):
                args += (self._wrap_exception,)
            w = self.Process(target=clean_worker, args=args)
            self._pool.append(w)
            w.name = w.name.replace('Process', 'PoolWorker')
            w.daemon = True
            w.start()
            util.debug('added worker')
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def as_bulk_resolve(candidates, threads=50):
    """
    Resolve a list of IPs to AS information.

    Returns a map of each result as a tuple of (ASN, owner) keyed to
    its candidate.  Returns None if no ASN could be found or (ASN,
    None) if an ASN was found but no owner is available.

    WARNING: This function will create a pool of up to 'threads'
    threads.
    """

    result = {}

    pool = multiprocessing.pool.ThreadPool(
        processes=min(len(candidates), threads))

    for ip, as_ in pool.imap(
            __asresolve__,
            candidates,
            chunksize=1):
        result[ip] = as_
    pool.close()
    return result
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def build_extensions(self):
        if build_concurrency > 1:
            self.check_extensions_list(self.extensions)

            import multiprocessing.pool
            multiprocessing.pool.ThreadPool(processes=build_concurrency).map(self.build_extension, self.extensions)
        else:
            build_ext.build_extensions(self)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def _setup_extensions(self):
        # We defer extension setup until this command to leveraage 'setup_requires' pulling in Cython before we
        # attempt to import anything
        self.extensions = []

        if try_murmur3:
            self.extensions.append(murmur3_ext)

        if try_libev:
            self.extensions.append(libev_ext)

        if try_cython:
            try:
                from Cython.Build import cythonize
                cython_candidates = ['cluster', 'concurrent', 'connection', 'cqltypes', 'metadata',
                                     'pool', 'protocol', 'query', 'util']
                compile_args = [] if is_windows else ['-Wno-unused-function']
                self.extensions.extend(cythonize(
                    [Extension('cassandra.%s' % m, ['cassandra/%s.py' % m],
                               extra_compile_args=compile_args)
                        for m in cython_candidates],
                    nthreads=build_concurrency,
                    exclude_failures=True))

                self.extensions.extend(cythonize(NoPatchExtension("*", ["cassandra/*.pyx"], extra_compile_args=compile_args),
                                                 nthreads=build_concurrency))
            except Exception:
                sys.stderr.write("Failed to cythonize one or more modules. These will not be compiled as extensions (optional).\n")
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def fill_cache(self, repair_incorrect: bool = False) -> None:
        with Pool(processes=multiprocessing.cpu_count()) as pool:
            total = len(self.labeled_spectrograms)
            not_yet_cached = [s for s in self.labeled_spectrograms if not s.is_cached()]

            to_calculate = self.labeled_spectrograms if repair_incorrect else not_yet_cached

            log("Filling cache with {} spectrograms: {} already cached, {} to calculate.".format(
                total, total - len(not_yet_cached), len(to_calculate)))
            for index, labeled_spectrogram in enumerate(to_calculate):
                pool.apply_async(_repair_cached_spectrogram_if_incorrect if repair_incorrect else _cache_spectrogram,
                                 (labeled_spectrogram,))

            pool.close()
            pool.join()
项目:pytorch-dist    作者:apaszke    | 项目源码 | 文件源码
def clean_worker(*args, **kwargs):
    import gc
    multiprocessing.pool.worker(*args, **kwargs)
    # Regular multiprocessing workers don't fully clean up after themselves,
    # so we have to explicitly trigger garbage collection to make sure that all
    # destructors are called...
    gc.collect()
项目:planetplanet    作者:rodluger    | 项目源码 | 文件源码
def __repr__(self):
        return "<Close pool message>"
项目:planetplanet    作者:rodluger    | 项目源码 | 文件源码
def close(self):
        """
        Just send a message off to all the pool members which contains
        the special :class:`_close_pool_message` sentinel.

        """
        if self.is_master():
            for i in range(self.size):
                self.comm.isend(_close_pool_message(), dest=i + 1)
项目:tasker    作者:wavenator    | 项目源码 | 文件源码
def main():
    process_pool_context = multiprocessing.get_context('spawn')
    pool = multiprocessing.pool.Pool(
        processes=2,
        context=process_pool_context,
    )

    multiprocessing_manager = multiprocessing.Manager()
    multiprocessing_queue = multiprocessing_manager.Queue(
        maxsize=test_queue_size,
    )

    start = time.time()
    for i in range(test_queue_size):
        multiprocessing_queue.put(b'1')
    end = time.time()

    print('queue INSERTION:')
    print(end-start)

    pool.apply(func=consume_queue, args=(multiprocessing_queue,), kwds={})

    regular_queue = queue.Queue()
    start = time.time()
    for i in range(test_queue_size):
        regular_queue.put(b'1')
    end = time.time()

    print('queue INSERTION:')
    print(end-start)
    consume_queue(regular_queue)
项目:pytorch    作者:tylergenter    | 项目源码 | 文件源码
def clean_worker(*args, **kwargs):
    import gc
    multiprocessing.pool.worker(*args, **kwargs)
    # Regular multiprocessing workers don't fully clean up after themselves,
    # so we have to explicitly trigger garbage collection to make sure that all
    # destructors are called...
    gc.collect()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_apply(self):
        papply = self.pool.apply
        self.assertEqual(papply(sqr, (5,)), sqr(5))
        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_map(self):
        pmap = self.pool.map
        self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
        self.assertEqual(pmap(sqr, range(100), chunksize=20),
                         map(sqr, range(100)))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_map_unplicklable(self):
        # Issue #19425 -- failure to pickle should not cause a hang
        if self.TYPE == 'threads':
            self.skipTest('test not appropriate for {}'.format(self.TYPE))
        class A(object):
            def __reduce__(self):
                raise RuntimeError('cannot pickle')
        with self.assertRaises(RuntimeError):
            self.pool.map(sqr, [A()]*10)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_map_chunksize(self):
        try:
            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
        except multiprocessing.TimeoutError:
            self.fail("pool.map_async with chunksize stalled on null list")
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_async(self):
        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
        get = TimingWrapper(res.get)
        self.assertEqual(get(), 49)
        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_imap(self):
        it = self.pool.imap(sqr, range(10))
        self.assertEqual(list(it), map(sqr, range(10)))

        it = self.pool.imap(sqr, range(10))
        for i in range(10):
            self.assertEqual(it.next(), i*i)
        self.assertRaises(StopIteration, it.next)

        it = self.pool.imap(sqr, range(1000), chunksize=100)
        for i in range(1000):
            self.assertEqual(it.next(), i*i)
        self.assertRaises(StopIteration, it.next)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_imap_unordered(self):
        it = self.pool.imap_unordered(sqr, range(1000))
        self.assertEqual(sorted(it), map(sqr, range(1000)))

        it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
        self.assertEqual(sorted(it), map(sqr, range(1000)))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_unpickleable_result(self):
        from multiprocessing.pool import MaybeEncodingError
        p = multiprocessing.Pool(2)

        # Make sure we don't lose pool processes because of encoding errors.
        for iteration in range(20):
            res = p.apply_async(unpickleable_result)
            self.assertRaises(MaybeEncodingError, res.get)

        p.close()
        p.join()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_number_of_objects(self):
        EXPECTED_NUMBER = 1                # the pool object is still alive
        multiprocessing.active_children()  # discard dead process objs
        gc.collect()                       # do garbage collection
        refs = self.manager._number_of_objects()
        debug_info = self.manager._debug_info()
        if refs != EXPECTED_NUMBER:
            print self.manager._debug_info()
            print debug_info

        self.assertEqual(refs, EXPECTED_NUMBER)

#
# Test of creating a customized manager class
#
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_import(self):
        modules = [
            'multiprocessing', 'multiprocessing.connection',
            'multiprocessing.heap', 'multiprocessing.managers',
            'multiprocessing.pool', 'multiprocessing.process',
            'multiprocessing.synchronize', 'multiprocessing.util'
            ]

        if HAS_REDUCTION:
            modules.append('multiprocessing.reduction')

        if c_int is not None:
            # This module requires _ctypes
            modules.append('multiprocessing.sharedctypes')

        for name in modules:
            __import__(name)
            mod = sys.modules[name]

            for attr in getattr(mod, '__all__', ()):
                self.assertTrue(
                    hasattr(mod, attr),
                    '%r does not have attribute %r' % (mod, attr)
                    )

#
# Quick test that logging works -- does not test logging output
#
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def pool_in_process():
    pool = multiprocessing.Pool(processes=4)
    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_apply(self):
        papply = self.pool.apply
        self.assertEqual(papply(sqr, (5,)), sqr(5))
        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_map(self):
        pmap = self.pool.map
        self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
        self.assertEqual(pmap(sqr, range(100), chunksize=20),
                         map(sqr, range(100)))
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_map_unplicklable(self):
        # Issue #19425 -- failure to pickle should not cause a hang
        if self.TYPE == 'threads':
            self.skipTest('test not appropriate for {}'.format(self.TYPE))
        class A(object):
            def __reduce__(self):
                raise RuntimeError('cannot pickle')
        with self.assertRaises(RuntimeError):
            self.pool.map(sqr, [A()]*10)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_map_chunksize(self):
        try:
            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
        except multiprocessing.TimeoutError:
            self.fail("pool.map_async with chunksize stalled on null list")
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_async(self):
        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
        get = TimingWrapper(res.get)
        self.assertEqual(get(), 49)
        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_imap(self):
        it = self.pool.imap(sqr, range(10))
        self.assertEqual(list(it), map(sqr, range(10)))

        it = self.pool.imap(sqr, range(10))
        for i in range(10):
            self.assertEqual(it.next(), i*i)
        self.assertRaises(StopIteration, it.next)

        it = self.pool.imap(sqr, range(1000), chunksize=100)
        for i in range(1000):
            self.assertEqual(it.next(), i*i)
        self.assertRaises(StopIteration, it.next)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_imap_unordered(self):
        it = self.pool.imap_unordered(sqr, range(1000))
        self.assertEqual(sorted(it), map(sqr, range(1000)))

        it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
        self.assertEqual(sorted(it), map(sqr, range(1000)))
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_unpickleable_result(self):
        from multiprocessing.pool import MaybeEncodingError
        p = multiprocessing.Pool(2)

        # Make sure we don't lose pool processes because of encoding errors.
        for iteration in range(20):
            res = p.apply_async(unpickleable_result)
            self.assertRaises(MaybeEncodingError, res.get)

        p.close()
        p.join()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_number_of_objects(self):
        EXPECTED_NUMBER = 1                # the pool object is still alive
        multiprocessing.active_children()  # discard dead process objs
        gc.collect()                       # do garbage collection
        refs = self.manager._number_of_objects()
        debug_info = self.manager._debug_info()
        if refs != EXPECTED_NUMBER:
            print self.manager._debug_info()
            print debug_info

        self.assertEqual(refs, EXPECTED_NUMBER)

#
# Test of creating a customized manager class
#