Python twisted.python.failure 模块,value() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用twisted.python.failure.value()

项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def makeTodo(value):
    """
    Return a L{Todo} object built from C{value}.

    If C{value} is a string, return a Todo that expects any exception with
    C{value} as a reason. If C{value} is a tuple, the second element is used
    as the reason and the first element as the excepted error(s).

    @param value: A string or a tuple of C{(errors, reason)}, where C{errors}
    is either a single exception class or an iterable of exception classes.

    @return: A L{Todo} object.
    """
    if isinstance(value, str):
        return Todo(reason=value)
    if isinstance(value, tuple):
        errors, reason = value
        try:
            errors = list(errors)
        except TypeError:
            errors = [errors]
        return Todo(reason=reason, errors=errors)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def failUnlessFailure(self, deferred, *expectedFailures):
        """Assert that C{deferred} will errback with one of
        C{expectedFailures}.  Returns the original Deferred with callbacks
        added. You will need to return this Deferred from your test case.
        """
        def _cb(ignore):
            raise self.failureException(
                "did not catch an error, instead got %r" % (ignore,))

        def _eb(failure):
            if failure.check(*expectedFailures):
                return failure.value
            else:
                output = ('\nExpected: %r\nGot:\n%s'
                          % (expectedFailures, str(failure)))
                raise self.failureException(output)
        return deferred.addCallbacks(_cb, _eb)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def search(self, *queries, **kwarg):
        """Search messages in the currently selected mailbox

        This command is allowed in the Selected state.

        Any non-zero number of queries are accepted by this method, as
        returned by the C{Query}, C{Or}, and C{Not} functions.

        One keyword argument is accepted: if uid is passed in with a non-zero
        value, the server is asked to return message UIDs instead of message
        sequence numbers.

        @rtype: C{Deferred}
        @return: A deferred whose callback will be invoked with a list of all
        the message sequence numbers return by the search, or whose errback
        will be invoked if there is an error.
        """
        if kwarg.get('uid'):
            cmd = 'UID SEARCH'
        else:
            cmd = 'SEARCH'
        args = ' '.join(queries)
        d = self.sendCommand(Command(cmd, args, wantResponse=(cmd,)))
        d.addCallback(self.__cbSearch)
        return d
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def create(pathspec):
        """Create a new mailbox from the given hierarchical name.

        @type pathspec: C{str}
        @param pathspec: The full hierarchical name of a new mailbox to create.
        If any of the inferior hierarchical names to this one do not exist,
        they are created as well.

        @rtype: C{Deferred} or C{bool}
        @return: A true value if the creation succeeds, or a deferred whose
        callback will be invoked when the creation succeeds.

        @raise MailboxException: Raised if this mailbox cannot be added.
        This may also be raised asynchronously, if a C{Deferred} is
        returned.
        """
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def rename(oldname, newname):
        """Rename a mailbox

        @type oldname: C{str}
        @param oldname: The current name of the mailbox to rename.

        @type newname: C{str}
        @param newname: The new name to associate with the mailbox.

        @rtype: C{Deferred} or C{bool}
        @return: A true value if the mailbox is successfully renamed, or a
        C{Deferred} whose callback will be invoked when the rename operation
        is completed.

        @raise MailboxException: Raised if this mailbox cannot be
        renamed.  This may also be raised asynchronously, if a C{Deferred}
        is returned.
        """
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def requestStatus(names):
        """Return status information about this mailbox.

        Mailboxes which do not intend to do any special processing to
        generate the return value, C{statusRequestHelper} can be used
        to build the dictionary by calling the other interface methods
        which return the data for each name.

        @type names: Any iterable
        @param names: The status names to return information regarding.
        The possible values for each name are: MESSAGES, RECENT, UIDNEXT,
        UIDVALIDITY, UNSEEN.

        @rtype: C{dict} or C{Deferred}
        @return: A dictionary containing status information about the
        requested names is returned.  If the process of looking this
        information up would be costly, a deferred whose callback will
        eventually be passed this dictionary is returned instead.
        """
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testNonExistentDelete(self):
        def login():
            return self.client.login('testuser', 'password-test')
        def delete():
            return self.client.delete('delete/me')
        def deleteFailed(failure):
            self.failure = failure

        self.failure = None
        d1 = self.connected.addCallback(strip(login))
        d1.addCallback(strip(delete)).addErrback(deleteFailed)
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
        d2 = self.loopback()
        d = defer.gatherResults([d1, d2])
        d.addCallback(lambda _: self.assertEquals(str(self.failure.value),
                                                  'No such mailbox'))
        return d
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testPathelogicalScatteringOfLiterals(self):
        self.server.checker.addUser('testuser', 'password-test')
        transport = StringTransport()
        self.server.makeConnection(transport)

        transport.clear()
        self.server.dataReceived("01 LOGIN {8}\r\n")
        self.assertEquals(transport.value(), "+ Ready for 8 octets of text\r\n")

        transport.clear()
        self.server.dataReceived("testuser {13}\r\n")
        self.assertEquals(transport.value(), "+ Ready for 13 octets of text\r\n")

        transport.clear()
        self.server.dataReceived("password-test\r\n")
        self.assertEquals(transport.value(), "01 OK LOGIN succeeded\r\n")
        self.assertEquals(self.server.state, 'auth')

        self.server.connectionLost(error.ConnectionDone("Connection done."))
