Python tornado.testing 模块,ExpectLog() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tornado.testing.ExpectLog()

项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_body_size_override_reset(self):
        # The max_body_size override is reset between requests.
        stream = IOStream(socket.socket())
        try:
            yield stream.connect(('127.0.0.1', self.get_http_port()))
            # Use a raw stream so we can make sure it's all on one connection.
            stream.write(b'PUT /streaming?expected_size=10240 HTTP/1.1\r\n'
                         b'Content-Length: 10240\r\n\r\n')
            stream.write(b'a' * 10240)
            headers, response = yield gen.Task(read_stream_body, stream)
            self.assertEqual(response, b'10240')
            # Without the ?expected_size parameter, we get the old default value
            stream.write(b'PUT /streaming HTTP/1.1\r\n'
                         b'Content-Length: 10240\r\n\r\n')
            with ExpectLog(gen_log, '.*Content-Length too long'):
                data = yield stream.read_until_close()
            self.assertEqual(data, b'')
        finally:
            stream.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_multi_exceptions(self):
        with ExpectLog(app_log, "Multiple exceptions in yield list"):
            with self.assertRaises(RuntimeError) as cm:
                yield gen.Multi([self.async_exception(RuntimeError("error 1")),
                                 self.async_exception(RuntimeError("error 2"))])
        self.assertEqual(str(cm.exception), "error 1")

        # With only one exception, no error is logged.
        with self.assertRaises(RuntimeError):
            yield gen.Multi([self.async_exception(RuntimeError("error 1")),
                             self.async_future(2)])

        # Exception logging may be explicitly quieted.
        with self.assertRaises(RuntimeError):
            yield gen.Multi([self.async_exception(RuntimeError("error 1")),
                             self.async_exception(RuntimeError("error 2"))],
                            quiet_exceptions=RuntimeError)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_multi_future_exceptions(self):
        with ExpectLog(app_log, "Multiple exceptions in yield list"):
            with self.assertRaises(RuntimeError) as cm:
                yield [self.async_exception(RuntimeError("error 1")),
                       self.async_exception(RuntimeError("error 2"))]
        self.assertEqual(str(cm.exception), "error 1")

        # With only one exception, no error is logged.
        with self.assertRaises(RuntimeError):
            yield [self.async_exception(RuntimeError("error 1")),
                   self.async_future(2)]

        # Exception logging may be explicitly quieted.
        with self.assertRaises(RuntimeError):
            yield gen.multi_future(
                [self.async_exception(RuntimeError("error 1")),
                 self.async_exception(RuntimeError("error 2"))],
                quiet_exceptions=RuntimeError)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_cross_user(self):
        token2 = self.get_token()
        # Each token can be used to authenticate its own request.
        for token in (self.xsrf_token, token2):
            response = self.fetch(
                "/", method="POST",
                body=urllib_parse.urlencode(dict(_xsrf=token)),
                headers=self.cookie_headers(token))
            self.assertEqual(response.code, 200)
        # Sending one in the cookie and the other in the body is not allowed.
        for cookie_token, body_token in ((self.xsrf_token, token2),
                                         (token2, self.xsrf_token)):
            with ExpectLog(gen_log, '.*XSRF cookie does not match POST'):
                response = self.fetch(
                    "/", method="POST",
                    body=urllib_parse.urlencode(dict(_xsrf=body_token)),
                    headers=self.cookie_headers(cookie_token))
            self.assertEqual(response.code, 403)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_connection_refused(self):
        # When a connection is refused, the connect callback should not
        # be run.  (The kqueue IOLoop used to behave differently from the
        # epoll IOLoop in this respect)
        cleanup_func, port = refusing_port()
        self.addCleanup(cleanup_func)
        stream = IOStream(socket.socket(), self.io_loop)
        self.connect_called = False

        def connect_callback():
            self.connect_called = True
            self.stop()
        stream.set_close_callback(self.stop)
        # log messages vary by platform and ioloop implementation
        with ExpectLog(gen_log, ".*", required=False):
            stream.connect(("127.0.0.1", port), connect_callback)
            self.wait()
        self.assertFalse(self.connect_called)
        self.assertTrue(isinstance(stream.error, socket.error), stream.error)
        if sys.platform != 'cygwin':
            _ERRNO_CONNREFUSED = (errno.ECONNREFUSED,)
            if hasattr(errno, "WSAECONNREFUSED"):
                _ERRNO_CONNREFUSED += (errno.WSAECONNREFUSED,)
            # cygwin's errnos don't match those used on native windows python
            self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_read_callback_error(self):
        # Test that IOStream sets its exc_info when a read callback throws
        server, client = self.make_iostream_pair()
        try:
            server.set_close_callback(self.stop)
            with ExpectLog(
                app_log, "(Uncaught exception|Exception in callback)"
            ):
                # Clear ExceptionStackContext so IOStream catches error
                with NullContext():
                    server.read_bytes(1, callback=lambda data: 1 / 0)
                client.write(b"1")
                self.wait()
            self.assertTrue(isinstance(server.error, ZeroDivisionError))
        finally:
            server.close()
            client.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_async_read_error_logging(self):
        # Socket errors on asynchronous reads should be logged (but only
        # once).
        server, client = self.make_iostream_pair()
        server.set_close_callback(self.stop)
        try:
            # Start a read that will be fulfilled asynchronously.
            server.read_bytes(1, lambda data: None)
            client.write(b'a')
            # Stub out read_from_fd to make it fail.

            def fake_read_from_fd():
                os.close(server.socket.fileno())
                server.__class__.read_from_fd(server)
            server.read_from_fd = fake_read_from_fd
            # This log message is from _handle_read (not read_from_fd).
            with ExpectLog(gen_log, "error on read"):
                self.wait()
        finally:
            server.close()
            client.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_read_until_regex_max_bytes_inline(self):
        server, client = self.make_iostream_pair()
        client.set_close_callback(lambda: self.stop("closed"))
        try:
            # Similar to the error case in the previous test, but the
            # server writes first so client reads are satisfied
            # inline.  For consistency with the out-of-line case, we
            # do not raise the error synchronously.
            server.write(b"123456")
            with ExpectLog(gen_log, "Unsatisfiable read"):
                client.read_until_regex(b"def", self.stop, max_bytes=5)
                data = self.wait()
            self.assertEqual(data, "closed")
        finally:
            server.close()
            client.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_body_size_override_reset(self):
        # The max_body_size override is reset between requests.
        stream = IOStream(socket.socket())
        try:
            yield stream.connect(('127.0.0.1', self.get_http_port()))
            # Use a raw stream so we can make sure it's all on one connection.
            stream.write(b'PUT /streaming?expected_size=10240 HTTP/1.1\r\n'
                         b'Content-Length: 10240\r\n\r\n')
            stream.write(b'a' * 10240)
            headers, response = yield gen.Task(read_stream_body, stream)
            self.assertEqual(response, b'10240')
            # Without the ?expected_size parameter, we get the old default value
            stream.write(b'PUT /streaming HTTP/1.1\r\n'
                         b'Content-Length: 10240\r\n\r\n')
            with ExpectLog(gen_log, '.*Content-Length too long'):
                data = yield stream.read_until_close()
            self.assertEqual(data, b'')
        finally:
            stream.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_multi_exceptions(self):
        with ExpectLog(app_log, "Multiple exceptions in yield list"):
            with self.assertRaises(RuntimeError) as cm:
                yield gen.Multi([self.async_exception(RuntimeError("error 1")),
                                 self.async_exception(RuntimeError("error 2"))])
        self.assertEqual(str(cm.exception), "error 1")

        # With only one exception, no error is logged.
        with self.assertRaises(RuntimeError):
            yield gen.Multi([self.async_exception(RuntimeError("error 1")),
                             self.async_future(2)])

        # Exception logging may be explicitly quieted.
        with self.assertRaises(RuntimeError):
            yield gen.Multi([self.async_exception(RuntimeError("error 1")),
                             self.async_exception(RuntimeError("error 2"))],
                            quiet_exceptions=RuntimeError)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_multi_future_exceptions(self):
        with ExpectLog(app_log, "Multiple exceptions in yield list"):
            with self.assertRaises(RuntimeError) as cm:
                yield [self.async_exception(RuntimeError("error 1")),
                       self.async_exception(RuntimeError("error 2"))]
        self.assertEqual(str(cm.exception), "error 1")

        # With only one exception, no error is logged.
        with self.assertRaises(RuntimeError):
            yield [self.async_exception(RuntimeError("error 1")),
                   self.async_future(2)]

        # Exception logging may be explicitly quieted.
        with self.assertRaises(RuntimeError):
            yield gen.multi_future(
                [self.async_exception(RuntimeError("error 1")),
                 self.async_exception(RuntimeError("error 2"))],
                quiet_exceptions=RuntimeError)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_cross_user(self):
        token2 = self.get_token()
        # Each token can be used to authenticate its own request.
        for token in (self.xsrf_token, token2):
            response = self.fetch(
                "/", method="POST",
                body=urllib_parse.urlencode(dict(_xsrf=token)),
                headers=self.cookie_headers(token))
            self.assertEqual(response.code, 200)
        # Sending one in the cookie and the other in the body is not allowed.
        for cookie_token, body_token in ((self.xsrf_token, token2),
                                         (token2, self.xsrf_token)):
            with ExpectLog(gen_log, '.*XSRF cookie does not match POST'):
                response = self.fetch(
                    "/", method="POST",
                    body=urllib_parse.urlencode(dict(_xsrf=body_token)),
                    headers=self.cookie_headers(cookie_token))
            self.assertEqual(response.code, 403)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_connection_refused(self):
        # When a connection is refused, the connect callback should not
        # be run.  (The kqueue IOLoop used to behave differently from the
        # epoll IOLoop in this respect)
        cleanup_func, port = refusing_port()
        self.addCleanup(cleanup_func)
        stream = IOStream(socket.socket(), self.io_loop)
        self.connect_called = False

        def connect_callback():
            self.connect_called = True
            self.stop()
        stream.set_close_callback(self.stop)
        # log messages vary by platform and ioloop implementation
        with ExpectLog(gen_log, ".*", required=False):
            stream.connect(("127.0.0.1", port), connect_callback)
            self.wait()
        self.assertFalse(self.connect_called)
        self.assertTrue(isinstance(stream.error, socket.error), stream.error)
        if sys.platform != 'cygwin':
            _ERRNO_CONNREFUSED = (errno.ECONNREFUSED,)
            if hasattr(errno, "WSAECONNREFUSED"):
                _ERRNO_CONNREFUSED += (errno.WSAECONNREFUSED,)
            # cygwin's errnos don't match those used on native windows python
            self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_read_callback_error(self):
        # Test that IOStream sets its exc_info when a read callback throws
        server, client = self.make_iostream_pair()
        try:
            server.set_close_callback(self.stop)
            with ExpectLog(
                app_log, "(Uncaught exception|Exception in callback)"
            ):
                # Clear ExceptionStackContext so IOStream catches error
                with NullContext():
                    server.read_bytes(1, callback=lambda data: 1 / 0)
                client.write(b"1")
                self.wait()
            self.assertTrue(isinstance(server.error, ZeroDivisionError))
        finally:
            server.close()
            client.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_async_read_error_logging(self):
        # Socket errors on asynchronous reads should be logged (but only
        # once).
        server, client = self.make_iostream_pair()
        server.set_close_callback(self.stop)
        try:
            # Start a read that will be fulfilled asynchronously.
            server.read_bytes(1, lambda data: None)
            client.write(b'a')
            # Stub out read_from_fd to make it fail.

            def fake_read_from_fd():
                os.close(server.socket.fileno())
                server.__class__.read_from_fd(server)
            server.read_from_fd = fake_read_from_fd
            # This log message is from _handle_read (not read_from_fd).
            with ExpectLog(gen_log, "error on read"):
                self.wait()
        finally:
            server.close()
            client.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_read_until_regex_max_bytes_inline(self):
        server, client = self.make_iostream_pair()
        client.set_close_callback(lambda: self.stop("closed"))
        try:
            # Similar to the error case in the previous test, but the
            # server writes first so client reads are satisfied
            # inline.  For consistency with the out-of-line case, we
            # do not raise the error synchronously.
            server.write(b"123456")
            with ExpectLog(gen_log, "Unsatisfiable read"):
                client.read_until_regex(b"def", self.stop, max_bytes=5)
                data = self.wait()
            self.assertEqual(data, "closed")
        finally:
            server.close()
            client.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_multi_exceptions(self):
        with ExpectLog(app_log, "Multiple exceptions in yield list"):
            with self.assertRaises(RuntimeError) as cm:
                yield gen.Multi([self.async_exception(RuntimeError("error 1")),
                                 self.async_exception(RuntimeError("error 2"))])
        self.assertEqual(str(cm.exception), "error 1")

        # With only one exception, no error is logged.
        with self.assertRaises(RuntimeError):
            yield gen.Multi([self.async_exception(RuntimeError("error 1")),
                             self.async_future(2)])

        # Exception logging may be explicitly quieted.
        with self.assertRaises(RuntimeError):
            yield gen.Multi([self.async_exception(RuntimeError("error 1")),
                             self.async_exception(RuntimeError("error 2"))],
                            quiet_exceptions=RuntimeError)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_multi_future_exceptions(self):
        with ExpectLog(app_log, "Multiple exceptions in yield list"):
            with self.assertRaises(RuntimeError) as cm:
                yield [self.async_exception(RuntimeError("error 1")),
                       self.async_exception(RuntimeError("error 2"))]
        self.assertEqual(str(cm.exception), "error 1")

        # With only one exception, no error is logged.
        with self.assertRaises(RuntimeError):
            yield [self.async_exception(RuntimeError("error 1")),
                   self.async_future(2)]

        # Exception logging may be explicitly quieted.
        with self.assertRaises(RuntimeError):
            yield gen.multi_future(
                [self.async_exception(RuntimeError("error 1")),
                 self.async_exception(RuntimeError("error 2"))],
                quiet_exceptions=RuntimeError)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_cross_user(self):
        token2 = self.get_token()
        # Each token can be used to authenticate its own request.
        for token in (self.xsrf_token, token2):
            response = self.fetch(
                "/", method="POST",
                body=urllib_parse.urlencode(dict(_xsrf=token)),
                headers=self.cookie_headers(token))
            self.assertEqual(response.code, 200)
        # Sending one in the cookie and the other in the body is not allowed.
        for cookie_token, body_token in ((self.xsrf_token, token2),
                                         (token2, self.xsrf_token)):
            with ExpectLog(gen_log, '.*XSRF cookie does not match POST'):
                response = self.fetch(
                    "/", method="POST",
                    body=urllib_parse.urlencode(dict(_xsrf=body_token)),
                    headers=self.cookie_headers(cookie_token))
            self.assertEqual(response.code, 403)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_connection_refused(self):
        # When a connection is refused, the connect callback should not
        # be run.  (The kqueue IOLoop used to behave differently from the
        # epoll IOLoop in this respect)
        cleanup_func, port = refusing_port()
        self.addCleanup(cleanup_func)
        stream = IOStream(socket.socket(), self.io_loop)
        self.connect_called = False

        def connect_callback():
            self.connect_called = True
            self.stop()
        stream.set_close_callback(self.stop)
        # log messages vary by platform and ioloop implementation
        with ExpectLog(gen_log, ".*", required=False):
            stream.connect(("127.0.0.1", port), connect_callback)
            self.wait()
        self.assertFalse(self.connect_called)
        self.assertTrue(isinstance(stream.error, socket.error), stream.error)
        if sys.platform != 'cygwin':
            _ERRNO_CONNREFUSED = (errno.ECONNREFUSED,)
            if hasattr(errno, "WSAECONNREFUSED"):
                _ERRNO_CONNREFUSED += (errno.WSAECONNREFUSED,)
            # cygwin's errnos don't match those used on native windows python
            self.assertTrue(stream.error.args[0] in _ERRNO_CONNREFUSED)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_read_callback_error(self):
        # Test that IOStream sets its exc_info when a read callback throws
        server, client = self.make_iostream_pair()
        try:
            server.set_close_callback(self.stop)
            with ExpectLog(
                app_log, "(Uncaught exception|Exception in callback)"
            ):
                # Clear ExceptionStackContext so IOStream catches error
                with NullContext():
                    server.read_bytes(1, callback=lambda data: 1 / 0)
                client.write(b"1")
                self.wait()
            self.assertTrue(isinstance(server.error, ZeroDivisionError))
        finally:
            server.close()
            client.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_async_read_error_logging(self):
        # Socket errors on asynchronous reads should be logged (but only
        # once).
        server, client = self.make_iostream_pair()
        server.set_close_callback(self.stop)
        try:
            # Start a read that will be fulfilled asynchronously.
            server.read_bytes(1, lambda data: None)
            client.write(b'a')
            # Stub out read_from_fd to make it fail.

            def fake_read_from_fd():
                os.close(server.socket.fileno())
                server.__class__.read_from_fd(server)
            server.read_from_fd = fake_read_from_fd
            # This log message is from _handle_read (not read_from_fd).
            with ExpectLog(gen_log, "error on read"):
                self.wait()
        finally:
            server.close()
            client.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_read_until_regex_max_bytes_inline(self):
        server, client = self.make_iostream_pair()
        client.set_close_callback(lambda: self.stop("closed"))
        try:
            # Similar to the error case in the previous test, but the
            # server writes first so client reads are satisfied
            # inline.  For consistency with the out-of-line case, we
            # do not raise the error synchronously.
            server.write(b"123456")
            with ExpectLog(gen_log, "Unsatisfiable read"):
                client.read_until_regex(b"def", self.stop, max_bytes=5)
                data = self.wait()
            self.assertEqual(data, "closed")
        finally:
            server.close()
            client.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_handle_stream_coroutine_logging(self):
        # handle_stream may be a coroutine and any exception in its
        # Future will be logged.
        class TestServer(TCPServer):
            @gen.coroutine
            def handle_stream(self, stream, address):
                yield gen.moment
                stream.close()
                1 / 0

        server = client = None
        try:
            sock, port = bind_unused_port()
            with NullContext():
                server = TestServer()
                server.add_socket(sock)
            client = IOStream(socket.socket())
            with ExpectLog(app_log, "Exception in callback"):
                yield client.connect(('localhost', port))
                yield client.read_until_close()
                yield gen.moment
        finally:
            if server is not None:
                server.stop()
            if client is not None:
                client.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_multiple_errors(self):
        def fail(message):
            raise Exception(message)
        self.io_loop.add_callback(lambda: fail("error one"))
        self.io_loop.add_callback(lambda: fail("error two"))
        # The first error gets raised; the second gets logged.
        with ExpectLog(app_log, "multiple unhandled exceptions"):
            with self.assertRaises(Exception) as cm:
                self.wait()
        self.assertEqual(str(cm.exception), "error one")
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_error_in_on_message(self):
        ws = yield self.ws_connect('/error_in_on_message')
        ws.write_message('hello')
        with ExpectLog(app_log, "Uncaught exception"):
            response = yield ws.read_message()
        self.assertIs(response, None)
        yield self.close(ws)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_websocket_network_fail(self):
        sock, port = bind_unused_port()
        sock.close()
        with self.assertRaises(IOError):
            with ExpectLog(gen_log, ".*"):
                yield websocket_connect(
                    'ws://127.0.0.1:%d/' % port,
                    io_loop=self.io_loop,
                    connect_timeout=3600)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_missing_headers(self):
        data = b'''\
--1234

Foo
--1234--'''.replace(b"\n", b"\r\n")
        args = {}
        files = {}
        with ExpectLog(gen_log, "multipart/form-data missing headers"):
            parse_multipart_form_data(b"1234", data, args, files)
        self.assertEqual(files, {})
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_line_does_not_end_with_correct_line_break(self):
        data = b'''\
--1234
Content-Disposition: form-data; name="files"; filename="ab.txt"

Foo--1234--'''.replace(b"\n", b"\r\n")
        args = {}
        files = {}
        with ExpectLog(gen_log, "Invalid multipart/form-data"):
            parse_multipart_form_data(b"1234", data, args, files)
        self.assertEqual(files, {})
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_content_disposition_header_without_name_parameter(self):
        data = b"""\
--1234
Content-Disposition: form-data; filename="ab.txt"

Foo
--1234--""".replace(b"\n", b"\r\n")
        args = {}
        files = {}
        with ExpectLog(gen_log, "multipart/form-data value missing name"):
            parse_multipart_form_data(b"1234", data, args, files)
        self.assertEqual(files, {})
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_stack_context(self):
        with ExpectLog(app_log, "Uncaught exception GET /"):
            self.http_client.fetch(self.get_url('/'), self.handle_response)
            self.wait()
        self.assertEqual(self.response.code, 500)
        self.assertTrue(b'got expected exception' in self.response.body)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_exception_logging(self):
        """Uncaught exceptions get logged by the IOLoop."""
        # Use a NullContext to keep the exception from being caught by
        # AsyncTestCase.
        with NullContext():
            self.io_loop.add_callback(lambda: 1 / 0)
            self.io_loop.add_callback(self.stop)
            with ExpectLog(app_log, "Exception in callback"):
                self.wait()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_exception_logging_future(self):
        """The IOLoop examines exceptions from Futures and logs them."""
        with NullContext():
            @gen.coroutine
            def callback():
                self.io_loop.add_callback(self.stop)
                1 / 0
            self.io_loop.add_callback(callback)
            with ExpectLog(app_log, "Exception in callback"):
                self.wait()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_spawn_callback(self):
        # An added callback runs in the test's stack_context, so will be
        # re-arised in wait().
        self.io_loop.add_callback(lambda: 1 / 0)
        with self.assertRaises(ZeroDivisionError):
            self.wait()
        # A spawned callback is run directly on the IOLoop, so it will be
        # logged without stopping the test.
        self.io_loop.spawn_callback(lambda: 1 / 0)
        self.io_loop.add_callback(self.stop)
        with ExpectLog(app_log, "Exception in callback"):
            self.wait()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_non_ssl_request(self):
        # Make sure the server closes the connection when it gets a non-ssl
        # connection, rather than waiting for a timeout or otherwise
        # misbehaving.
        with ExpectLog(gen_log, '(SSL Error|uncaught exception)'):
            with ExpectLog(gen_log, 'Uncaught exception', required=False):
                self.http_client.fetch(
                    self.get_url("/").replace('https:', 'http:'),
                    self.stop,
                    request_timeout=3600,
                    connect_timeout=3600)
                response = self.wait()
        self.assertEqual(response.code, 599)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_error_logging(self):
        # No stack traces are logged for SSL errors.
        with ExpectLog(gen_log, 'SSL Error') as expect_log:
            self.http_client.fetch(
                self.get_url("/").replace("https:", "http:"),
                self.stop)
            response = self.wait()
            self.assertEqual(response.code, 599)
        self.assertFalse(expect_log.logged_stack)

