Python werkzeug.wrappers 模块,Request() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用werkzeug.wrappers.Request()

项目:talisker    作者:canonical-ols    | 项目源码 | 文件源码
def __call__(self, environ, start_response):
        request = Request(environ)
        if request.path.startswith(self.prefix):
            method = request.path[len(self.prefix):]
            if method == '':
                # no trailing /
                start_response('302', [('location', self.prefix + '/')])
                return ''
            try:
                funcname = self.urlmap[method]
                func = getattr(self, funcname)
            except (KeyError, AttributeError):
                response = Response(status=404)
            else:
                response = func(request)

            return response(environ, start_response)
        else:
            return self.app(environ, start_response)
项目:Leics    作者:LeicsFrameWork    | 项目源码 | 文件源码
def wsgi_app(self, environ, start_response):
        """
        wsgi ??
            ??????(cgI????), ????start_response????,
            ????
            ????header??start_response????
            ??????????
        """
        # ???wsgi???????
        # self is app
        # active: _request_ctx_stack.top
        _request_ctx_stack.push(_RequestContext(self, environ))
        try:
            request = Request(environ)
            response = self.dispatch_request(request)
            return response(environ, start_response)
        finally:
            _request_ctx_stack.pop()
项目:apm-agent-python    作者:elastic    | 项目源码 | 文件源码
def __call__(self, environ, start_response):
        code = self.code
        content = self.content
        request = Request(environ)
        self.requests.append(request)
        data = request.data
        if request.content_encoding == 'deflate':
            data = zlib.decompress(data)
        data = data.decode(request.charset)
        if request.content_type == 'application/json':
            data = json.loads(data)
        self.payloads.append(data)
        validator = VALIDATORS.get(request.path, None)
        if validator and not self.skip_validate:
            try:
                validator.validate(data)
                code = 202
            except jsonschema.ValidationError as e:
                code = 400
                content = json.dumps({'status': 'error', 'message': str(e)})
        response = Response(status=code)
        response.headers.clear()
        response.headers.extend(self.headers)
        response.data = content
        return response(environ, start_response)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_accept_mixin(self):
        request = wrappers.Request({
            'HTTP_ACCEPT':  'text/xml,application/xml,application/xhtml+xml,'
                            'text/html;q=0.9,text/plain;q=0.8,image/png,*/*;q=0.5',
            'HTTP_ACCEPT_CHARSET': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7',
            'HTTP_ACCEPT_ENCODING': 'gzip,deflate',
            'HTTP_ACCEPT_LANGUAGE': 'en-us,en;q=0.5'
        })
        self.assert_equal(request.accept_mimetypes, MIMEAccept([
            ('text/xml', 1), ('image/png', 1), ('application/xml', 1),
            ('application/xhtml+xml', 1), ('text/html', 0.9),
            ('text/plain', 0.8), ('*/*', 0.5)
        ]))
        self.assert_strict_equal(request.accept_charsets, CharsetAccept([
            ('ISO-8859-1', 1), ('utf-8', 0.7), ('*', 0.7)
        ]))
        self.assert_strict_equal(request.accept_encodings, Accept([
            ('gzip', 1), ('deflate', 1)]))
        self.assert_strict_equal(request.accept_languages, LanguageAccept([
            ('en-us', 1), ('en', 0.5)]))

        request = wrappers.Request({'HTTP_ACCEPT': ''})
        self.assert_strict_equal(request.accept_mimetypes, MIMEAccept())
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_etag_request_mixin(self):
        request = wrappers.Request({
            'HTTP_CACHE_CONTROL':       'no-store, no-cache',
            'HTTP_IF_MATCH':            'w/"foo", bar, "baz"',
            'HTTP_IF_NONE_MATCH':       'w/"foo", bar, "baz"',
            'HTTP_IF_MODIFIED_SINCE':   'Tue, 22 Jan 2008 11:18:44 GMT',
            'HTTP_IF_UNMODIFIED_SINCE': 'Tue, 22 Jan 2008 11:18:44 GMT'
        })
        assert request.cache_control.no_store
        assert request.cache_control.no_cache

        for etags in request.if_match, request.if_none_match:
            assert etags('bar')
            assert etags.contains_raw('w/"foo"')
            assert etags.contains_weak('foo')
            assert not etags.contains('foo')

        self.assert_equal(request.if_modified_since, datetime(2008, 1, 22, 11, 18, 44))
        self.assert_equal(request.if_unmodified_since, datetime(2008, 1, 22, 11, 18, 44))
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_user_agent_mixin(self):
        user_agents = [
            ('Mozilla/5.0 (Macintosh; U; Intel Mac OS X; en-US; rv:1.8.1.11) '
             'Gecko/20071127 Firefox/2.0.0.11', 'firefox', 'macos', '2.0.0.11',
             'en-US'),
            ('Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; de-DE) Opera 8.54',
             'opera', 'windows', '8.54', 'de-DE'),
            ('Mozilla/5.0 (iPhone; U; CPU like Mac OS X; en) AppleWebKit/420 '
             '(KHTML, like Gecko) Version/3.0 Mobile/1A543a Safari/419.3',
             'safari', 'iphone', '419.3', 'en'),
            ('Bot Googlebot/2.1 ( http://www.googlebot.com/bot.html)',
             'google', None, '2.1', None)
        ]
        for ua, browser, platform, version, lang in user_agents:
            request = wrappers.Request({'HTTP_USER_AGENT': ua})
            self.assert_strict_equal(request.user_agent.browser, browser)
            self.assert_strict_equal(request.user_agent.platform, platform)
            self.assert_strict_equal(request.user_agent.version, version)
            self.assert_strict_equal(request.user_agent.language, lang)
            assert bool(request.user_agent)
            self.assert_strict_equal(request.user_agent.to_header(), ua)
            self.assert_strict_equal(str(request.user_agent), ua)

        request = wrappers.Request({'HTTP_USER_AGENT': 'foo'})
        assert not request.user_agent
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_common_request_descriptors_mixin(self):
        request = wrappers.Request.from_values(content_type='text/html; charset=utf-8',
                                               content_length='23',
                                               headers={
            'Referer':      'http://www.example.com/',
            'Date':         'Sat, 28 Feb 2009 19:04:35 GMT',
            'Max-Forwards': '10',
            'Pragma':       'no-cache',
            'Content-Encoding': 'gzip',
            'Content-MD5':      '9a3bc6dbc47a70db25b84c6e5867a072'
        })

        self.assert_equal(request.content_type, 'text/html; charset=utf-8')
        self.assert_equal(request.mimetype, 'text/html')
        self.assert_equal(request.mimetype_params, {'charset': 'utf-8'})
        self.assert_equal(request.content_length, 23)
        self.assert_equal(request.referrer, 'http://www.example.com/')
        self.assert_equal(request.date, datetime(2009, 2, 28, 19, 4, 35))
        self.assert_equal(request.max_forwards, 10)
        self.assert_true('no-cache' in request.pragma)
        self.assert_equal(request.content_encoding, 'gzip')
        self.assert_equal(request.content_md5, '9a3bc6dbc47a70db25b84c6e5867a072')
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_file_closing(self):
        data = (b'--foo\r\n'
                b'Content-Disposition: form-data; name="foo"; filename="foo.txt"\r\n'
                b'Content-Type: text/plain; charset=utf-8\r\n\r\n'
                b'file contents, just the contents\r\n'
                b'--foo--')
        req = wrappers.Request.from_values(
            input_stream=BytesIO(data),
            content_length=len(data),
            content_type='multipart/form-data; boundary=foo',
            method='POST'
        )
        foo = req.files['foo']
        self.assert_equal(foo.mimetype, 'text/plain')
        self.assert_equal(foo.filename, 'foo.txt')

        self.assert_equal(foo.closed, False)
        req.close()
        self.assert_equal(foo.closed, True)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_ranges(self):
        # basic range stuff
        req = wrappers.Request.from_values()
        assert req.range is None
        req = wrappers.Request.from_values(headers={'Range': 'bytes=0-499'})
        self.assert_equal(req.range.ranges, [(0, 500)])

        resp = wrappers.Response()
        resp.content_range = req.range.make_content_range(1000)
        self.assert_equal(resp.content_range.units, 'bytes')
        self.assert_equal(resp.content_range.start, 0)
        self.assert_equal(resp.content_range.stop, 500)
        self.assert_equal(resp.content_range.length, 1000)
        self.assert_equal(resp.headers['Content-Range'], 'bytes 0-499/1000')

        resp.content_range.unset()
        assert 'Content-Range' not in resp.headers

        resp.headers['Content-Range'] = 'bytes 0-499/1000'
        self.assert_equal(resp.content_range.units, 'bytes')
        self.assert_equal(resp.content_range.start, 0)
        self.assert_equal(resp.content_range.stop, 500)
        self.assert_equal(resp.content_range.length, 1000)