项目:p2pool-bch    作者:amarian12    | 项目源码 | 文件源码
def _on_no_port_mapping_received(self, failure, mappings):
        """
        Called when we have no more port mappings to retreive, or an
        error occured while retreiving them.

        Either we have a "SpecifiedArrayIndexInvalid" SOAP error, and that's ok,
        it just means we have finished. If it returns some other error, then we
        fail with an UPnPError.

        @param mappings: the already retreived mappings
        @param failure: the failure
        @return: The existing mappings as defined in L{get_port_mappings}
        @raise UPnPError: When we got any other error
            than "SpecifiedArrayIndexInvalid"
        """
        logging.debug("_on_no_port_mapping_received: %s", failure)
        err = failure.value
        message = err.args[0]["UPnPError"]["errorDescription"]
        if "SpecifiedArrayIndexInvalid" == message:
            return mappings
        else:
            return failure
项目:bitmask-dev    作者:leapcode    | 项目源码 | 文件源码
def _format_error(failure):
    """
    Logs the failure backtrace, and returns a json containing the error
    message.

    If a exception declares the 'expected' attribute as True,
    we will not print a full traceback. instead, we will dispatch
    the ``exception`` message attribute as the ``error`` field in the response
    json.
    """

    expected = getattr(failure.value, 'expected', False)
    if not expected:
        log.error('[DISPATCHER] Unexpected error!')
        log.error('{0!r}'.format(failure.value))
        log.error(failure.getTraceback())

    # if needed, we could add here the exception type as an extra field
    return json.dumps({'error': failure.value.message, 'result': None})
项目:p2pool-unitus    作者:amarian12    | 项目源码 | 文件源码
def _on_no_port_mapping_received(self, failure, mappings):
        """
        Called when we have no more port mappings to retreive, or an
        error occured while retreiving them.

        Either we have a "SpecifiedArrayIndexInvalid" SOAP error, and that's ok,
        it just means we have finished. If it returns some other error, then we
        fail with an UPnPError.

        @param mappings: the already retreived mappings
        @param failure: the failure
        @return: The existing mappings as defined in L{get_port_mappings}
        @raise UPnPError: When we got any other error
            than "SpecifiedArrayIndexInvalid"
        """
        logging.debug("_on_no_port_mapping_received: %s", failure)
        err = failure.value
        message = err.args[0]["UPnPError"]["errorDescription"]
        if "SpecifiedArrayIndexInvalid" == message:
            return mappings
        else:
            return failure
