Python bottle.request 模块,headers() 实例源码

我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用bottle.request.headers()

项目:schul_cloud_resources_server_tests    作者:schul-cloud    | 项目源码 | 文件源码
def _error(error, code):
    """Return an error as json"""
    _error = {
        "status": str(code),
        "title": errors[code],
        "detail": error.body
    }
    traceback.print_exception(type(error), error, error.traceback)
    response.headers["Content-Type"] = "application/vnd.api+json"
    return response_object(errors=[_error])
项目:schul_cloud_resources_server_tests    作者:schul-cloud    | 项目源码 | 文件源码
def get_resources():
    """Return the resources of the authenticated user.

    If authentication failed, this aborts the execution with
    401 Unauthorized.
    """
    #pprint(dict(request.headers))
    header = request.environ.get('HTTP_AUTHORIZATION','')
    if header:
        print("Authorization:", header)
    basic = request.auth
    if basic:
        username, password = basic
        if passwords.get(username) != password:
            abort(401, BASIC_ERROR)
    else:
        api_key = get_api_key()
        if api_key is not None:
            username = api_keys.get(api_key)
            if username is None:
                abort(401, API_KEY_ERROR)
        else:
            username = None
    return _resources[username]
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def assertRedirect(self, target, result, query=None, status=303, **args):
        env = {'SERVER_PROTOCOL':'HTTP/1.1'}
        for key in list(args):
            if key.startswith('wsgi'):
                args[key.replace('_', '.', 1)] = args[key]
                del args[key]
        env.update(args)
        request.bind(env)
        bottle.response.bind()
        try:
            bottle.redirect(target, **(query or {}))
        except bottle.HTTPResponse:
            r = _e()
            self.assertEqual(status, r.status_code)
            self.assertTrue(r.headers)
            self.assertEqual(result, r.headers['Location'])
项目:behave-web-api    作者:jefersondaniel    | 项目源码 | 文件源码
def echo():
    try:
        body = request.body.read().decode('utf-8')
    except:
        body = None

    result = {
        'method': request.method,
        'headers': dict(request.headers),
        'body': body,
        'files': [
            {'key': key, 'name': request.files[key].raw_filename}
            for key in request.files
        ]
    }

    return result
项目:coolq-telegram-bot    作者:jqqqqqqqqqq    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        headers = {}
        if self._access_token:
            headers['Authorization'] = 'Token ' + self._access_token
        resp = requests.post(
            self._url, json=kwargs,
            headers=headers
        )
        if resp.ok:
            data = resp.json()
            if data.get('status') == 'failed':
                raise Error(resp.status_code, data.get('retcode'))
            return data.get('data')
        raise Error(resp.status_code)
项目:dd-trace-py    作者:DataDog    | 项目源码 | 文件源码
def apply(self, callback, route):

        def wrapped(*args, **kwargs):
            if not self.tracer or not self.tracer.enabled:
                return callback(*args, **kwargs)

            resource = "%s %s" % (request.method, request.route.rule)

            # Propagate headers such as x-datadog-trace-id.
            if self.distributed_tracing:
                propagator = HTTPPropagator()
                context = propagator.extract(request.headers)
                if context.trace_id:
                    self.tracer.context_provider.activate(context)

            with self.tracer.trace("bottle.request", service=self.service, resource=resource) as s:
                code = 0
                try:
                    return callback(*args, **kwargs)
                except Exception:
                    # bottle doesn't always translate unhandled exceptions, so
                    # we mark it here.
                    code = 500
                    raise
                finally:
                    s.set_tag(http.STATUS_CODE, code or response.status_code)
                    s.set_tag(http.URL, request.path)
                    s.set_tag(http.METHOD, request.method)

        return wrapped
项目:schul_cloud_resources_server_tests    作者:schul-cloud    | 项目源码 | 文件源码
def get_api_key():
    """Return the api key or None."""
    header = request.headers.get('Authorization')
    if not header: return
    try:
        method, data = header.split(None, 1)
        if method.lower() != 'api-key': return
        return touni(base64.b64decode(tob(data[4:])))
    except (ValueError, TypeError):
        abort(401, HEADER_ERROR)
项目:schul_cloud_resources_server_tests    作者:schul-cloud    | 项目源码 | 文件源码
def get_endpoint_url():
    """Return the url that this server is reachable at."""
    return "http://" + request.headers["Host"] + BASE
