Python multiprocessing 模块,get_context() 实例源码

我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用multiprocessing.get_context()

项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_set_get(self):
        multiprocessing.set_forkserver_preload(PRELOAD)
        count = 0
        old_method = multiprocessing.get_start_method()
        try:
            for method in ('fork', 'spawn', 'forkserver'):
                try:
                    multiprocessing.set_start_method(method, force=True)
                except ValueError:
                    continue
                self.assertEqual(multiprocessing.get_start_method(), method)
                ctx = multiprocessing.get_context()
                self.assertEqual(ctx.get_start_method(), method)
                self.assertTrue(type(ctx).__name__.lower().startswith(method))
                self.assertTrue(
                    ctx.Process.__name__.lower().startswith(method))
                self.check_context(multiprocessing)
                count += 1
        finally:
            multiprocessing.set_start_method(old_method, force=True)
        self.assertGreaterEqual(count, 1)
项目:mujoco-py    作者:openai    | 项目源码 | 文件源码
def test_multiprocess():
    '''
    Tests for importing mujoco_py from multiple processes.
    '''
    ctx = get_context('spawn')
    processes = []
    times = 3
    queue = ctx.Queue()
    for idx in range(3):
        processes.append(ctx.Process(target=import_process, args=(queue, )))
    for p in processes:
        p.start()
    for p in processes:
        p.join()
    for _ in range(times):
        assert queue.get(), "One of processes failed."
项目:face_recognition    作者:ageitgey    | 项目源码 | 文件源码
def process_images_in_process_pool(images_to_check, known_names, known_face_encodings, number_of_cpus, tolerance, show_distance):
    if number_of_cpus == -1:
        processes = None
    else:
        processes = number_of_cpus

    # macOS will crash due to a bug in libdispatch if you don't use 'forkserver'
    context = multiprocessing
    if "forkserver" in multiprocessing.get_all_start_methods():
        context = multiprocessing.get_context("forkserver")

    pool = context.Pool(processes=processes)

    function_parameters = zip(
        images_to_check,
        itertools.repeat(known_names),
        itertools.repeat(known_face_encodings),
        itertools.repeat(tolerance),
        itertools.repeat(show_distance)
    )

    pool.starmap(test_image, function_parameters)
项目:coriolis    作者:cloudbase    | 项目源码 | 文件源码
def _exec_task_process(self, ctxt, task_id, task_type, origin, destination,
                           instance, task_info):
        mp_ctx = multiprocessing.get_context('spawn')
        mp_q = mp_ctx.Queue()
        mp_log_q = mp_ctx.Queue()
        p = mp_ctx.Process(
            target=_task_process,
            args=(ctxt, task_id, task_type, origin, destination, instance,
                  task_info, mp_q, mp_log_q))

        p.start()
        LOG.info("Task process started: %s", task_id)
        self._rpc_conductor_client.set_task_host(
            ctxt, task_id, self._server, p.pid)

        self._handle_mp_log_events(p, mp_log_q)
        p.join()

        if mp_q.empty():
            raise exception.CoriolisException("Task canceled")
        result = mp_q.get(False)

        if isinstance(result, str):
            raise exception.TaskProcessException(result)
        return result
项目:pyrealtime    作者:ewhitmire    | 项目源码 | 文件源码
def get_output(self):
        ctx = multiprocessing.get_context('spawn')
        # out_queue = ctx.Queue()
        out_queue = utils.Queue(ctx=ctx) # .get_context()
        self.out_queues.append(out_queue)
        return out_queue
项目:pyrealtime    作者:ewhitmire    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        ctx = multiprocessing.get_context('spawn')
        self.process = ctx.Process(target=self.run_proc)
        self.thread_layers = []
        LayerManager.session().add_layer(self)
项目:pyrealtime    作者:ewhitmire    | 项目源码 | 文件源码
def __init__(self):
            # multiprocessing.set_start_method('spawn')
            self.layers = {}
            self.stop_event = multiprocessing.get_context('spawn').Event()
            self.input_prompts = multiprocessing.get_context('spawn').Queue()
            self.show_monitor = False
项目:pyrealtime    作者:ewhitmire    | 项目源码 | 文件源码
def reset(self):
            self.layers = {}
            self.stop_event = multiprocessing.get_context('spawn').Event()
            self.input_prompts = multiprocessing.get_context('spawn').Queue()
项目:Url    作者:beiruan    | 项目源码 | 文件源码
def Queue(maxsize=0):
            return MultiProcessingQueue(maxsize, ctx=multiprocessing.get_context())
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def __init__(self, maxsize):
        multiprocessing.queues.Queue.__init__(self, maxsize, ctx=multiprocessing.get_context())
        self.exit = False
        bb.utils.set_process_name("ProcessEQueue")