项目:p2pool-dgb-sha256    作者:ilsawa    | 项目源码 | 文件源码
def _on_no_port_mapping_received(self, failure, mappings):
        """
        Called when we have no more port mappings to retreive, or an
        error occured while retreiving them.

        Either we have a "SpecifiedArrayIndexInvalid" SOAP error, and that's ok,
        it just means we have finished. If it returns some other error, then we
        fail with an UPnPError.

        @param mappings: the already retreived mappings
        @param failure: the failure
        @return: The existing mappings as defined in L{get_port_mappings}
        @raise UPnPError: When we got any other error
            than "SpecifiedArrayIndexInvalid"
        """
        logging.debug("_on_no_port_mapping_received: %s", failure)
        err = failure.value
        message = err.args[0]["UPnPError"]["errorDescription"]
        if "SpecifiedArrayIndexInvalid" == message:
            return mappings
        else:
            return failure
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def makeTodo(value):
    """
    Return a L{Todo} object built from C{value}.

    If C{value} is a string, return a Todo that expects any exception with
    C{value} as a reason. If C{value} is a tuple, the second element is used
    as the reason and the first element as the excepted error(s).

    @param value: A string or a tuple of C{(errors, reason)}, where C{errors}
    is either a single exception class or an iterable of exception classes.

    @return: A L{Todo} object.
    """
    if isinstance(value, str):
        return Todo(reason=value)
    if isinstance(value, tuple):
        errors, reason = value
        try:
            errors = list(errors)
        except TypeError:
            errors = [errors]
        return Todo(reason=reason, errors=errors)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def failUnlessFailure(self, deferred, *expectedFailures):
        """Assert that C{deferred} will errback with one of
        C{expectedFailures}.  Returns the original Deferred with callbacks
        added. You will need to return this Deferred from your test case.
        """
        def _cb(ignore):
            raise self.failureException(
                "did not catch an error, instead got %r" % (ignore,))

        def _eb(failure):
            if failure.check(*expectedFailures):
                return failure.value
            else:
                output = ('\nExpected: %r\nGot:\n%s'
                          % (expectedFailures, str(failure)))
                raise self.failureException(output)
        return deferred.addCallbacks(_cb, _eb)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def search(self, *queries, **kwarg):
        """Search messages in the currently selected mailbox

        This command is allowed in the Selected state.

        Any non-zero number of queries are accepted by this method, as
        returned by the C{Query}, C{Or}, and C{Not} functions.

        One keyword argument is accepted: if uid is passed in with a non-zero
        value, the server is asked to return message UIDs instead of message
        sequence numbers.

        @rtype: C{Deferred}
        @return: A deferred whose callback will be invoked with a list of all
        the message sequence numbers return by the search, or whose errback
        will be invoked if there is an error.
        """
        if kwarg.get('uid'):
            cmd = 'UID SEARCH'
        else:
            cmd = 'SEARCH'
        args = ' '.join(queries)
        d = self.sendCommand(Command(cmd, args, wantResponse=(cmd,)))
        d.addCallback(self.__cbSearch)
        return d
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def create(pathspec):
        """Create a new mailbox from the given hierarchical name.

        @type pathspec: C{str}
        @param pathspec: The full hierarchical name of a new mailbox to create.
        If any of the inferior hierarchical names to this one do not exist,
        they are created as well.

        @rtype: C{Deferred} or C{bool}
        @return: A true value if the creation succeeds, or a deferred whose
        callback will be invoked when the creation succeeds.

        @raise MailboxException: Raised if this mailbox cannot be added.
        This may also be raised asynchronously, if a C{Deferred} is
        returned.
        """
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def rename(oldname, newname):
        """Rename a mailbox

        @type oldname: C{str}
        @param oldname: The current name of the mailbox to rename.

        @type newname: C{str}
        @param newname: The new name to associate with the mailbox.

        @rtype: C{Deferred} or C{bool}
        @return: A true value if the mailbox is successfully renamed, or a
        C{Deferred} whose callback will be invoked when the rename operation
        is completed.

        @raise MailboxException: Raised if this mailbox cannot be
        renamed.  This may also be raised asynchronously, if a C{Deferred}
        is returned.
        """
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def requestStatus(names):
        """Return status information about this mailbox.

        Mailboxes which do not intend to do any special processing to
        generate the return value, C{statusRequestHelper} can be used
        to build the dictionary by calling the other interface methods
        which return the data for each name.

        @type names: Any iterable
        @param names: The status names to return information regarding.
        The possible values for each name are: MESSAGES, RECENT, UIDNEXT,
        UIDVALIDITY, UNSEEN.

        @rtype: C{dict} or C{Deferred}
        @return: A dictionary containing status information about the
        requested names is returned.  If the process of looking this
        information up would be costly, a deferred whose callback will
        eventually be passed this dictionary is returned instead.
        """
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def testNonExistentDelete(self):
        def login():
            return self.client.login('testuser', 'password-test')
        def delete():
            return self.client.delete('delete/me')
        def deleteFailed(failure):
            self.failure = failure

        self.failure = None
        d1 = self.connected.addCallback(strip(login))
        d1.addCallback(strip(delete)).addErrback(deleteFailed)
        d1.addCallbacks(self._cbStopClient, self._ebGeneral)
        d2 = self.loopback()
        d = defer.gatherResults([d1, d2])
        d.addCallback(lambda _: self.assertEquals(str(self.failure.value),
                                                  'No such mailbox'))
        return d
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def testPathelogicalScatteringOfLiterals(self):
        self.server.checker.addUser('testuser', 'password-test')
        transport = StringTransport()
        self.server.makeConnection(transport)

        transport.clear()
        self.server.dataReceived("01 LOGIN {8}\r\n")
        self.assertEquals(transport.value(), "+ Ready for 8 octets of text\r\n")

        transport.clear()
        self.server.dataReceived("testuser {13}\r\n")
        self.assertEquals(transport.value(), "+ Ready for 13 octets of text\r\n")

        transport.clear()
        self.server.dataReceived("password-test\r\n")
        self.assertEquals(transport.value(), "01 OK LOGIN succeeded\r\n")
        self.assertEquals(self.server.state, 'auth')

        self.server.connectionLost(error.ConnectionDone("Connection done."))