项目:gce_metadata_server    作者:salrashid123    | 项目源码 | 文件源码
def __call__(self, environ, start_response):
    req = Request(environ, shallow=True)  
    host = req.headers.get('Host')
    if host not in ('metadata.google.internal', '169.254.169.254' , 'metadata'):
      status = '403 Forbidden'
      response_headers = [('Content-Type','text/html; charset=UTF-8')]
      start_response(status, response_headers)
      return ['Host Header nust be one of (metadata.google.internal, metadata, 169.254.169.254)']
    req_path = environ.get('PATH_INFO')
    metadata_flavor = req.headers.get('Metadata-Flavor')
    if (metadata_flavor is None and req_path != '/'):
      status = '403 Forbidden'
      response_headers = [('Content-Type','text/html; charset=UTF-8')]
      start_response(status, response_headers)
      logging.error("Metadata-Flavor: Google header not sent for: " + environ.get('PATH_INFO'))
      t = Template('''<p>Your client does not have permission to get URL 
      <code>{{ err_path }}</code> from this server. Missing Metadata-Flavor:Google header. ''')
      return [str(t.render(err_path= environ.get('PATH_INFO')))]
    return self.app(environ, start_response)


# Make sure every response has these headers (which is what gce does)
项目:zabbix-exporter    作者:MyBook    | 项目源码 | 文件源码
def zabbix_fake_app(environ, start_response):
    request = Request(environ)
    request_body = request.get_data(as_text=True)

    if getattr(zabbix_fake_app, 'status', False):
        response = Response(status=zabbix_fake_app.status)
        response.data = zabbix_fake_app.content
        return response(environ, start_response)

    response = Response(status=200, headers=[('Content-type', 'application/json')])
    if '"method": "user.login"' in request_body:
        json_string = '{"jsonrpc":"2.0","result":"9287f336ffb611e586aa5e5517507c66","id":0}'
    elif '"method": "host.get"' in request_body:
        json_string = open('tests/fixtures/host.get_success.json').read()
    elif '"method": "item.get"' in request_body:
        json_string = open('tests/fixtures/items.get_success.json').read()
    else:
        json_string = 'Unrecognized test request'
    response.data = json_string
    return response(environ, start_response)
