Python zmq 模块,RCVTIMEO 实例源码

我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用zmq.RCVTIMEO

项目:supvisors    作者:julien6387    | 项目源码 | 文件源码
def setUp(self):
        """ Create a dummy supvisors, ZMQ context and sockets. """
        from supvisors.supvisorszmq import (InternalEventPublisher,
                                            InternalEventSubscriber)
        # the dummy Supvisors is used for addresses and ports
        self.supvisors = MockedSupvisors()
        # create publisher and subscriber
        self.publisher = InternalEventPublisher(
            self.supvisors.address_mapper.local_address,
            self.supvisors.options.internal_port,
            self.supvisors.logger)
        self.subscriber = InternalEventSubscriber(
            self.supvisors.address_mapper.addresses,
            self.supvisors.options.internal_port)
        # socket configuration is meant to be blocking
        # however, a failure would block the unit test,
        # so a timeout is set for reception
        self.subscriber.socket.setsockopt(zmq.RCVTIMEO, 1000)
        # publisher does not wait for subscriber clients to work,
        # so give some time for connections
        time.sleep(1)
项目:mflow    作者:datastreaming    | 项目源码 | 文件源码
def start(self, socket):
        """
        Start the monitoring thread and socket.
        :param socket: Socket to monitor.
        """
        # Start a thread only if it is not already running.
        if self.monitor_listening.is_set():
            return

        # Setup monitor socket.
        monitor_socket = socket.get_monitor_socket(events=self.events)
        monitor_socket.setsockopt(zmq.RCVTIMEO, self.receive_timeout)
        self.monitor_listening.set()

        def event_listener(monitor_listening):
            while monitor_listening.is_set():
                try:
                    event = recv_monitor_message(monitor_socket)
                    # The socket is closed, just stop listening now.
                    if event["event"] == zmq.EVENT_CLOSED:
                        monitor_listening.clear()

                    self._notify_listeners(event)
                # In case the receive cannot be completed before the timeout.
                except zmq.Again:
                    # Heartbeat for listeners - we do not need an additional thread for time based listeners.
                    self._notify_listeners(None)

            # Cleanup monitor socket.
            socket.disable_monitor()
            monitor_socket.close()

        self.monitor_thread = threading.Thread(target=event_listener, args=(self.monitor_listening,))
        # In case someone does not call disconnect, this will stop the thread anyway.
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
项目:supvisors    作者:julien6387    | 项目源码 | 文件源码
def setUp(self):
        """ Create a dummy supvisors, ZMQ context and sockets. """
        from supvisors.supvisorszmq import RequestPusher, RequestPuller
        # the dummy Supvisors is used for addresses and ports
        self.supvisors = MockedSupvisors()
        # create pusher and puller
        self.pusher = RequestPusher(self.supvisors.logger)
        self.puller = RequestPuller()
        # socket configuration is meant to be blocking
        # however, a failure would block the unit test,
        # so a timeout is set for emission and reception
        self.puller.socket.setsockopt(zmq.SNDTIMEO, 1000)
        self.puller.socket.setsockopt(zmq.RCVTIMEO, 1000)
项目:supvisors    作者:julien6387    | 项目源码 | 文件源码
def setUp(self):
        """ Create a dummy supvisors and a ZMQ context. """
        from supvisors.supvisorszmq import EventPublisher, EventSubscriber
        # the dummy Supvisors is used for addresses and ports
        self.supvisors = MockedSupvisors()
        # create the ZeroMQ context
        # create publisher and subscriber
        self.publisher = EventPublisher(
            self.supvisors.options.event_port,
            self.supvisors.logger)
        self.subscriber = EventSubscriber(
            zmq.Context.instance(),
            self.supvisors.options.event_port,
            self.supvisors.logger)
        # WARN: this subscriber does not include a subscription
        # when using a subscription, use a time sleep to give time
        # to PyZMQ to handle it
        # WARN: socket configuration is meant to be blocking
        # however, a failure would block the unit test,
        # so a timeout is set for reception
        self.subscriber.socket.setsockopt(zmq.RCVTIMEO, 1000)
        # create test payloads
        self.supvisors_payload = Payload({'state': 'running',
                                          'version': '1.0'})
        self.address_payload = Payload({'state': 'silent',
                                        'name': 'cliche01',
                                        'date': 1234})
        self.application_payload = Payload({'state': 'starting',
                                            'name': 'supvisors'})
        self.process_payload = Payload({'state': 'running',
                                        'process_name': 'plugin',
                                        'application_name': 'supvisors',
                                        'date': 1230})
        self.event_payload = Payload({'state': 20,
                                      'name': 'plugin',
                                      'group': 'supvisors',
                                      'now': 1230})
