Python zmq 模块,REP 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用zmq.REP

项目:py-enarksh    作者:SetBased    | 项目源码 | 文件源码
def receive_message(self, event, event_data, listener_data):
        """
        Receives a messages from another processes.

        :param * event: Not used.
        :param * event_data: Not used.
        :param * listener_data: Not used.
        """
        del event, event_data, listener_data

        # Make a poller for all incoming sockets.
        poller = zmq.Poller()
        for socket in self.__end_points.values():
            if socket.type in [zmq.PULL, zmq.REP]:
                poller.register(socket, zmq.POLLIN)

        # Wait for socket is ready for reading.
        socks = dict(poller.poll())

        for name, socket in self.__end_points.items():
            if socket in socks:
                self._receive_message(name, socket)

    # ------------------------------------------------------------------------------------------------------------------
项目:py-enarksh    作者:SetBased    | 项目源码 | 文件源码
def __register_sockets(self):
        """
        Registers ZMQ sockets for communication with other processes in Enarksh.
        """
        config = Config.get()

        # Register socket for receiving asynchronous incoming messages.
        self.message_controller.register_end_point('pull', zmq.PULL, config.get_controller_pull_end_point())

        # Create socket for lockstep incoming messages.
        self.message_controller.register_end_point('lockstep', zmq.REP, config.get_controller_lockstep_end_point())

        # Create socket for sending asynchronous messages to the spanner.
        self.message_controller.register_end_point('spawner', zmq.PUSH, config.get_spawner_pull_end_point())

        # Create socket for sending asynchronous messages to the logger.
        self.message_controller.register_end_point('logger', zmq.PUSH, config.get_logger_pull_end_point())

    # ------------------------------------------------------------------------------------------------------------------
项目:py-enarksh    作者:SetBased    | 项目源码 | 文件源码
def register_end_point(self, name, socket_type, end_point):
        """
        Registers an end point.

        :param str name: The name of the end point.
        :param int socket_type: The socket type, one of
                                - zmq.PULL for asynchronous incoming messages
                                - zmq.REP for lockstep incoming messages
                                - zmq.PUSH for asynchronous outgoing messages
        :param str end_point: The end point.
        """
        socket = self.__zmq_context.socket(socket_type)
        self.__end_points[name] = socket
        if socket_type in [zmq.PULL, zmq.REP]:
            socket.bind(end_point)
        elif socket_type == zmq.PUSH:
            socket.connect(end_point)
        else:
            raise ValueError("Unknown socket type {0}".format(socket_type))

    # ------------------------------------------------------------------------------------------------------------------
项目:py-enarksh    作者:SetBased    | 项目源码 | 文件源码
def no_barking(self, seconds):
        """
        During start up of ZMQ the incoming file descriptors become 'ready for reading' while there is no message on
        the socket. This method prevent incoming sockets barking that the are ready the for reading.

        :param int seconds: The number of seconds the give the other ZMQ thread to start up.
        """
        sleep(seconds)

        for _ in range(1, len(self.end_points)):
            poller = zmq.Poller()
            for socket in self.end_points.values():
                if socket.type in [zmq.PULL, zmq.REP]:
                    poller.register(socket, zmq.POLLIN)

            poller.poll(1)