项目:talisker    作者:canonical-ols    | 项目源码 | 文件源码
def get_response(ip, forwarded=None):
    req_dict = {'REMOTE_ADDR': ip}
    if forwarded:
        req_dict['HTTP_X_FORWARDED_FOR'] = forwarded

    return protected(None, Request(req_dict))
项目:spichi    作者:ifkite    | 项目源码 | 文件源码
def wsgi_app(self, environ, start_response):
        request = Request(environ)
        self.pre_handle(request)
        response = self.dispatch_response(request)
        self.post_handle(request, response)
        return response(environ, start_response)
项目:saffron    作者:Lamden    | 项目源码 | 文件源码
def wsgi_app(self, environ, start_response):
        request = Request(environ)
        response = self.dispatch_request(request)
        return response(environ, start_response)
项目:marabunta    作者:camptocamp    | 项目源码 | 文件源码
def wsgi_app(self, environ, start_response):
        request = Request(environ)
        response = self.dispatch_request(request)
        return response(environ, start_response)
项目:core-python    作者:yidao620c    | 项目源码 | 文件源码
def wsgi_app(self, environ, start_response):
        request = Request(environ)
        response = self.dispatch_request(request)
        return response(environ, start_response)
项目:pixie    作者:algorithm-ninja    | 项目源码 | 文件源码
def wsgi_app(self, environ, start_response):
        route = self.router.bind_to_environ(environ)
        try:
            endpoint, args = route.match()
        except RequestRedirect as e:
            return e
        except HTTPException:
            return NotFound()

        request = Request(environ)
        args = request.args
        response = Response()
        response.mimetype = 'text/plain'
        response.status_code = 200

        if endpoint == 'contestant':
            if 'mac' not in args or 'row' not in args or 'col' not in args:
                response.status_code = 400
                response.data = 'Required query parameters: mac, row, col'
            else:
                mac = args['mac']
                row = args['row']
                col = args['col']
                self.add_contestant(mac, row, col, response)

        elif endpoint == 'worker':
            if 'mac' not in args or 'num' not in args:
                response.status_code = 400
                response.data = 'Required query parameters: mac, num'
            else:
                mac = args['mac']
                num = args['num']
                self.add_worker(mac, num, response)

        elif endpoint == 'reboot_timestamp':
            response.data = str(self.reboot_string)

        return response