项目:CommunityCellularManager    作者:facebookincubator    | 项目源码 | 文件源码
def setup_socket(self):
    """Sets up the ZMQ socket."""
    context = zmq.Context()
    # The component inheriting from BaseComponent should self.socket.connect
    # with the appropriate address.
    self.socket = context.socket(zmq.REQ)
    # LINGER sets a timeout for socket.send.
    self.socket.setsockopt(zmq.LINGER, 0)
    # RCVTIME0 sets a timeout for socket.recv.
    self.socket.setsockopt(zmq.RCVTIMEO, 500)  # milliseconds
项目:bqueryd    作者:visualfabriq    | 项目源码 | 文件源码
def connect_socket(self):
        reply = None
        for c in self.controllers:
            self.logger.debug('Establishing socket connection to %s' % c)
            tmp_sock = self.context.socket(zmq.REQ)
            tmp_sock.setsockopt(zmq.RCVTIMEO, 2000)
            tmp_sock.setsockopt(zmq.LINGER, 0)
            tmp_sock.identity = self.identity
            tmp_sock.connect(c)
            # first ping the controller to see if it responds at all
            msg = RPCMessage({'payload': 'ping'})
            tmp_sock.send_json(msg)
            try:
                reply = msg_factory(tmp_sock.recv_json())
                self.address = c
                break
            except:
                traceback.print_exc()
                continue
        if reply:
            # Now set the timeout to the actual requested
            self.logger.debug("Connection OK, setting network timeout to %s milliseconds", self.timeout*1000)
            self.controller = tmp_sock
            self.controller.setsockopt(zmq.RCVTIMEO, self.timeout*1000)
        else:
            raise Exception('No controller connection')
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def connect(self, server = None, port = None):
        if self.connected:
            self.disconnect()

        self.context = zmq.Context()

        self.server = (server if server else self.server)
        self.port = (port if port else self.port)

        #  Socket to talk to server
        self.transport = "tcp://{0}:{1}".format(self.server, self.port)

        self.socket = self.context.socket(zmq.REQ)
        try:
            self.socket.connect(self.transport)
        except zmq.error.ZMQError as e:
            return RC_ERR("ZMQ Error: Bad server or port name: " + str(e))

        self.socket.setsockopt(zmq.SNDTIMEO, 10000)
        self.socket.setsockopt(zmq.RCVTIMEO, 10000)

        self.connected = True

        rc = self.invoke_rpc_method('ping', api_class = None)
        if not rc:
            self.connected = False
            return rc

        return RC_OK()
项目:apart-gtk    作者:alexheretic    | 项目源码 | 文件源码
def __init__(self, listeners: List[MessageListener] = None,
                 on_finish: Callable[[int], None] = lambda return_code: None):
        """Starts an apart-core command and starts listening for zmq messages on this new thread"""
        Thread.__init__(self, name='apart-core-runner')
        self.ipc_address = 'ipc:///tmp/apart-gtk-{}.ipc'.format(uuid.uuid4())
        self.zmq_context = zmq.Context()
        self.socket = self.zmq_context.socket(zmq.PAIR)
        self.socket.setsockopt(zmq.RCVTIMEO, 100)
        self.socket.bind(self.ipc_address)
        self.on_finish = on_finish
        self.listeners = listeners or []  # List[MessageListener]

        if LOG_MESSAGES:
            self.register(MessageListener(lambda msg: print('apart-core ->\n {}'.format(str(msg)))))

        # Current default is apart-core binary stored in the directory above these sources
        apart_core_cmd = os.environ.get('APART_GTK_CORE_CMD') or \
            os.path.abspath(os.path.dirname(os.path.realpath(__file__)) + '/../apart-core')

        try:
            if os.geteuid() == 0 or os.environ.get('APART_PARTCLONE_CMD'):
                self.process = subprocess.Popen([apart_core_cmd, self.ipc_address])
            else:
                self.process = subprocess.Popen(['pkexec', apart_core_cmd, self.ipc_address])
        except FileNotFoundError:
            if os.geteuid() == 0:
                print('apart-core command not found at \'' + apart_core_cmd + '\'', file=sys.stderr)
            else:
                print('pkexec command not found, install polkit or run as root', file=sys.stderr)
            self.zmq_context.destroy()
            sys.exit(1)
        self.start()
