Python multiprocessing 模块,dummy() 实例源码

我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用multiprocessing.dummy()

项目:nokia-deployer    作者:nokia    | 项目源码 | 文件源码
def concurrent_get_release_status(targets, timeout=4):
    """
    Args:
        target (list of tuples): a list of (host, target_path)
    """
    if len(targets) == 0:
        return []

    # workaround for http://bugs.python.org/issue7980
    import _strptime   # noqa

    pool = multiprocessing.dummy.Pool(min(20, len(targets)))

    def _inner_get_release_status(target):
        host, path = target
        return get_release_status(host, path, timeout)

    try:
        return pool.map(_inner_get_release_status, targets, chunksize=1)
    finally:
        pool.close()
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def dns_bulk_resolve(candidates, reverse=False, ip_version=None, threads=50):
    """
    Resolve a list of host names to IPs or, if reverse is true, IPs to
    host names.  Return a map of each result keyed to its candidate.

    WARNING: This function will create a pool of up to 'threads'
    threads.
    """

    # This is based loosely on http://stackoverflow.com/a/34377198

    if reverse and ip_version is not None:
        raise ValueError("Unable to force IP version when reverse-resolving")

    if ip_version is None:
        ip_version = 4
    __check_ip_version__(ip_version)

    result = {}

    if len(candidates) == 0:
        return result

    # Work around a bug in 2.6
    # TODO: Get rid of this when 2.6 is no longer in the picture.
    if not hasattr(threading.current_thread(), "_children"):
        threading.current_thread()._children = weakref.WeakKeyDictionary()

    pool = multiprocessing.dummy.Pool(
        processes=min(len(candidates), threads) )

    candidate_args = [ (candidate, ip_version) for candidate in candidates ]

    for ip, name in pool.imap(
        __reverser__ if reverse else __forwarder__,
        candidate_args,
        chunksize=1):
        result[ip] = name
    pool.close()
    return result
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def api_ping_list(hosts, bind=None, timeout=None, threads=10):
    """
    Ping a list of hosts and return a list of their statuses.
    """

    if len(hosts) == 0:
        return {}

    # Work around a bug in 2.6
    # TODO: Get rid of this when 2.6 is no longer in the picture.
    if not hasattr(threading.current_thread(), "_children"):
        threading.current_thread()._children = weakref.WeakKeyDictionary()

    pool = multiprocessing.dummy.Pool(processes=min(len(hosts), threads))

    pool_args = [(host, timeout) for host in hosts]
    result = {}

    def ping_one(arg):
        host, timeout = arg
        up, _ = api_ping(host, bind=bind, timeout=timeout)
        return (host, up)

    for host, state in pool.imap(
            ping_one,
            pool_args,
            chunksize=1):
        result[host] = state
    pool.close()
    return result
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def api_has_services(hosts, timeout=5, bind=None, threads=10):
    """
    Do a parallel rendition of the two functions above.

    Returns a hash of host names and results
    """

    # Work around a bug in 2.6
    # TODO: Get rid of this when 2.6 is no longer in the picture.
    if not hasattr(threading.current_thread(), "_children"):
        threading.current_thread()._children = weakref.WeakKeyDictionary()

    pool = multiprocessing.dummy.Pool(processes=min(len(hosts), threads))

    def check_one(arg):
        host, service, function = arg
        return (host, service, function(host, timeout=timeout, bind=bind))

    args = []
    result = {}
    for host in hosts:
        args.extend([
            (host, "bwctl", api_has_bwctl),
            (host, "pscheduler", api_has_pscheduler)
            ])
        result[host] = {
            "bwctl": None,
            "pscheduler": None
        }


    for host, service, state in pool.imap(check_one, args, chunksize=1):
        result[host][service] = state
    pool.close()
    return result
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def _all(func, hosts):
    '''
    Internal function that allow function to perform in all hosts
    '''
    all_instances = []
    # threads should likely scale with cores or interfaces
    cpus = multiprocessing.cpu_count()
    threads = 4 * cpus
    log.debug('multi._all cpus count={}, thread count={}'.format(cpus, threads))
    pool = multiprocessing.dummy.Pool(threads)
    for instance in pool.map(func, hosts):
        all_instances.append(instance)

    return all_instances