# ----------------------------------------------------------------------------------------------------------------------
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_poller_events(self):
        """Tornado poller implementation maps events correctly"""
        req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
        poller = ioloop.ZMQPoller()
        poller.register(req, ioloop.IOLoop.READ)
        poller.register(rep, ioloop.IOLoop.READ)
        events = dict(poller.poll(0))
        self.assertEqual(events.get(rep), None)
        self.assertEqual(events.get(req), None)

        poller.register(req, ioloop.IOLoop.WRITE)
        poller.register(rep, ioloop.IOLoop.WRITE)
        events = dict(poller.poll(1))
        self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
        self.assertEqual(events.get(rep), None)

        poller.register(rep, ioloop.IOLoop.READ)
        req.send(b'hi')
        events = dict(poller.poll(1))
        self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
        self.assertEqual(events.get(req), None)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_monitor_connected(self):
        """Test connected monitoring socket."""
        s_rep = self.context.socket(zmq.REP)
        s_req = self.context.socket(zmq.REQ)
        self.sockets.extend([s_rep, s_req])
        s_req.bind("tcp://127.0.0.1:6667")
        # try monitoring the REP socket
        # create listening socket for monitor
        s_event = s_rep.get_monitor_socket()
        s_event.linger = 0
        self.sockets.append(s_event)
        # test receive event for connect event
        s_rep.connect("tcp://127.0.0.1:6667")
        m = recv_monitor_message(s_event)
        if m['event'] == zmq.EVENT_CONNECT_DELAYED:
            self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
            # test receive event for connected event
            m = recv_monitor_message(s_event)
        self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
        self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_single_socket_forwarder_connect(self):
        if zmq.zmq_version() in ('4.1.1', '4.0.6'):
            raise SkipTest("libzmq-%s broke single-socket devices" % zmq.zmq_version())
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_in('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_out('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello again'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
项目:lustre_task_driven_monitoring_framework    作者:GSI-HPC    | 项目源码 | 文件源码
def connect(self):

        self.context = zmq.Context()

        if not self.context:
            raise RuntimeError('Failed to create ZMQ context!')

        self.socket = self.context.socket(zmq.REP)

        if not self.socket:
            raise RuntimeError('Failed to create ZMQ socket!')

        self.socket.bind(self.endpoint)

        self.poller = zmq.Poller()
        self.poller.register(self.socket, zmq.POLLIN)

        self.is_connected = True
项目:InplusTrader_Linux    作者:zhengwsh    | 项目源码 | 文件源码
def __init__(self, repAddress, pubAddress):
        """Constructor"""
        super(RpcServer, self).__init__()

        # ??????????key?????value?????
        self.__functions = {}     

        # zmq????
        self.__context = zmq.Context()

        self.__socketREP = self.__context.socket(zmq.REP)   # ????socket
        self.__socketREP.bind(repAddress)

        self.__socketPUB = self.__context.socket(zmq.PUB)   # ????socket
        self.__socketPUB.bind(pubAddress)

        # ??????
        self.__active = False                             # ????????
        self.__thread = threading.Thread(target=self.run) # ????????

    #----------------------------------------------------------------------
项目:justdb    作者:kootenpv    | 项目源码 | 文件源码
def create_server():
    context = zmq.Context()

    try:
        main_socket = context.socket(zmq.REP)
        main_socket.bind("tcp://*:5555")

        freeze_socket = context.socket(zmq.REP)
        freeze_socket.bind("tcp://*:6666")
    except zmq.error.ZMQError:
        print("JustDB already running, this is no error.")
        sys.exit()

    print("Successfully started \033[92mjustdb\033[0m")
    while True:  # pragma: no cover
        _ = main_socket.recv()
        main_socket.send(b"")

        _ = freeze_socket.recv()
        freeze_socket.send(b"")
项目:azmq    作者:ereOn    | 项目源码 | 文件源码
def test_tcp_req_socket(event_loop, socket_factory, connect_or_bind):
    rep_socket = socket_factory.create(zmq.REP)
    connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        frames = rep_socket.recv_multipart()
        assert frames == [b'my', b'question']
        rep_socket.send_multipart([b'your', b'answer'])

    with run_in_background(run):
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.REQ)
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')
            await asyncio.wait_for(
                socket.send_multipart([b'my', b'question']),
                1,
            )
            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert frames == [b'your', b'answer']
项目:azmq    作者:ereOn    | 项目源码 | 文件源码
def test_tcp_rep_socket(event_loop, socket_factory, connect_or_bind):
    req_socket = socket_factory.create(zmq.REQ)
    connect_or_bind(req_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        req_socket.send_multipart([b'my', b'question'])
        frames = req_socket.recv_multipart()
        assert frames == [b'your', b'answer']

    with run_in_background(run):
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.REP)
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')
            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert frames == [b'my', b'question']
            await asyncio.wait_for(
                socket.send_multipart([b'your', b'answer']),
                1,
            )
