Python twisted.internet 模块,reactor() 实例源码


项目:txmix    作者:applied-mixnetworks    | 项目源码 | 文件源码
def message_received(self, unwrapped_message):
        message is of type UnwrappedMessage

        delay = self._sys_rand.randint(0, self.max_delay)
        action = start_action(
            action_type=u"send delayed message",
        with action.context():
            d = deferLater(self.reactor, delay, self.protocol.packet_proxy, unwrapped_message)

            def _remove(res, d=d):
                return res

项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def test_cannot_listen(self):
        When the program is run with an argument and a listen address specified
        with a port that we can't listen on (e.g. port 1), a CannotListenError
        is expected to be logged and the program should stop.
        temp_dir = self.useFixture(TempDir())
        yield main(reactor, raw_args=[
            '--listen', ':1',  # A port we can't listen on

        # Expect a 'certs' directory to be created
        self.assertThat(os.path.isdir(temp_dir.join('certs')), Equals(True))

        # Expect a default certificate to be created

        # Expect to be unable to listen
项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def get_events(self, callbacks):
        Attach to Marathon's event stream using Server-Sent Events (SSE).

        :param callbacks:
            A dict mapping event types to functions that handle the event data
        d = self.request('GET', path='/v2/events', unbuffered=True, headers={
            'Accept': 'text/event-stream',
            'Cache-Control': 'no-store'

        def handler(event, data):
            callback = callbacks.get(event)
            # Deserialize JSON if a callback is present
            if callback is not None:

        return d.addCallback(
            sse_content, handler, reactor=self._reactor, **self._sse_kwargs)
项目:gloss    作者:openhealthcare    | 项目源码 | 文件源码
def makeService(self, options):
        """Construct a server using MLLPFactory.

        :rtype: :py:class:`twisted.application.internet.StreamServerEndpointService`
        from twisted.internet import reactor
        from txHL7.mllp import IHL7Receiver, MLLPFactory

        receiver_name = options['receiver']
        receiver_class = reflect.namedClass(receiver_name)
        verifyClass(IHL7Receiver, receiver_class)
        factory = MLLPFactory(receiver_class())
        multi_service = MultiService()

        for port_number in PORTS:
            port = "tcp:interface={0}:port={1}".format(HOST, port_number,)
            endpoint = endpoints.serverFromString(reactor, port)
            server = internet.StreamServerEndpointService(endpoint, factory)
            server.setName(u"mllp-{0}-{1}".format(receiver_name, port_number))
        return multi_service
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def listen(description, factory, default=None):
    """Listen on a port corresponding to a description

    @type description: C{str}
    @type factory: L{twisted.internet.interfaces.IProtocolFactory}
    @type default: C{str} or C{None}
    @rtype: C{twisted.internet.interfaces.IListeningPort}
    @return: the port corresponding to a description of a reliable
    virtual circuit server.

    See the documentation of the C{parse} function for description
    of the semantics of the arguments.
    from twisted.internet import reactor
    name, args, kw = parse(description, factory, default)
    return getattr(reactor, 'listen'+name)(*args, **kw)
项目:txmix    作者:applied-mixnetworks    | 项目源码 | 文件源码
def message_received(self, unwrapped_message):
        message is of type UnwrappedMessage

        self._batch.append(unwrapped_message)  # [(destination, sphinx_packet)
        if len(self._batch) >= self.threshold_count:
            delay = self._sys_rand.randint(0, self.max_delay)
            action = start_action(
                action_type=u"send delayed message batch",
            with action.context():
                released = self._batch
                self._batch = []
                d = deferLater(self.reactor, delay, self.batch_send, released)

                def _remove(res, d=d):
                    return res

项目:bitmask-dev    作者:leapcode    | 项目源码 | 文件源码
def _start_onion_service(self, factory):

        def progress(percent, tag, message):
            bar = int(percent / 10)
            log.debug('[%s%s] %s' % ('#' * bar, '.' * (10 - bar), message))

        def setup_complete(port):
            port = txtorcon.IHiddenService(port)
            self.uri = "http://%s" % (port.getHost().onion_uri)
  'I have set up a hidden service, advertised at: %s'
                     % self.uri)
  'locally listening on %s' % port.local_address.getHost())

        def setup_failed(args):
            log.error('onion service setup FAILED: %r' % args)

        endpoint = endpoints.serverFromString(reactor, 'onion:80')
        d = endpoint.listen(factory)
        return d
项目:universe    作者:openai    | 项目源码 | 文件源码
def connectionMade(self):'[%s] Connection received from VNC client',
        factory = protocol.ClientFactory()
        factory.protocol = VNCProxyClient
        factory.vnc_server = self
        factory.deferrable = defer.Deferred()
        endpoint = endpoints.clientFromString(reactor, self.factory.vnc_address)

        def _established_callback(client):
            if self._broken:
            self.vnc_client = client
        def _established_errback(reason):
            logger.error('[VNCProxyServer] Connection succeeded but could not establish session: %s', reason)
        factory.deferrable.addCallbacks(_established_callback, _established_errback)

        def _connect_errback(reason):
            logger.error('[VNCProxyServer] Connection failed: %s', reason)

项目:vault-redirector-twisted    作者:manheim    | 项目源码 | 文件源码
def run(self):
        """setup the site, start listening on port, setup the looping call to
        :py:meth:`~.update_active_node` every ``self.poll_interval`` seconds,
        and start the Twisted reactor"""
        # get the active node before we start anything...
        self.active_node_ip_port = self.get_active_node()
        if self.active_node_ip_port is None:
            logger.critical("ERROR: Could not get active vault node from "
                            "Consul. Exiting.")
            raise SystemExit(3)
        logger.warning("Initial Vault active node: %s",
        site = Site(VaultRedirectorSite(self))
        # setup our HTTP(S) listener
        if self.tls_factory is not None:
        # setup the update_active_node poll every POLL_INTERVAL seconds
        logger.warning('Starting Twisted reactor (event loop)')
项目:vault-redirector-twisted    作者:manheim    | 项目源码 | 文件源码
def se_requester(self):
        While the reactor is polling, we can't make any requests. So have the
        reactor itself make the request and store the result.
        logger.debug('requester called; spawning process')
        # since Python is single-threaded and Twisted is just event-based,
        # we can't do a request and run the redirector from the same script.
        # Best choice is to used popen to run an external script to do the
        # redirect.
        url = '' % self.cls.bind_port
        path = os.path.join(os.path.dirname(__file__), '')
        self.poller = subprocess.Popen(
            [sys.executable, path, url, '/bar/baz', '/vault-redirector-health'],
        # run a poller loop to check for process stop and get results
        self.poller_check_task = task.LoopingCall(self.check_request)
        self.poller_check_task.clock = self.cls.reactor
        logger.debug('poller_check_task started')
项目:vault-redirector-twisted    作者:manheim    | 项目源码 | 文件源码
def check_request(self):
        check if the self.poller process has finished; if so, handle results
        and stop the poller_check_task. If update_active has also already been
        called, stop the reactor.
        logger.debug('check_request called')
        if self.poller.poll() is None:
            logger.debug('poller process still running')
        # stop the looping task
        assert self.poller.returncode == 0
        out, err = self.poller.communicate()
        self.response = out.strip()
        logger.debug('check_request done; response: %s', self.response)
        # on python3, this will be binary
        if not isinstance(self.response, str):
            self.response = self.response.decode('utf-8')
        if self.update_active_called:
项目:privacyidea-ldap-proxy    作者:NetKnights-GmbH    | 项目源码 | 文件源码
def makeService(self, options):
        Called by Twisted after having parsed the command-line options.
        :param options: ``usage.Options`` instance
        :return: the server instance
        # Configuration is mandatory
        if options['config'] is None:
            print 'You need to specify a configuration file via `twistd ldap-proxy -c config.ini`.'

        config = load_config(options['config'])
        factory = ProxyServerFactory(config)

        endpoint_string = serverFromString(reactor, config['ldap-proxy']['endpoint'])
        return internet.StreamServerEndpointService(endpoint_string, factory)
项目:privacyidea-ldap-proxy    作者:NetKnights-GmbH    | 项目源码 | 文件源码
def connect_service_account(self):
        Make a new connection to the LDAP backend server using the credentials of the service account
        :return: A Deferred that fires a `LDAPClient` instance
        client = yield connectToLDAPEndpoint(reactor, self.proxied_endpoint_string, LDAPClient)
        if self.use_tls:
            client = yield client.startTLS()
            yield client.bind(self.service_account_dn, self.service_account_password)
        except ldaperrors.LDAPException, e:
            # Call unbind() here if an exception occurs: Otherwise, Twisted will keep the file open
            # and slowly run out of open files.
            yield client.unbind()
            raise e
项目:driveboardapp    作者:nortd    | 项目源码 | 文件源码
def test_twisted(pyi_builder):
        # Twisted is an event-driven networking engine.
        # The 'reactor' is object that starts the eventloop.
        # There are different types of platform specific reactors.
        # Platform specific reactor is wrapped into twisted.internet.reactor module.
        from twisted.internet import reactor
        # Applications importing module twisted.internet.reactor might fail
        # with error like:
        #     AttributeError: 'module' object has no attribute 'listenTCP'
        # Ensure default reactor was loaded - it has method 'listenTCP' to start server.
        if not hasattr(reactor, 'listenTCP'):
            raise SystemExit('Twisted reactor not properly initialized.')
项目:peas    作者:mwrlabs    | 项目源码 | 文件源码
def __init__(self, domain, username, pw, server, use_ssl, policy_key=0, server_version="14.0", device_type="iPhone", device_id=None, verbose=False):
        self.use_ssl = use_ssl
        self.domain = domain
        self.username = username
        self.password = pw
        self.server = server
        self.device_id = device_id
        if not self.device_id:
            self.device_id = str(uuid.uuid4()).replace("-","")[:32]
        self.server_version = server_version
        self.device_type = device_type
        self.policy_key = policy_key
        self.folder_data = {}
        self.verbose = verbose
        self.collection_data = {}
        clientContext = WebClientContextFactory()
        self.agent = Agent(reactor, clientContext)
        self.operation_queue = defer.DeferredQueue()
        self.queue_deferred = self.operation_queue.get()

    # Response processing
项目:python-hpedockerplugin    作者:hpe-storage    | 项目源码 | 文件源码
def test_hpe_create_volume_invalid_provisioning_option(self):
        name = 'test-create-volume-fake'
        path = b"/VolumeDriver.Create"
        body = {u"Name": name,
                u"Opts": {u"provisioning": u"fake"}}

        headers = Headers({b"content-type": [b"application/json"]})
        body_producer = FileBodyProducer(BytesIO(dumps(body)))
        agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
        d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
        d.addCallback(self.checkResponse, json.dumps({
            u"Err": "Invalid input received: Must specify a valid " +
            "provisioning type ['thin', 'full', " +
            "'dedup'], value 'fake' is invalid."}))
        d.addCallback(self._remove_volume_callback, name)
        return d
项目:python-hpedockerplugin    作者:hpe-storage    | 项目源码 | 文件源码
def test_hpe_create_volume_invalid_option(self):
        name = 'test-create-volume-fake'
        path = b"/VolumeDriver.Create"
        body = {u"Name": name,
                u"Opts": {u"fake": u"fake"}}

        headers = Headers({b"content-type": [b"application/json"]})
        body_producer = FileBodyProducer(BytesIO(dumps(body)))
        agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
        d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
        d.addCallback(self.checkResponse, json.dumps({
            u"Err": "create volume failed, error is: fake is not a valid "
            "option. Valid options are: ['size', 'provisioning', "
        d.addCallback(self._remove_volume_callback, name)
        return d
项目:python-hpedockerplugin    作者:hpe-storage    | 项目源码 | 文件源码
def _get_volume_mount_path(self, body, name):
        # NOTE: body arg is the result from last deferred call.
        # Python complains about parameter mis-match if you don't include it
        # In this test, we need it to compare expected results with Path
        # request

        # Compare path returned by mount (body) with Get Path request
        path = b"/VolumeDriver.Path"
        newbody = {u"Name": name}
        headers = Headers({b"content-type": [b"application/json"]})
        body_producer = FileBodyProducer(BytesIO(dumps(newbody)))
        agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
        d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
        d.addCallback(self.checkResponse, body)
        return d
项目:python-hpedockerplugin    作者:hpe-storage    | 项目源码 | 文件源码
def _mount_the_volume(self, body, name):
        # NOTE: body arg is the result from last deferred call.
        # Python complains about parameter mis-match if you don't include it

        # Mount the previously created volume
        path = b"/VolumeDriver.Mount"
        newbody = {u"Name": name}
        headers = Headers({b"content-type": [b"application/json"]})
        body_producer = FileBodyProducer(BytesIO(dumps(newbody)))
        agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
        d = agent.request(b'POST', b"UNIX://localhost" + path, headers,


        # If we get a valid response from Path request then we assume
        # the mount passed.
        # TODO: Add additonal logic to verify the mountpath
        d.addCallback(self._get_volume_mount_path, name)
        return d
项目:python-hpedockerplugin    作者:hpe-storage    | 项目源码 | 文件源码
def broken_test_hpe_mount_umount_volume(self):
        name = 'test-mount-volume'
        path = b"/VolumeDriver.Create"
        body = {u"Name": name}

        # Create a volume to be mounted
        headers = Headers({b"content-type": [b"application/json"]})
        body_producer = FileBodyProducer(BytesIO(dumps(body)))
        agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
        d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
        d.addCallback(self.checkResponse, json.dumps({u"Err": ''}))

        # Mount the previously created volume
        d.addCallback(self._mount_the_volume, name)

        # UMount the previously created volume
        d.addCallback(self._unmount_the_volume, name)

        # Remove the previously created volume
        d.addCallback(self._remove_volume_callback, name)
        return d
项目:python-hpedockerplugin    作者:hpe-storage    | 项目源码 | 文件源码
def test_hpe_get_volume(self):
        name = 'test-get-volume'
        path = b"/VolumeDriver.Create"
        body = {u"Name": name}

        # Create a volume to be mounted
        headers = Headers({b"content-type": [b"application/json"]})
        body_producer = FileBodyProducer(BytesIO(dumps(body)))
        agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
        d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
        d.addCallback(self.checkResponse, json.dumps({u"Err": ''}))

        # Get the previously created volume
        expected = {u"Volume": {u"Status": {},
                                u"Mountpoint": '',
                                u"Name": name},
                    u"Err": ''}
        d.addCallback(self._get_volume, name, expected)

        # Remove the previously created volume
        d.addCallback(self._remove_volume_callback, name)
        return d
项目:python-hpedockerplugin    作者:hpe-storage    | 项目源码 | 文件源码
def broken_test_hpe_list_volume(self):
        name = 'test-list-volume'
        path = b"/VolumeDriver.Create"
        body = {u"Name": name}

        # Create a volume to be mounted
        headers = Headers({b"content-type": [b"application/json"]})
        body_producer = FileBodyProducer(BytesIO(dumps(body)))
        agent = Agent.usingEndpointFactory(reactor, HPEEndpointFactory())
        d = agent.request(b'POST', b"UNIX://localhost" + path, headers,
        d.addCallback(self.checkResponse, json.dumps({u"Err": ''}))

        # List volumes
        expected = {u"Err": '',
                    u"Volumes": [{u"Mountpoint": '',
                                  u"Name": name}]}
        d.addCallback(self._list_volumes, name, expected)

        # Remove the previously created volume
        d.addCallback(self._remove_volume_callback, name)

        return d
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def listen(description, factory, default=None):
    """Listen on a port corresponding to a description

    @type description: C{str}
    @type factory: L{twisted.internet.interfaces.IProtocolFactory}
    @type default: C{str} or C{None}
    @rtype: C{twisted.internet.interfaces.IListeningPort}
    @return: the port corresponding to a description of a reliable
    virtual circuit server.

    See the documentation of the C{parse} function for description
    of the semantics of the arguments.
    from twisted.internet import reactor
    name, args, kw = parse(description, factory, default)
    return getattr(reactor, 'listen'+name)(*args, **kw)
项目:fluiddb    作者:fluidinfo    | 项目源码 | 文件源码
def testValidOptionsRequest(self):
        Makes sure that a "regular" OPTIONS request doesn't include the CORS
        specific headers in the response.
        agent = Agent(reactor)
        headers = Headers({'origin': ['http://localhost']})
        response = yield agent.request('OPTIONS', self.uri, headers)

        # Check we get the correct status.
        self.assertEqual(http.OK, response.code)

        # Check we get the correct length
        self.assertEqual(0, response.length)

        # Check we get the right headers back
项目:fluiddb    作者:fluidinfo    | 项目源码 | 文件源码
def testViaAgent(self):
        This is a manual check of a POST to /objects which uses
        L{twisted.web.client.Agent} to make the request. We do not use
        txFluidDB because we need to check that a Location header is
        received and that we receive both a 'URI' and an 'id' in the JSON
        response payload.
        URI = self.txEndpoint.getRootURL() + defaults.httpObjectCategoryName
        basicAuth = 'Basic %s' % b64encode('%s:%s' % ('testuser1', 'secret'))
        headers = Headers({'accept': ['application/json'],
                           'authorization': [basicAuth]})
        agent = Agent(reactor)
        response = yield agent.request('POST', URI, headers)
        self.assertEqual(http.CREATED, response.code)
        d = defer.Deferred()
        bodyGetter = ResponseGetter(d)
        body = yield d
        responseDict = json.loads(body)
        self.assertIn('URI', responseDict)
        self.assertIn('id', responseDict)
项目:fluiddb    作者:fluidinfo    | 项目源码 | 文件源码
def testQueryUnicodePath(self):
        """A query on a non-existent Unicode tag, should 404. Part of the
        point here is to make sure that no other error occurs due to
        passing in a Unicode tag path.
        path = u'çóñ/???'
        query = '%s = "hi"' % path
        URI = '%s/%s?query=%s' % (

        headers = Headers({'accept': ['application/json']})

        agent = Agent(reactor)
        response = yield agent.request('GET', URI, headers)

        self.assertEqual(http.NOT_FOUND, response.code)
项目:fluiddb    作者:fluidinfo    | 项目源码 | 文件源码
def testValidCORSRequest(self):
        Sanity check to make sure we get the valid headers back for a CORS
        based request.
        agent = Agent(reactor)
        headers = Headers()
        # The origin to use in the tests
        dummy_origin = ''
        headers.addRawHeader('Origin', dummy_origin)
        response = yield agent.request('GET', self.uri, headers)

        # Check we get the correct status.
        self.assertEqual(http.OK, response.code)
        # Check we get the right headers back
            dummy_origin in
项目:fluiddb    作者:fluidinfo    | 项目源码 | 文件源码
def testVersionGets404(self):
        Version numbers used to be able to be given in API calls, but are
        no longer supported.
        version = 20100808

        URI = '%s/%d/%s/%s' % (

        headers = Headers({'accept': ['application/json']})

        agent = Agent(reactor)
        response = yield agent.request('GET', URI, headers)
        self.assertEqual(http.NOT_FOUND, response.code)

    # TODO: Add a test for a namespace that we don't have LIST perm on.
    # although that might be done in when that finally gets
    # added.
项目:deb-python-txaio    作者:openstack    | 项目源码 | 文件源码
def with_config(loop=None):
    global config
    if loop is not None:
        if config.loop is not None and config.loop is not loop:
            raise RuntimeError(
                "Twisted has only a single, global reactor. You passed in "
                "a reactor different from the one already configured "
                "in txaio.config.loop"
    return _TxApi(config)

# NOTE: beware that twisted.logger._logger.Logger copies itself via an
# overriden __get__ method when used as recommended as a class
# descriptor.  So, we override __get__ to just return ``self`` which
# means ``log_source`` will be wrong, but we don't document that as a
# key that you can depend on anyway :/
项目:mgr.p2p.proxy    作者:tomusdrw    | 项目源码 | 文件源码
def requestWebObject(self):
        parsed = urlparse.urlparse(self.uri)
        protocol = parsed[0]
        host, port = self.extractHostAndPort(parsed, protocol)
        rest = self.extractQuery(parsed)

        class_ = self.protocols[protocol]

        headers = self.getAllHeaders().copy()

        if 'host' not in headers:
            headers['host'] = host'Performing {} request for {}'.format(self.method, self.uri)), 0)
        s =
        clientFactory = class_(self.method, rest, self.clientproto, headers,
                               s, self)
        self.reactor.connectTCP(host, port, clientFactory)
项目:afkak    作者:ciena    | 项目源码 | 文件源码
def clientConnectionLost(self, connector, reason):
        """Handle notification from the lower layers of connection loss.

        If we are shutting down, and twisted sends us the expected type of
        error, eat the error. Otherwise, log it and pass it along.
        Also, schedule notification of our subscribers at the next pass
        through the reactor.
        if self.dDown and reason.check(ConnectionDone):
            # We initiated the close, this is an expected close/lost
            log.debug('%r: Connection Closed:%r:%r', self, connector, reason)
            notifyReason = None  # Not a failure
            log.debug('%r: clientConnectionLost:%r:%r', self, connector,
            notifyReason = reason

        # Reset our proto so we don't try to send to a down connection
        self.proto = None
        # Schedule notification of subscribers
        self._get_clock().callLater(0, self._notify, False, notifyReason)
        # Call our superclass's method to handle reconnecting
            self, connector, reason)
项目:bwscanner    作者:TheTorProject    | 项目源码 | 文件源码
def __init__(self, host, port, state, path):
        @param reactor: An L{IReactorTCP} provider

        @param host: A hostname, used when connecting
        @type host: str

        @param port: The port number, used when connecting
        @type port: int

        @param path: A list of relay identities.
        @type path: list

        This endpoint will be routed through Tor over a circuit
        defined by path.
        """ = host
        self.port = port
        self.path = path
        self.state = state

        self.or_endpoint = get_orport_endpoint(state)
项目:bwscanner    作者:TheTorProject    | 项目源码 | 文件源码
def start_tor(config):
    Launches tor with random TCP ports chosen for SocksPort and ControlPort,
    and other options specified by a txtorcon.torconfig.TorConfig instance.

    Returns a deferred that calls back with a txtorcon.torstate.TorState
    def get_random_tor_ports():
        d2 = available_tcp_port(reactor)
        d2.addCallback(lambda port: config.__setattr__('SocksPort', port))
        d2.addCallback(lambda _: available_tcp_port(reactor))
        d2.addCallback(lambda port: config.__setattr__('ControlPort', port))
        return d2

    def launch_and_get_state(ignore):
        d2 = launch_tor(config, reactor, stdout=sys.stdout)
        d2.addCallback(lambda tpp: TorState(tpp.tor_protocol).post_bootstrap)
        return d2
    return get_random_tor_ports().addCallback(launch_and_get_state)
项目:ccs-twistedextensions    作者:apple    | 项目源码 | 文件源码
def reExec(self):
        Removes pidfile, registers an exec to happen after shutdown, then
        stops the reactor.
        self.log.warn("SIGHUP received - restarting")
  "Removing pidfile: {log_source.pidfilePath}")
        except OSError:
            "after", "shutdown", os.execv,
            sys.executable, [sys.executable] + sys.argv
项目:ccs-twistedextensions    作者:apple    | 项目源码 | 文件源码
def _getPort(self):
        from twisted.internet import reactor

        if self.inherit:
            port = InheritedSSLPort(
                self.args[0], self.args[1], self.args[2], reactor
            port = MaxAcceptSSLPort(
                self.args[0], self.args[1], self.args[2],
                self.backlog, self.interface, self.reactor

        self.myPort = port
        return port
项目:ccs-twistedextensions    作者:apple    | 项目源码 | 文件源码
def __init__(self, reactor, transactionFactory, useWorkerPool=True, disableWorkProcessing=False):
        Initialize a L{ControllerQueue}.

        @param transactionFactory: a 0- or 1-argument callable that produces an

        @param useWorkerPool:  Whether to use a worker pool to manage load
            or instead take on all work ourselves (e.g. in single process mode)
        super(ControllerQueue, self).__init__()
        self.reactor = reactor
        self.transactionFactory = transactionFactory
        self.workerPool = WorkerConnectionPool() if useWorkerPool else None
        self.disableWorkProcessing = disableWorkProcessing
        self._lastMinPriority = WORK_PRIORITY_LOW
        self._timeOfLastWork = time.time()
        self._actualPollInterval = self.queuePollInterval
        self._inWorkCheck = False
        self._inOverdueCheck = False
项目:ccs-twistedextensions    作者:apple    | 项目源码 | 文件源码
def _overdueCheckLoop(self):
        While the service is running, keep checking for any overdue items.
        self._overdueCheckCall = None

        if not self.running:

            yield self._overdueCheck()
        except Exception as e:
            log.error("_overdueCheckLoop: {exc}", exc=e)

        if not self.running:

        self._overdueCheckCall = self.reactor.callLater(
            self.queueOverduePollInterval, self._overdueCheckLoop
项目:deb-python-autobahn    作者:openstack    | 项目源码 | 文件源码
def parseStreamServer(self, reactor, description, **options):

        # The present endpoint plugin is intended to be used as in the
        # following for running a streaming protocol over WebSocket over
        # an underlying stream transport.
        # endpoint = serverFromString(reactor,
        # "autobahn:tcp\:9000\:interface\=\://localhost\:9000:compress=false"
        # This will result in `parseStreamServer` to be called will
        # description == tcp:9000:interface=
        # and
        # options == {'url': 'ws://localhost:9000', 'compress': 'false'}
        # Essentially, we are using the `\:` escape to coerce the endpoint descriptor
        # of the underlying stream transport into one (first) positional argument.
        # Note that the `\:` within "url" is another form of escaping!
        opts = _parseOptions(options)
        endpoint = serverFromString(reactor, description)
        return AutobahnServerEndpoint(reactor, endpoint, opts)
项目:Interactive_estimation    作者:adminq80    | 项目源码 | 文件源码
def start_initial(game):
    round_data, users_plots = get_round(game)
    state = 'initial'

    if round_data is None:
        game.end_time =
        game.broadcast(action='redirect', url=reverse('interactive:exit'))
        cache.set(, {'state': state,
                            'round_data': round_data,
                            'users_plots': users_plots,
    initial(game, round_data, users_plots)
    task.deferLater(reactor, 1, game_state_checker, game, state, round_data, users_plots).addErrback(twisted_error)
项目:nav    作者:UNINETT    | 项目源码 | 文件源码
def throttled(func):
    """Decorator for AgentProxyMixIn.getTable to throttle requests"""
    def _wrapper(*args, **kwargs):
        self = args[0]
        last_request = getattr(self, '_last_request')
        delay = (last_request + self.throttle_delay) - time.time()
        setattr(self, '_last_request', time.time())

        if delay > 0:
            _logger.debug("%sss delay due to throttling: %r", delay, self)
            return deferLater(reactor, delay, func, *args, **kwargs)
            return func(*args, **kwargs)

    return wraps(func)(_wrapper)

# pylint: disable=R0903
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def service(description, factory, reactor=None):
    Return the service corresponding to a description.

    @param description: The description of the listening port, in the syntax
        described by L{twisted.internet.endpoints.serverFromString}.
    @type description: C{str}

    @param factory: The protocol factory which will build protocols for
        connections to this service.
    @type factory: L{twisted.internet.interfaces.IProtocolFactory}

    @rtype: C{twisted.application.service.IService}
    @return: the service corresponding to a description of a reliable stream

    @see: L{twisted.internet.endpoints.serverFromString}
    if reactor is None:
        from twisted.internet import reactor

    svc = StreamServerEndpointService(
        endpoints.serverFromString(reactor, description), factory)
    svc._raiseSynchronously = True
    return svc
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def listen(description, factory):
    Listen on a port corresponding to a description.

    @param description: The description of the connecting port, in the syntax
        described by L{twisted.internet.endpoints.serverFromString}.
    @type description: L{str}

    @param factory: The protocol factory which will build protocols on
    @type factory: L{twisted.internet.interfaces.IProtocolFactory}

    @rtype: L{twisted.internet.interfaces.IListeningPort}
    @return: the port corresponding to a description of a reliable virtual
        circuit server.

    @see: L{twisted.internet.endpoints.serverFromString}
    from twisted.internet import reactor
    name, args, kw = endpoints._parseServer(description, factory)
    return getattr(reactor, 'listen' + name)(*args, **kw)
项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def test_storage_dir_required(self):
        When the program is run with no arguments, it should exit with code 2
        because there is one required argument.
        with ExpectedException(SystemExit, MatchesStructure(code=Equals(2))):
            main(reactor, raw_args=[])
项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def test_storage_dir_provided(self):
        When the program is run with an argument, it should start up and run.
        The program is expected to fail because it is unable to connect to

        This test takes a while because we have to let txacme go through it's
        initial sync (registration + issuing of 0 certificates) before things
        can be halted.
        temp_dir = self.useFixture(TempDir())
        yield main(reactor, raw_args=[
            '--acme', LETSENCRYPT_STAGING_DIRECTORY.asText(),
            '--marathon', 'http://localhost:28080'  # An address we can't reach

        # Expect a 'certs' directory to be created
        self.assertThat(os.path.isdir(temp_dir.join('certs')), Equals(True))

        # Expect a default certificate to be created

        # Expect to be unable to connect to Marathon
项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def test_default_reactor(self):
        When default_reactor is passed a reactor it should return that reactor.
        clock = Clock()

        assert_that(default_reactor(clock), Is(clock))
项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def test_default_reactor_not_provided(self):
        When default_reactor is not passed a reactor, it should return the
        default reactor.
        assert_that(default_reactor(None), Is(reactor))
项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def test_default_client_not_provided(self):
        When default_agent is not passed an agent, it should return a default
        assert_that(default_client(None, reactor), IsInstance(treq_HTTPClient))
项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def setUp(self):
        super(TestHTTPClientBase, self).setUp()

        self.requests = DeferredQueue()
        self.fake_server = FakeHttpServer(self.handle_request)

        fake_client = treq_HTTPClient(self.fake_server.get_agent())
        self.client = self.get_client(fake_client)

        # Spin the reactor once at the end of each test to clean up any
        # cancelled deferreds
项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def default_reactor(reactor):
    if reactor is None:
        from twisted.internet import reactor
    return reactor
项目:marathon-acme    作者:praekeltfoundation    | 项目源码 | 文件源码
def default_client(client, reactor):
    Set up a default client if one is not provided. Set up the default
    ``twisted.web.client.Agent`` using the provided reactor.
    if client is None:
        from twisted.web.client import Agent
        client = treq_HTTPClient(Agent(reactor))

    return client