Python zope.interface 模块,provider() 实例源码

我们从Python开源项目中,提取了以下26个代码示例,用于说明如何使用zope.interface.provider()

项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, logPath=None, timeout=_REQUEST_TIMEOUT,
                 logFormatter=None, reactor=None):
        """
        @param logFormatter: An object to format requests into log lines for
            the access log.
        @type logFormatter: L{IAccessLogFormatter} provider

        @param reactor: A L{IReactorTime} provider used to compute logging
            timestamps.
        """
        if not reactor:
            from twisted.internet import reactor
        self._reactor = reactor

        if logPath is not None:
            logPath = os.path.abspath(logPath)
        self.logPath = logPath
        self.timeOut = timeout
        if logFormatter is None:
            logFormatter = combinedLogFormatter
        self._logFormatter = logFormatter

        # For storing the cached log datetime and the callback to update it
        self._logDateTime = None
        self._logDateTimeCall = None
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, connectedDeferred, wrappedProtocol):
        """
        @param connectedDeferred: The L{Deferred} that will callback
            with the C{wrappedProtocol} when it is connected.

        @param wrappedProtocol: An L{IProtocol} provider that will be
            connected.
        """
        self._connectedDeferred = connectedDeferred
        self._wrappedProtocol = wrappedProtocol

        for iface in [interfaces.IHalfCloseableProtocol,
                      interfaces.IFileDescriptorReceiver,
                      interfaces.IHandshakeListener]:
            if iface.providedBy(self._wrappedProtocol):
                directlyProvides(self, iface)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def connect(self, protocolFactory):
        """
        Implement L{IStreamClientEndpoint.connect} to launch a child process
        and connect it to a protocol created by C{protocolFactory}.

        @param protocolFactory: A factory for an L{IProtocol} provider which
            will be notified of all events related to the created process.
        """
        proto = protocolFactory.buildProtocol(_ProcessAddress())
        try:
            self._spawnProcess(
                _WrapIProtocol(proto, self._executable, self._errFlag),
                self._executable, self._args, self._env, self._path, self._uid,
                self._gid, self._usePTY, self._childFDs)
        except:
            return defer.fail()
        else:
            return defer.succeed(proto)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, reactor, port, backlog, interface):
        """
        @param reactor: An L{IReactorTCP} provider.

        @param port: The port number used for listening
        @type port: int

        @param backlog: Size of the listen queue
        @type backlog: int

        @param interface: The hostname to bind to
        @type interface: str
        """
        self._reactor = reactor
        self._port = port
        self._backlog = backlog
        self._interface = interface
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, reactor, host, port, timeout=30, bindAddress=None):
        """
        @param reactor: An L{IReactorTCP} provider

        @param host: A hostname, used when connecting
        @type host: str

        @param port: The port number, used when connecting
        @type port: int

        @param timeout: The number of seconds to wait before assuming the
            connection has failed.
        @type timeout: int

        @param bindAddress: A (host, port) tuple of local address to bind to,
            or None.
        @type bindAddress: tuple
        """
        self._reactor = reactor
        self._host = host
        self._port = port
        self._timeout = timeout
        self._bindAddress = bindAddress
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, reactor, port, sslContextFactory,
                 backlog=50, interface=''):
        """
        @param reactor: An L{IReactorSSL} provider.

        @param port: The port number used for listening
        @type port: int

        @param sslContextFactory: An instance of
            L{interfaces.IOpenSSLContextFactory}.

        @param backlog: Size of the listen queue
        @type backlog: int

        @param interface: The hostname to bind to, defaults to '' (all)
        @type interface: str
        """
        self._reactor = reactor
        self._port = port
        self._sslContextFactory = sslContextFactory
        self._backlog = backlog
        self._interface = interface
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, reactor, path, timeout=30, checkPID=0):
        """
        @param reactor: An L{IReactorUNIX} provider.

        @param path: The path to the Unix socket file, used when connecting
        @type path: str

        @param timeout: Number of seconds to wait before assuming the
            connection has failed.
        @type timeout: int

        @param checkPID: If True, check for a pid file to verify that a server
            is listening.
        @type checkPID: bool
        """
        self._reactor = reactor
        self._path = path
        self._timeout = timeout
        self._checkPID = checkPID
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _parseServer(self, reactor, port, backlog=50, interface='::'):
        """
        Internal parser function for L{_parseServer} to convert the string
        arguments into structured arguments for the L{TCP6ServerEndpoint}

        @param reactor: An L{IReactorTCP} provider.

        @param port: The port number used for listening
        @type port: int

        @param backlog: Size of the listen queue
        @type backlog: int

        @param interface: The hostname to bind to
        @type interface: str
        """
        port = int(port)
        backlog = int(backlog)
        return TCP6ServerEndpoint(reactor, port, backlog, interface)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def spawnProcess(self, processProtocol, executable, args=(), env={},
                     path=None, uid=None, gid=None, usePTY=0, childFDs=None):
        """
        @ivar processProtocol: Stores the protocol passed to the reactor.
        @return: An L{IProcessTransport} provider.
        """
        self.processProtocol = processProtocol
        self.executable = executable
        self.args = args
        self.env = env
        self.path = path
        self.uid = uid
        self.gid = gid
        self.usePTY = usePTY
        self.childFDs = childFDs

        self.processTransport = MemoryProcessTransport()
        self.processProtocol.makeConnection(self.processTransport)
        return self.processTransport