项目:pixie    作者:algorithm-ninja    | 项目源码 | 文件源码
def wsgi_app(self, environ, start_response):
        route = self.router.bind_to_environ(environ)
        try:
            endpoint, args = route.match()
        except RequestRedirect as e:
            return e
        except HTTPException:
            return NotFound()

        request = Request(environ)
        get_args = dict(request.args)
        if endpoint == 'wipe':
            get_args['wipe'] = 'pixie_wipe=force'
        else:
            get_args['wipe'] = ""


        response = Response()
        response.mimetype = 'text/plain'
        response.status_code = 200

        config = None
        if 'ip' in get_args:
            ip_addr = ipaddress.ip_address(get_args['ip'][0])
            for cfg in self.configs:
                if ip_addr in cfg['subnet']:
                    config = cfg

        if config is None:
            response.data = CONFIGSCRIPT.format(**self.default_config)
        else:
            for (k, v) in config.items():
                get_args[k] = v
            response.data = BOOTSCRIPT.format(**get_args)
        return response
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_query_string_is_bytes(self):
        req = wrappers.Request.from_values(u'/?foo=%2f')
        self.assert_strict_equal(req.query_string, b'foo=%2f')
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_access_route(self):
        req = wrappers.Request.from_values(headers={
            'X-Forwarded-For': '192.168.1.2, 192.168.1.1'
        })
        req.environ['REMOTE_ADDR'] = '192.168.1.3'
        self.assert_equal(req.access_route, ['192.168.1.2', '192.168.1.1'])
        self.assert_strict_equal(req.remote_addr, '192.168.1.3')

        req = wrappers.Request.from_values()
        req.environ['REMOTE_ADDR'] = '192.168.1.3'
        self.assert_strict_equal(list(req.access_route), ['192.168.1.3'])
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_url_request_descriptors(self):
        req = wrappers.Request.from_values('/bar?foo=baz', 'http://example.com/test')
        self.assert_strict_equal(req.path, u'/bar')
        self.assert_strict_equal(req.full_path, u'/bar?foo=baz')
        self.assert_strict_equal(req.script_root, u'/test')
        self.assert_strict_equal(req.url, u'http://example.com/test/bar?foo=baz')
        self.assert_strict_equal(req.base_url, u'http://example.com/test/bar')
        self.assert_strict_equal(req.url_root, u'http://example.com/test/')
        self.assert_strict_equal(req.host_url, u'http://example.com/')
        self.assert_strict_equal(req.host, 'example.com')
        self.assert_strict_equal(req.scheme, 'http')

        req = wrappers.Request.from_values('/bar?foo=baz', 'https://example.com/test')
        self.assert_strict_equal(req.scheme, 'https')
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_url_request_descriptors_query_quoting(self):
        next = 'http%3A%2F%2Fwww.example.com%2F%3Fnext%3D%2F'
        req = wrappers.Request.from_values('/bar?next=' + next, 'http://example.com/')
        self.assert_equal(req.path, u'/bar')
        self.assert_strict_equal(req.full_path, u'/bar?next=' + next)
        self.assert_strict_equal(req.url, u'http://example.com/bar?next=' + next)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_authorization_mixin(self):
        request = wrappers.Request.from_values(headers={
            'Authorization': 'Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ=='
        })
        a = request.authorization
        self.assert_strict_equal(a.type, 'basic')
        self.assert_strict_equal(a.username, 'Aladdin')
        self.assert_strict_equal(a.password, 'open sesame')
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_stream_wrapping(self):
        class LowercasingStream(object):
            def __init__(self, stream):
                self._stream = stream
            def read(self, size=-1):
                return self._stream.read(size).lower()
            def readline(self, size=-1):
                return self._stream.readline(size).lower()

        data = b'foo=Hello+World'
        req = wrappers.Request.from_values('/', method='POST', data=data,
            content_type='application/x-www-form-urlencoded')
        req.stream = LowercasingStream(req.stream)
        self.assert_equal(req.form['foo'], 'hello world')
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_get_data_method_parsing_caching_behavior(self):
        data = b'foo=Hello+World'
        req = wrappers.Request.from_values('/', method='POST', data=data,
            content_type='application/x-www-form-urlencoded')

        # get_data() caches, so form stays available
        self.assert_equal(req.get_data(), data)
        self.assert_equal(req.form['foo'], u'Hello World')
        self.assert_equal(req.get_data(), data)

        # here we access the form data first, caching is bypassed
        req = wrappers.Request.from_values('/', method='POST', data=data,
            content_type='application/x-www-form-urlencoded')
        self.assert_equal(req.form['foo'], u'Hello World')
        self.assert_equal(req.get_data(), b'')

        # Another case is uncached get data which trashes everything
        req = wrappers.Request.from_values('/', method='POST', data=data,
            content_type='application/x-www-form-urlencoded')
        self.assert_equal(req.get_data(cache=False), data)
        self.assert_equal(req.get_data(cache=False), b'')
        self.assert_equal(req.form, {})

        # Or we can implicitly start the form parser which is similar to
        # the old .data behavior
        req = wrappers.Request.from_values('/', method='POST', data=data,
            content_type='application/x-www-form-urlencoded')
        self.assert_equal(req.get_data(parse_form_data=True), b'')
        self.assert_equal(req.form['foo'], u'Hello World')
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_shallow_mode(self):
        request = wrappers.Request({'QUERY_STRING': 'foo=bar'}, shallow=True)
        self.assert_equal(request.args['foo'], 'bar')
        self.assert_raises(RuntimeError, lambda: request.form['foo'])
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_form_parsing_failed(self):
        data = (
            b'--blah\r\n'
        )
        data = wrappers.Request.from_values(input_stream=BytesIO(data),
                                            content_length=len(data),
                                            content_type='multipart/form-data; boundary=foo',
                                            method='POST')
        assert not data.files
        assert not data.form
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_url_charset_reflection(self):
        req = wrappers.Request.from_values()
        req.charset = 'utf-7'
        self.assert_equal(req.url_charset, 'utf-7')
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_response_iter_wrapping(self):
        def uppercasing(iterator):
            for item in iterator:
                yield item.upper()
        def generator():
            yield 'foo'
            yield 'bar'
        req = wrappers.Request.from_values()
        resp = wrappers.Response(generator())
        del resp.headers['Content-Length']
        resp.response = uppercasing(resp.iter_encoded())
        actual_resp = wrappers.Response.from_app(resp, req.environ, buffered=True)
        self.assertEqual(actual_resp.get_data(), b'FOOBAR')
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_other_method_payload(self):
        data = b'Hello World'
        req = wrappers.Request.from_values(input_stream=BytesIO(data),
                                           content_length=len(data),
                                           content_type='text/plain',
                                           method='WHAT_THE_FUCK')
        self.assert_equal(req.get_data(), data)
        self.assert_is_instance(req.stream, LimitedStream)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_form_data_ordering(self):
        class MyRequest(wrappers.Request):
            parameter_storage_class = ImmutableOrderedMultiDict

        req = MyRequest.from_values('/?foo=1&bar=0&foo=3')
        self.assert_equal(list(req.args), ['foo', 'bar'])
        self.assert_equal(list(req.args.items(multi=True)), [
            ('foo', '1'),
            ('bar', '0'),
            ('foo', '3')
        ])
        self.assert_is_instance(req.args, ImmutableOrderedMultiDict)
        self.assert_is_instance(req.values, CombinedMultiDict)
        self.assert_equal(req.values['foo'], '1')
        self.assert_equal(req.values.getlist('foo'), ['1', '3'])
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_modified_url_encoding(self):
        class ModifiedRequest(wrappers.Request):
            url_charset = 'euc-kr'

        req = ModifiedRequest.from_values(u'/?foo=????'.encode('euc-kr'))
        self.assert_strict_equal(req.args['foo'], u'????')
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_reverse_slash_behavior(self):
        class MyRequest(wrappers.ReverseSlashBehaviorRequestMixin, Request):
            pass
        req = MyRequest.from_values('/foo/bar', 'http://example.com/test')
        assert req.url == 'http://example.com/test/foo/bar'
        assert req.path == 'foo/bar'
        assert req.script_root == '/test/'

        # make sure the routing system works with the slashes in
        # reverse order as well.
        map = routing.Map([routing.Rule('/foo/bar', endpoint='foo')])
        adapter = map.bind_to_environ(req.environ)
        assert adapter.match() == ('foo', {})
        adapter = map.bind(req.host, req.script_root)
        assert adapter.match(req.path) == ('foo', {})
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_dynamic_charset_request_mixin(self):
        class MyRequest(wrappers.DynamicCharsetRequestMixin, Request):
            pass
        env = {'CONTENT_TYPE': 'text/html'}
        req = MyRequest(env)
        assert req.charset == 'latin1'

        env = {'CONTENT_TYPE': 'text/html; charset=utf-8'}
        req = MyRequest(env)
        assert req.charset == 'utf-8'

        env = {'CONTENT_TYPE': 'application/octet-stream'}
        req = MyRequest(env)
        assert req.charset == 'latin1'
        assert req.url_charset == 'latin1'

        MyRequest.url_charset = 'utf-8'
        env = {'CONTENT_TYPE': 'application/octet-stream'}
        req = MyRequest(env)
        assert req.charset == 'latin1'
        assert req.url_charset == 'utf-8'

        def return_ascii(x):
            return "ascii"
        env = {'CONTENT_TYPE': 'text/plain; charset=x-weird-charset'}
        req = MyRequest(env)
        req.unknown_charset = return_ascii
        assert req.charset == 'ascii'
        assert req.url_charset == 'utf-8'
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def application(environ, start_responseonse):
    request = Request(environ)
    if request.method == 'POST':
        response = stats(request)
    else:
        response = upload_file(request)
    return response(environ, start_responseonse)