项目:napalm-logs    作者:napalm-automation    | 项目源码 | 文件源码
def start(self):
        '''
        Startup the zmq consumer.
        '''
        zmq_uri = '{protocol}://{address}:{port}'.format(
                       protocol=self.protocol,
                       address=self.address,
                       port=self.port
                   ) if self.port else\
                   '{protocol}://{address}'.format(  # noqa
                       protocol=self.protocol,
                       address=self.address
                   )
        log.debug('ZMQ URI: %s', zmq_uri)
        self.ctx = zmq.Context()
        if hasattr(zmq, self.type):
            skt_type = getattr(zmq, self.type)
        else:
            skt_type = zmq.PULL
        self.sub = self.ctx.socket(skt_type)
        self.sub.connect(zmq_uri)
        if self.hwm is not None:
            try:
                self.sub.setsockopt(zmq.HWM, self.hwm)
            except AttributeError:
                self.sub.setsockopt(zmq.RCVHWM, self.hwm)
        if self.recvtimeout is not None:
            log.debug('Setting RCVTIMEO to %d', self.recvtimeout)
            self.sub.setsockopt(zmq.RCVTIMEO, self.recvtimeout)
        if self.keepalive is not None:
            log.debug('Setting TCP_KEEPALIVE to %d', self.keepalive)
            self.sub.setsockopt(zmq.TCP_KEEPALIVE, self.keepalive)
        if self.keepalive_idle is not None:
            log.debug('Setting TCP_KEEPALIVE_IDLE to %d', self.keepalive_idle)
            self.sub.setsockopt(zmq.TCP_KEEPALIVE_IDLE, self.keepalive_idle)
        if self.keepalive_interval is not None:
            log.debug('Setting TCP_KEEPALIVE_INTVL to %d', self.keepalive_interval)
            self.sub.setsockopt(zmq.TCP_KEEPALIVE_INTVL, self.keepalive_interval)
项目:lewis    作者:DMSC-Instrument-Data    | 项目源码 | 文件源码
def test_zmq_socket_uses_timeout(self, mock_zmq_context):
        timeout = 100
        ControlClient(host='127.0.0.1', port='10002', timeout=timeout)

        mock_zmq_context.assert_has_calls(
            [call().setsockopt(zmq.SNDTIMEO, timeout), call().setsockopt(zmq.RCVTIMEO, timeout)])
项目:lewis    作者:DMSC-Instrument-Data    | 项目源码 | 文件源码
def test_connection(self, mock_context):
        cs = ControlServer(None, connection_string='127.0.0.1:10001')
        cs.start_server()

        mock_context.assert_has_calls([call(), call().socket(zmq.REP),
                                       call().socket().setsockopt(zmq.RCVTIMEO, 100),
                                       call().socket().bind('tcp://127.0.0.1:10001')])
项目:lewis    作者:DMSC-Instrument-Data    | 项目源码 | 文件源码
def test_server_can_only_be_started_once(self, mock_context):
        server = ControlServer(None, connection_string='127.0.0.1:10000')
        server.start_server()
        server.start_server()

        mock_context.assert_has_calls([call(), call().socket(zmq.REP),
                                       call().socket().setsockopt(zmq.RCVTIMEO, 100),
                                       call().socket().bind('tcp://127.0.0.1:10000')])
项目:lewis    作者:DMSC-Instrument-Data    | 项目源码 | 文件源码
def start_server(self):
        """
        Binds the server to the configured host and port and starts listening.
        """
        if self._socket is None:
            context = zmq.Context()
            self._socket = context.socket(zmq.REP)
            self._socket.setsockopt(zmq.RCVTIMEO, 100)
            self._socket.bind('tcp://{0}:{1}'.format(self.host, self.port))

            self.log.info('Listening on %s:%s', self.host, self.port)