项目:maas    作者:maas    | 项目源码 | 文件源码
def retries(timeout=30, intervals=1, clock=reactor):
    """Helper for retrying something, sleeping between attempts.

    Returns a generator that yields ``(elapsed, remaining, wait)`` tuples,
    giving times in seconds. The last item, `wait`, is the suggested amount of
    time to sleep before trying again.

    :param timeout: From now, how long to keep iterating, in seconds. This can
        be specified as a number, or as an iterable. In the latter case, the
        iterator is advanced each time an interval is needed. This allows for
        back-off strategies.
    :param intervals: The sleep between each iteration, in seconds, an an
        iterable from which to obtain intervals.
    :param clock: An optional `IReactorTime` provider. Defaults to the
        installed reactor.
    """
    start = clock.seconds()
    end = start + timeout

    if isinstance(intervals, Iterable):
        intervals = iter(intervals)
    else:
        intervals = repeat(intervals)

    return gen_retries(start, end, intervals, clock)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def getPeer(self):
        """
        Get the remote address of this connection.

        @return: An L{IAddress} provider.
        """
        return self.transport.getPeer()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def getHost(self):
        """
        Get the local address of this connection.

        @return: An L{IAddress} provider.
        """
        return self.transport.getHost()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def registerProducer(self, producer, streaming):
        """
        Register to receive data from a producer.

        This sets self to be a consumer for a producer.  When this object runs
        out of data (as when a send(2) call on a socket succeeds in moving the
        last data from a userspace buffer into a kernelspace buffer), it will
        ask the producer to resumeProducing().

        For L{IPullProducer} providers, C{resumeProducing} will be called once
        each time data is required.

        For L{IPushProducer} providers, C{pauseProducing} will be called
        whenever the write buffer fills up and C{resumeProducing} will only be
        called when it empties.

        @type producer: L{IProducer} provider
        @param producer: The L{IProducer} that will be producing data.

        @type streaming: L{bool}
        @param streaming: C{True} if C{producer} provides L{IPushProducer},
        C{False} if C{producer} provides L{IPullProducer}.

        @raise RuntimeError: If a producer is already registered.

        @return: L{None}
        """
        if self._requestProducer is not None:
            raise RuntimeError(
                "Cannot register producer %s, because producer %s was never "
                "unregistered." % (producer, self._requestProducer))

        if not streaming:
            producer = _PullToPush(producer, self)

        self._requestProducer = producer
        self._requestProducerStreaming = streaming

        if not streaming:
            producer.startStreaming()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, proto, executable, errFlag):
        """
        @param proto: An L{IProtocol} provider.
        @param errFlag: A constant belonging to L{StandardErrorBehavior}
            that determines if stderr is logged or dropped.
        @param executable: The file name (full path) to spawn.
        """
        self.protocol = proto
        self.errFlag = errFlag
        self.executable = executable
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def makeConnection(self, process):
        """
        Call L{IProtocol} provider's makeConnection method with an
        L{ITransport} provider.

        @param process: An L{IProcessTransport} provider.
        """
        self.transport = _ProcessEndpointTransport(process)
        return self.protocol.makeConnection(self.transport)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def processEnded(self, reason):
        """
        If the process ends with L{error.ProcessDone}, this method calls the
        L{IProtocol} provider's L{connectionLost} with a
        L{error.ConnectionDone}

        @see: L{ProcessProtocol.processEnded}
        """
        if (reason.check(error.ProcessDone) == error.ProcessDone) and (
                reason.value.status == 0):
            return self.protocol.connectionLost(
                Failure(error.ConnectionDone()))
        else:
            return self.protocol.connectionLost(reason)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, reactor, port, backlog=50, interface='::'):
        """
        @param reactor: An L{IReactorTCP} provider.

        @param port: The port number used for listening
        @type port: int

        @param backlog: Size of the listen queue
        @type backlog: int

        @param interface: The hostname to bind to, defaults to C{::} (all)
        @type interface: str
        """
        _TCPServerEndpoint.__init__(self, reactor, port, backlog, interface)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, reactor, host, port, timeout=30, bindAddress=None,
                 attemptDelay=None):
        """
        Create a L{HostnameEndpoint}.

        @param reactor: The reactor to use for connections and delayed calls.
        @type reactor: provider of L{IReactorTCP}, L{IReactorTime} and either
            L{IReactorPluggableNameResolver} or L{IReactorPluggableResolver}.

        @param host: A hostname to connect to.
        @type host: L{bytes} or L{unicode}

        @param port: The port number to connect to.
        @type port: L{int}

        @param timeout: For each individual connection attempt, the number of
            seconds to wait before assuming the connection has failed.
        @type timeout: L{int}

        @param bindAddress: the local address of the network interface to make
            the connections from.
        @type bindAddress: L{bytes}

        @param attemptDelay: The number of seconds to delay between connection
            attempts.
        @type attemptDelay: L{float}

        @see: L{twisted.internet.interfaces.IReactorTCP.connectTCP}
        """
        self._reactor = reactor
        [self._badHostname, self._hostBytes, self._hostText] = (
            self._hostAsBytesAndText(host)
        )
        self._hostStr = self._hostBytes if bytes is str else self._hostText
        self._port = port
        self._timeout = timeout
        self._bindAddress = bindAddress
        if attemptDelay is None:
            attemptDelay = self._DEFAULT_ATTEMPT_DELAY
        self._attemptDelay = attemptDelay
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, reactor, host, port, sslContextFactory,
                 timeout=30, bindAddress=None):
        """
        @param reactor: An L{IReactorSSL} provider.

        @param host: A hostname, used when connecting
        @type host: str

        @param port: The port number, used when connecting
        @type port: int

        @param sslContextFactory: SSL Configuration information as an instance
            of L{interfaces.IOpenSSLContextFactory}.

        @param timeout: Number of seconds to wait before assuming the
            connection has failed.
        @type timeout: int

        @param bindAddress: A (host, port) tuple of local address to bind to,
            or None.
        @type bindAddress: tuple
        """
        self._reactor = reactor
        self._host = host
        self._port = port
        self._sslContextFactory = sslContextFactory
        self._timeout = timeout
        self._bindAddress = bindAddress
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self, reactor, fileno, addressFamily):
        """
        @param reactor: An L{IReactorSocket} provider.

        @param fileno: An integer file descriptor corresponding to a listening
            I{SOCK_STREAM} socket.

        @param addressFamily: The address family of the socket given by
            C{fileno}.
        """
        self.reactor = reactor
        self.fileno = fileno
        self.addressFamily = addressFamily
        self._used = False
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _parseServer(self, reactor, domain, index):
        """
        Internal parser function for L{_parseServer} to convert the string
        arguments for a systemd server endpoint into structured arguments for
        L{AdoptedStreamServerEndpoint}.

        @param reactor: An L{IReactorSocket} provider.

        @param domain: The domain (or address family) of the socket inherited
            from systemd.  This is a string like C{"INET"} or C{"UNIX"}, ie the
            name of an address family from the L{socket} module, without the
            C{"AF_"} prefix.
        @type domain: C{str}

        @param index: An offset into the list of file descriptors inherited from
            systemd.
        @type index: C{str}

        @return: A two-tuple of parsed positional arguments and parsed keyword
            arguments (a tuple and a dictionary).  These can be used to
            construct an L{AdoptedStreamServerEndpoint}.
        """
        index = int(index)
        fileno = self._sddaemon.inheritedDescriptors()[index]
        addressFamily = getattr(socket, 'AF_' + domain)
        return AdoptedStreamServerEndpoint(reactor, fileno, addressFamily)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _loadCAsFromDir(directoryPath):
    """
    Load certificate-authority certificate objects in a given directory.

    @param directoryPath: a L{unicode} or L{bytes} pointing at a directory to
        load .pem files from, or L{None}.

    @return: an L{IOpenSSLTrustRoot} provider.
    """
    caCerts = {}
    for child in directoryPath.children():
        if not child.asTextMode().basename().split(u'.')[-1].lower() == u'pem':
            continue
        try:
            data = child.getContent()
        except IOError:
            # Permission denied, corrupt disk, we don't care.
            continue
        try:
            theCert = Certificate.loadPEM(data)
        except SSLError:
            # Duplicate certificate, invalid certificate, etc.  We don't care.
            pass
        else:
            caCerts[theCert.digest()] = theCert
    return trustRootFromCertificates(caCerts.values())
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_constructor(self):
        """
        Stores an L{IProtocol} provider and the flag to log/drop stderr
        """
        d = self.ep.connect(self.factory)
        self.successResultOf(d)
        wpp = self.reactor.processProtocol
        self.assertIsInstance(wpp.protocol, StubApplicationProtocol)
        self.assertEqual(wpp.errFlag, self.ep._errFlag)