项目:Isa-Framework    作者:c0hb1rd    | 项目源码 | 文件源码
def wsgi_app(app, environ, start_response):
    # ?????
    request = Request(environ)

    # ??????????????????????
    response = app.dispatch_request(request)

    # ??????
    return response(environ, start_response)
项目:py    作者:stencila    | 项目源码 | 文件源码
def __call__(self, environ, start_response):
        """
        The WSGI application interface.
        """
        request = Request(environ)
        response = self.handle(request)
        return response(environ, start_response)
项目:py    作者:stencila    | 项目源码 | 文件源码
def request(**kwargs):
    return Request(EnvironBuilder(**kwargs).get_environ())
项目:territoriali-backend    作者:algorithm-ninja    | 项目源码 | 文件源码
def test_parse_body(self):
        request = Request({})
        request.form = {"foo": "bar"}

        body = self.handler.parse_body(request)
        self.assertEqual("bar", body["foo"])
项目:territoriali-backend    作者:algorithm-ninja    | 项目源码 | 文件源码
def test_call(self, name_mock, content_mock, ip_mock):
        handler = TestBaseHandler.DummyHandler()
        env = Environ({"wsgi.input": None})
        request = Request(env)
        request.form = { 'param': 42 }

        res = handler._call(handler.dummy_endpoint, {}, request)
        self.assertEqual(43, res["incremented"])

        Logger.c.execute("SELECT * FROM logs WHERE category = 'HTTP'")
        row = Logger.c.fetchone()
        self.assertIn("1.2.3.4", row[3])
        self.assertIn("dummy_endpoint", row[3])