项目:lewis    作者:DMSC-Instrument-Data    | 项目源码 | 文件源码
def _get_zmq_req_socket(self):
        context = zmq.Context()
        context.setsockopt(zmq.REQ_CORRELATE, 1)
        context.setsockopt(zmq.REQ_RELAXED, 1)
        context.setsockopt(zmq.SNDTIMEO, self.timeout)
        context.setsockopt(zmq.RCVTIMEO, self.timeout)
        context.setsockopt(zmq.LINGER, 0)
        return context.socket(zmq.REQ)
项目:pymoku    作者:liquidinstruments    | 项目源码 | 文件源码
def _set_timeout(self, short=True, seconds=None):
        if seconds is not None:
            base = seconds * 1000
        else:
            base = 5000
            if not short:
                base *= 2

        self._conn.setsockopt(zmq.SNDTIMEO, base) # A send should always be quick
        self._conn.setsockopt(zmq.RCVTIMEO, 2 * base) # A receive might need to wait on processing
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_recv_timeout():
    # https://github.com/eventlet/eventlet/issues/282
    with clean_pair(zmq.PUB, zmq.SUB) as (_, sub, _):
        sub.setsockopt(zmq.RCVTIMEO, 100)
        try:
            with eventlet.Timeout(1, False):
                sub.recv()
            assert False
        except zmq.ZMQError as e:
            assert eventlet.is_timeout(e)
项目:bearded-avenger-sdk-py    作者:csirtgadgets    | 项目源码 | 文件源码
def zthread_fork(ctx, func, *args, **kwargs):
    """
    Create an attached thread. An attached thread gets a ctx and a PAIR
    pipe back to its parent. It must monitor its pipe, and exit if the
    pipe becomes unreadable. Returns pipe, or NULL if there was an error.
    """
    a = ctx.socket(zmq.PAIR)
    a.setsockopt(zmq.LINGER, 0)
    a.setsockopt(zmq.RCVHWM, 100)
    a.setsockopt(zmq.SNDHWM, 100)
    a.setsockopt(zmq.SNDTIMEO, 5000)
    a.setsockopt(zmq.RCVTIMEO, 5000)
    b = ctx.socket(zmq.PAIR)
    b.setsockopt(zmq.LINGER, 0)
    b.setsockopt(zmq.RCVHWM, 100)
    b.setsockopt(zmq.SNDHWM, 100)
    b.setsockopt(zmq.SNDTIMEO, 5000)
    a.setsockopt(zmq.RCVTIMEO, 5000)
    iface = "inproc://%s" % binascii.hexlify(os.urandom(8))
    a.bind(iface)
    b.connect(iface)

    thread = threading.Thread(target=func, args=((ctx, b) + args), kwargs=kwargs)
    thread.daemon = False
    thread.start()

    return a
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def _run (self):

        # socket must be created on the same thread 
        self.socket.setsockopt(zmq.SUBSCRIBE, b'')
        self.socket.setsockopt(zmq.RCVTIMEO, 5000)
        self.socket.connect(self.tr)

        got_data = False

        self.monitor.reset()


        while self.active:
            try:

                with self.monitor:
                    line = self.socket.recv_string()

                self.monitor.on_recv_msg(line)

                self.last_data_recv_ts = time.time()

                # signal once
                if not got_data:
                    self.event_handler.on_async_alive()
                    got_data = True


            # got a timeout - mark as not alive and retry
            except zmq.Again:
                # signal once
                if got_data:
                    self.event_handler.on_async_dead()
                    got_data = False

                continue

            except zmq.ContextTerminated:
                # outside thread signaled us to exit
                assert(not self.active)
                break

            msg = json.loads(line)

            name = msg['name']
            data = msg['data']
            type = msg['type']
            baseline = msg.get('baseline', False)

            self.raw_snapshot[name] = data

            self.__dispatch(name, type, data, baseline)


        # closing of socket must be from the same thread
        self.socket.close(linger = 0)
