Python multiprocessing.pool 模块,map() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用multiprocessing.pool.map()

项目:planetplanet    作者:rodluger    | 项目源码 | 文件源码
def map(self, func, iterable, chunksize=None):
        """
        Equivalent of `map()` built-in, without swallowing
        `KeyboardInterrupt`.

        :param func:
            The function to apply to the items.

        :param iterable:
            An iterable of items that will have `func` applied to them.

        """
        # The key magic is that we must call r.get() with a timeout, because
        # a Condition.wait() without a timeout swallows KeyboardInterrupts.
        r = self.map_async(func, iterable, chunksize)

        while True:
            try:
                return r.get(self.wait_timeout)
            except multiprocessing.TimeoutError:
                pass
            except KeyboardInterrupt:
                self.terminate()
                self.join()
                raise
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_imap_unordered_handle_iterable_exception(self):
        if self.TYPE == 'manager':
            self.skipTest('test not appropriate for {}'.format(self.TYPE))

        it = self.pool.imap_unordered(sqr,
                                      exception_throwing_generator(10, 3),
                                      1)
        expected_values = map(sqr, range(10))
        with self.assertRaises(SayWhenError):
            # imap_unordered makes it difficult to anticipate the SayWhenError
            for i in range(10):
                value = next(it)
                self.assertIn(value, expected_values)
                expected_values.remove(value)

        it = self.pool.imap_unordered(sqr,
                                      exception_throwing_generator(20, 7),
                                      2)
        expected_values = map(sqr, range(20))
        with self.assertRaises(SayWhenError):
            for i in range(20):
                value = next(it)
                self.assertIn(value, expected_values)
                expected_values.remove(value)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_imap_unordered_handle_iterable_exception(self):
        if self.TYPE == 'manager':
            self.skipTest('test not appropriate for {}'.format(self.TYPE))

        it = self.pool.imap_unordered(sqr,
                                      exception_throwing_generator(10, 3),
                                      1)
        expected_values = map(sqr, range(10))
        with self.assertRaises(SayWhenError):
            # imap_unordered makes it difficult to anticipate the SayWhenError
            for i in range(10):
                value = next(it)
                self.assertIn(value, expected_values)
                expected_values.remove(value)

        it = self.pool.imap_unordered(sqr,
                                      exception_throwing_generator(20, 7),
                                      2)
        expected_values = map(sqr, range(20))
        with self.assertRaises(SayWhenError):
            for i in range(20):
                value = next(it)
                self.assertIn(value, expected_values)
                expected_values.remove(value)
项目:mist.io-api-examples    作者:mistio    | 项目源码 | 文件源码
def list_all_machines(cloud_ids, headers):
    "Given the cloud ids, runs in parallel queries to get all machines"
    def list_one_cloud(cloud_id):
        cloud_machines = requests.get('https://mist.io/clouds/%s/machines' % cloud_id, headers=headers)
        if cloud_machines.status_code == 200:
            machines = cloud_machines.json()
            for machine in machines:
                machine['cloud'] = cloud_id
            return machines
        return []

    pool = multiprocessing.pool.ThreadPool(8)
    results = pool.map(list_one_cloud, cloud_ids)
    pool.terminate()

    machines = []
    for result in results:
        machines.extend(result)
    return machines
项目:planetplanet    作者:rodluger    | 项目源码 | 文件源码
def map(self, *args, **kwargs):
        '''

        '''

        return NotImplementedError('Method ``map`` must be called from subclasses.')
项目:planetplanet    作者:rodluger    | 项目源码 | 文件源码
def map(self, function, iterable):
        '''

        '''

        return list(map(function, iterable))
项目:nojs    作者:chrisdickinson    | 项目源码 | 文件源码
def _ConvertToWebP(webp_binary, png_files):
  pool = multiprocessing.pool.ThreadPool(10)
  def convert_image(png_path):
    root = os.path.splitext(png_path)[0]
    webp_path = root + '.webp'
    args = [webp_binary, png_path] + _PNG_TO_WEBP_ARGS + [webp_path]
    subprocess.check_call(args)
    os.remove(png_path)
  # Android requires pngs for 9-patch images.
  pool.map(convert_image, [f for f in png_files if not f.endswith('.9.png')])
  pool.close()
  pool.join()
