Python zmq 模块,UNSUBSCRIBE 实例源码

我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用zmq.UNSUBSCRIBE

项目:osbrain    作者:opensistemas-hub    | 项目源码 | 文件源码
def unsubscribe(self, alias: str, topic: Union[bytes, str]) -> None:
        '''
        Unsubscribe a SUB/SYNC_SUB socket given by its alias from a given
        specific topic, and delete its entry from the handlers dictionary.

        If instead of a single topic, a tuple or a list of topics is passed,
        the agent will unsubscribe from all the supplied topics.
        '''
        if isinstance(topic, (tuple, list)):
            for t in topic:
                self.unsubscribe(alias, t)
            return

        topic = topic_to_bytes(topic)

        if isinstance(self.address[alias], AgentAddress):
            self.socket[alias].setsockopt(zmq.UNSUBSCRIBE, topic)
            del self.handler[self.socket[alias]][topic]
        elif isinstance(self.address[alias], AgentChannel):
            channel = self.address[alias]
            sub_address = channel.receiver
            treated_topic = channel.twin_uuid + topic
            self.socket[sub_address].setsockopt(zmq.UNSUBSCRIBE, treated_topic)
            del self.handler[self.socket[sub_address]][treated_topic]
        else:
            raise NotImplementedError('Unsupported address type %s!' %
                                      self.address[alias])
项目:supvisors    作者:julien6387    | 项目源码 | 文件源码
def unsubscribe_all(self):
        """ Subscription to all events. """
        self.socket.setsockopt(zmq.UNSUBSCRIBE, '')
项目:supvisors    作者:julien6387    | 项目源码 | 文件源码
def unsubscribe(self, code):
        """ Remove subscription to the event named code. """
        self.socket.setsockopt(zmq.UNSUBSCRIBE, code.encode('utf-8'))

    # reception part
项目:baselayer    作者:cesium-ml    | 项目源码 | 文件源码
def unsubscribe(cls, username):
        cls._zmq_stream.socket.setsockopt(zmq.UNSUBSCRIBE,
                                          username.encode('utf-8'))
项目:agentzero    作者:gabrielfalcao    | 项目源码 | 文件源码
def set_socket_option(self, name, option, value):
        """calls ``zmq.setsockopt`` on the given socket.

        :param name: the name of the socket where data will pad through
        :param option: the option from the ``zmq`` module
        :param value:

        Here are some examples of options:

        * ``zmq.HWM``: Set high water mark
        * ``zmq.AFFINITY``: Set I/O thread affinity
        * ``zmq.IDENTITY``: Set socket identity
        * ``zmq.SUBSCRIBE``: Establish message filter
        * ``zmq.UNSUBSCRIBE``: Remove message filter
        * ``zmq.SNDBUF``: Set kernel transmit buffer size
        * ``zmq.RCVBUF``: Set kernel receive buffer size
        * ``zmq.LINGER``: Set linger period for socket shutdown
        * ``zmq.BACKLOG``: Set maximum length of the queue of outstanding connections
        * for the full list go to ``http://api.zeromq.org/4-0:zmq-setsockopt``

        **Example:**

        ::

          >>> import zmq
          >>> from agentzero.core import SocketManager
          >>>
          >>> sockets = SocketManager()
          >>> sockets.create('pipe-in', zmq.PULL)
          >>>
          >>> # block after 10 messages are queued
          >>> sockets.set_socket_option('pipe-in', zmq.HWM, 10)
        """

        socket = self.get_by_name(name)
        socket.setsockopt(option, value)
项目:dauber    作者:OpenDataAnalytics    | 项目源码 | 文件源码
def _connect(self):
        # Subscribe to the hello topic - once we recieve a hello we'll send a request
        # for real data.  The callback plugin will effectively block execution until we
        # Send this request
        self.socket.setsockopt(zmq.SUBSCRIBE, 'hello')

        # Define the control socket for responding to the 'hello' topic
        control_socket = self.context.socket(zmq.REQ)
        control_socket.connect(self._env['DAUBER_CONTROL_SOCKET_URI'])
        timeout = 500
        t_last = time.time()
        while (time.time() - t_last) < timeout:
            ready = dict(self.poller.poll())
            if ready.get(self.socket):
                topic, _ = self.socket.recv_multipart()
                if topic == 'hello':
                    # Signal that we've connected and we're ready to recieve data
                    control_socket.send(b'')
                    control_socket.recv()
                    break

        assert (time.time() - t_last) < timeout, \
            "Timed out before recieving a hello topic message from the publisher."

        del control_socket
        self.socket.setsockopt(zmq.UNSUBSCRIBE, 'hello')
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_change_subscription(self):
        # FIXME: Extensive testing showed this particular test is the root cause
        # of sporadic failures on Travis.
        pub, sub, port = self.create_bound_pair(zmq.PUB, zmq.SUB)
        sub.setsockopt(zmq.SUBSCRIBE, b'test')
        eventlet.sleep(0)
        sub_ready = eventlet.Event()
        sub_last = eventlet.Event()
        sub_done = eventlet.Event()

        def rx():
            while sub.recv() != b'test BEGIN':
                eventlet.sleep(0)
            sub_ready.send()
            count = 0
            while True:
                msg = sub.recv()
                if msg == b'test BEGIN':
                    # BEGIN may come many times
                    continue
                if msg == b'test LAST':
                    sub.setsockopt(zmq.SUBSCRIBE, b'done')
                    sub.setsockopt(zmq.UNSUBSCRIBE, b'test')
                    eventlet.sleep(0)
                    # In real application you should either sync
                    # or tolerate loss of messages.
                    sub_last.send()
                if msg == b'done DONE':
                    break
                count += 1
            sub_done.send(count)

        def tx():
            # Sync receiver ready to avoid loss of first packets
            while not sub_ready.ready():
                pub.send(b'test BEGIN')
                eventlet.sleep(0.005)
            for i in range(1, 101):
                msg = 'test {0}'.format(i).encode()
                if i != 50:
                    pub.send(msg)
                else:
                    pub.send(b'test LAST')
                    sub_last.wait()
                # XXX: putting a real delay of 1ms here fixes sporadic failures on Travis
                # just yield eventlet.sleep(0) doesn't cut it
                eventlet.sleep(0.001)
            pub.send(b'done DONE')

        eventlet.spawn(rx)
        eventlet.spawn(tx)
        rx_count = sub_done.wait()
        self.assertEqual(rx_count, 50)