# Python's SSL implementation differs significantly between versions.
# For example, SSLv3 and TLSv1 throw an exception if you try to read
# from the socket before the handshake is complete, but the default
# of SSLv23 allows it.
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_malformed_body(self):
        # parse_qs is pretty forgiving, but it will fail on python 3
        # if the data is not utf8.  On python 2 parse_qs will work,
        # but then the recursive_unicode call in EchoHandler will
        # fail.
        if str is bytes:
            return
        with ExpectLog(gen_log, 'Invalid x-www-form-urlencoded body'):
            response = self.fetch(
                '/echo', method="POST",
                headers={'Content-Type': 'application/x-www-form-urlencoded'},
                body=b'\xe9')
        self.assertEqual(200, response.code)
        self.assertEqual(b'{}', response.body)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_malformed_first_line(self):
        with ExpectLog(gen_log, '.*Malformed HTTP request line'):
            self.stream.write(b'asdf\r\n\r\n')
            # TODO: need an async version of ExpectLog so we don't need
            # hard-coded timeouts here.
            self.io_loop.add_timeout(datetime.timedelta(seconds=0.01),
                                     self.stop)
            self.wait()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_unix_socket_bad_request(self):
        # Unix sockets don't have remote addresses so they just return an
        # empty string.
        with ExpectLog(gen_log, "Malformed HTTP message from"):
            self.stream.write(b"garbage\r\n\r\n")
            self.stream.read_until_close(self.stop)
            response = self.wait()
        self.assertEqual(response, b"")
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_gzip_unsupported(self):
        # Gzip support is opt-in; without it the server fails to parse
        # the body (but parsing form bodies is currently just a log message,
        # not a fatal error).
        with ExpectLog(gen_log, "Unsupported Content-Encoding"):
            response = self.post_gzip('foo=bar')
        self.assertEquals(json_decode(response.body), {})
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_large_headers(self):
        with ExpectLog(gen_log, "Unsatisfiable read", required=False):
            response = self.fetch("/", headers={'X-Filler': 'a' * 1000})
        # 431 is "Request Header Fields Too Large", defined in RFC
        # 6585. However, many implementations just close the
        # connection in this case, resulting in a 599.
        self.assertIn(response.code, (431, 599))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_large_body_buffered(self):
        with ExpectLog(gen_log, '.*Content-Length too long'):
            response = self.fetch('/buffered', method='PUT', body=b'a' * 10240)
        self.assertEqual(response.code, 599)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_large_body_buffered_chunked(self):
        with ExpectLog(gen_log, '.*chunked body too large'):
            response = self.fetch('/buffered', method='PUT',
                                  body_producer=lambda write: write(b'a' * 10240))
        self.assertEqual(response.code, 599)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_large_body_streaming_chunked(self):
        with ExpectLog(gen_log, '.*chunked body too large'):
            response = self.fetch('/streaming', method='PUT',
                                  body_producer=lambda write: write(b'a' * 10240))
        self.assertEqual(response.code, 599)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_timeout(self):
        stream = IOStream(socket.socket())
        try:
            yield stream.connect(('127.0.0.1', self.get_http_port()))
            # Use a raw stream because AsyncHTTPClient won't let us read a
            # response without finishing a body.
            stream.write(b'PUT /streaming?body_timeout=0.1 HTTP/1.0\r\n'
                         b'Content-Length: 42\r\n\r\n')
            with ExpectLog(gen_log, 'Timeout reading body'):
                response = yield stream.read_until_close()
            self.assertEqual(response, b'')
        finally:
            stream.close()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_coroutine_exception_handler(self):
        # Make sure we get an error and not a timeout
        with ExpectLog(app_log, "Uncaught exception GET /coroutine_exception"):
            response = self.fetch('/coroutine_exception')
        self.assertEqual(500, response.code)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_cookie_tampering_future_timestamp(self):
        handler = CookieTestRequestHandler()
        # this string base64-encodes to '12345678'
        handler.set_secure_cookie('foo', binascii.a2b_hex(b'd76df8e7aefc'),
                                  version=1)
        cookie = handler._cookies['foo']
        match = re.match(br'12345678\|([0-9]+)\|([0-9a-f]+)', cookie)
        self.assertTrue(match)
        timestamp = match.group(1)
        sig = match.group(2)
        self.assertEqual(
            _create_signature_v1(handler.application.settings["cookie_secret"],
                                 'foo', '12345678', timestamp),
            sig)
        # shifting digits from payload to timestamp doesn't alter signature
        # (this is not desirable behavior, just confirming that that's how it
        # works)
        self.assertEqual(
            _create_signature_v1(handler.application.settings["cookie_secret"],
                                 'foo', '1234', b'5678' + timestamp),
            sig)
        # tamper with the cookie
        handler._cookies['foo'] = utf8('1234|5678%s|%s' % (
            to_basestring(timestamp), to_basestring(sig)))
        # it gets rejected
        with ExpectLog(gen_log, "Cookie timestamp in future"):
            self.assertTrue(
                handler.get_secure_cookie('foo', min_version=1) is None)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_error(self):
        # Percent signs (encoded as %25) should not mess up printf-style
        # messages in logs
        with ExpectLog(gen_log, ".*Invalid unicode"):
            self.fetch("/group/?arg=%25%e9")
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_decode_argument_invalid_unicode(self):
        # test that invalid unicode in URLs causes 400, not 500
        with ExpectLog(gen_log, ".*Invalid unicode.*"):
            response = self.fetch("/typecheck/invalid%FF")
            self.assertEqual(response.code, 400)
            response = self.fetch("/typecheck/invalid?foo=%FF")
            self.assertEqual(response.code, 400)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_default(self):
        with ExpectLog(app_log, "Uncaught exception"):
            response = self.fetch("/default")
            self.assertEqual(response.code, 500)
            self.assertTrue(b"500: Internal Server Error" in response.body)

            response = self.fetch("/default?status=503")
            self.assertEqual(response.code, 503)
            self.assertTrue(b"503: Service Unavailable" in response.body)