项目:chromium-build    作者:discordapp    | 项目源码 | 文件源码
def _ConvertToWebP(webp_binary, png_files):
  pool = multiprocessing.pool.ThreadPool(10)
  def convert_image(png_path):
    root = os.path.splitext(png_path)[0]
    webp_path = root + '.webp'
    args = [webp_binary, png_path] + _PNG_TO_WEBP_ARGS + [webp_path]
    subprocess.check_call(args)
    os.remove(png_path)
  # Android requires pngs for 9-patch images.
  pool.map(convert_image, [f for f in png_files if not f.endswith('.9.png')])
  pool.close()
  pool.join()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_map(self):
        pmap = self.pool.map
        self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
        self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
                         list(map(sqr, list(range(100)))))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_imap(self):
        it = self.pool.imap(sqr, list(range(10)))
        self.assertEqual(list(it), list(map(sqr, list(range(10)))))

        it = self.pool.imap(sqr, list(range(10)))
        for i in range(10):
            self.assertEqual(next(it), i*i)
        self.assertRaises(StopIteration, it.__next__)

        it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
        for i in range(1000):
            self.assertEqual(next(it), i*i)
        self.assertRaises(StopIteration, it.__next__)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_imap_unordered(self):
        it = self.pool.imap_unordered(sqr, list(range(1000)))
        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))

        it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def pool_in_process():
    pool = multiprocessing.Pool(processes=4)
    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_map(self):
        pmap = self.pool.map
        self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
        self.assertEqual(pmap(sqr, range(100), chunksize=20),
                         map(sqr, range(100)))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_map_unplicklable(self):
        # Issue #19425 -- failure to pickle should not cause a hang
        if self.TYPE == 'threads':
            self.skipTest('test not appropriate for {}'.format(self.TYPE))
        class A(object):
            def __reduce__(self):
                raise RuntimeError('cannot pickle')
        with self.assertRaises(RuntimeError):
            self.pool.map(sqr, [A()]*10)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_imap(self):
        it = self.pool.imap(sqr, range(10))
        self.assertEqual(list(it), map(sqr, range(10)))

        it = self.pool.imap(sqr, range(10))
        for i in range(10):
            self.assertEqual(it.next(), i*i)
        self.assertRaises(StopIteration, it.next)

        it = self.pool.imap(sqr, range(1000), chunksize=100)
        for i in range(1000):
            self.assertEqual(it.next(), i*i)
        self.assertRaises(StopIteration, it.next)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_imap_unordered(self):
        it = self.pool.imap_unordered(sqr, range(1000))
        self.assertEqual(sorted(it), map(sqr, range(1000)))

        it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
        self.assertEqual(sorted(it), map(sqr, range(1000)))
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def pool_in_process():
    pool = multiprocessing.Pool(processes=4)
    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_map(self):
        pmap = self.pool.map
        self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
        self.assertEqual(pmap(sqr, range(100), chunksize=20),
                         map(sqr, range(100)))
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_map_unplicklable(self):
        # Issue #19425 -- failure to pickle should not cause a hang
        if self.TYPE == 'threads':
            self.skipTest('test not appropriate for {}'.format(self.TYPE))
        class A(object):
            def __reduce__(self):
                raise RuntimeError('cannot pickle')
        with self.assertRaises(RuntimeError):
            self.pool.map(sqr, [A()]*10)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_imap(self):
        it = self.pool.imap(sqr, range(10))
        self.assertEqual(list(it), map(sqr, range(10)))

        it = self.pool.imap(sqr, range(10))
        for i in range(10):
            self.assertEqual(it.next(), i*i)
        self.assertRaises(StopIteration, it.next)

        it = self.pool.imap(sqr, range(1000), chunksize=100)
        for i in range(1000):
            self.assertEqual(it.next(), i*i)
        self.assertRaises(StopIteration, it.next)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_imap_unordered(self):
        it = self.pool.imap_unordered(sqr, range(1000))
        self.assertEqual(sorted(it), map(sqr, range(1000)))

        it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
        self.assertEqual(sorted(it), map(sqr, range(1000)))
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def pool_in_process():
    pool = multiprocessing.Pool(processes=4)
    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_map(self):
        pmap = self.pool.map
        self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
        self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
                         list(map(sqr, list(range(100)))))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_map_async(self):
        self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
                         list(map(sqr, list(range(10)))))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_imap(self):
        it = self.pool.imap(sqr, list(range(10)))
        self.assertEqual(list(it), list(map(sqr, list(range(10)))))

        it = self.pool.imap(sqr, list(range(10)))
        for i in range(10):
            self.assertEqual(next(it), i*i)
        self.assertRaises(StopIteration, it.__next__)

        it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
        for i in range(1000):
            self.assertEqual(next(it), i*i)
        self.assertRaises(StopIteration, it.__next__)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_imap_unordered(self):
        it = self.pool.imap_unordered(sqr, list(range(1000)))
        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))

        it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_empty_iterable(self):
        # See Issue 12157
        p = self.Pool(1)

        self.assertEqual(p.map(sqr, []), [])
        self.assertEqual(list(p.imap(sqr, [])), [])
        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
        self.assertEqual(p.map_async(sqr, []).get(), [])

        p.close()
        p.join()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_map(self):
        pmap = self.pool.map
        self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
        self.assertEqual(pmap(sqr, range(100), chunksize=20),
                         map(sqr, range(100)))
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_map_unplicklable(self):
        # Issue #19425 -- failure to pickle should not cause a hang
        if self.TYPE == 'threads':
            self.skipTest('test not appropriate for {}'.format(self.TYPE))
        class A(object):
            def __reduce__(self):
                raise RuntimeError('cannot pickle')
        with self.assertRaises(RuntimeError):
            self.pool.map(sqr, [A()]*10)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_imap(self):
        it = self.pool.imap(sqr, range(10))
        self.assertEqual(list(it), map(sqr, range(10)))

        it = self.pool.imap(sqr, range(10))
        for i in range(10):
            self.assertEqual(it.next(), i*i)
        self.assertRaises(StopIteration, it.next)

        it = self.pool.imap(sqr, range(1000), chunksize=100)
        for i in range(1000):
            self.assertEqual(it.next(), i*i)
        self.assertRaises(StopIteration, it.next)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_imap_unordered(self):
        it = self.pool.imap_unordered(sqr, range(1000))
        self.assertEqual(sorted(it), map(sqr, range(1000)))

        it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
        self.assertEqual(sorted(it), map(sqr, range(1000)))
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_empty_iterable(self):
        # See Issue 12157
        p = self.Pool(1)

        self.assertEqual(p.map(sqr, []), [])
        self.assertEqual(list(p.imap(sqr, [])), [])
        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
        self.assertEqual(p.map_async(sqr, []).get(), [])

        p.close()
        p.join()