项目:territoriali-backend    作者:algorithm-ninja    | 项目源码 | 文件源码
def test_call_default(self, name_mock, content_mock, ip_mock):
        handler = TestBaseHandler.DummyHandler()
        env = Environ({"wsgi.input": None})
        request = Request(env)

        res = handler._call(handler.dummy_endpoint, {}, request)
        self.assertEqual(124, res["incremented"])
项目:territoriali-backend    作者:algorithm-ninja    | 项目源码 | 文件源码
def test_call_cast_parameter(self, name_mock, content_mock, ip_mock):
        handler = TestBaseHandler.DummyHandler()
        env = Environ({"wsgi.input": None})
        request = Request(env)
        request.form = { 'param': '42' }

        res = handler._call(handler.dummy_endpoint, {}, request)
        self.assertEqual(43, res["incremented"])
项目:territoriali-backend    作者:algorithm-ninja    | 项目源码 | 文件源码
def test_call_fail_cast_parameter(self, name_mock, content_mock, ip_mock):
        handler = TestBaseHandler.DummyHandler()
        env = Environ({"wsgi.input": None})
        request = Request(env)
        request.form = { 'param': 'nope' }

        with self.assertRaises(BadRequest):
            handler._call(handler.dummy_endpoint, {}, request)
项目:territoriali-backend    作者:algorithm-ninja    | 项目源码 | 文件源码
def test_call_required_args(self, name_mock, content_mock, ip_mock):
        handler = TestBaseHandler.DummyHandler()
        env = Environ({"wsgi.input": None})
        request = Request(env)

        with self.assertRaises(BadRequest) as ex:
            handler._call(handler.required, {}, request)

        response = ex.exception.response
        self.assertIn("MISSING_PARAMETER", response.data.decode())
        self.assertIn("param", response.data.decode())