项目:azmq    作者:ereOn    | 项目源码 | 文件源码
def test_tcp_dealer_socket(event_loop, socket_factory, connect_or_bind):
    rep_socket = socket_factory.create(zmq.REP)
    connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        frames = rep_socket.recv_multipart()
        assert frames == [b'my', b'question']
        rep_socket.send_multipart([b'your', b'answer'])

    with run_in_background(run):
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.DEALER)
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')
            await asyncio.wait_for(
                socket.send_multipart([b'', b'my', b'question']),
                1,
            )
            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert frames == [b'', b'your', b'answer']
项目:azmq    作者:ereOn    | 项目源码 | 文件源码
def test_tcp_big_messages(event_loop, socket_factory, connect_or_bind):
    rep_socket = socket_factory.create(zmq.REP)
    connect_or_bind(rep_socket, 'tcp://127.0.0.1:3333', reverse=True)

    def run():
        frames = rep_socket.recv_multipart()
        assert frames == [b'1' * 500, b'2' * 100000]
        rep_socket.send_multipart([b'3' * 500, b'4' * 100000])

    with run_in_background(run):
        async with azmq.Context(loop=event_loop) as context:
            socket = context.socket(azmq.REQ)
            connect_or_bind(socket, 'tcp://127.0.0.1:3333')
            await asyncio.wait_for(
                socket.send_multipart([b'1' * 500, b'2' * 100000]),
                1,
            )
            frames = await asyncio.wait_for(socket.recv_multipart(), 1)
            assert frames == [b'3' * 500, b'4' * 100000]
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_poller_events(self):
        """Tornado poller implementation maps events correctly"""
        req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
        poller = ioloop.ZMQPoller()
        poller.register(req, ioloop.IOLoop.READ)
        poller.register(rep, ioloop.IOLoop.READ)
        events = dict(poller.poll(0))
        self.assertEqual(events.get(rep), None)
        self.assertEqual(events.get(req), None)

        poller.register(req, ioloop.IOLoop.WRITE)
        poller.register(rep, ioloop.IOLoop.WRITE)
        events = dict(poller.poll(1))
        self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
        self.assertEqual(events.get(rep), None)

        poller.register(rep, ioloop.IOLoop.READ)
        req.send(b'hi')
        events = dict(poller.poll(1))
        self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
        self.assertEqual(events.get(req), None)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_monitor_connected(self):
        """Test connected monitoring socket."""
        s_rep = self.context.socket(zmq.REP)
        s_req = self.context.socket(zmq.REQ)
        self.sockets.extend([s_rep, s_req])
        s_req.bind("tcp://127.0.0.1:6667")
        # try monitoring the REP socket
        # create listening socket for monitor
        s_event = s_rep.get_monitor_socket()
        s_event.linger = 0
        self.sockets.append(s_event)
        # test receive event for connect event
        s_rep.connect("tcp://127.0.0.1:6667")
        m = recv_monitor_message(s_event)
        if m['event'] == zmq.EVENT_CONNECT_DELAYED:
            self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
            # test receive event for connected event
            m = recv_monitor_message(s_event)
        self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
        self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_single_socket_forwarder_connect(self):
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_in('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_out('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello again'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_monitor_connected(self):
        """Test connected monitoring socket."""
        s_rep = self.context.socket(zmq.REP)
        s_req = self.context.socket(zmq.REQ)
        self.sockets.extend([s_rep, s_req])
        s_req.bind("tcp://127.0.0.1:6667")
        # try monitoring the REP socket
        # create listening socket for monitor
        s_event = s_rep.get_monitor_socket()
        s_event.linger = 0
        self.sockets.append(s_event)
        # test receive event for connect event
        s_rep.connect("tcp://127.0.0.1:6667")
        m = recv_monitor_message(s_event)
        if m['event'] == zmq.EVENT_CONNECT_DELAYED:
            self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
            # test receive event for connected event
            m = recv_monitor_message(s_event)
        self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
        self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_single_socket_forwarder_connect(self):
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_in('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_out('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello again'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_poller_events(self):
        """Tornado poller implementation maps events correctly"""
        req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
        poller = ioloop.ZMQPoller()
        poller.register(req, ioloop.IOLoop.READ)
        poller.register(rep, ioloop.IOLoop.READ)
        events = dict(poller.poll(0))
        self.assertEqual(events.get(rep), None)
        self.assertEqual(events.get(req), None)

        poller.register(req, ioloop.IOLoop.WRITE)
        poller.register(rep, ioloop.IOLoop.WRITE)
        events = dict(poller.poll(1))
        self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
        self.assertEqual(events.get(rep), None)

        poller.register(rep, ioloop.IOLoop.READ)
        req.send(b'hi')
        events = dict(poller.poll(1))
        self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
        self.assertEqual(events.get(req), None)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_monitor_connected(self):
        """Test connected monitoring socket."""
        s_rep = self.context.socket(zmq.REP)
        s_req = self.context.socket(zmq.REQ)
        self.sockets.extend([s_rep, s_req])
        s_req.bind("tcp://127.0.0.1:6667")
        # try monitoring the REP socket
        # create listening socket for monitor
        s_event = s_rep.get_monitor_socket()
        s_event.linger = 0
        self.sockets.append(s_event)
        # test receive event for connect event
        s_rep.connect("tcp://127.0.0.1:6667")
        m = recv_monitor_message(s_event)
        if m['event'] == zmq.EVENT_CONNECT_DELAYED:
            self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
            # test receive event for connected event
            m = recv_monitor_message(s_event)
        self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
        self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_poller_events(self):
        """Tornado poller implementation maps events correctly"""
        req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
        poller = ioloop.ZMQPoller()
        poller.register(req, ioloop.IOLoop.READ)
        poller.register(rep, ioloop.IOLoop.READ)
        events = dict(poller.poll(0))
        self.assertEqual(events.get(rep), None)
        self.assertEqual(events.get(req), None)

        poller.register(req, ioloop.IOLoop.WRITE)
        poller.register(rep, ioloop.IOLoop.WRITE)
        events = dict(poller.poll(1))
        self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
        self.assertEqual(events.get(rep), None)

        poller.register(rep, ioloop.IOLoop.READ)
        req.send(b'hi')
        events = dict(poller.poll(1))
        self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
        self.assertEqual(events.get(req), None)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_single_socket_forwarder_connect(self):
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_in('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_out('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello again'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_poller_events(self):
        """Tornado poller implementation maps events correctly"""
        req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
        poller = ioloop.ZMQPoller()
        poller.register(req, ioloop.IOLoop.READ)
        poller.register(rep, ioloop.IOLoop.READ)
        events = dict(poller.poll(0))
        self.assertEqual(events.get(rep), None)
        self.assertEqual(events.get(req), None)

        poller.register(req, ioloop.IOLoop.WRITE)
        poller.register(rep, ioloop.IOLoop.WRITE)
        events = dict(poller.poll(1))
        self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
        self.assertEqual(events.get(rep), None)

        poller.register(rep, ioloop.IOLoop.READ)
        req.send(b'hi')
        events = dict(poller.poll(1))
        self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
        self.assertEqual(events.get(req), None)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_monitor_connected(self):
        """Test connected monitoring socket."""
        s_rep = self.context.socket(zmq.REP)
        s_req = self.context.socket(zmq.REQ)
        self.sockets.extend([s_rep, s_req])
        s_req.bind("tcp://127.0.0.1:6667")
        # try monitoring the REP socket
        # create listening socket for monitor
        s_event = s_rep.get_monitor_socket()
        s_event.linger = 0
        self.sockets.append(s_event)
        # test receive event for connect event
        s_rep.connect("tcp://127.0.0.1:6667")
        m = recv_monitor_message(s_event)
        if m['event'] == zmq.EVENT_CONNECT_DELAYED:
            self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
            # test receive event for connected event
            m = recv_monitor_message(s_event)
        self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
        self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_single_socket_forwarder_connect(self):
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_in('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_out('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello again'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_poller_events(self):
        """Tornado poller implementation maps events correctly"""
        req,rep = self.create_bound_pair(zmq.REQ, zmq.REP)
        poller = ioloop.ZMQPoller()
        poller.register(req, ioloop.IOLoop.READ)
        poller.register(rep, ioloop.IOLoop.READ)
        events = dict(poller.poll(0))
        self.assertEqual(events.get(rep), None)
        self.assertEqual(events.get(req), None)

        poller.register(req, ioloop.IOLoop.WRITE)
        poller.register(rep, ioloop.IOLoop.WRITE)
        events = dict(poller.poll(1))
        self.assertEqual(events.get(req), ioloop.IOLoop.WRITE)
        self.assertEqual(events.get(rep), None)

        poller.register(rep, ioloop.IOLoop.READ)
        req.send(b'hi')
        events = dict(poller.poll(1))
        self.assertEqual(events.get(rep), ioloop.IOLoop.READ)
        self.assertEqual(events.get(req), None)
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_monitor_connected(self):
        """Test connected monitoring socket."""
        s_rep = self.context.socket(zmq.REP)
        s_req = self.context.socket(zmq.REQ)
        self.sockets.extend([s_rep, s_req])
        s_req.bind("tcp://127.0.0.1:6667")
        # try monitoring the REP socket
        # create listening socket for monitor
        s_event = s_rep.get_monitor_socket()
        s_event.linger = 0
        self.sockets.append(s_event)
        # test receive event for connect event
        s_rep.connect("tcp://127.0.0.1:6667")
        m = recv_monitor_message(s_event)
        if m['event'] == zmq.EVENT_CONNECT_DELAYED:
            self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
            # test receive event for connected event
            m = recv_monitor_message(s_event)
        self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
        self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def test_single_socket_forwarder_connect(self):
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_in('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        req = self.context.socket(zmq.REQ)
        port = req.bind_to_random_port('tcp://127.0.0.1')
        dev.connect_out('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello again'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
项目:flask-sqlalchemy-socketio-demo    作者:lukeyeager    | 项目源码 | 文件源码
def listen_thread():
    ctx = zmq.Context()
    # Reply - act as server
    socket = ctx.socket(zmq.REP)
    socket.bind('tcp://*:%s' % config.ZEROMQ_PORT)
    while True:
        request = socket.recv()
        print('ZEROMQ request -', request)
        socket.send('OK')

        if request == 'KILL':
            socket.close()
            print('Socket closed.')
            break

        with webapp.app_context():
            data = json.loads(request)
            print('Sending to SocketIO ...')
            socketio.emit(
                'new_update',
                data['msg'] + ' at ' + data['timestamp'],
                namespace='/browser',
            )
项目:jps    作者:OTL    | 项目源码 | 文件源码
def __init__(self, callback, host=None, res_port=None, use_security=False):
        if host is None:
            host = env.get_master_host()
        context = zmq.Context()
        self._socket = context.socket(zmq.REP)
        self._auth = None
        if use_security:
            self._auth = Authenticator.instance(
                env.get_server_public_key_dir())
            self._auth.set_server_key(
                self._socket, env.get_server_secret_key_path())

        if res_port is None:
            res_port = env.get_res_port()
        self._socket.connect(
            'tcp://{host}:{port}'.format(host=host, port=res_port))
        self._callback = callback
        self._thread = None
        self._lock = threading.Lock()
项目:python-zcm    作者:pranav-srinivas-kumar    | 项目源码 | 文件源码
def __init__(self, name, priority, actor_context, endpoints, 
                 operation_function, operation_queue):
        """
        Create a server

        Keyword arguments:
        name - Name of the timer
        priority - Priority of the subscriber
        actor_context - ZMQ context of the actor process
        endpoints - A list of endpoint strings
        operation_function - Operation function of the subscriber
        operation_queue - The operation queue object
        """
        self.name = name
        self.priority = priority
        self.endpoints = endpoints
        self.operation_function = operation_function
        self.operation_queue = operation_queue
        self.context = actor_context
        self.server_socket = self.context.socket(zmq.REP)
        for endpoint in self.endpoints:
            self.server_socket.bind(endpoint)
        self.ready = True
        self.func_mutex = Lock()
项目:research-dist    作者:DrawML    | 项目源码 | 文件源码
def run():
    print("Getting ready for hello world client.  Ctrl-C to exit.\n")
    socket = Ctx.socket(zmq.REP)
    socket.bind(Url)
    while True:
        #  Wait for next request from client
        message = await socket.recv()
        print("Received request: {}".format(message))
        #  Do some "work"
        await asyncio.sleep(1)
        #  Send reply back to client
        message = message.decode('utf-8')
        message = '{}, world'.format(message)
        message = message.encode('utf-8')
        print("Sending reply: {}".format(message))
        await socket.send(message)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def start(self):
        """Create and bind the ZAP socket"""
        self.zap_socket = self.context.socket(zmq.REP)
        self.zap_socket.linger = 1
        self.zap_socket.bind("inproc://zeromq.zap.01")
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_subclass(self):
        """subclasses can assign attributes"""
        class S(zmq.Socket):
            a = None
            def __init__(self, *a, **kw):
                self.a=-1
                super(S, self).__init__(*a, **kw)

        s = S(self.context, zmq.REP)
        self.sockets.append(s)
        self.assertEqual(s.a, -1)
        s.a=1
        self.assertEqual(s.a, 1)
        a=s.a
        self.assertEqual(a, 1)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_close_after_destroy(self):
        """s.close() after ctx.destroy() should be fine"""
        ctx = self.Context()
        s = ctx.socket(zmq.REP)
        ctx.destroy()
        # reaper is not instantaneous
        time.sleep(1e-2)
        s.close()
        self.assertTrue(s.closed)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_many_sockets(self):
        """opening and closing many sockets shouldn't cause problems"""
        ctx = self.Context()
        for i in range(16):
            sockets = [ ctx.socket(zmq.REP) for i in range(65) ]
            [ s.close() for s in sockets ]
            # give the reaper a chance
            time.sleep(1e-2)
        ctx.term()
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_destroy(self):
        """Context.destroy should close sockets"""
        ctx = self.Context()
        sockets = [ ctx.socket(zmq.REP) for i in range(65) ]

        # close half of the sockets
        [ s.close() for s in sockets[::2] ]

        ctx.destroy()
        # reaper is not instantaneous
        time.sleep(1e-2)
        for s in sockets:
            self.assertTrue(s.closed)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_term_thread(self):
        """ctx.term should not crash active threads (#139)"""
        ctx = self.Context()
        evt = Event()
        evt.clear()

        def block():
            s = ctx.socket(zmq.REP)
            s.bind_to_random_port('tcp://127.0.0.1')
            evt.set()
            try:
                s.recv()
            except zmq.ZMQError as e:
                self.assertEqual(e.errno, zmq.ETERM)
                return
            finally:
                s.close()
            self.fail("recv should have been interrupted with ETERM")
        t = Thread(target=block)
        t.start()

        evt.wait(1)
        self.assertTrue(evt.is_set(), "sync event never fired")
        time.sleep(0.01)
        ctx.term()
        t.join(timeout=1)
        self.assertFalse(t.is_alive(), "term should have interrupted s.recv()")
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_basic(self):
        s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)

        msg1 = b'message 1'
        msg2 = self.ping_pong(s1, s2, msg1)
        self.assertEqual(msg1, msg2)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_multiple(self):
        s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)

        for i in range(10):
            msg1 = i*b' '
            msg2 = self.ping_pong(s1, s2, msg1)
            self.assertEqual(msg1, msg2)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_bad_send_recv(self):
        s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)

        if zmq.zmq_version() != '2.1.8':
            # this doesn't work on 2.1.8
            for copy in (True,False):
                self.assertRaisesErrno(zmq.EFSM, s1.recv, copy=copy)
                self.assertRaisesErrno(zmq.EFSM, s2.send, b'asdf', copy=copy)

        # I have to have this or we die on an Abort trap.
        msg1 = b'asdf'
        msg2 = self.ping_pong(s1, s2, msg1)
        self.assertEqual(msg1, msg2)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_json(self):
        s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
        o = dict(a=10,b=list(range(10)))
        o2 = self.ping_pong_json(s1, s2, o)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_large_msg(self):
        s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
        msg1 = 10000*b'X'

        for i in range(10):
            msg2 = self.ping_pong(s1, s2, msg1)
            self.assertEqual(msg1, msg2)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def setUp(self):
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.REP)
        self.loop = ioloop.IOLoop.instance()
        self.stream = zmqstream.ZMQStream(self.socket)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_again(self):
        s = self.context.socket(zmq.REP)
        self.assertRaises(Again, s.recv, zmq.NOBLOCK)
        self.assertRaisesErrno(zmq.EAGAIN, s.recv, zmq.NOBLOCK)
        s.close()
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def atest_ctxterm(self):
        s = self.context.socket(zmq.REP)
        t = Thread(target=self.context.term)
        t.start()
        self.assertRaises(ContextTerminated, s.recv, zmq.NOBLOCK)
        self.assertRaisesErrno(zmq.TERM, s.recv, zmq.NOBLOCK)
        s.close()
        t.join()
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def zap_handler(self):
        socket = self.context.socket(zmq.REP)
        socket.bind("inproc://zeromq.zap.01")
        try:
            msg = self.recv_multipart(socket)

            version, sequence, domain, address, identity, mechanism = msg[:6]
            if mechanism == b'PLAIN':
                username, password = msg[6:]
            elif mechanism == b'CURVE':
                key = msg[6]

            self.assertEqual(version, b"1.0")
            self.assertEqual(identity, b"IDENT")
            reply = [version, sequence]
            if mechanism == b'CURVE' or \
                (mechanism == b'PLAIN' and username == USER and password == PASS) or \
                (mechanism == b'NULL'):
                reply.extend([
                    b"200",
                    b"OK",
                    b"anonymous",
                    b"\5Hello\0\0\0\5World",
                ])
            else:
                reply.extend([
                    b"400",
                    b"Invalid username or password",
                    b"",
                    b"",
                ])
            socket.send_multipart(reply)
        finally:
            socket.close()
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_monitor(self):
        """Test monitoring interface for sockets."""
        s_rep = self.context.socket(zmq.REP)
        s_req = self.context.socket(zmq.REQ)
        self.sockets.extend([s_rep, s_req])
        s_req.bind("tcp://127.0.0.1:6666")
        # try monitoring the REP socket

        s_rep.monitor("inproc://monitor.rep", zmq.EVENT_ALL)
        # create listening socket for monitor
        s_event = self.context.socket(zmq.PAIR)
        self.sockets.append(s_event)
        s_event.connect("inproc://monitor.rep")
        s_event.linger = 0
        # test receive event for connect event
        s_rep.connect("tcp://127.0.0.1:6666")
        m = recv_monitor_message(s_event)
        if m['event'] == zmq.EVENT_CONNECT_DELAYED:
            self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")
            # test receive event for connected event
            m = recv_monitor_message(s_event)
        self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
        self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")

        # test monitor can be disabled.
        s_rep.disable_monitor()
        m = recv_monitor_message(s_event)
        self.assertEqual(m['event'], zmq.EVENT_MONITOR_STOPPED)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_single_socket_forwarder_bind(self):
        if zmq.zmq_version() in ('4.1.1', '4.0.6'):
            raise SkipTest("libzmq-%s broke single-socket devices" % zmq.zmq_version())
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        # select random port:
        binder = self.context.socket(zmq.REQ)
        port = binder.bind_to_random_port('tcp://127.0.0.1')
        binder.close()
        time.sleep(0.1)
        req = self.context.socket(zmq.REQ)
        req.connect('tcp://127.0.0.1:%i'%port)
        dev.bind_in('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()
        dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
        # select random port:
        binder = self.context.socket(zmq.REQ)
        port = binder.bind_to_random_port('tcp://127.0.0.1')
        binder.close()
        time.sleep(0.1)
        req = self.context.socket(zmq.REQ)
        req.connect('tcp://127.0.0.1:%i'%port)
        dev.bind_in('tcp://127.0.0.1:%i'%port)
        dev.start()
        time.sleep(.25)
        msg = b'hello again'
        req.send(msg)
        self.assertEqual(msg, self.recv(req))
        del dev
        req.close()