项目:pencroft    作者:lukeyeager    | 项目源码 | 文件源码
def benchmark(path, threads=1, thread_library='threading'):
    print('%20s: %s' % ('File', path))
    print('%20s: %d' % ('Threads', threads))
    if threads > 1:
        print('%20s: %s' % ('Library', thread_library))

    if threads > 1:
        if thread_library == 'threading':
            pool = multiprocessing.pool.ThreadPool(threads)
        elif thread_library == 'multiprocessing':
            pool = multiprocessing.pool.Pool(threads)

    with _Timer('Total'):
        with _Timer('Initialization'):
            loader = pencroft.open(path)
            if threads > 1:
                if thread_library == 'threading':
                    loader.make_thread_safe()
                elif thread_library == 'multiprocessing':
                    loader.make_mp_safe()

        keys = loader.keys()
        random.shuffle(keys)

        with _Timer('Read data'):
            if threads > 1:
                pool.map(loader.get, keys)
            else:
                for key in keys:
                    loader.get(key)
    print('')
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_map(self):
        pmap = self.pool.map
        self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
        self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
                         list(map(sqr, list(range(100)))))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_map_async(self):
        self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
                         list(map(sqr, list(range(10)))))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_map_unplicklable(self):
        # Issue #19425 -- failure to pickle should not cause a hang
        if self.TYPE == 'threads':
            self.skipTest('test not appropriate for {}'.format(self.TYPE))
        class A(object):
            def __reduce__(self):
                raise RuntimeError('cannot pickle')
        with self.assertRaises(RuntimeError):
            self.pool.map(sqr, [A()]*10)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_imap(self):
        it = self.pool.imap(sqr, list(range(10)))
        self.assertEqual(list(it), list(map(sqr, list(range(10)))))

        it = self.pool.imap(sqr, list(range(10)))
        for i in range(10):
            self.assertEqual(next(it), i*i)
        self.assertRaises(StopIteration, it.__next__)

        it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
        for i in range(1000):
            self.assertEqual(next(it), i*i)
        self.assertRaises(StopIteration, it.__next__)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_imap_unordered(self):
        it = self.pool.imap_unordered(sqr, list(range(1000)))
        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))

        it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_empty_iterable(self):
        # See Issue 12157
        p = self.Pool(1)

        self.assertEqual(p.map(sqr, []), [])
        self.assertEqual(list(p.imap(sqr, [])), [])
        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
        self.assertEqual(p.map_async(sqr, []).get(), [])

        p.close()
        p.join()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def pool_in_process():
    pool = multiprocessing.Pool(processes=4)
    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
    pool.close()
    pool.join()
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_map(self):
        pmap = self.pool.map
        self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
        self.assertEqual(pmap(sqr, range(100), chunksize=20),
                         map(sqr, range(100)))
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_imap(self):
        it = self.pool.imap(sqr, range(10))
        self.assertEqual(list(it), map(sqr, range(10)))

        it = self.pool.imap(sqr, range(10))
        for i in range(10):
            self.assertEqual(it.next(), i*i)
        self.assertRaises(StopIteration, it.next)

        it = self.pool.imap(sqr, range(1000), chunksize=100)
        for i in range(1000):
            self.assertEqual(it.next(), i*i)
        self.assertRaises(StopIteration, it.next)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_imap_unordered(self):
        it = self.pool.imap_unordered(sqr, range(1000))
        self.assertEqual(sorted(it), map(sqr, range(1000)))

        it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
        self.assertEqual(sorted(it), map(sqr, range(1000)))
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_empty_iterable(self):
        # See Issue 12157
        p = self.Pool(1)

        self.assertEqual(p.map(sqr, []), [])
        self.assertEqual(list(p.imap(sqr, [])), [])
        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
        self.assertEqual(p.map_async(sqr, []).get(), [])

        p.close()
        p.join()
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def pool_in_process():
    pool = multiprocessing.Pool(processes=4)
    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_map(self):
        pmap = self.pool.map
        self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
        self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
                         list(map(sqr, list(range(100)))))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_map_async(self):
        self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
                         list(map(sqr, list(range(10)))))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_map_unplicklable(self):
        # Issue #19425 -- failure to pickle should not cause a hang
        if self.TYPE == 'threads':
            self.skipTest('test not appropriate for {}'.format(self.TYPE))
        class A(object):
            def __reduce__(self):
                raise RuntimeError('cannot pickle')
        with self.assertRaises(RuntimeError):
            self.pool.map(sqr, [A()]*10)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_imap(self):
        it = self.pool.imap(sqr, list(range(10)))
        self.assertEqual(list(it), list(map(sqr, list(range(10)))))

        it = self.pool.imap(sqr, list(range(10)))
        for i in range(10):
            self.assertEqual(next(it), i*i)
        self.assertRaises(StopIteration, it.__next__)

        it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
        for i in range(1000):
            self.assertEqual(next(it), i*i)
        self.assertRaises(StopIteration, it.__next__)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_imap_unordered(self):
        it = self.pool.imap_unordered(sqr, list(range(1000)))
        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))

        it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))