我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tornado.escape.native_str()。
def main(): from tornado.options import define, options, parse_command_line define("print_headers", type=bool, default=False) define("print_body", type=bool, default=True) define("follow_redirects", type=bool, default=True) define("validate_cert", type=bool, default=True) args = parse_command_line() client = HTTPClient() for arg in args: try: response = client.fetch(arg, follow_redirects=options.follow_redirects, validate_cert=options.validate_cert, ) except HTTPError as e: if e.response is not None: response = e.response else: raise if options.print_headers: print(response.headers) if options.print_body: print(native_str(response.body)) client.close()
def _curl_header_callback(self, headers, header_callback, header_line): header_line = native_str(header_line) if header_callback is not None: self.io_loop.add_callback(header_callback, header_line) # header_line as returned by curl includes the end-of-line characters. # whitespace at the start should be preserved to allow multi-line headers header_line = header_line.rstrip() if header_line.startswith("HTTP/"): headers.clear() try: (__, __, reason) = httputil.parse_response_start_line(header_line) header_line = "X-Http-Reason: %s" % reason except httputil.HTTPInputError: return if not header_line: return headers.parse_line(header_line)
def _parse_headers(self, data): # The lstrip removes newlines that some implementations sometimes # insert between messages of a reused connection. Per RFC 7230, # we SHOULD ignore at least one empty line before the request. # http://tools.ietf.org/html/rfc7230#section-3.5 data = native_str(data.decode('latin1')).lstrip("\r\n") # RFC 7230 section allows for both CRLF and bare LF. eol = data.find("\n") start_line = data[:eol].rstrip("\r") try: headers = httputil.HTTPHeaders.parse(data[eol:]) except ValueError: # probably form split() if there was no ':' in the line raise httputil.HTTPInputError("Malformed HTTP headers: %r" % data[eol:100]) return start_line, headers
def test_custom_escape(self): loader = DictLoader({"foo.py": "{% autoescape py_escape %}s = {{ name }}\n"}) def py_escape(s): self.assertEqual(type(s), bytes) return repr(native_str(s)) def render(template, name): return loader.load(template).generate(py_escape=py_escape, name=name) self.assertEqual(render("foo.py", "<html>"), b"s = '<html>'\n") self.assertEqual(render("foo.py", "';sys.exit()"), b"""s = "';sys.exit()"\n""") self.assertEqual(render("foo.py", ["not a string"]), b"""s = "['not a string']"\n""")
def parse_response_start_line(line): """Returns a (version, code, reason) tuple for an HTTP 1.x response line. The response is a `collections.namedtuple`. >>> parse_response_start_line("HTTP/1.1 200 OK") ResponseStartLine(version='HTTP/1.1', code=200, reason='OK') """ line = native_str(line) match = re.match("(HTTP/1.[0-9]) ([0-9]+) ([^\r]*)", line) if not match: raise HTTPInputError("Error parsing response start line") return ResponseStartLine(match.group(1), int(match.group(2)), match.group(3)) # _parseparam and _parse_header are copied and modified from python2.7's cgi.py # The original 2.7 version of this code did not correctly support some # combinations of semicolons and double quotes. # It has also been modified to support valueless parameters as seen in # websocket extension negotiations.
def _on_access_token(self, redirect_uri, client_id, client_secret, future, fields, response): if response.error: future.set_exception(AuthError('Facebook auth error: %s' % str(response))) return args = urlparse.parse_qs(escape.native_str(response.body)) session = { "access_token": args["access_token"][-1], "expires": args.get("expires") } self.facebook_request( path="/me", callback=functools.partial( self._on_get_user_info, future, session, fields), access_token=session["access_token"], fields=",".join(fields) )
def parse_config_file(self, path, final=True): """Parses and loads the Python config file at the given path. If ``final`` is ``False``, parse callbacks will not be run. This is useful for applications that wish to combine configurations from multiple sources. .. versionchanged:: 4.1 Config files are now always interpreted as utf-8 instead of the system default encoding. """ config = {} with open(path, 'rb') as f: exec_in(native_str(f.read()), config, config) for name in config: normalized = self._normalize_name(name) if normalized in self._options: self._options[normalized].set(config[name]) if final: self.run_parse_callbacks()
def set_status(self, status_code, reason=None): """Sets the status code for our response. :arg int status_code: Response status code. If ``reason`` is ``None``, it must be present in `httplib.responses <http.client.responses>`. :arg string reason: Human-readable reason phrase describing the status code. If ``None``, it will be filled in from `httplib.responses <http.client.responses>`. """ self._status_code = status_code if reason is not None: self._reason = escape.native_str(reason) else: try: self._reason = httputil.responses[status_code] except KeyError: raise ValueError("unknown status code %d", status_code)
def __init__(self, template_string, name="<string>", loader=None, compress_whitespace=None, autoescape=_UNSET): self.name = name if compress_whitespace is None: compress_whitespace = name.endswith(".html") or \ name.endswith(".js") if autoescape is not _UNSET: self.autoescape = autoescape elif loader: self.autoescape = loader.autoescape else: self.autoescape = _DEFAULT_AUTOESCAPE self.namespace = loader.namespace if loader else {} reader = _TemplateReader(name, escape.native_str(template_string)) self.file = _File(_parse(reader, self)) self.code = self._generate_python(loader, compress_whitespace) try: self.compiled = compile(escape.to_unicode(self.code), "<template %s>" % self.name, "exec") except Exception: formatted_code = _format_code(self.code).rstrip() logging.error("%s code:\n%s", self.name, formatted_code) raise
def test_100_continue(self): # Run through a 100-continue interaction by hand: # When given Expect: 100-continue, we get a 100 response after the # headers, and then the real response after the body. stream = IOStream(socket.socket(), io_loop=self.io_loop) stream.connect(("localhost", self.get_http_port()), callback=self.stop) self.wait() stream.write(b("\r\n").join([b("POST /hello HTTP/1.1"), b("Content-Length: 1024"), b("Expect: 100-continue"), b("\r\n")]), callback=self.stop) self.wait() stream.read_until(b("\r\n\r\n"), self.stop) data = self.wait() self.assertTrue(data.startswith(b("HTTP/1.1 100 ")), data) stream.write(b("a") * 1024) stream.read_until(b("\r\n"), self.stop) first_line = self.wait() self.assertTrue(first_line.startswith(b("HTTP/1.1 200")), first_line) stream.read_until(b("\r\n\r\n"), self.stop) header_data = self.wait() headers = HTTPHeaders.parse(native_str(header_data.decode('latin1'))) stream.read_bytes(int(headers["Content-Length"]), self.stop) body = self.wait() self.assertEqual(body, b("Got 1024 bytes in POST"))
def _on_request_body(self, data): self._request.body = data content_type = self._request.headers.get("Content-Type", "") if self._request.method in ("POST", "PUT"): if content_type.startswith("application/x-www-form-urlencoded"): arguments = parse_qs_bytes(native_str(self._request.body)) for name, values in arguments.iteritems(): values = [v for v in values if v] if values: self._request.arguments.setdefault(name, []).extend( values) elif content_type.startswith("multipart/form-data"): fields = content_type.split(";") for field in fields: k, sep, v = field.strip().partition("=") if k == "boundary" and v: httputil.parse_multipart_form_data( utf8(v), data, self._request.arguments, self._request.files) break else: logging.warning("Invalid multipart/form-data") self.request_callback(self._request)
def test_custom_escape(self): loader = DictLoader({"foo.py": "{% autoescape py_escape %}s = {{ name }}\n"}) def py_escape(s): self.assertEqual(type(s), bytes_type) return repr(native_str(s)) def render(template, name): return loader.load(template).generate(py_escape=py_escape, name=name) self.assertEqual(render("foo.py", "<html>"), b"s = '<html>'\n") self.assertEqual(render("foo.py", "';sys.exit()"), b"""s = "';sys.exit()"\n""") self.assertEqual(render("foo.py", ["not a string"]), b"""s = "['not a string']"\n""")
def _curl_header_callback(self, headers, header_callback, header_line): header_line = native_str(header_line) if header_callback is not None: self.io_loop.add_callback(header_callback, header_line) # header_line as returned by curl includes the end-of-line characters. header_line = header_line.strip() if header_line.startswith("HTTP/"): headers.clear() try: (__, __, reason) = httputil.parse_response_start_line(header_line) header_line = "X-Http-Reason: %s" % reason except httputil.HTTPInputError: return if not header_line: return headers.parse_line(header_line)
def _on_access_token(self, redirect_uri, client_id, client_secret, future, fields, response): if response.error: future.set_exception(AuthError('Facebook auth error: %s' % str(response))) return args = escape.parse_qs_bytes(escape.native_str(response.body)) session = { "access_token": args["access_token"][-1], "expires": args.get("expires") } self.facebook_request( path="/me", callback=functools.partial( self._on_get_user_info, future, session, fields), access_token=session["access_token"], fields=",".join(fields) )
def parse_config_file(self, path, final=True): """Parses and loads the Python config file at the given path. If ``final`` is ``False``, parse callbacks will not be run. This is useful for applications that wish to combine configurations from multiple sources. .. versionchanged:: 4.1 Config files are now always interpreted as utf-8 instead of the system default encoding. """ config = {} with open(path, 'rb') as f: exec_in(native_str(f.read()), config, config) for name in config: if name in self._options: self._options[name].set(config[name]) if final: self.run_parse_callbacks()
def define_options(default_conf): """ Define the options from default.conf dynamically """ default = {} with open(default_conf, 'rb') as f: exec_in(native_str(f.read()), {}, default) for name, value in default.iteritems(): # if the option is already defined by tornado # override the value # a list of options set by tornado: # log_file_num_backups, logging, help, # log_to_stderr, log_file_max_size, log_file_prefix if name in options: setattr(options, name, value) # otherwise define the option else: define(name, value)