项目:p2pool-ltc    作者:ilsawa    | 项目源码 | 文件源码
def _on_no_port_mapping_received(self, failure, mappings):
        """
        Called when we have no more port mappings to retreive, or an
        error occured while retreiving them.

        Either we have a "SpecifiedArrayIndexInvalid" SOAP error, and that's ok,
        it just means we have finished. If it returns some other error, then we
        fail with an UPnPError.

        @param mappings: the already retreived mappings
        @param failure: the failure
        @return: The existing mappings as defined in L{get_port_mappings}
        @raise UPnPError: When we got any other error
            than "SpecifiedArrayIndexInvalid"
        """
        logging.debug("_on_no_port_mapping_received: %s", failure)
        err = failure.value
        message = err.args[0]["UPnPError"]["errorDescription"]
        if "SpecifiedArrayIndexInvalid" == message:
            return mappings
        else:
            return failure
项目:p2pool-bsty    作者:amarian12    | 项目源码 | 文件源码
def _on_no_port_mapping_received(self, failure, mappings):
        """
        Called when we have no more port mappings to retreive, or an
        error occured while retreiving them.

        Either we have a "SpecifiedArrayIndexInvalid" SOAP error, and that's ok,
        it just means we have finished. If it returns some other error, then we
        fail with an UPnPError.

        @param mappings: the already retreived mappings
        @param failure: the failure
        @return: The existing mappings as defined in L{get_port_mappings}
        @raise UPnPError: When we got any other error
            than "SpecifiedArrayIndexInvalid"
        """
        logging.debug("_on_no_port_mapping_received: %s", failure)
        err = failure.value
        message = err.args[0]["UPnPError"]["errorDescription"]
        if "SpecifiedArrayIndexInvalid" == message:
            return mappings
        else:
            return failure