项目:tasker    作者:wavenator    | 项目源码 | 文件源码
def main():
    process_pool_context = multiprocessing.get_context('spawn')
    pool = multiprocessing.pool.Pool(
        processes=2,
        context=process_pool_context,
    )

    multiprocessing_manager = multiprocessing.Manager()
    multiprocessing_queue = multiprocessing_manager.Queue(
        maxsize=test_queue_size,
    )

    start = time.time()
    for i in range(test_queue_size):
        multiprocessing_queue.put(b'1')
    end = time.time()

    print('queue INSERTION:')
    print(end-start)

    pool.apply(func=consume_queue, args=(multiprocessing_queue,), kwds={})

    regular_queue = queue.Queue()
    start = time.time()
    for i in range(test_queue_size):
        regular_queue.put(b'1')
    end = time.time()

    print('queue INSERTION:')
    print(end-start)
    consume_queue(regular_queue)
项目:sockeye    作者:awslabs    | 项目源码 | 文件源码
def __init__(self,
                 batch_size: int,
                 output_folder: str,
                 optimized_metric: str = C.PERPLEXITY,
                 use_tensorboard: bool = False,
                 cp_decoder: Optional[checkpoint_decoder.CheckpointDecoder] = None) -> None:
        self.output_folder = output_folder
        # stores dicts of metric names & values for each checkpoint
        self.metrics = []  # type: List[Dict]
        self.metrics_filename = os.path.join(output_folder, C.METRICS_NAME)
        self.best_checkpoint = 0
        self.start_tic = time.time()
        self.summary_writer = None
        if use_tensorboard:
            import tensorboard  # pylint: disable=import-error
            log_dir = os.path.join(output_folder, C.TENSORBOARD_NAME)
            if os.path.exists(log_dir):
                logger.info("Deleting existing tensorboard log dir %s", log_dir)
                shutil.rmtree(log_dir)
            logger.info("Logging training events for Tensorboard at '%s'", log_dir)
            self.summary_writer = tensorboard.FileWriter(log_dir)
        self.cp_decoder = cp_decoder
        self.ctx = mp.get_context('spawn')  # type: ignore
        self.decoder_metric_queue = self.ctx.Queue()
        self.decoder_process = None  # type: Optional[mp.Process]
        utils.check_condition(optimized_metric in C.METRICS, "Unsupported metric: %s" % optimized_metric)
        if optimized_metric == C.BLEU:
            utils.check_condition(self.cp_decoder is not None, "%s requires CheckpointDecoder" % C.BLEU)
        self.optimized_metric = optimized_metric
        self.validation_best = C.METRIC_WORST[self.optimized_metric]
        logger.info("Early stopping by optimizing '%s'", self.optimized_metric)
        self.tic = 0
项目:pysimgrid    作者:alexmnazarenko    | 项目源码 | 文件源码
def get_schedule(self, simulation):
    """
    Overriden.
    """
    nxgraph = simulation.get_task_graph()
    platform_model = cscheduling.PlatformModel(simulation)
    state = cscheduling.SchedulerState(simulation)

    ordered_tasks = cscheduling.heft_order(nxgraph, platform_model)

    subgraph = networkx.DiGraph()

    # fork context is incompatible with SimGrid static variables
    ctx = multiprocessing.get_context("spawn")
    for task in ordered_tasks:
      _update_subgraph(nxgraph, subgraph, task)
      if cscheduling.try_schedule_boundary_task(task, platform_model, state):
        continue
      current_min = cscheduling.MinSelector()
      for host, timesheet in state.timetable.items():
        if cscheduling.is_master_host(host):
          continue
        current_state = state.copy()
        est = platform_model.est(host, nxgraph.pred[task], current_state)
        eet = platform_model.eet(task, host)
        # 'correct' way
        pos, start, finish = cscheduling.timesheet_insertion(timesheet, est, eet)
        # TODO: try aggressive inserts
        current_state.update(task, host, pos, start, finish)
        with tempfile.NamedTemporaryFile("w", suffix=".dot") as temp_file:
          _serialize_graph(subgraph, temp_file)
          subschedule = _serialize_schedule(current_state.timetable)
          with ctx.Pool(1) as process:
            serialized_state = process.apply(_run_simulation, (simulation.platform_path, temp_file.name, subschedule))
          current_state = _restore_state(simulation, serialized_state)
          current_min.update((current_state.max_time, host.speed, host.name), current_state)
      state = current_min.value
    expected_makespan = max([state["ect"] for state in state.task_states.values()])
    return state.schedule, expected_makespan
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_context(self):
        for method in ('fork', 'spawn', 'forkserver'):
            try:
                ctx = multiprocessing.get_context(method)
            except ValueError:
                continue
            self.assertEqual(ctx.get_start_method(), method)
            self.assertIs(ctx.get_context(), ctx)
            self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
            self.assertRaises(ValueError, ctx.set_start_method, None)
            self.check_context(ctx)
项目:stanford-osrl    作者:ctmakro    | 项目源码 | 文件源码
def __init__(self, f):
        ctx = mp.get_context('spawn') # eliminate problems with fork().
        pq,cq = ctx.Queue(1), ctx.Queue(1)
        self.pc, self.cc = conn_master(pq, cq), conn_slave(cq,pq)

        self.p = ctx.Process(target=f, args=(self.cc,), daemon=True)
        self.pretty('starting process')
        self.p.start()
项目:CheckPy    作者:Jelleas    | 项目源码 | 文件源码
def _runTests(moduleName, fileName, debugMode = False):
    if sys.version_info[:2] >= (3,4):
        ctx = multiprocessing.get_context("spawn")
    else:
        ctx = multiprocessing

    signalQueue = ctx.Queue()
    resultQueue = ctx.Queue()
    tester = _Tester(moduleName, fileName, debugMode, signalQueue, resultQueue)
    p = ctx.Process(target=tester.run, name="Tester")
    p.start()

    start = time.time()
    isTiming = False

    while p.is_alive():
        while not signalQueue.empty():
            signal = signalQueue.get()
            isTiming = signal.isTiming
            description = signal.description
            timeout = signal.timeout
            if signal.resetTimer:
                start = time.time()

        if isTiming and time.time() - start > timeout:
            result = TesterResult()
            result.addOutput(printer.displayError("Timeout ({} seconds) reached during: {}".format(timeout, description)))
            p.terminate()
            p.join()
            return result

        if not resultQueue.empty():
            p.terminate()
            p.join()
            break

        time.sleep(0.1)

    if not resultQueue.empty():
        return resultQueue.get()
项目:sunvox-dll-python    作者:metrasynth    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self._ctx = mp.get_context('spawn')
        self._conn, child_conn = mp.Pipe()
        self._lock = Lock()
        args = (child_conn,) + args
        self._process = self._ctx.Process(
            target=self.processor_class, args=args, kwargs=kwargs)
        self._process.start()
项目:snovault    作者:ENCODE-DCC    | 项目源码 | 文件源码
def pool(self):
        return Pool(
            processes=self.processes,
            initializer=initializer,
            initargs=self.initargs,
            maxtasksperchild=self.maxtasks,
            context=get_context('forkserver'),
        )
项目:tasker    作者:wavenator    | 项目源码 | 文件源码
def main():
    process_pool_context = multiprocessing.get_context('spawn')
    pool = multiprocessing.pool.Pool(
        processes=4,
        context=process_pool_context,
    )
    pool.apply_async(
        func=zmq_streamer,
    )

    multiprocessing_manager = multiprocessing.Manager()
    multiprocessing_queue = multiprocessing_manager.Queue(
        maxsize=test_queue_size,
    )
    for i in range(test_queue_size):
        multiprocessing_queue.put(b'1')

    res = pool.apply_async(
        func=consume_queue,
        args=(multiprocessing_queue,),
    )
    res.get()


    context = zmq.Context()
    socket = context.socket(zmq.PAIR)
    res = pool.apply_async(
        func=consume_zmq_pair,
    )
    time.sleep(1)
    socket.connect("tcp://localhost:%s" % zmq_port)
    for i in range(test_queue_size):
        socket.send(b'1')
    res.get()
    socket.close()


    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    res = pool.apply_async(
        func=consume_zmq_streamer,
    )
    time.sleep(1)
    socket.connect("tcp://localhost:%s" % zmq_queue_port_pull)
    for i in range(test_queue_size):
        socket.send(b'1')
    res.wait()
    socket.close()
项目:snovault    作者:ENCODE-DCC    | 项目源码 | 文件源码
def run(config_uri, app_name=None, username=None, types=(), batch_size=500, processes=None):
    # multiprocessing.get_context is Python 3 only.
    from multiprocessing import get_context
    from multiprocessing.pool import Pool

    # Loading app will have configured from config file. Reconfigure here:
    logging.getLogger('snovault').setLevel(logging.DEBUG)

    testapp = internal_app(config_uri, app_name, username)
    connection = testapp.app.registry[CONNECTION]
    uuids = [str(uuid) for uuid in connection.__iter__(*types)]
    transaction.abort()
    logger.info('Total items: %d' % len(uuids))

    pool = Pool(
        processes=processes,
        initializer=initializer,
        initargs=(config_uri, app_name, username),
        context=get_context('forkserver'),
    )

    all_results = []
    try:
        for result in pool.imap_unordered(worker, batched(uuids, batch_size), chunksize=1):
            results = result['results']
            errors = sum(error for item_type, path, update, error in results)
            updated = sum(update for item_type, path, update, error in results)
            logger.info('Batch: Updated %d of %d (errors %d)' %
                        (updated, len(results), errors))
            all_results.extend(results)
    finally:
        pool.terminate()
        pool.join()

    def result_item_type(result):
        # Ensure we always return a string
        return result[0] or ''

    for item_type, results in itertools.groupby(
            sorted(all_results, key=result_item_type), key=result_item_type):
        results = list(results)
        errors = sum(error for item_type, path, update, error in results)
        updated = sum(update for item_type, path, update, error in results)
        logger.info('Collection %s: Updated %d of %d (errors %d)' %
                    (item_type, updated, len(results), errors))