Python tornado.escape 模块,utf8() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tornado.escape.utf8()

项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_multipart_form(self):
        # Encodings here are tricky:  Headers are latin1, bodies can be
        # anything (we use utf8 by default).
        response = self.raw_fetch([
            b"POST /multipart HTTP/1.0",
            b"Content-Type: multipart/form-data; boundary=1234567890",
            b"X-Header-encoding-test: \xe9",
        ],
            b"\r\n".join([
                b"Content-Disposition: form-data; name=argument",
                b"",
                u("\u00e1").encode("utf-8"),
                b"--1234567890",
                u('Content-Disposition: form-data; name="files"; filename="\u00f3"').encode("utf8"),
                b"",
                u("\u00fa").encode("utf-8"),
                b"--1234567890--",
                b"",
            ]))
        data = json_decode(response)
        self.assertEqual(u("\u00e9"), data["header"])
        self.assertEqual(u("\u00e1"), data["argument"])
        self.assertEqual(u("\u00f3"), data["filename"])
        self.assertEqual(u("\u00fa"), data["filebody"])
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def get_app(self):
        # The old request_callback interface does not implement the
        # delegate interface, and writes its response via request.write
        # instead of request.connection.write_headers.
        def handle_request(request):
            self.http1 = request.version.startswith("HTTP/1.")
            if not self.http1:
                # This test will be skipped if we're using HTTP/2,
                # so just close it out cleanly using the modern interface.
                request.connection.write_headers(
                    ResponseStartLine('', 200, 'OK'),
                    HTTPHeaders())
                request.connection.finish()
                return
            message = b"Hello world"
            request.write(utf8("HTTP/1.1 200 OK\r\n"
                               "Content-Length: %d\r\n\r\n" % len(message)))
            request.write(message)
            request.finish()
        return handle_request
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_cookie_special_char(self):
        response = self.fetch("/special_char")
        headers = sorted(response.headers.get_list("Set-Cookie"))
        self.assertEqual(len(headers), 3)
        self.assertEqual(headers[0], 'equals="a=b"; Path=/')
        self.assertEqual(headers[1], 'quote="a\\"b"; Path=/')
        # python 2.7 octal-escapes the semicolon; older versions leave it alone
        self.assertTrue(headers[2] in ('semicolon="a;b"; Path=/',
                                       'semicolon="a\\073b"; Path=/'),
                        headers[2])

        data = [('foo=a=b', 'a=b'),
                ('foo="a=b"', 'a=b'),
                ('foo="a;b"', 'a;b'),
                # ('foo=a\\073b', 'a;b'),  # even encoded, ";" is a delimiter
                ('foo="a\\073b"', 'a;b'),
                ('foo="a\\"b"', 'a"b'),
                ]
        for header, expected in data:
            logging.debug("trying %r", header)
            response = self.fetch("/get", headers={"Cookie": header})
            self.assertEqual(response.body, utf8(expected))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _on_request_token(self, authorize_url, callback_uri, callback,
                          response):
        if response.error:
            raise Exception("Could not get request token: %s" % response.error)
        request_token = _oauth_parse_response(response.body)
        data = (base64.b64encode(escape.utf8(request_token["key"])) + b"|" +
                base64.b64encode(escape.utf8(request_token["secret"])))
        self.set_cookie("_oauth_request_token", data)
        args = dict(oauth_token=request_token["key"])
        if callback_uri == "oob":
            self.finish(authorize_url + "?" + urllib_parse.urlencode(args))
            callback()
            return
        elif callback_uri:
            args["oauth_callback"] = urlparse.urljoin(
                self.request.full_url(), callback_uri)
        self.redirect(authorize_url + "?" + urllib_parse.urlencode(args))
        callback()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _oauth_signature(consumer_token, method, url, parameters={}, token=None):
    """Calculates the HMAC-SHA1 OAuth signature for the given request.

    See http://oauth.net/core/1.0/#signing_process
    """
    parts = urlparse.urlparse(url)
    scheme, netloc, path = parts[:3]
    normalized_url = scheme.lower() + "://" + netloc.lower() + path

    base_elems = []
    base_elems.append(method.upper())
    base_elems.append(normalized_url)
    base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
                               for k, v in sorted(parameters.items())))
    base_string = "&".join(_oauth_escape(e) for e in base_elems)

    key_elems = [escape.utf8(consumer_token["secret"])]
    key_elems.append(escape.utf8(token["secret"] if token else ""))
    key = b"&".join(key_elems)

    hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
    return binascii.b2a_base64(hash.digest())[:-1]
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _oauth10a_signature(consumer_token, method, url, parameters={}, token=None):
    """Calculates the HMAC-SHA1 OAuth 1.0a signature for the given request.

    See http://oauth.net/core/1.0a/#signing_process
    """
    parts = urlparse.urlparse(url)
    scheme, netloc, path = parts[:3]
    normalized_url = scheme.lower() + "://" + netloc.lower() + path

    base_elems = []
    base_elems.append(method.upper())
    base_elems.append(normalized_url)
    base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
                               for k, v in sorted(parameters.items())))

    base_string = "&".join(_oauth_escape(e) for e in base_elems)
    key_elems = [escape.utf8(urllib_parse.quote(consumer_token["secret"], safe='~'))]
    key_elems.append(escape.utf8(urllib_parse.quote(token["secret"], safe='~') if token else ""))
    key = b"&".join(key_elems)

    hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
    return binascii.b2a_base64(hash.digest())[:-1]
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def redirect(self, url, permanent=False, status=None):
        """Sends a redirect to the given (optionally relative) URL.

        If the ``status`` argument is specified, that value is used as the
        HTTP status code; otherwise either 301 (permanent) or 302
        (temporary) is chosen based on the ``permanent`` argument.
        The default is 302 (temporary).
        """
        if self._headers_written:
            raise Exception("Cannot redirect after headers have been written")
        if status is None:
            status = 301 if permanent else 302
        else:
            assert isinstance(status, int) and 300 <= status <= 399
        self.set_status(status)
        self.set_header("Location", utf8(url))
        self.finish()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def render_string(self, template_name, **kwargs):
        """Generate the given template with the given arguments.

        We return the generated byte string (in utf8). To generate and
        write a template as a response, use render() above.
        """
        # If no template_path is specified, use the path of the calling file
        template_path = self.get_template_path()
        if not template_path:
            frame = sys._getframe(0)
            web_file = frame.f_code.co_filename
            while frame.f_code.co_filename == web_file:
                frame = frame.f_back
            template_path = os.path.dirname(frame.f_code.co_filename)
        with RequestHandler._template_loader_lock:
            if template_path not in RequestHandler._template_loaders:
                loader = self.create_template_loader(template_path)
                RequestHandler._template_loaders[template_path] = loader
            else:
                loader = RequestHandler._template_loaders[template_path]
        t = loader.load(template_name)
        namespace = self.get_template_namespace()
        namespace.update(kwargs)
        return t.generate(**namespace)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def decode_signed_value(secret, name, value, max_age_days=31,
                        clock=None, min_version=None):
    if clock is None:
        clock = time.time
    if min_version is None:
        min_version = DEFAULT_SIGNED_VALUE_MIN_VERSION
    if min_version > 2:
        raise ValueError("Unsupported min_version %d" % min_version)
    if not value:
        return None

    value = utf8(value)
    version = _get_version(value)

    if version < min_version:
        return None
    if version == 1:
        return _decode_signed_value_v1(secret, name, value,
                                       max_age_days, clock)
    elif version == 2:
        return _decode_signed_value_v2(secret, name, value,
                                       max_age_days, clock)
    else:
        return None
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def __init__(self, handler, mask_outgoing=False,
                 compression_options=None):
        WebSocketProtocol.__init__(self, handler)
        self.mask_outgoing = mask_outgoing
        self._final_frame = False
        self._frame_opcode = None
        self._masked_frame = None
        self._frame_mask = None
        self._frame_length = None
        self._fragmented_message_buffer = None
        self._fragmented_message_opcode = None
        self._waiting = None
        self._compression_options = compression_options
        self._decompressor = None
        self._compressor = None
        self._frame_compressed = None
        # The total uncompressed size of all messages received or sent.
        # Unicode messages are encoded to utf8.
        # Only for testing; subject to change.
        self._message_bytes_in = 0
        self._message_bytes_out = 0
        # The total size of all packets received or sent.  Includes
        # the effect of compression, frame overhead, and control frames.
        self._wire_bytes_in = 0
        self._wire_bytes_out = 0
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def close(self, code=None, reason=None):
        """Closes the WebSocket connection."""
        if not self.server_terminated:
            if not self.stream.closed():
                if code is None and reason is not None:
                    code = 1000  # "normal closure" status code
                if code is None:
                    close_data = b''
                else:
                    close_data = struct.pack('>H', code)
                if reason is not None:
                    close_data += utf8(reason)
                self._write_frame(True, 0x8, close_data)
            self.server_terminated = True
        if self.client_terminated:
            if self._waiting is not None:
                self.stream.io_loop.remove_timeout(self._waiting)
                self._waiting = None
            self.stream.close()
        elif self._waiting is None:
            # Give the client a few seconds to complete a clean shutdown,
            # otherwise just close the connection.
            self._waiting = self.stream.io_loop.add_timeout(
                self.stream.io_loop.time() + 5, self._abort)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_special_filenames(self):
        filenames = ['a;b.txt',
                     'a"b.txt',
                     'a";b.txt',
                     'a;"b.txt',
                     'a";";.txt',
                     'a\\"b.txt',
                     'a\\b.txt',
                     ]
        for filename in filenames:
            logging.debug("trying filename %r", filename)
            data = """\
--1234
Content-Disposition: form-data; name="files"; filename="%s"

Foo
--1234--""" % filename.replace('\\', '\\\\').replace('"', '\\"')
            data = utf8(data.replace("\n", "\r\n"))
            args = {}
            files = {}
            parse_multipart_form_data(b"1234", data, args, files)
            file = files["files"][0]
            self.assertEqual(file["filename"], filename)
            self.assertEqual(file["body"], b"Foo")
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_multipart_form(self):
        # Encodings here are tricky:  Headers are latin1, bodies can be
        # anything (we use utf8 by default).
        response = self.raw_fetch([
            b"POST /multipart HTTP/1.0",
            b"Content-Type: multipart/form-data; boundary=1234567890",
            b"X-Header-encoding-test: \xe9",
        ],
            b"\r\n".join([
                b"Content-Disposition: form-data; name=argument",
                b"",
                u("\u00e1").encode("utf-8"),
                b"--1234567890",
                u('Content-Disposition: form-data; name="files"; filename="\u00f3"').encode("utf8"),
                b"",
                u("\u00fa").encode("utf-8"),
                b"--1234567890--",
                b"",
            ]))
        data = json_decode(response)
        self.assertEqual(u("\u00e9"), data["header"])
        self.assertEqual(u("\u00e1"), data["argument"])
        self.assertEqual(u("\u00f3"), data["filename"])
        self.assertEqual(u("\u00fa"), data["filebody"])
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_cookie_special_char(self):
        response = self.fetch("/special_char")
        headers = sorted(response.headers.get_list("Set-Cookie"))
        self.assertEqual(len(headers), 3)
        self.assertEqual(headers[0], 'equals="a=b"; Path=/')
        self.assertEqual(headers[1], 'quote="a\\"b"; Path=/')
        # python 2.7 octal-escapes the semicolon; older versions leave it alone
        self.assertTrue(headers[2] in ('semicolon="a;b"; Path=/',
                                       'semicolon="a\\073b"; Path=/'),
                        headers[2])

        data = [('foo=a=b', 'a=b'),
                ('foo="a=b"', 'a=b'),
                ('foo="a;b"', 'a;b'),
                # ('foo=a\\073b', 'a;b'),  # even encoded, ";" is a delimiter
                ('foo="a\\073b"', 'a;b'),
                ('foo="a\\"b"', 'a"b'),
                ]
        for header, expected in data:
            logging.debug("trying %r", header)
            response = self.fetch("/get", headers={"Cookie": header})
            self.assertEqual(response.body, utf8(expected))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_body_encoding(self):
        unicode_body = u("\xe9")
        byte_body = binascii.a2b_hex(b"e9")

        # unicode string in body gets converted to utf8
        response = self.fetch("/echopost", method="POST", body=unicode_body,
                              headers={"Content-Type": "application/blah"})
        self.assertEqual(response.headers["Content-Length"], "2")
        self.assertEqual(response.body, utf8(unicode_body))

        # byte strings pass through directly
        response = self.fetch("/echopost", method="POST",
                              body=byte_body,
                              headers={"Content-Type": "application/blah"})
        self.assertEqual(response.headers["Content-Length"], "1")
        self.assertEqual(response.body, byte_body)

        # Mixing unicode in headers and byte string bodies shouldn't
        # break anything
        response = self.fetch("/echopost", method="POST", body=byte_body,
                              headers={"Content-Type": "application/blah"},
                              user_agent=u("foo"))
        self.assertEqual(response.headers["Content-Length"], "1")
        self.assertEqual(response.body, byte_body)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def twisted_coroutine_fetch(self, url, runner):
        body = [None]

        @gen.coroutine
        def f():
            # This is simpler than the non-coroutine version, but it cheats
            # by reading the body in one blob instead of streaming it with
            # a Protocol.
            client = Agent(self.reactor)
            response = yield client.request(b'GET', utf8(url))
            with warnings.catch_warnings():
                # readBody has a buggy DeprecationWarning in Twisted 15.0:
                # https://twistedmatrix.com/trac/changeset/43379
                warnings.simplefilter('ignore', category=DeprecationWarning)
                body[0] = yield readBody(response)
            self.stop_loop()
        self.io_loop.add_callback(f)
        runner()
        return body[0]
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _on_request_token(self, authorize_url, callback_uri, callback,
                          response):
        if response.error:
            raise Exception("Could not get request token: %s" % response.error)
        request_token = _oauth_parse_response(response.body)
        data = (base64.b64encode(escape.utf8(request_token["key"])) + b"|" +
                base64.b64encode(escape.utf8(request_token["secret"])))
        self.set_cookie("_oauth_request_token", data)
        args = dict(oauth_token=request_token["key"])
        if callback_uri == "oob":
            self.finish(authorize_url + "?" + urllib_parse.urlencode(args))
            callback()
            return
        elif callback_uri:
            args["oauth_callback"] = urlparse.urljoin(
                self.request.full_url(), callback_uri)
        self.redirect(authorize_url + "?" + urllib_parse.urlencode(args))
        callback()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _oauth_signature(consumer_token, method, url, parameters={}, token=None):
    """Calculates the HMAC-SHA1 OAuth signature for the given request.

    See http://oauth.net/core/1.0/#signing_process
    """
    parts = urlparse.urlparse(url)
    scheme, netloc, path = parts[:3]
    normalized_url = scheme.lower() + "://" + netloc.lower() + path

    base_elems = []
    base_elems.append(method.upper())
    base_elems.append(normalized_url)
    base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
                               for k, v in sorted(parameters.items())))
    base_string = "&".join(_oauth_escape(e) for e in base_elems)

    key_elems = [escape.utf8(consumer_token["secret"])]
    key_elems.append(escape.utf8(token["secret"] if token else ""))
    key = b"&".join(key_elems)

    hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
    return binascii.b2a_base64(hash.digest())[:-1]
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _oauth10a_signature(consumer_token, method, url, parameters={}, token=None):
    """Calculates the HMAC-SHA1 OAuth 1.0a signature for the given request.

    See http://oauth.net/core/1.0a/#signing_process
    """
    parts = urlparse.urlparse(url)
    scheme, netloc, path = parts[:3]
    normalized_url = scheme.lower() + "://" + netloc.lower() + path

    base_elems = []
    base_elems.append(method.upper())
    base_elems.append(normalized_url)
    base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
                               for k, v in sorted(parameters.items())))

    base_string = "&".join(_oauth_escape(e) for e in base_elems)
    key_elems = [escape.utf8(urllib_parse.quote(consumer_token["secret"], safe='~'))]
    key_elems.append(escape.utf8(urllib_parse.quote(token["secret"], safe='~') if token else ""))
    key = b"&".join(key_elems)

    hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
    return binascii.b2a_base64(hash.digest())[:-1]
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def render_string(self, template_name, **kwargs):
        """Generate the given template with the given arguments.

        We return the generated byte string (in utf8). To generate and
        write a template as a response, use render() above.
        """
        # If no template_path is specified, use the path of the calling file
        template_path = self.get_template_path()
        if not template_path:
            frame = sys._getframe(0)
            web_file = frame.f_code.co_filename
            while frame.f_code.co_filename == web_file:
                frame = frame.f_back
            template_path = os.path.dirname(frame.f_code.co_filename)
        with RequestHandler._template_loader_lock:
            if template_path not in RequestHandler._template_loaders:
                loader = self.create_template_loader(template_path)
                RequestHandler._template_loaders[template_path] = loader
            else:
                loader = RequestHandler._template_loaders[template_path]
        t = loader.load(template_name)
        namespace = self.get_template_namespace()
        namespace.update(kwargs)
        return t.generate(**namespace)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def decode_signed_value(secret, name, value, max_age_days=31,
                        clock=None, min_version=None):
    if clock is None:
        clock = time.time
    if min_version is None:
        min_version = DEFAULT_SIGNED_VALUE_MIN_VERSION
    if min_version > 2:
        raise ValueError("Unsupported min_version %d" % min_version)
    if not value:
        return None

    value = utf8(value)
    version = _get_version(value)

    if version < min_version:
        return None
    if version == 1:
        return _decode_signed_value_v1(secret, name, value,
                                       max_age_days, clock)
    elif version == 2:
        return _decode_signed_value_v2(secret, name, value,
                                       max_age_days, clock)
    else:
        return None
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def write_message(self, message, binary=False):
        """Sends the given message to the client of this Web Socket.

        The message may be either a string or a dict (which will be
        encoded as json).  If the ``binary`` argument is false, the
        message will be sent as utf8; in binary mode any byte string
        is allowed.

        If the connection is already closed, raises `WebSocketClosedError`.

        .. versionchanged:: 3.2
           `WebSocketClosedError` was added (previously a closed connection
           would raise an `AttributeError`)

        .. versionchanged:: 4.3
           Returns a `.Future` which can be used for flow control.
        """
        if self.ws_connection is None:
            raise WebSocketClosedError()
        if isinstance(message, dict):
            message = tornado.escape.json_encode(message)
        return self.ws_connection.write_message(message, binary=binary)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def close(self, code=None, reason=None):
        """Closes the WebSocket connection."""
        if not self.server_terminated:
            if not self.stream.closed():
                if code is None and reason is not None:
                    code = 1000  # "normal closure" status code
                if code is None:
                    close_data = b''
                else:
                    close_data = struct.pack('>H', code)
                if reason is not None:
                    close_data += utf8(reason)
                self._write_frame(True, 0x8, close_data)
            self.server_terminated = True
        if self.client_terminated:
            if self._waiting is not None:
                self.stream.io_loop.remove_timeout(self._waiting)
                self._waiting = None
            self.stream.close()
        elif self._waiting is None:
            # Give the client a few seconds to complete a clean shutdown,
            # otherwise just close the connection.
            self._waiting = self.stream.io_loop.add_timeout(
                self.stream.io_loop.time() + 5, self._abort)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_special_filenames(self):
        filenames = ['a;b.txt',
                     'a"b.txt',
                     'a";b.txt',
                     'a;"b.txt',
                     'a";";.txt',
                     'a\\"b.txt',
                     'a\\b.txt',
                     ]
        for filename in filenames:
            logging.debug("trying filename %r", filename)
            data = """\
--1234
Content-Disposition: form-data; name="files"; filename="%s"

Foo
--1234--""" % filename.replace('\\', '\\\\').replace('"', '\\"')
            data = utf8(data.replace("\n", "\r\n"))
            args = {}
            files = {}
            parse_multipart_form_data(b"1234", data, args, files)
            file = files["files"][0]
            self.assertEqual(file["filename"], filename)
            self.assertEqual(file["body"], b"Foo")
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_multipart_form(self):
        # Encodings here are tricky:  Headers are latin1, bodies can be
        # anything (we use utf8 by default).
        response = self.raw_fetch([
            b"POST /multipart HTTP/1.0",
            b"Content-Type: multipart/form-data; boundary=1234567890",
            b"X-Header-encoding-test: \xe9",
        ],
            b"\r\n".join([
                b"Content-Disposition: form-data; name=argument",
                b"",
                u("\u00e1").encode("utf-8"),
                b"--1234567890",
                u('Content-Disposition: form-data; name="files"; filename="\u00f3"').encode("utf8"),
                b"",
                u("\u00fa").encode("utf-8"),
                b"--1234567890--",
                b"",
            ]))
        data = json_decode(response)
        self.assertEqual(u("\u00e9"), data["header"])
        self.assertEqual(u("\u00e1"), data["argument"])
        self.assertEqual(u("\u00f3"), data["filename"])
        self.assertEqual(u("\u00fa"), data["filebody"])
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def get_app(self):
        # The old request_callback interface does not implement the
        # delegate interface, and writes its response via request.write
        # instead of request.connection.write_headers.
        def handle_request(request):
            self.http1 = request.version.startswith("HTTP/1.")
            if not self.http1:
                # This test will be skipped if we're using HTTP/2,
                # so just close it out cleanly using the modern interface.
                request.connection.write_headers(
                    ResponseStartLine('', 200, 'OK'),
                    HTTPHeaders())
                request.connection.finish()
                return
            message = b"Hello world"
            request.write(utf8("HTTP/1.1 200 OK\r\n"
                               "Content-Length: %d\r\n\r\n" % len(message)))
            request.write(message)
            request.finish()
        return handle_request
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_cookie_special_char(self):
        response = self.fetch("/special_char")
        headers = sorted(response.headers.get_list("Set-Cookie"))
        self.assertEqual(len(headers), 3)
        self.assertEqual(headers[0], 'equals="a=b"; Path=/')
        self.assertEqual(headers[1], 'quote="a\\"b"; Path=/')
        # python 2.7 octal-escapes the semicolon; older versions leave it alone
        self.assertTrue(headers[2] in ('semicolon="a;b"; Path=/',
                                       'semicolon="a\\073b"; Path=/'),
                        headers[2])

        data = [('foo=a=b', 'a=b'),
                ('foo="a=b"', 'a=b'),
                ('foo="a;b"', 'a;b'),
                # ('foo=a\\073b', 'a;b'),  # even encoded, ";" is a delimiter
                ('foo="a\\073b"', 'a;b'),
                ('foo="a\\"b"', 'a"b'),
                ]
        for header, expected in data:
            logging.debug("trying %r", header)
            response = self.fetch("/get", headers={"Cookie": header})
            self.assertEqual(response.body, utf8(expected))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def body(self, value):
        self._body = utf8(value)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def generate(self, **kwargs):
        """Generate this template with the given arguments."""
        namespace = {
            "escape": escape.xhtml_escape,
            "xhtml_escape": escape.xhtml_escape,
            "url_escape": escape.url_escape,
            "json_encode": escape.json_encode,
            "squeeze": escape.squeeze,
            "linkify": escape.linkify,
            "datetime": datetime,
            "_tt_utf8": escape.utf8,  # for internal use
            "_tt_string_types": (unicode_type, bytes),
            # __name__ and __loader__ allow the traceback mechanism to find
            # the generated source code.
            "__name__": self.name.replace('.', '_'),
            "__loader__": ObjectDict(get_source=lambda name: self.code),
        }
        namespace.update(self.namespace)
        namespace.update(kwargs)
        exec_in(self.compiled, namespace)
        execute = namespace["_tt_execute"]
        # Clear the traceback module's cache of source data now that
        # we've generated a new template (mainly for this module's
        # unittests, where different tests reuse the same name).
        linecache.clearcache()
        return execute()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def generate(self, writer):
        writer.write_line("_tt_tmp = %s" % self.expression, self.line)
        writer.write_line("if isinstance(_tt_tmp, _tt_string_types):"
                          " _tt_tmp = _tt_utf8(_tt_tmp)", self.line)
        writer.write_line("else: _tt_tmp = _tt_utf8(str(_tt_tmp))", self.line)
        if not self.raw and writer.current_template.autoescape is not None:
            # In python3 functions like xhtml_escape return unicode,
            # so we have to convert to utf8 again.
            writer.write_line("_tt_tmp = _tt_utf8(%s(_tt_tmp))" %
                              writer.current_template.autoescape, self.line)
        writer.write_line("_tt_append(_tt_tmp)", self.line)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def generate(self, writer):
        value = self.value

        # Compress whitespace if requested, with a crude heuristic to avoid
        # altering preformatted whitespace.
        if "<pre>" not in value:
            value = filter_whitespace(self.whitespace, value)

        if value:
            writer.write_line('_tt_append(%r)' % escape.utf8(value), self.line)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _format_chunk(self, chunk):
        if self._expected_content_remaining is not None:
            self._expected_content_remaining -= len(chunk)
            if self._expected_content_remaining < 0:
                # Close the stream now to stop further framing errors.
                self.stream.close()
                raise httputil.HTTPOutputError(
                    "Tried to write more data than Content-Length")
        if self._chunking_output and chunk:
            # Don't write out empty chunks because that means END-OF-STREAM
            # with chunked encoding
            return utf8("%x" % len(chunk)) + b"\r\n" + chunk + b"\r\n"
        else:
            return chunk
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_comment(self):
        template = Template("Hello{# TODO i18n #} {{ name }}!")
        self.assertEqual(template.generate(name=utf8("Ben")),
                         b"Hello Ben!")
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_unicode_template(self):
        template = Template(utf8(u("\u00e9")))
        self.assertEqual(template.generate(), utf8(u("\u00e9")))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_unicode_literal_expression(self):
        # Unicode literals should be usable in templates.  Note that this
        # test simulates unicode characters appearing directly in the
        # template file (with utf8 encoding), i.e. \u escapes would not
        # be used in the template file itself.
        if str is unicode_type:
            # python 3 needs a different version of this test since
            # 2to3 doesn't run on template internals
            template = Template(utf8(u('{{ "\u00e9" }}')))
        else:
            template = Template(utf8(u('{{ u"\u00e9" }}')))
        self.assertEqual(template.generate(), utf8(u("\u00e9")))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_apply(self):
        def upper(s):
            return s.upper()
        template = Template(utf8("{% apply upper %}foo{% end %}"))
        self.assertEqual(template.generate(upper=upper), b"FOO")
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_unicode_apply(self):
        def upper(s):
            return to_unicode(s).upper()
        template = Template(utf8(u("{% apply upper %}foo \u00e9{% end %}")))
        self.assertEqual(template.generate(upper=upper), utf8(u("FOO \u00c9")))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_if(self):
        template = Template(utf8("{% if x > 4 %}yes{% else %}no{% end %}"))
        self.assertEqual(template.generate(x=5), b"yes")
        self.assertEqual(template.generate(x=3), b"no")
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_if_empty_body(self):
        template = Template(utf8("{% if True %}{% else %}{% end %}"))
        self.assertEqual(template.generate(), b"")
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_try(self):
        template = Template(utf8("""{% try %}
try{% set y = 1/x %}
{% except %}-except
{% else %}-else
{% finally %}-finally
{% end %}"""))
        self.assertEqual(template.generate(x=1), b"\ntry\n-else\n-finally\n")
        self.assertEqual(template.generate(x=0), b"\ntry-except\n-finally\n")
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_comment_directive(self):
        template = Template(utf8("{% comment blah blah %}foo"))
        self.assertEqual(template.generate(), b"foo")
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_break_continue(self):
        template = Template(utf8("""\
{% for i in range(10) %}
    {% if i == 2 %}
        {% continue %}
    {% end %}
    {{ i }}
    {% if i == 6 %}
        {% break %}
    {% end %}
{% end %}"""))
        result = template.generate()
        # remove extraneous whitespace
        result = b''.join(result.split())
        self.assertEqual(result, b"013456")
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_break_in_apply(self):
        # This test verifies current behavior, although of course it would
        # be nice if apply didn't cause seemingly unrelated breakage
        try:
            Template(utf8("{% for i in [] %}{% apply foo %}{% break %}{% end %}{% end %}"))
            raise Exception("Did not get expected exception")
        except ParseError:
            pass
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_utf8_in_file(self):
        tmpl = self.loader.load("utf8.html")
        result = tmpl.generate()
        self.assertEqual(to_unicode(result).strip(), u("H\u00e9llo"))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_unicode_escapes(self):
        self.assertEqual(utf8(u('\u00e9')), b'\xc3\xa9')
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_import_member(self):
        self.assertIs(import_object('tornado.escape.utf8'), utf8)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_import_member_unicode(self):
        self.assertIs(import_object(u('tornado.escape.utf8')), utf8)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_unicode_newlines(self):
        # Ensure that only \r\n is recognized as a header separator, and not
        # the other newline-like unicode characters.
        # Characters that are likely to be problematic can be found in
        # http://unicode.org/standard/reports/tr13/tr13-5.html
        # and cpython's unicodeobject.c (which defines the implementation
        # of unicode_type.splitlines(), and uses a different list than TR13).
        newlines = [
            u('\u001b'),  # VERTICAL TAB
            u('\u001c'),  # FILE SEPARATOR
            u('\u001d'),  # GROUP SEPARATOR
            u('\u001e'),  # RECORD SEPARATOR
            u('\u0085'),  # NEXT LINE
            u('\u2028'),  # LINE SEPARATOR
            u('\u2029'),  # PARAGRAPH SEPARATOR
        ]
        for newline in newlines:
            # Try the utf8 and latin1 representations of each newline
            for encoding in ['utf8', 'latin1']:
                try:
                    try:
                        encoded = newline.encode(encoding)
                    except UnicodeEncodeError:
                        # Some chars cannot be represented in latin1
                        continue
                    data = b'Cookie: foo=' + encoded + b'bar'
                    # parse() wants a native_str, so decode through latin1
                    # in the same way the real parser does.
                    headers = HTTPHeaders.parse(
                        native_str(data.decode('latin1')))
                    expected = [('Cookie', 'foo=' +
                                 native_str(encoded.decode('latin1')) + 'bar')]
                    self.assertEqual(
                        expected, list(headers.get_all()))
                except Exception:
                    gen_log.warning("failed while trying %r in %s",
                                    newline, encoding)
                    raise
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def raw_fetch(self, headers, body, newline=b"\r\n"):
        with closing(IOStream(socket.socket())) as stream:
            stream.connect(('127.0.0.1', self.get_http_port()), self.stop)
            self.wait()
            stream.write(
                newline.join(headers +
                             [utf8("Content-Length: %d" % len(body))]) +
                newline + newline + body)
            read_stream_body(stream, self.stop)
            headers, body = self.wait()
            return body
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_malformed_body(self):
        # parse_qs is pretty forgiving, but it will fail on python 3
        # if the data is not utf8.  On python 2 parse_qs will work,
        # but then the recursive_unicode call in EchoHandler will
        # fail.
        if str is bytes:
            return
        with ExpectLog(gen_log, 'Invalid x-www-form-urlencoded body'):
            response = self.fetch(
                '/echo', method="POST",
                headers={'Content-Type': 'application/x-www-form-urlencoded'},
                body=b'\xe9')
        self.assertEqual(200, response.code)
        self.assertEqual(b'{}', response.body)