项目:p2pool-cann    作者:ilsawa    | 项目源码 | 文件源码
def _on_no_port_mapping_received(self, failure, mappings):
        """
        Called when we have no more port mappings to retreive, or an
        error occured while retreiving them.

        Either we have a "SpecifiedArrayIndexInvalid" SOAP error, and that's ok,
        it just means we have finished. If it returns some other error, then we
        fail with an UPnPError.

        @param mappings: the already retreived mappings
        @param failure: the failure
        @return: The existing mappings as defined in L{get_port_mappings}
        @raise UPnPError: When we got any other error
            than "SpecifiedArrayIndexInvalid"
        """
        logging.debug("_on_no_port_mapping_received: %s", failure)
        err = failure.value
        message = err.args[0]["UPnPError"]["errorDescription"]
        if "SpecifiedArrayIndexInvalid" == message:
            return mappings
        else:
            return failure
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_cancelDeferredListWithFireOnOneErrback(self):
        """
        When cancelling an unfired L{defer.DeferredList} with the flag
        C{fireOnOneErrback} set, cancel every L{defer.Deferred} in the list.
        """
        deferredOne = defer.Deferred()
        deferredTwo = defer.Deferred()
        deferredList = defer.DeferredList([deferredOne, deferredTwo],
                                          fireOnOneErrback=True)
        deferredList.cancel()
        self.failureResultOf(deferredOne, defer.CancelledError)
        self.failureResultOf(deferredTwo, defer.CancelledError)
        deferredListFailure = self.failureResultOf(deferredList,
                                                   defer.FirstError)
        firstError = deferredListFailure.value
        self.assertTrue(firstError.subFailure.check(defer.CancelledError))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_errbackWithNoArgsNoDebug(self):
        """
        C{Deferred.errback()} creates a failure from the current Python
        exception.  When Deferred.debug is not set no globals or locals are
        captured in that failure.
        """
        defer.setDebugging(False)
        d = defer.Deferred()
        l = []
        exc = GenericError("Bang")
        try:
            raise exc
        except:
            d.errback()
        d.addErrback(l.append)
        fail = l[0]
        self.assertEqual(fail.value, exc)
        localz, globalz = fail.frames[0][-2:]
        self.assertEqual([], localz)
        self.assertEqual([], globalz)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_errbackWithNoArgs(self):
        """
        C{Deferred.errback()} creates a failure from the current Python
        exception.  When Deferred.debug is set globals and locals are captured
        in that failure.
        """
        defer.setDebugging(True)
        d = defer.Deferred()
        l = []
        exc = GenericError("Bang")
        try:
            raise exc
        except:
            d.errback()
        d.addErrback(l.append)
        fail = l[0]
        self.assertEqual(fail.value, exc)
        localz, globalz = fail.frames[0][-2:]
        self.assertNotEqual([], localz)
        self.assertNotEqual([], globalz)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_timedOutCustom(self):
        """
        If a custom C{onTimeoutCancel] function is provided, the
        L{defer.Deferred} returns the custom function's return value if the
        L{defer.Deferred} times out before callbacking or errbacking.
        The custom C{onTimeoutCancel} function can return a result instead of
        a failure.
        """
        clock = Clock()
        d = defer.Deferred()
        d.addTimeout(10, clock, onTimeoutCancel=_overrideFunc)
        self.assertNoResult(d)

        clock.advance(15)

        self.assertEqual("OVERRIDDEN", self.successResultOf(d))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_timedOutProvidedCancelFailure(self):
        """
        If a cancellation function is provided when the L{defer.Deferred} is
        initialized, the L{defer.Deferred} returns the cancellation value's
        non-L{CanceledError} failure when the L{defer.Deferred} times out.
        """
        clock = Clock()
        error = ValueError('what!')
        d = defer.Deferred(lambda c: c.errback(error))
        d.addTimeout(10, clock)
        self.assertNoResult(d)

        clock.advance(15)

        f = self.failureResultOf(d, ValueError)
        self.assertIs(f.value, error)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_errbackAddedBeforeTimeoutSuppressesCancellation(self):
        """
        An errback added before a timeout is added errbacks with a
        L{defer.CancelledError} when the timeout fires.  If the
        errback suppresses the L{defer.CancelledError}, the deferred
        successfully completes.
        """
        clock = Clock()
        d = defer.Deferred()

        dErrbacked = [None]

        def errback(f):
            dErrbacked[0] = f
            f.trap(defer.CancelledError)

        d.addErrback(errback)
        d.addTimeout(10, clock)

        clock.advance(15)

        self.assertIsInstance(dErrbacked[0], failure.Failure)
        self.assertIsInstance(dErrbacked[0].value, defer.CancelledError)

        self.successResultOf(d)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_errbackAddedBeforeTimeoutCustom(self):
        """
        An errback added before a timeout is added with a custom
        timeout function errbacks with a L{defer.CancelledError} when
        the timeout fires.  The timeout function runs if the errback
        returns the L{defer.CancelledError}.
        """
        clock = Clock()
        d = defer.Deferred()

        dErrbacked = [None]

        def errback(f):
            dErrbacked[0] = f
            return f

        d.addErrback(errback)
        d.addTimeout(10, clock, _overrideFunc)

        clock.advance(15)

        self.assertIsInstance(dErrbacked[0], failure.Failure)
        self.assertIsInstance(dErrbacked[0].value, defer.CancelledError)

        self.assertEqual("OVERRIDDEN", self.successResultOf(d))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_errbackAddedBeforeTimeoutSuppressesCancellationCustom(self):
        """
        An errback added before a timeout is added with a custom
        timeout function errbacks with a L{defer.CancelledError} when
        the timeout fires.  The timeout function runs if the errback
        suppresses the L{defer.CancelledError}.
        """
        clock = Clock()
        d = defer.Deferred()

        dErrbacked = [None]

        def errback(f):
            dErrbacked[0] = f

        d.addErrback(errback)
        d.addTimeout(10, clock, _overrideFunc)

        clock.advance(15)

        self.assertIsInstance(dErrbacked[0], failure.Failure)
        self.assertIsInstance(dErrbacked[0].value, defer.CancelledError)

        self.assertEqual("OVERRIDDEN", self.successResultOf(d))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def getTimeout(self):
        """
        Returns the timeout value set on this test. Checks on the instance
        first, then the class, then the module, then packages. As soon as it
        finds something with a C{timeout} attribute, returns that. Returns
        L{util.DEFAULT_TIMEOUT_DURATION} if it cannot find anything. See
        L{TestCase} docstring for more details.
        """
        timeout =  util.acquireAttribute(self._parents, 'timeout',
                                         util.DEFAULT_TIMEOUT_DURATION)
        try:
            return float(timeout)
        except (ValueError, TypeError):
            # XXX -- this is here because sometimes people will have methods
            # called 'timeout', or set timeout to 'orange', or something
            # Particularly, test_news.NewsTestCase and ReactorCoreTestCase
            # both do this.
            warnings.warn("'timeout' attribute needs to be a number.",
                          category=DeprecationWarning)
            return util.DEFAULT_TIMEOUT_DURATION
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def lookup(self, serverAddress, clientAddress):
        """
        Lookup user information about the specified address pair.

        Return value should be a two-tuple of system name and username.
        Acceptable values for the system name may be found online at::

            U{http://www.iana.org/assignments/operating-system-names}

        This method may also raise any IdentError subclass (or IdentError
        itself) to indicate user information will not be provided for the
        given query.

        A Deferred may also be returned.

        @param serverAddress: A two-tuple representing the server endpoint
        of the address being queried.  The first element is a string holding
        a dotted-quad IP address.  The second element is an integer
        representing the port.

        @param clientAddress: Like I{serverAddress}, but represents the
        client endpoint of the address being queried.
        """
        raise IdentError()
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def generate(self, request, node):
        if self.data:
            try:
                child = microdom.parseString(self.data)
            except Exception, e:
                log.msg("Error parsing return value, probably invalid xml:", e)
                child = request.d.createTextNode(self.data)
        else:
            child = request.d.createTextNode(self.data)
        nodeMutator = NodeNodeMutator(child)
        return nodeMutator.generate(request, node)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def renderFailure(self, failure, request):
        try:
            xml = request.d.toxml()
        except:
            xml = ""