项目:mxbox    作者:Lyken17    | 项目源码 | 文件源码
def __init__(self, dataset, feedin_shape, collate_fn=default_collate, threads=1, shuffle=False):
        super(DataLoader, self).__init__()

        self.dataset = dataset
        self.threads = threads
        self.collate_fn = collate_fn(feedin_shape)
        # self.collate_fn = self.default_collate_fn

        # shape related variables

        self.data_shapes = feedin_shape['data']
        self.label_shapes = feedin_shape['label']
        self.batch_size = feedin_shape['batch_size']

        # loader related variables
        self.current = 0
        self.total = len(self.dataset)
        self.shuflle = shuffle
        self.map_index = list(range(self.total))

        # prepare for loading
        self.get_batch = self.get_batch_single_thread
        if self.threads > 1:  # multi process read
            from multiprocessing.dummy import Pool as ThreadPool
            # self.pool = multiprocessing.Pool(self.threads)
            self.pool = ThreadPool(self.threads)
            self.get_batch = self.get_batch_multi_thread

        self.reset()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_main(run=None):
    if sys.platform.startswith("linux"):
        try:
            lock = multiprocessing.RLock()
        except OSError:
            raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")

    check_enough_semaphores()

    if run is None:
        from test.test_support import run_unittest as run

    util.get_temp_dir()     # creates temp directory for use by all processes

    multiprocessing.get_logger().setLevel(LOG_LEVEL)

    ProcessesMixin.pool = multiprocessing.Pool(4)
    ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
    ManagerMixin.manager.__init__()
    ManagerMixin.manager.start()
    ManagerMixin.pool = ManagerMixin.manager.Pool(4)

    testcases = (
        sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
        sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
        sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
        testcases_other
        )

    loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
    suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
    # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
    # module during these tests is at least platform dependent and possibly
    # non-deterministic on any given platform. So we don't mind if the listed
    # warnings aren't actually raised.
    with test_support.check_py3k_warnings(
            (".+__(get|set)slice__ has been removed", DeprecationWarning),
            (r"sys.exc_clear\(\) not supported", DeprecationWarning),
            quiet=True):
        run(suite)

    ThreadsMixin.pool.terminate()
    ProcessesMixin.pool.terminate()
    ManagerMixin.pool.terminate()
    ManagerMixin.manager.shutdown()

    del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_main(run=None):
    if sys.platform.startswith("linux"):
        try:
            lock = multiprocessing.RLock()
        except OSError:
            raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")

    check_enough_semaphores()

    if run is None:
        from test.test_support import run_unittest as run

    util.get_temp_dir()     # creates temp directory for use by all processes

    multiprocessing.get_logger().setLevel(LOG_LEVEL)

    ProcessesMixin.pool = multiprocessing.Pool(4)
    ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
    ManagerMixin.manager.__init__()
    ManagerMixin.manager.start()
    ManagerMixin.pool = ManagerMixin.manager.Pool(4)

    testcases = (
        sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
        sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
        sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
        testcases_other
        )

    loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
    suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
    # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
    # module during these tests is at least platform dependent and possibly
    # non-deterministic on any given platform. So we don't mind if the listed
    # warnings aren't actually raised.
    with test_support.check_py3k_warnings(
            (".+__(get|set)slice__ has been removed", DeprecationWarning),
            (r"sys.exc_clear\(\) not supported", DeprecationWarning),
            quiet=True):
        run(suite)

    ThreadsMixin.pool.terminate()
    ProcessesMixin.pool.terminate()
    ManagerMixin.pool.terminate()
    ManagerMixin.manager.shutdown()

    del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool