Python gevent.pool 模块,Group() 实例源码

我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用gevent.pool.Group()

项目:core-python    作者:yidao620c    | 项目源码 | 文件源码
def test_group(self):
        def talk(msg):
            for i in xrange(3):
                print(msg)

        g1 = gevent.spawn(talk, 'bar')
        g2 = gevent.spawn(talk, 'foo')
        g3 = gevent.spawn(talk, 'fizz')

        group = Group()
        group.add(g1)
        group.add(g2)
        group.join()

        group.add(g3)
        group.join()
项目:multi-cloud-control    作者:robertpeteuil    | 项目源码 | 文件源码
def get_conns(cred, providers):
    """Collect node data asynchronously using gevent lib."""
    cld_svc_map = {"aws": conn_aws,
                   "azure": conn_az,
                   "gcp": conn_gcp}
    sys.stdout.write("\rEstablishing Connections:  ")
    sys.stdout.flush()
    busy_obj = busy_disp_on()
    conn_fn = [[cld_svc_map[x.rstrip('1234567890')], cred[x], x]
               for x in providers]
    cgroup = Group()
    conn_res = []
    conn_res = cgroup.map(get_conn, conn_fn)
    cgroup.join()
    conn_objs = {}
    for item in conn_res:
        conn_objs.update(item)
    busy_disp_off(dobj=busy_obj)
    sys.stdout.write("\r                                                 \r")
    sys.stdout.write("\033[?25h")  # cursor back on
    sys.stdout.flush()
    return conn_objs
项目:multi-cloud-control    作者:robertpeteuil    | 项目源码 | 文件源码
def get_data(conn_objs, providers):
    """Refresh node data using previous connection-objects."""
    cld_svc_map = {"aws": nodes_aws,
                   "azure": nodes_az,
                   "gcp": nodes_gcp}
    sys.stdout.write("\rCollecting Info:  ")
    sys.stdout.flush()
    busy_obj = busy_disp_on()
    collec_fn = [[cld_svc_map[x.rstrip('1234567890')], conn_objs[x]]
                 for x in providers]
    ngroup = Group()
    node_list = []
    node_list = ngroup.map(get_nodes, collec_fn)
    ngroup.join()
    busy_disp_off(dobj=busy_obj)
    sys.stdout.write("\r                                                 \r")
    sys.stdout.write("\033[?25h")  # cursor back on
    sys.stdout.flush()
    return node_list
项目:aiolocust    作者:kpidata    | 项目源码 | 文件源码
def __init__(self, locust_classes, options):
        self.locust_classes = locust_classes
        self.hatch_rate = options.hatch_rate
        self.num_clients = options.num_clients
        self.num_requests = options.num_requests
        self.host = options.host
        self.locusts = Group()
        self.state = STATE_INIT
        self.hatching_greenlet = None
        self.exceptions = {}
        self.stats = global_stats

        # register listener that resets stats when hatching is complete
        def on_hatch_complete(user_count):
            self.state = STATE_RUNNING
            logger.info("Resetting stats\n")
            self.stats.reset_all()
        events.hatch_complete += on_hatch_complete
项目:aiolocust    作者:kpidata    | 项目源码 | 文件源码
def kill_locusts(self, kill_count):
        """
        Kill a kill_count of weighted locusts from the Group() object in self.locusts
        """
        bucket = self.weight_locusts(kill_count)
        kill_count = len(bucket)
        self.num_clients -= kill_count
        logger.info("Killing %i locusts" % kill_count)
        dying = []
        for g in self.locusts:
            for l in bucket:
                if l == g.args[0]:
                    dying.append(g)
                    bucket.remove(l)
                    break
        for g in dying:
            self.locusts.killone(g)
        events.hatch_complete.fire(user_count=self.num_clients)
项目:pipeline    作者:alexlemann    | 项目源码 | 文件源码
def pipeline(stages, initial_data):
    monitors = Group()
    # Make sure items in initial_data are iterable.
    if not isinstance(initial_data, types.GeneratorType):
        try:
            iter(initial_data)
        except:
            raise TypeError('initial_data must be iterable')
    # The StopIteration will bubble through the queues as it is reached.
    #   Once a stage monitor sees it, it indicates that the stage will read
    #   no more data and the monitor can wait for the current work to complete
    #   and clean up.
    if hasattr(initial_data, 'append'):
        initial_data.append(StopIteration)
    if not stages:
        return PipelineResult(monitors, [])
    # chain stage queue io
    #  Each stage shares an output queue with the next stage's input.
    qs = [initial_data] + [Queue() for _ in range(len(stages))]
    for stage, in_q, out_q in zip(stages, qs[:-1], qs[1:]):
        stage.in_q = in_q
        stage.out_q = out_q
        monitors.spawn(stage_monitor, stage)
    gevent.sleep(0)
    return PipelineResult(monitors, stages[-1].out_q)
项目:core-python    作者:yidao620c    | 项目源码 | 文件源码
def gevent_click_page():
    global TRY_COUNT
    TRY_COUNT = int(sys.argv[1])

    _log.info('????????...')
    # ????????
    driver = webdriver.PhantomJS()
    driver.get('https://www.xncoding.com/archives/')
    # driver.maximize_window()
    posts_count = len(driver.find_elements_by_xpath(
        '//article/header/h1[@class="post-title"]/a[@class="post-title-link"]'))
    driver.close()
    # gevent?pool??
    psize = posts_count / THREAD_COUNT
    _log.info('???????:{}, ??????????:{}'.format(posts_count, psize))
    group = Group()
    for i in range(0, THREAD_COUNT + 1):
        group.add(gevent.spawn(_click_page, posts_count, psize, i))
    group.join()

    _log.info('????...')
项目:onedrop    作者:seraphln    | 项目源码 | 文件源码
def __init__(self, callback=None, ttype=None, source=None):
        """ ??????? """
        self.group = Group()
        self.task_queue = Queue()
        self.task_type = ttype
        self.cb = callback
        self.source = source
        self.task_name = "%s-%s-%s" % (socket.gethostname(),
                                       self.source,
                                       self.task_type)
项目:aiolocust    作者:kpidata    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        super(MasterLocustRunner, self).__init__(*args, **kwargs)

        class SlaveNodesDict(dict):
            def get_by_state(self, state):
                return [c for c in self.values() if c.state == state]

            @property
            def ready(self):
                return self.get_by_state(STATE_INIT)

            @property
            def hatching(self):
                return self.get_by_state(STATE_HATCHING)

            @property
            def running(self):
                return self.get_by_state(STATE_RUNNING)

        self.clients = SlaveNodesDict()
        self.server = rpc.Server(self.master_bind_host, self.master_bind_port)
        self.greenlet = Group()
        self.greenlet.spawn(self.client_listener).link_exception(callback=self.noop)

        # listener that gathers info on how many locust users the slaves has spawned
        def on_slave_report(client_id, data):
            if client_id not in self.clients:
                logger.info("Discarded report from unrecognized slave %s", client_id)
                return

            self.clients[client_id].user_count = data["user_count"]
        events.slave_report += on_slave_report

        # register listener that sends quit message to slave nodes
        def on_quitting():
            self.quit()
        events.quitting += on_quitting
项目:aiolocust    作者:kpidata    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        super(SlaveLocustRunner, self).__init__(*args, **kwargs)
        self.client_id = socket.gethostname() + "_" + md5(str(time() + random.randint(0, 10000)).encode('utf-8')).hexdigest()

        self.client = rpc.Client(self.master_host, self.master_port)
        self.greenlet = Group()

        self.greenlet.spawn(self.worker).link_exception(callback=self.noop)
        self.client.send(Message("client_ready", None, self.client_id))
        self.greenlet.spawn(self.stats_reporter).link_exception(callback=self.noop)

        # register listener for when all locust users have hatched, and report it to the master node
        def on_hatch_complete(user_count):
            self.client.send(Message("hatch_complete", {"count":user_count}, self.client_id))
        events.hatch_complete += on_hatch_complete

        # register listener that adds the current number of spawned locusts to the report that is sent to the master node 
        def on_report_to_master(client_id, data):
            data["user_count"] = self.user_count
        events.report_to_master += on_report_to_master

        # register listener that sends quit message to master
        def on_quitting():
            self.client.send(Message("quit", None, self.client_id))
        events.quitting += on_quitting

        # register listener thats sends locust exceptions to master
        def on_locust_error(locust_instance, exception, tb):
            formatted_tb = "".join(traceback.format_tb(tb))
            self.client.send(Message("exception", {"msg" : str(exception), "traceback" : formatted_tb}, self.client_id))
        events.locust_error += on_locust_error
项目:pymqant    作者:liangdas    | 项目源码 | 文件源码
def __init__(self):
        self.mods=[]
        self.runMods=[]
        self.group = Group()
        self.app=None
        self.ProcessID=None
项目:server    作者:happypandax    | 项目源码 | 文件源码
def __init__(self, name, group=pool.Group()):
        self.name = name
        self._commands = {}  # cmd_id : command
        self._greenlets = {}  # cmd_id : greenlet
        self._decorators = {}  # cmd_id : callable
        self._group = group
        self._queue = queue.Queue()
        self.services.append(weakref.ref(self))
项目:pipeline    作者:alexlemann    | 项目源码 | 文件源码
def stage_monitor(stage):
    """
    Stage monitor is a worker that monitors a stage while it is being executed.
      The stage monitor coordinates running stage workers, saving results, and
      determining the end of any particular stage.
    """
    # Pool of stage function worker greenlets.
    work_pool = Pool(size=stage.n_workers)
    # Group of greenlets which save results from workers via callbacks.
    save_group = Group()

    def save_result(x):
        """
        Save results onto the output queue as a tuple or if there is only
          a single returned value, save that instead as that singular item.
        """
        if type(stage) == Reduce:
            # XXX: This would not work for stream inputs afaict
            #   But, reduction should not work anyway
            if len(work_pool) + len(save_group) + len(stage.in_q) == 1:
                stage.out_q.put(x)
        else:
            if not stage.returns_many:
                stage.out_q.put(x)
            else:
                try:
                    for i in x:
                        stage.out_q.put(i)
                except:
                    stage.out_q.put([x])

    for x in stage.in_q:
        """
        Iterate the input queue until StopIteration is received.
          Spawn new workers for work items on the input queue.
          Keep track of storing results via a group of result saving greenlets.

        Ignore all DROP items in the input queue.

        Once we receive a StopIteration, wait for all open workers to finish
          and once they are finished, bubble the StopIteration to the next stage
        """
        gevent.sleep(0)
        if x is DROP:
            continue
        if x is StopIteration:
            break
        func_args = [x]
        cb_worker = work_pool.apply_async(stage.func,
                                          func_args,
                                          callback=save_result)
        save_group.add(cb_worker)
    logger.debug('Worker Pool: << {} >>'.format(work_pool))
    work_pool.join()
    save_group.join()
    stage.out_q.put(StopIteration)
    return stage