#         if not hasattr(request, 'channel'):
#             log.msg("The request got away from me before I could render an error page.")
#             log.err(failure)
#             return failure
        if not self.failed:
            self.failed = 1
            if failure:
                request.write("<html><head><title>%s: %s</title></head><body>\n" % (html.escape(str(failure.type)), html.escape(str(failure.value))))
            else:
                request.write("<html><head><title>Failure!</title></head><body>\n")
            utils.renderFailure(failure, request)
            request.write("<h3>Here is the partially processed DOM:</h3>")
            request.write("\n<pre>\n")
            request.write(html.escape(xml))
            request.write("\n</pre>\n")
            request.write("</body></html>")
            request.finish()
        return failure

##########################################
# Deprecation zone
# Wear a hard hat
##########################################


# DOMView is now deprecated since the functionality was merged into domtemplate
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _ebRender(self, failure):
        if isinstance(failure.value, Fault):
            return failure.value
        log.err(failure)
        return Fault(self.FAILURE, "error")
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testDeferredList(self):
        defr1 = defer.Deferred()
        defr2 = defer.Deferred()
        defr3 = defer.Deferred()
        dl = defer.DeferredList([defr1, defr2, defr3])
        result = []
        def cb(resultList, result=result):
            result.extend(resultList)
        def catch(err):
            return None
        dl.addCallbacks(cb, cb)
        defr1.callback("1")
        defr2.addErrback(catch)
        # "catch" is added to eat the GenericError that will be passed on by
        # the DeferredList's callback on defr2. If left unhandled, the
        # Failure object would cause a log.err() warning about "Unhandled
        # error in Deferred". Twisted's pyunit watches for log.err calls and
        # treats them as failures. So "catch" must eat the error to prevent
        # it from flunking the test.
        defr2.errback(GenericError("2"))
        defr3.callback("3")
        self.failUnlessEqual([result[0],
                    #result[1][1] is now a Failure instead of an Exception
                              (result[1][0], str(result[1][1].value)),
                              result[2]],

                             [(defer.SUCCESS, "1"),
                              (defer.FAILURE, "2"),
                              (defer.SUCCESS, "3")])
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testDeferredListDontConsumeErrors(self):
        d1 = defer.Deferred()
        dl = defer.DeferredList([d1])

        errorTrap = []
        d1.addErrback(errorTrap.append)

        result = []
        dl.addCallback(result.append)

        d1.errback(GenericError('Bang'))
        self.failUnlessEqual('Bang', errorTrap[0].value.args[0])
        self.failUnlessEqual(1, len(result))
        self.failUnlessEqual('Bang', result[0][0][1].value.args[0])
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testDeferredListConsumeErrors(self):
        d1 = defer.Deferred()
        dl = defer.DeferredList([d1], consumeErrors=True)

        errorTrap = []
        d1.addErrback(errorTrap.append)

        result = []
        dl.addCallback(result.append)

        d1.errback(GenericError('Bang'))
        self.failUnlessEqual([], errorTrap)
        self.failUnlessEqual(1, len(result))
        self.failUnlessEqual('Bang', result[0][0][1].value.args[0])
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testImmediateFailure(self):
        l = []
        d = defer.fail(GenericError("fail"))
        d.addErrback(l.append)
        self.assertEquals(str(l[0].value), "fail")
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testPausedFailure(self):
        l = []
        d = defer.fail(GenericError("fail"))
        d.pause()
        d.addErrback(l.append)
        self.assertEquals(l, [])
        d.unpause()
        self.assertEquals(str(l[0].value), "fail")
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testCallbackErrors(self):
        l = []
        d = defer.Deferred().addCallback(lambda _: 1/0).addErrback(l.append)
        d.callback(1)
        self.assert_(isinstance(l[0].value, ZeroDivisionError))
        l = []
        d = defer.Deferred().addCallback(
            lambda _: failure.Failure(ZeroDivisionError())).addErrback(l.append)
        d.callback(1)
        self.assert_(isinstance(l[0].value, ZeroDivisionError))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def failUnlessIn(self, containee, container, msg=None):
        """fail the test if C{containee} is not found in C{container}

        @param containee: the value that should be in C{container}
        @param container: a sequence type, or in the case of a mapping type,
                          will follow semantics of 'if key in dict.keys()'
        @param msg: if msg is None, then the failure message will be
                    '%r not in %r' % (first, second)
        """
        if containee not in container:
            raise self.failureException(msg or "%r not in %r"
                                        % (containee, container))
        return containee
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def failIfIn(self, containee, container, msg=None):
        """fail the test if C{containee} is found in C{container}

        @param containee: the value that should not be in C{container}
        @param container: a sequence type, or in the case of a mapping type,
                          will follow semantics of 'if key in dict.keys()'
        @param msg: if msg is None, then the failure message will be
                    '%r in %r' % (first, second)
        """
        if containee in container:
            raise self.failureException(msg or "%r in %r"
                                        % (containee, container))
        return containee
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _getReason(self, f):
        if len(f.value.args) > 0:
            reason = f.value.args[0]
        else:
            warnings.warn(("Do not raise unittest.SkipTest with no "
                           "arguments! Give a reason for skipping tests!"),
                          stacklevel=2)
            reason = f
        return reason
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def mktemp(self):
        """Returns a unique name that may be used as either a temporary
        directory or filename.

        @note: you must call os.mkdir on the value returned from this
               method if you wish to use it as a directory!
        """
        MAX_FILENAME = 32 # some platforms limit lengths of filenames
        base = os.path.join(self.__class__.__module__[:MAX_FILENAME],
                            self.__class__.__name__[:MAX_FILENAME],
                            self._testMethodName[:MAX_FILENAME])
        if not os.path.exists(base):
            os.makedirs(base)
        dirname = tempfile.mkdtemp('', '', base)
        return os.path.join(dirname, 'temp')
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _exc_info(self, err):
        if isinstance(err, failure.Failure):
            # Unwrap the Failure into a exc_info tuple.
            # XXX: if err.tb is a real traceback and not stringified, we should
            #      use that.
            err = (err.type, err.value, None)
        return err
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_pyunitAddError(self):
        # pyunit passes an exc_info tuple directly to addError
        try:
            raise RuntimeError('foo')
        except RuntimeError, excValue:
            self.result.addError(self, sys.exc_info())
        failure = self.result.errors[0][1]
        self.assertEqual(excValue, failure.value)
        self.assertEqual(RuntimeError, failure.type)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_pyunitAddFailure(self):
        # pyunit passes an exc_info tuple directly to addFailure
        try:
            raise self.failureException('foo')
        except self.failureException, excValue:
            self.result.addFailure(self, sys.exc_info())
        failure = self.result.failures[0][1]
        self.assertEqual(excValue, failure.value)
        self.assertEqual(self.failureException, failure.type)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _ebLookup(self, failure, sport, cport):
        if failure.check(IdentError):
            self.sendLine('%d, %d : ERROR : %s' % (sport, cport, failure.value))
        else:
            log.err(failure)
            self.sendLine('%d, %d : ERROR : %s' % (sport, cport, IdentError(failure.value)))