项目:schul_cloud_resources_server_tests    作者:schul-cloud    | 项目源码 | 文件源码
def test_jsonapi_header():
    """Make sure that the content type is set accordingly.

    http://jsonapi.org/format/#content-negotiation-clients
    """
    content_type = request.content_type
    content_type_expected = "application/vnd.api+json"
    if content_type != content_type_expected and content_type.startswith(content_type_expected):
        abort(415, "The Content-Type header must be \"{}\", not \"{}\".".format(
                   content_type_expected, content_type))
    accepts = request.headers.get("Accept", "*/*").split(",")
    expected_accept = ["*/*", "application/*", "application/vnd.api+json"]
    if not any([accept in expected_accept for accept in accepts]):
        abort(406, "The Accept header must one of \"{}\", not \"{}\".".format(
                       expected_accept, ",".join(accepts)))
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_header_access(self):
        """ Environ: Request objects decode headers """
        e = {}
        wsgiref.util.setup_testing_defaults(e)
        e['HTTP_SOME_HEADER'] = 'some value'
        request = BaseRequest(e)
        request['HTTP_SOME_OTHER_HEADER'] = 'some other value'
        self.assertTrue('Some-Header' in request.headers)
        self.assertTrue(request.headers['Some-Header'] == 'some value')
        self.assertTrue(request.headers['Some-Other-Header'] == 'some other value')
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_header_access_special(self):
        e = {}
        wsgiref.util.setup_testing_defaults(e)
        request = BaseRequest(e)
        request['CONTENT_TYPE'] = 'test'
        request['CONTENT_LENGTH'] = '123'
        self.assertEqual(request.headers['Content-Type'], 'test')
        self.assertEqual(request.headers['Content-Length'], '123')
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_content_type(self):
        rs = BaseResponse()
        rs.content_type = 'test/some'
        self.assertEquals('test/some', rs.headers.get('Content-Type'))
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_set_header(self):
        response = BaseResponse()
        response['x-test'] = 'foo'
        headers = [value for name, value in response.headerlist
                   if name.title() == 'X-Test']
        self.assertEqual(['foo'], headers)
        self.assertEqual('foo', response['x-test'])

        response['X-Test'] = 'bar'
        headers = [value for name, value in response.headerlist
                   if name.title() == 'X-Test']
        self.assertEqual(['bar'], headers)
        self.assertEqual('bar', response['x-test'])
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_append_header(self):
        response = BaseResponse()
        response.set_header('x-test', 'foo')
        headers = [value for name, value in response.headerlist
                   if name.title() == 'X-Test']
        self.assertEqual(['foo'], headers)
        self.assertEqual('foo', response['x-test'])

        response.add_header('X-Test', 'bar')
        headers = [value for name, value in response.headerlist
                   if name.title() == 'X-Test']
        self.assertEqual(['foo', 'bar'], headers)
        self.assertEqual('bar', response['x-test'])
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_host_http_proxy(self):
        # Trust proxy headers over original header.
        self.assertRedirect('./test.html', 'http://example.com/test.html',
                            HTTP_X_FORWARDED_HOST='example.com',
                            HTTP_HOST='127.0.0.1')
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def setUp(self):
        self.env = {}
        self.headers = bottle.WSGIHeaderDict(self.env)
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_native(self):
        self.env['HTTP_TEST_HEADER'] = 'foobar'
        self.assertEqual(self.headers['Test-header'], 'foobar')
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_bytes(self):
        self.env['HTTP_TEST_HEADER'] = tob('foobar')
        self.assertEqual(self.headers['Test-Header'], 'foobar')
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_dict(self):
        for key in 'foo-bar Foo-Bar foo-Bar FOO-BAR'.split():
            self.assertTrue(key not in self.headers)
            self.assertEqual(self.headers.get(key), None)
            self.assertEqual(self.headers.get(key, 5), 5)
            self.assertRaises(KeyError, lambda x: self.headers[x], key)
        self.env['HTTP_FOO_BAR'] = 'test'
        for key in 'foo-bar Foo-Bar foo-Bar FOO-BAR'.split():
            self.assertTrue(key in self.headers)
            self.assertEqual(self.headers.get(key), 'test')
            self.assertEqual(self.headers.get(key, 5), 'test')
项目:aws-es-auth-proxy    作者:lucaagostini    | 项目源码 | 文件源码
def index():

    auth = AWS4Auth(config.aws_service_credentials.access_key, config.aws_service_credentials.secret_key, config.aws_service_region, config.aws_service_name)
    endpoint = config.aws_service_protocol + "://" + config.aws_service_endpoint + request.path + "?" + request.query_string
    proxy_req_header = {}
    for header in request.headers:
        if header.lower() in PROXY_REQ_HEADERS_WHITELIST:
            proxy_req_header[header] = request.headers[header]

    if request.method == "HEAD":
        requests_response = requests.head(endpoint, auth=auth, headers=proxy_req_header)
    elif request.method == "GET":
        requests_response = requests.get(endpoint, auth=auth, headers=proxy_req_header)
    elif request.method == "POST":
        proxy_req_body = request.body.getvalue()
        requests_response = requests.post(
            endpoint,
            auth=auth,
            data=proxy_req_body,
            headers=proxy_req_header)
    elif request.method == "PUT":
        proxy_req_body = request.body.getvalue()
        requests_response = requests.put(
            endpoint,
            auth=auth,
            data=proxy_req_body,
            headers=proxy_req_header)
    else:
        logging.error("Method not allowed")
    response.body = requests_response.content
    for header in requests_response.headers:
        if not header.lower() in PROXY_RESP_HEADERS_BLACKLIST:
            response.add_header(header, requests_response.headers[header])
    return response
项目:coolq-telegram-bot    作者:jqqqqqqqqqq    | 项目源码 | 文件源码
def _handle(self):
        if self._secret:
            # check signature
            if 'X-Signature' not in request.headers:
                abort(401)

            sec = self._secret
            if isinstance(sec, str):
                sec = sec.encode('utf-8')
            sig = hmac.new(sec, request.body.read(), 'sha1').hexdigest()
            if request.headers['X-Signature'] != 'sha1=' + sig:
                abort(403)

        post_type = request.json.get('post_type')
        if post_type not in ('message', 'event', 'request'):
            abort(400)

        handler_key = None
        for pk_pair in (('message', 'message_type'),
                        ('event', 'event'),
                        ('request', 'request_type')):
            if post_type == pk_pair[0]:
                handler_key = request.json.get(pk_pair[1])
                if not handler_key:
                    abort(400)
                else:
                    break

        if not handler_key:
            abort(400)

        for group in self._groups:
            handler = self._handlers[group][post_type].get(handler_key)
            if not handler:
                handler = self._handlers[group][post_type].get('*')  # try wildcard
            if handler:
                assert callable(handler)
                result = handler(request.json)
                if 'pass' in result:
                    continue
                else:
                    return result

        return ''