项目:territoriali-backend    作者:algorithm-ninja    | 项目源码 | 文件源码
def test_call_with_error(self, name_mock, content_mock, ip_mock):
        handler = TestBaseHandler.DummyHandler()
        env = Environ({"wsgi.input": None})
        request = Request(env)

        with self.assertRaises(Forbidden) as ex:
            handler._call(handler.required, {'param': 42}, request)

        response = ex.exception.response
        self.assertIn("NOBUONO", response.data.decode())
        self.assertIn("nononono", response.data.decode())
项目:territoriali-backend    作者:algorithm-ninja    | 项目源码 | 文件源码
def test_call_general_attrs(self, name_mock, content_mock, ip_mock):
        handler = TestBaseHandler.DummyHandler()
        env = Environ({"wsgi.input": None})
        request = Request(env)

        res = handler._call(handler.myip, {}, request)
        self.assertEqual("1.2.3.4", res)
项目:territoriali-backend    作者:algorithm-ninja    | 项目源码 | 文件源码
def test_call_file(self):
        handler = TestBaseHandler.DummyHandler()
        env = Environ({"wsgi.input": None})
        request = Request(env)
        request.files = {"file": FileStorage(filename="foo")}

        res = handler._call(handler.file, {}, request)
        self.assertEqual("foo", res)
项目:territoriali-backend    作者:algorithm-ninja    | 项目源码 | 文件源码
def test_call_with_decorators(self):
        handler = TestBaseHandler.DummyHandler()
        env = Environ({"wsgi.input": None})
        request = Request(env)
        Database.add_user("token", "", "")
        Database.add_task("poldo", "", "", 1, 1)
        Database.add_input("inputid", "token", "poldo", 1, "", 42)
        Database.add_output("outputid", "inputid", "", 42, "")

        handler._call(handler.with_decorators, {"input_id": "inputid", "output_id":"outputid"}, request)
项目:territoriali-backend    作者:algorithm-ninja    | 项目源码 | 文件源码
def test_get_file_name_no_file(self):
        request = Request(Environ())
        request.files = {}

        self.assertIsNone(BaseHandler._get_file_name(request))
项目:territoriali-backend    作者:algorithm-ninja    | 项目源码 | 文件源码
def test_get_file_content(self):
        request = Request(Environ())
        stream = _io.BytesIO("hello world".encode())
        request.files = {"file": FileStorage(stream=stream, filename="foo")}

        self.assertEqual("hello world", BaseHandler._get_file_content(request).decode())