我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tornado.escape.utf8()。
def test_multipart_form(self): # Encodings here are tricky: Headers are latin1, bodies can be # anything (we use utf8 by default). response = self.raw_fetch([ b"POST /multipart HTTP/1.0", b"Content-Type: multipart/form-data; boundary=1234567890", b"X-Header-encoding-test: \xe9", ], b"\r\n".join([ b"Content-Disposition: form-data; name=argument", b"", u("\u00e1").encode("utf-8"), b"--1234567890", u('Content-Disposition: form-data; name="files"; filename="\u00f3"').encode("utf8"), b"", u("\u00fa").encode("utf-8"), b"--1234567890--", b"", ])) data = json_decode(response) self.assertEqual(u("\u00e9"), data["header"]) self.assertEqual(u("\u00e1"), data["argument"]) self.assertEqual(u("\u00f3"), data["filename"]) self.assertEqual(u("\u00fa"), data["filebody"])
def get_app(self): # The old request_callback interface does not implement the # delegate interface, and writes its response via request.write # instead of request.connection.write_headers. def handle_request(request): self.http1 = request.version.startswith("HTTP/1.") if not self.http1: # This test will be skipped if we're using HTTP/2, # so just close it out cleanly using the modern interface. request.connection.write_headers( ResponseStartLine('', 200, 'OK'), HTTPHeaders()) request.connection.finish() return message = b"Hello world" request.write(utf8("HTTP/1.1 200 OK\r\n" "Content-Length: %d\r\n\r\n" % len(message))) request.write(message) request.finish() return handle_request
def test_cookie_special_char(self): response = self.fetch("/special_char") headers = sorted(response.headers.get_list("Set-Cookie")) self.assertEqual(len(headers), 3) self.assertEqual(headers[0], 'equals="a=b"; Path=/') self.assertEqual(headers[1], 'quote="a\\"b"; Path=/') # python 2.7 octal-escapes the semicolon; older versions leave it alone self.assertTrue(headers[2] in ('semicolon="a;b"; Path=/', 'semicolon="a\\073b"; Path=/'), headers[2]) data = [('foo=a=b', 'a=b'), ('foo="a=b"', 'a=b'), ('foo="a;b"', 'a;b'), # ('foo=a\\073b', 'a;b'), # even encoded, ";" is a delimiter ('foo="a\\073b"', 'a;b'), ('foo="a\\"b"', 'a"b'), ] for header, expected in data: logging.debug("trying %r", header) response = self.fetch("/get", headers={"Cookie": header}) self.assertEqual(response.body, utf8(expected))
def _on_request_token(self, authorize_url, callback_uri, callback, response): if response.error: raise Exception("Could not get request token: %s" % response.error) request_token = _oauth_parse_response(response.body) data = (base64.b64encode(escape.utf8(request_token["key"])) + b"|" + base64.b64encode(escape.utf8(request_token["secret"]))) self.set_cookie("_oauth_request_token", data) args = dict(oauth_token=request_token["key"]) if callback_uri == "oob": self.finish(authorize_url + "?" + urllib_parse.urlencode(args)) callback() return elif callback_uri: args["oauth_callback"] = urlparse.urljoin( self.request.full_url(), callback_uri) self.redirect(authorize_url + "?" + urllib_parse.urlencode(args)) callback()
def _oauth_signature(consumer_token, method, url, parameters={}, token=None): """Calculates the HMAC-SHA1 OAuth signature for the given request. See http://oauth.net/core/1.0/#signing_process """ parts = urlparse.urlparse(url) scheme, netloc, path = parts[:3] normalized_url = scheme.lower() + "://" + netloc.lower() + path base_elems = [] base_elems.append(method.upper()) base_elems.append(normalized_url) base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v))) for k, v in sorted(parameters.items()))) base_string = "&".join(_oauth_escape(e) for e in base_elems) key_elems = [escape.utf8(consumer_token["secret"])] key_elems.append(escape.utf8(token["secret"] if token else "")) key = b"&".join(key_elems) hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1) return binascii.b2a_base64(hash.digest())[:-1]
def _oauth10a_signature(consumer_token, method, url, parameters={}, token=None): """Calculates the HMAC-SHA1 OAuth 1.0a signature for the given request. See http://oauth.net/core/1.0a/#signing_process """ parts = urlparse.urlparse(url) scheme, netloc, path = parts[:3] normalized_url = scheme.lower() + "://" + netloc.lower() + path base_elems = [] base_elems.append(method.upper()) base_elems.append(normalized_url) base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v))) for k, v in sorted(parameters.items()))) base_string = "&".join(_oauth_escape(e) for e in base_elems) key_elems = [escape.utf8(urllib_parse.quote(consumer_token["secret"], safe='~'))] key_elems.append(escape.utf8(urllib_parse.quote(token["secret"], safe='~') if token else "")) key = b"&".join(key_elems) hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1) return binascii.b2a_base64(hash.digest())[:-1]
def redirect(self, url, permanent=False, status=None): """Sends a redirect to the given (optionally relative) URL. If the ``status`` argument is specified, that value is used as the HTTP status code; otherwise either 301 (permanent) or 302 (temporary) is chosen based on the ``permanent`` argument. The default is 302 (temporary). """ if self._headers_written: raise Exception("Cannot redirect after headers have been written") if status is None: status = 301 if permanent else 302 else: assert isinstance(status, int) and 300 <= status <= 399 self.set_status(status) self.set_header("Location", utf8(url)) self.finish()
def render_string(self, template_name, **kwargs): """Generate the given template with the given arguments. We return the generated byte string (in utf8). To generate and write a template as a response, use render() above. """ # If no template_path is specified, use the path of the calling file template_path = self.get_template_path() if not template_path: frame = sys._getframe(0) web_file = frame.f_code.co_filename while frame.f_code.co_filename == web_file: frame = frame.f_back template_path = os.path.dirname(frame.f_code.co_filename) with RequestHandler._template_loader_lock: if template_path not in RequestHandler._template_loaders: loader = self.create_template_loader(template_path) RequestHandler._template_loaders[template_path] = loader else: loader = RequestHandler._template_loaders[template_path] t = loader.load(template_name) namespace = self.get_template_namespace() namespace.update(kwargs) return t.generate(**namespace)
def decode_signed_value(secret, name, value, max_age_days=31, clock=None, min_version=None): if clock is None: clock = time.time if min_version is None: min_version = DEFAULT_SIGNED_VALUE_MIN_VERSION if min_version > 2: raise ValueError("Unsupported min_version %d" % min_version) if not value: return None value = utf8(value) version = _get_version(value) if version < min_version: return None if version == 1: return _decode_signed_value_v1(secret, name, value, max_age_days, clock) elif version == 2: return _decode_signed_value_v2(secret, name, value, max_age_days, clock) else: return None
def __init__(self, handler, mask_outgoing=False, compression_options=None): WebSocketProtocol.__init__(self, handler) self.mask_outgoing = mask_outgoing self._final_frame = False self._frame_opcode = None self._masked_frame = None self._frame_mask = None self._frame_length = None self._fragmented_message_buffer = None self._fragmented_message_opcode = None self._waiting = None self._compression_options = compression_options self._decompressor = None self._compressor = None self._frame_compressed = None # The total uncompressed size of all messages received or sent. # Unicode messages are encoded to utf8. # Only for testing; subject to change. self._message_bytes_in = 0 self._message_bytes_out = 0 # The total size of all packets received or sent. Includes # the effect of compression, frame overhead, and control frames. self._wire_bytes_in = 0 self._wire_bytes_out = 0
def close(self, code=None, reason=None): """Closes the WebSocket connection.""" if not self.server_terminated: if not self.stream.closed(): if code is None and reason is not None: code = 1000 # "normal closure" status code if code is None: close_data = b'' else: close_data = struct.pack('>H', code) if reason is not None: close_data += utf8(reason) self._write_frame(True, 0x8, close_data) self.server_terminated = True if self.client_terminated: if self._waiting is not None: self.stream.io_loop.remove_timeout(self._waiting) self._waiting = None self.stream.close() elif self._waiting is None: # Give the client a few seconds to complete a clean shutdown, # otherwise just close the connection. self._waiting = self.stream.io_loop.add_timeout( self.stream.io_loop.time() + 5, self._abort)
def test_special_filenames(self): filenames = ['a;b.txt', 'a"b.txt', 'a";b.txt', 'a;"b.txt', 'a";";.txt', 'a\\"b.txt', 'a\\b.txt', ] for filename in filenames: logging.debug("trying filename %r", filename) data = """\ --1234 Content-Disposition: form-data; name="files"; filename="%s" Foo --1234--""" % filename.replace('\\', '\\\\').replace('"', '\\"') data = utf8(data.replace("\n", "\r\n")) args = {} files = {} parse_multipart_form_data(b"1234", data, args, files) file = files["files"][0] self.assertEqual(file["filename"], filename) self.assertEqual(file["body"], b"Foo")
def test_body_encoding(self): unicode_body = u("\xe9") byte_body = binascii.a2b_hex(b"e9") # unicode string in body gets converted to utf8 response = self.fetch("/echopost", method="POST", body=unicode_body, headers={"Content-Type": "application/blah"}) self.assertEqual(response.headers["Content-Length"], "2") self.assertEqual(response.body, utf8(unicode_body)) # byte strings pass through directly response = self.fetch("/echopost", method="POST", body=byte_body, headers={"Content-Type": "application/blah"}) self.assertEqual(response.headers["Content-Length"], "1") self.assertEqual(response.body, byte_body) # Mixing unicode in headers and byte string bodies shouldn't # break anything response = self.fetch("/echopost", method="POST", body=byte_body, headers={"Content-Type": "application/blah"}, user_agent=u("foo")) self.assertEqual(response.headers["Content-Length"], "1") self.assertEqual(response.body, byte_body)
def twisted_coroutine_fetch(self, url, runner): body = [None] @gen.coroutine def f(): # This is simpler than the non-coroutine version, but it cheats # by reading the body in one blob instead of streaming it with # a Protocol. client = Agent(self.reactor) response = yield client.request(b'GET', utf8(url)) with warnings.catch_warnings(): # readBody has a buggy DeprecationWarning in Twisted 15.0: # https://twistedmatrix.com/trac/changeset/43379 warnings.simplefilter('ignore', category=DeprecationWarning) body[0] = yield readBody(response) self.stop_loop() self.io_loop.add_callback(f) runner() return body[0]
def write_message(self, message, binary=False): """Sends the given message to the client of this Web Socket. The message may be either a string or a dict (which will be encoded as json). If the ``binary`` argument is false, the message will be sent as utf8; in binary mode any byte string is allowed. If the connection is already closed, raises `WebSocketClosedError`. .. versionchanged:: 3.2 `WebSocketClosedError` was added (previously a closed connection would raise an `AttributeError`) .. versionchanged:: 4.3 Returns a `.Future` which can be used for flow control. """ if self.ws_connection is None: raise WebSocketClosedError() if isinstance(message, dict): message = tornado.escape.json_encode(message) return self.ws_connection.write_message(message, binary=binary)
def body(self, value): self._body = utf8(value)
def generate(self, **kwargs): """Generate this template with the given arguments.""" namespace = { "escape": escape.xhtml_escape, "xhtml_escape": escape.xhtml_escape, "url_escape": escape.url_escape, "json_encode": escape.json_encode, "squeeze": escape.squeeze, "linkify": escape.linkify, "datetime": datetime, "_tt_utf8": escape.utf8, # for internal use "_tt_string_types": (unicode_type, bytes), # __name__ and __loader__ allow the traceback mechanism to find # the generated source code. "__name__": self.name.replace('.', '_'), "__loader__": ObjectDict(get_source=lambda name: self.code), } namespace.update(self.namespace) namespace.update(kwargs) exec_in(self.compiled, namespace) execute = namespace["_tt_execute"] # Clear the traceback module's cache of source data now that # we've generated a new template (mainly for this module's # unittests, where different tests reuse the same name). linecache.clearcache() return execute()
def generate(self, writer): writer.write_line("_tt_tmp = %s" % self.expression, self.line) writer.write_line("if isinstance(_tt_tmp, _tt_string_types):" " _tt_tmp = _tt_utf8(_tt_tmp)", self.line) writer.write_line("else: _tt_tmp = _tt_utf8(str(_tt_tmp))", self.line) if not self.raw and writer.current_template.autoescape is not None: # In python3 functions like xhtml_escape return unicode, # so we have to convert to utf8 again. writer.write_line("_tt_tmp = _tt_utf8(%s(_tt_tmp))" % writer.current_template.autoescape, self.line) writer.write_line("_tt_append(_tt_tmp)", self.line)
def generate(self, writer): value = self.value # Compress whitespace if requested, with a crude heuristic to avoid # altering preformatted whitespace. if "<pre>" not in value: value = filter_whitespace(self.whitespace, value) if value: writer.write_line('_tt_append(%r)' % escape.utf8(value), self.line)
def _format_chunk(self, chunk): if self._expected_content_remaining is not None: self._expected_content_remaining -= len(chunk) if self._expected_content_remaining < 0: # Close the stream now to stop further framing errors. self.stream.close() raise httputil.HTTPOutputError( "Tried to write more data than Content-Length") if self._chunking_output and chunk: # Don't write out empty chunks because that means END-OF-STREAM # with chunked encoding return utf8("%x" % len(chunk)) + b"\r\n" + chunk + b"\r\n" else: return chunk
def test_comment(self): template = Template("Hello{# TODO i18n #} {{ name }}!") self.assertEqual(template.generate(name=utf8("Ben")), b"Hello Ben!")
def test_unicode_template(self): template = Template(utf8(u("\u00e9"))) self.assertEqual(template.generate(), utf8(u("\u00e9")))
def test_unicode_literal_expression(self): # Unicode literals should be usable in templates. Note that this # test simulates unicode characters appearing directly in the # template file (with utf8 encoding), i.e. \u escapes would not # be used in the template file itself. if str is unicode_type: # python 3 needs a different version of this test since # 2to3 doesn't run on template internals template = Template(utf8(u('{{ "\u00e9" }}'))) else: template = Template(utf8(u('{{ u"\u00e9" }}'))) self.assertEqual(template.generate(), utf8(u("\u00e9")))
def test_apply(self): def upper(s): return s.upper() template = Template(utf8("{% apply upper %}foo{% end %}")) self.assertEqual(template.generate(upper=upper), b"FOO")
def test_unicode_apply(self): def upper(s): return to_unicode(s).upper() template = Template(utf8(u("{% apply upper %}foo \u00e9{% end %}"))) self.assertEqual(template.generate(upper=upper), utf8(u("FOO \u00c9")))
def test_if(self): template = Template(utf8("{% if x > 4 %}yes{% else %}no{% end %}")) self.assertEqual(template.generate(x=5), b"yes") self.assertEqual(template.generate(x=3), b"no")
def test_if_empty_body(self): template = Template(utf8("{% if True %}{% else %}{% end %}")) self.assertEqual(template.generate(), b"")
def test_try(self): template = Template(utf8("""{% try %} try{% set y = 1/x %} {% except %}-except {% else %}-else {% finally %}-finally {% end %}""")) self.assertEqual(template.generate(x=1), b"\ntry\n-else\n-finally\n") self.assertEqual(template.generate(x=0), b"\ntry-except\n-finally\n")
def test_comment_directive(self): template = Template(utf8("{% comment blah blah %}foo")) self.assertEqual(template.generate(), b"foo")
def test_break_continue(self): template = Template(utf8("""\ {% for i in range(10) %} {% if i == 2 %} {% continue %} {% end %} {{ i }} {% if i == 6 %} {% break %} {% end %} {% end %}""")) result = template.generate() # remove extraneous whitespace result = b''.join(result.split()) self.assertEqual(result, b"013456")
def test_break_in_apply(self): # This test verifies current behavior, although of course it would # be nice if apply didn't cause seemingly unrelated breakage try: Template(utf8("{% for i in [] %}{% apply foo %}{% break %}{% end %}{% end %}")) raise Exception("Did not get expected exception") except ParseError: pass
def test_utf8_in_file(self): tmpl = self.loader.load("utf8.html") result = tmpl.generate() self.assertEqual(to_unicode(result).strip(), u("H\u00e9llo"))
def test_unicode_escapes(self): self.assertEqual(utf8(u('\u00e9')), b'\xc3\xa9')
def test_import_member(self): self.assertIs(import_object('tornado.escape.utf8'), utf8)
def test_import_member_unicode(self): self.assertIs(import_object(u('tornado.escape.utf8')), utf8)
def test_unicode_newlines(self): # Ensure that only \r\n is recognized as a header separator, and not # the other newline-like unicode characters. # Characters that are likely to be problematic can be found in # http://unicode.org/standard/reports/tr13/tr13-5.html # and cpython's unicodeobject.c (which defines the implementation # of unicode_type.splitlines(), and uses a different list than TR13). newlines = [ u('\u001b'), # VERTICAL TAB u('\u001c'), # FILE SEPARATOR u('\u001d'), # GROUP SEPARATOR u('\u001e'), # RECORD SEPARATOR u('\u0085'), # NEXT LINE u('\u2028'), # LINE SEPARATOR u('\u2029'), # PARAGRAPH SEPARATOR ] for newline in newlines: # Try the utf8 and latin1 representations of each newline for encoding in ['utf8', 'latin1']: try: try: encoded = newline.encode(encoding) except UnicodeEncodeError: # Some chars cannot be represented in latin1 continue data = b'Cookie: foo=' + encoded + b'bar' # parse() wants a native_str, so decode through latin1 # in the same way the real parser does. headers = HTTPHeaders.parse( native_str(data.decode('latin1'))) expected = [('Cookie', 'foo=' + native_str(encoded.decode('latin1')) + 'bar')] self.assertEqual( expected, list(headers.get_all())) except Exception: gen_log.warning("failed while trying %r in %s", newline, encoding) raise
def raw_fetch(self, headers, body, newline=b"\r\n"): with closing(IOStream(socket.socket())) as stream: stream.connect(('127.0.0.1', self.get_http_port()), self.stop) self.wait() stream.write( newline.join(headers + [utf8("Content-Length: %d" % len(body))]) + newline + newline + body) read_stream_body(stream, self.stop) headers, body = self.wait() return body
def test_malformed_body(self): # parse_qs is pretty forgiving, but it will fail on python 3 # if the data is not utf8. On python 2 parse_qs will work, # but then the recursive_unicode call in EchoHandler will # fail. if str is bytes: return with ExpectLog(gen_log, 'Invalid x-www-form-urlencoded body'): response = self.fetch( '/echo', method="POST", headers={'Content-Type': 'application/x-www-form-urlencoded'}, body=b'\xe9') self.assertEqual(200, response.code) self.assertEqual(b'{}', response.body)