项目:spyking-circus-ort    作者:spyking-circus    | 项目源码 | 文件源码
def __init__(self, host, address, log_address):

        object.__init__(self)

        if log_address is None:
            raise NotImplementedError()
            # TODO remove
        self.logger = get_log(log_address, name=__name__)

        # TODO find proper space to define following class
        class Encoder(json.JSONEncoder):

            def default(self_, obj):
                if obj is None:
                    obj = json.JSONEncoder.default(obj)
                else:
                    if isinstance(obj, Proxy):
                        obj = obj.encode()
                    else:
                        obj = self.wrap_proxy(obj)
                        obj = obj.encode()
                return obj

        self.encoder = Encoder

        self.context = zmq.Context()
        # TODO connect tmp socket
        self.logger.debug("connect tmp socket at {a}".format(a=address))
        socket = self.context.socket(zmq.PAIR)
        socket.connect(address)
        # TODO bind rpc socket
        transport = 'tcp'
        port = '*'
        endpoint = '{h}:{p}'.format(h=host, p=port)
        address = '{t}://{e}'.format(t=transport, e=endpoint)
        self.logger.debug("bind rpc socket at {a}".format(a=address))
        self.socket = self.context.socket(zmq.PAIR)
        # self.socket.setsockopt(zmq.RCVTIMEO, 10000)
        self.socket.bind(address)
        self.address = self.socket.getsockopt(zmq.LAST_ENDPOINT)
        self.logger.debug("rpc socket binded at {a}".format(a=self.address))
        # TODO send rpc address
        self.logger.debug("send back rpc address")
        message = {
            'address': self.address,
        }
        socket.send_json(message)

        self.last_obj_id = -1
        self.objs = {}
        self.poller = zmq.Poller()
        self.poller.register(self.socket, zmq.POLLIN)
项目:pymoku    作者:liquidinstruments    | 项目源码 | 文件源码
def __init__(self, ip_addr, load_instruments=None, force=False):
        """Create a connection to the Moku:Lab unit at the given IP address

        :type ip_addr: string
        :param ip_addr: The address to connect to. This should be in IPv4 dotted notation.

        :type load_instruments: bool or None
        :param load_instruments: Leave default (*None*) unless you know what you're doing.

        :type force: bool
        :param force: Ignore firmware and network compatibility checks and force the instrument
        to deploy. This is dangerous on many levels, leave *False* unless you know what you're doing.

        """
        self._ip = ip_addr
        self._seq = 0
        self._instrument = None
        self._known_mokus = []

        self._ctx = zmq.Context.instance()
        self._conn_lock = threading.RLock()

        try:
            self._conn = self._ctx.socket(zmq.REQ)
            self._conn.setsockopt(zmq.LINGER, 5000)
            self._conn.curve_publickey, self._conn.curve_secretkey = zmq.curve_keypair()
            self._conn.curve_serverkey, _ = zmq.auth.load_certificate(os.path.join(data_folder, '000'))
            self._conn.connect("tcp://%s:%d" % (self._ip, Moku.PORT))

            # Getting the serial should be fairly quick; it's a simple operation. More importantly we
            # don't wait to block the fall-back operation for too long
            self._conn.setsockopt(zmq.SNDTIMEO, 1000)
            self._conn.setsockopt(zmq.RCVTIMEO, 1000)

            self.serial = self.get_serial()
            self._set_timeout()
        except zmq.error.Again:
            if not force:
                print("Connection failed, either the Moku cannot be reached or the firmware is out of date")
                raise

            # If we're force-connecting, try falling back to non-encrypted.
            self._conn = self._ctx.socket(zmq.REQ)
            self._conn.setsockopt(zmq.LINGER, 5000)
            self._conn.connect("tcp://%s:%d" % (self._ip, Moku.PORT))

            self._set_timeout()

            self.serial = self.get_serial()

        self.name = None
        self.led = None
        self.led_colours = None

        # Check that pymoku is compatible with the Moku:Lab's firmware version
        if not force:
            build = self.get_firmware_build()
            if cp.firmware_is_compatible(build) == False: # Might be None = unknown, don't print that.
                raise MokuException("The connected Moku appears to be incompatible with this version of pymoku. Please run 'moku --ip={} firmware check_compat' for more information.".format(self._ip))

        self.load_instruments = load_instruments if load_instruments is not None else self.get_bootmode() == 'normal'