项目:maas    作者:maas    | 项目源码 | 文件源码
def gen_retries(start, end, intervals, clock=reactor):
    """Helper for retrying something, sleeping between attempts.

    Yields ``(elapsed, remaining, wait)`` tuples, giving times in seconds. The
    last item, `wait`, is the suggested amount of time to sleep before trying
    again.

    This function works in concert with `retries`. It's split out so that
    `retries` can capture the correct start time rather than the time at which
    it is first iterated.

    :param start: The start time, in seconds, of this generator. This must be
        congruent with the `IReactorTime` argument passed to this generator.
    :param end: The desired end time, in seconds, of this generator. This must
        be congruent with the `IReactorTime` argument passed to this
        generator.
    :param intervals: A iterable of intervals, each in seconds, which should
        be used as hints for the `wait` value that's generated.
    :param clock: An optional `IReactorTime` provider. Defaults to the
        installed reactor.

    """
    for interval in intervals:
        now = clock.seconds()
        if now < end:
            wait = min(interval, end - now)
            yield now - start, end - now, wait
        else:
            yield now - start, end - now, 0
            break
项目:plone.server    作者:plone    | 项目源码 | 文件源码
def test_register_behavior(self):
        cur_count = len(
            configure.get_configurations('plone.server.tests', 'behavior'))

        from plone.server.interfaces import IFormFieldProvider
        from zope.interface import provider
        from zope import schema

        @provider(IFormFieldProvider)
        class IMyBehavior(Interface):
            foobar = schema.Text()

        configure.behavior(
            title="MyBehavior",
            provides=IMyBehavior,
            factory="plone.behavior.AnnotationStorage",
            for_="plone.server.interfaces.IResource"
        )()

        self.assertEqual(
            len(configure.get_configurations('plone.server.tests', 'behavior')),
            cur_count + 1)

        class IMyType(Interface):
            pass

        class MyType(Item):
            pass

        configure.register_configuration(MyType, dict(
            context=ISite,
            schema=IMyType,
            portal_type="MyType2",
            behaviors=[IMyBehavior]
        ), 'contenttype')

        # now test it...
        configure.load_configuration(
            self.layer.app.app.config, 'plone.server.tests', 'contenttype')
        self.layer.app.app.config.execute_actions()

        resp = self.layer.requester('GET', '/plone/plone/@types')
        response = json.loads(resp.text)
        type_ = [s for s in response if s['title'] == 'MyType2'][0]
        self.assertTrue('foobar' in type_['definitions']['IMyBehavior']['properties'])
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def deterministicResolvingReactor(reactor, expectedAddresses=(),
                                  hostMap=None):
    """
    Create a reactor that will deterministically resolve all hostnames it is
    passed to the list of addresses given.

    @param reactor: An object that we wish to add an
        L{IReactorPluggableNameResolver} to.
    @type reactor: Any object with some formally-declared interfaces (i.e. one
        where C{list(providedBy(reactor))} is not empty); usually C{IReactor*}
        interfaces.

    @param expectedAddresses: (optional); the addresses expected to be returned
        for every address.  If these are strings, they should be IPv4 or IPv6
        literals, and they will be wrapped in L{IPv4Address} and L{IPv6Address}
        objects in the resolution result.
    @type expectedAddresses: iterable of C{object} or C{str}

    @param hostMap: (optional); the names (unicode) mapped to lists of
        addresses (str or L{IAddress}); in the same format as expectedAddress,
        which map the results for I{specific} hostnames to addresses.

    @return: A new reactor which provides all the interfaces previously
        provided by C{reactor} as well as L{IReactorPluggableNameResolver}.
        All name resolutions performed with its C{nameResolver} attribute will
        resolve reentrantly and synchronously with the given
        C{expectedAddresses}.  However, it is not a complete implementation as
        it does not have an C{installNameResolver} method.
    """
    if hostMap is None:
        hostMap = {}
    hostMap = hostMap.copy()
    @provider(IHostnameResolver)
    class SimpleNameResolver(object):
        @staticmethod
        def resolveHostName(resolutionReceiver, hostName, portNumber=0,
                            addressTypes=None, transportSemantics='TCP'):
            resolutionReceiver.resolutionBegan(None)
            for expectedAddress in hostMap.get(hostName, expectedAddresses):
                if isinstance(expectedAddress, str):
                    expectedAddress = ([IPv4Address, IPv6Address]
                                       [isIPv6Address(expectedAddress)]
                                       ('TCP', expectedAddress, portNumber))
                resolutionReceiver.addressResolved(expectedAddress)
            resolutionReceiver.resolutionComplete()


    @provider(IReactorPluggableNameResolver)
    class WithResolver(proxyForInterface(
            InterfaceClass('*', tuple(providedBy(reactor)))
    )):
        nameResolver = SimpleNameResolver()
    return WithResolver(reactor)