Python sanic.response 模块,json() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sanic.response.json()

项目:sanic-auth    作者:pyx    | 项目源码 | 文件源码
def handle_no_auth(request):
    return response.json(dict(message='unauthorized'), status=401)
项目:sanic-auth    作者:pyx    | 项目源码 | 文件源码
def api_profile(request, user):
    return response.json(dict(id=user.id, name=user.name))
项目:sanic_crud    作者:Typhon66    | 项目源码 | 文件源码
def _validate_field_types(self, request):
        shortcuts = self.model.shortcuts
        response_messages = self.config.response_messages
        fields = shortcuts.editable_fields
        request_data = request.json

        for key, value in request_data.items():
            expected_type = fields.get(key).db_field

            if expected_type in ['int', 'bool']:
                try:
                    int(value)
                except (ValueError, TypeError):
                    if expected_type == 'int':
                        message = response_messages.ErrorTypeInteger.format(value)
                    else:
                        message = response_messages.ErrorTypeBoolean.format(value)

                    return self.response_json(status_code=400, message=message)

        return True
项目:sanic_crud    作者:Typhon66    | 项目源码 | 文件源码
def _validate_field_size(self, request):
        shortcuts = self.model.shortcuts
        response_messages = self.config.response_messages
        request_data = request.json

        for key, value in request_data.items():
            field_type = shortcuts.editable_fields.get(key).db_field
            if field_type == 'int':
                min_size = -2147483647
                max_size = 2147483647
            elif field_type == 'bigint':
                min_size = -9223372036854775808
                max_size = 9223372036854775807
            else:
                continue

            if not min_size <= value <= max_size:
                return self.response_json(status_code=400,
                                          message=response_messages.ErrorFieldOutOfRange.format(key, min_size, max_size))

        return True
项目:toshi-admin-service    作者:toshiapp    | 项目源码 | 文件源码
def post_login(request):
    token = request.json['auth_token']
    url = '{}/v1/login/verify/{}'.format(ID_SERVICE_LOGIN_URL, token)
    resp = await app.http.get(url)
    if resp.status != 200:
        raise SanicException("Login Failed", status_code=401)

    user = await resp.json()
    toshi_id = user['toshi_id']
    session_id = generate_session_id()
    async with app.pool.acquire() as con:
        admin = await con.fetchrow("SELECT * FROM admins WHERE toshi_id = $1", toshi_id)
        if admin:
            await con.execute("INSERT INTO sessions (session_id, toshi_id) VALUES ($1, $2)",
                              session_id, toshi_id)
    if admin:
        response = json_response(user)
        response.cookies['session'] = session_id
        #response.cookies['session']['secure'] = True
        return response
    else:
        toshi_log.info("Invalid login from: {}".format(toshi_id))
        raise SanicException("Login Failed", status_code=401)
项目:annotated-py-sanic    作者:hhstore    | 项目源码 | 文件源码
def test_storage():
    app = Sanic('test_text')

    @app.middleware('request')
    def store(request):
        request['user'] = 'sanic'
        request['sidekick'] = 'tails'
        del request['sidekick']

    @app.route('/')
    def handler(request):
        return json({ 'user': request.get('user'), 'sidekick': request.get('sidekick') })

    request, response = sanic_endpoint_test(app)

    response_json = loads(response.text)
    assert response_json['user'] == 'sanic'
    assert response_json.get('sidekick') is None
项目:sanic-openapi    作者:channelcat    | 项目源码 | 文件源码
def test_list_default():
    app = Sanic('test_get')

    app.blueprint(openapi_blueprint)

    @app.put('/test')
    @doc.consumes(doc.List(int, description="All the numbers"), location="body")
    def test(request):
        return json({"test": True})

    request, response = app.test_client.get('/openapi/spec.json')

    response_schema = json_loads(response.body.decode())
    parameter = response_schema['paths']['/test']['put']['parameters'][0]

    assert response.status == 200
    assert parameter['type'] == 'array'
    assert parameter['items']['type'] == 'integer'
项目:jawaf    作者:danpozmanter    | 项目源码 | 文件源码
def __call__(self, view_func, *_args, **_kwargs):
        async def wrapped_view(*args, **kwargs):
            request = None
            if 'request' in kwargs:
                request = kwags['request']
            for arg in args:
                if type(arg) == Request:
                    request = arg
                    break
            if not request:
                raise ServerError('No request found!')
            if request['session'].get('user', None):
                return await view_func(*args, **kwargs)
            if self.redirect:
                return login_redirect(request)
            return json({'message': 'access denied'}, status=403)
        return wrapped_view
项目:jawaf    作者:danpozmanter    | 项目源码 | 文件源码
def delete(self, request, table_name=None):
        """Delete endpoint.
        :param request: Sanic Request.
        :param table_name: Name of the table to access.
        """
        if not check_csrf(request):
            return json({'message': 'access denied'}, status=403)
        waf = get_jawaf()
        table = registry.get(table_name)
        target_id = request.json.get('id', None)
        if not target_id:
            return json({'message': 'no id'}, status=400)
        if not table:
            return json({'message': 'access denied'}, status=403)
        async with Connection(table['database']) as con:
            stmt = table['table'].delete().where(table['table'].c.id==target_id)
            await con.execute(stmt)
        await add_audit_action('delete', 'admin', table_name, request['session']['user'])
        return json({'message': 'success'}, status=200)
项目:jawaf    作者:danpozmanter    | 项目源码 | 文件源码
def get(self, request, table_name=None):
        """Get endpoint. Retrieve one object by id (url param `id=`)
        :param request: Sanic Request.
        :param table_name: Name of the table to access.
        """
        waf = get_jawaf()
        table = registry.get(table_name)
        try:
            target_id = int(request.raw_args.get('id', None))
        except:
            return json({'message': 'no id'}, status=400)
        if not table:
            return json({'message': 'access denied'}, status=403)
        async with Connection(table['database']) as con:
            query = sa.select('*').select_from(table['table']).where(table['table'].c.id==target_id)
            result = await con.fetchrow(query)
            if not result:
                return json({'message': 'not found', 'data': None}, status=404)
            return json({'message': 'success', 'data': result}, status=200)
项目:jawaf    作者:danpozmanter    | 项目源码 | 文件源码
def patch(self, request, table_name=None):
        """Patch endpoint. Partially edit a row.
        :param request: Sanic Request.
        :param table_name: Name of the table to access.
        """
        if not check_csrf(request):
            return json({'message': 'access denied'}, status=403)
        waf = get_jawaf()
        table = registry.get(table_name)
        target_id = request.json.get('id', None)
        if not target_id:
            return json({'message': 'no id'}, status=400)
        if not table:
            return json({'message': 'access denied'}, status=403)
        request.json.pop('id')
        async with Connection(table['database']) as con:
            stmt = table['table'].update().where(table['table'].c.id==target_id).values(**request.json)
            await con.execute(stmt)
        await add_audit_action('put', 'admin', table_name, request['session']['user'])
        return json({'message': 'success'}, status=200)
项目:jawaf    作者:danpozmanter    | 项目源码 | 文件源码
def post(self, request, table_name=None):
        """Post endpoint. Create a new row.
        :param request: Sanic Request.
        :param table_name: Name of the table to access.
        """
        if not check_csrf(request):
            return json({'message': 'access denied'}, status=403)
        waf = get_jawaf()
        table = registry.get(table_name)
        if not table:
            return json({'message': 'access denied'}, status=403)
        async with Connection(table['database']) as con:
            stmt = table['table'].insert().values(**request.json)
            await con.execute(stmt)
        await add_audit_action('post', 'admin', table_name, request['session']['user'])
        return json({'message': 'success'}, status=201)
项目:sanic-jwt    作者:ahopkins    | 项目源码 | 文件源码
def scoped(scopes, require_all=True, require_all_actions=True):
    def decorator(f):
        @wraps(f)
        async def decorated_function(request, *args, **kwargs):
            # Retrieve the scopes from the payload
            user_scopes = request.app.auth.retrieve_scopes(request)
            if user_scopes is None:
                # If there are no defined scopes in the payload, deny access
                is_authorized = False
            else:
                is_authorized = validate_scopes(request, scopes, user_scopes, require_all, require_all_actions)

            if is_authorized:
                # the user is authorized.
                # run the handler method and return the response
                response = await f(request, *args, **kwargs)
                return response
            else:
                # the user is not authorized.
                return json({
                    'status': 'not_authorized',
                }, 403)
            return response
        return decorated_function
    return decorator
项目:sanic-jwt    作者:ahopkins    | 项目源码 | 文件源码
def get_token_reponse(request, access_token, output, refresh_token=None):
    response = json(output)

    if request.app.config.SANIC_JWT_COOKIE_SET:
        key = request.app.config.SANIC_JWT_COOKIE_TOKEN_NAME
        response.cookies[key] = str(access_token, 'utf-8')
        response.cookies[key]['domain'] = request.app.config.SANIC_JWT_COOKIE_DOMAIN
        response.cookies[key]['httponly'] = request.app.config.SANIC_JWT_COOKIE_HTTPONLY

        if refresh_token and request.app.config.SANIC_JWT_REFRESH_TOKEN_ENABLED:
            key = request.app.config.SANIC_JWT_COOKIE_REFRESH_TOKEN_NAME
            response.cookies[key] = refresh_token
            response.cookies[key]['domain'] = request.app.config.SANIC_JWT_COOKIE_DOMAIN
            response.cookies[key]['httponly'] = request.app.config.SANIC_JWT_COOKIE_HTTPONLY

    return response
项目:homingbot    作者:homingbot    | 项目源码 | 文件源码
def get_messages(id):
    result = {}
    try:
        messages = mail.Message.filter(address=id)
        if (len(messages) == 0):
            raise NotFound('could not find messages for email %s' % ( id))

        result['count'] = len(messages)
        msg_list = []
        for msg in messages:
            msg_list.append(msg.toJson())
        result['messages'] = msg_list
    except (mail.Message.DoesNotExist, KeyError):
        raise NotFound('could not find messages for email %s' % ( id))
    except (mail.Email.DoesNotExist, KeyError):
        raise NotFound('could not find email with address %s' % (id))
    return json(result)
项目:homingbot    作者:homingbot    | 项目源码 | 文件源码
def get(request):
    '''
    Generate x number of emails
    '''
    count = int(get_post_param(request, 'count'))
    try:
        if count <= 0:
            raise InvalidUsage('count must be greater than 0')
    except KeyError:
        raise InvalidUsage('count must be present')
    live_emails = mail.Email.all()
    logger.debug("Generating %s emails.", count)
    email_gen = utils.generate_emails(count)
    emails = []
    addresses = []
    for n in range(count):
        email = next(email_gen)
        emails.append(email)
        addresses.append(email.address)
    db_service.batch_save(emails)
    return json({'accounts': addresses, "total_active": len(live_emails) + count}, status = 201)
项目:python-tarantool-benchmark-and-bootstrap    作者:valentinmk    | 项目源码 | 文件源码
def stickers_api_packs_stickers(self, request, packs, stickers):
        #packs = parse.unquote(packs)
        #stickers = parse.unquote(stickers)
        resp = {"packs": None,
                "stickers": None}
        resp["packs"] = packs
        resp["stickers"] = stickers
        resp.update({"comment": "Get sticker info"})
        resp.update({"name": resp["stickers"]})
        print(resp["stickers"])
        sticker = await self.db.get_stickers_by_name(resp["stickers"])
        print(sticker)
        resp.update({"url": sticker[0][2]})
        resp.update({"id": sticker[0][0]})
        resp.update({"rating": sticker[0][1]})
        resp.update({"pack_url": sticker[0][3]})
        return json(resp)
项目:covador    作者:baverman    | 项目源码 | 文件源码
def test_form(test_cli):
    resp = yield from test_cli.post('/form/', data={'a': 2})
    assert resp.status == 200
    resp_json = yield from resp.json()
    assert resp_json == {'a': 2}

    body, mct = helpers.encode_multipart([('a', '2')])
    resp = yield from test_cli.post('/form/', data=body,
                                    headers={'Content-Type': mct})
    assert resp.status == 200
    resp_json = yield from resp.json()
    assert resp_json == {'a': 2}

    resp = yield from test_cli.post('/cbv/', data={'a': 2})
    assert resp.status == 200
    resp_json = yield from resp.json()
    assert resp_json == {'a': 2}
项目:covador    作者:baverman    | 项目源码 | 文件源码
def test_form(test_cli):
    resp = yield from test_cli.post('/form/', data={'a': 2})
    assert resp.status == 200
    resp_json = yield from resp.json()
    assert resp_json == {'a': 2}

    body, mct = helpers.encode_multipart([('a', '2')])
    resp = yield from test_cli.post('/form/', data=body,
                                    headers={'Content-Type': mct})
    assert resp.status == 200
    resp_json = yield from resp.json()
    assert resp_json == {'a': 2}

    resp = yield from test_cli.post('/cbv/', data={'a': 2})
    assert resp.status == 200
    resp_json = yield from resp.json()
    assert resp_json == {'a': 2}
项目:covador    作者:baverman    | 项目源码 | 文件源码
def test_form(test_cli):
    resp = yield from test_cli.post('/form/', data={'a': 2})
    assert resp.status == 200
    resp_json = yield from resp.json()
    assert resp_json == {'a': 2}

    body, mct = helpers.encode_multipart([('a', '2')])
    resp = yield from test_cli.post('/form/', data=body,
                                    headers={'Content-Type': mct})
    assert resp.status == 200
    resp_json = yield from resp.json()
    assert resp_json == {'a': 2}

    resp = yield from test_cli.post('/cbv/', data={'a': 2})
    assert resp.status == 200
    resp_json = yield from resp.json()
    assert resp_json == {'a': 2}
项目:owllook    作者:howie6879    | 项目源码 | 文件源码
def owllook_logout(request):
    """
    ????
    :param request:
    :return:
        :   0   ????
        :   1   ????
    """
    user = request['session'].get('user', None)
    if user:
        response = json({'status': 1})
        del response.cookies['user']
        del response.cookies['owl_sid']
        return response
    else:
        return json({'status': 0})
项目:owllook    作者:howie6879    | 项目源码 | 文件源码
def owllook_delete_bookmark(request):
    """
    ????
    :param request:
    :return:
        :   -1  ??session??  ??????
        :   0   ??????
        :   1   ??????
    """
    user = request['session'].get('user', None)
    data = parse_qs(str(request.body, encoding='utf-8'))
    bookmarkurl = data.get('bookmarkurl', '')
    if user and bookmarkurl:
        bookmark = unquote(bookmarkurl[0])
        try:
            motor_db = motor_base.get_db()
            await motor_db.user_message.update_one({'user': user},
                                                   {'$pull': {'bookmarks': {"bookmark": bookmark}}})
            LOGGER.info('??????')
            return json({'status': 1})
        except Exception as e:
            LOGGER.exception(e)
            return json({'status': 0})
    else:
        return json({'status': -1})
项目:owllook    作者:howie6879    | 项目源码 | 文件源码
def change_email(request):
    """
    ??????
    :param request:
    :return:
        :   -1  ??session??  ??????
        :   0   ??????
        :   1   ??????
    """
    user = request['session'].get('user', None)
    data = parse_qs(str(request.body, encoding='utf-8'))
    if user:
        try:
            email = data.get('email', None)[0]
            motor_db = motor_base.get_db()
            await motor_db.user.update_one({'user': user},
                                           {'$set': {'email': email}})
            LOGGER.info('??????')
            return json({'status': 1})
        except Exception as e:
            LOGGER.exception(e)
            return json({'status': 0})
    else:
        return json({'status': -1})
项目:sanic    作者:channelcat    | 项目源码 | 文件源码
def authorized():
    def decorator(f):
        @wraps(f)
        async def decorated_function(request, *args, **kwargs):
            # run some method that checks the request
            # for the client's authorization status
            is_authorized = check_request_for_authorization_status(request)

            if is_authorized:
                # the user is authorized.
                # run the handler method and return the response
                response = await f(request, *args, **kwargs)
                return response
            else:
                # the user is not authorized.
                return json({'status': 'not_authorized'}, 403)
        return decorated_function
    return decorator
项目:sanic    作者:channelcat    | 项目源码 | 文件源码
def test_utf8_post_json():
    app = Sanic('test_utf8_post_json')

    @app.route('/')
    async def handler(request):
        return text('OK')

    payload = {'test': '?'}
    headers = {'content-type': 'application/json'}

    request, response = app.test_client.get(
        '/',
        data=json_dumps(payload), headers=headers)

    assert request.json.get('test') == '?'
    assert response.text == 'OK'
项目:sanic    作者:channelcat    | 项目源码 | 文件源码
def test_match_info():
    app = Sanic('test_match_info')

    @app.route('/api/v1/user/<user_id>/')
    async def handler(request, user_id):
        return json(request.match_info)

    request, response = app.test_client.get('/api/v1/user/sanic_user/')

    assert request.match_info == {"user_id": "sanic_user"}
    assert json_loads(response.text) == {"user_id": "sanic_user"}


# ------------------------------------------------------------ #
#  POST
# ------------------------------------------------------------ #
项目:sanic    作者:channelcat    | 项目源码 | 文件源码
def test_method_not_allowed():
    app = Sanic('method_not_allowed')

    @app.get('/')
    async def test(request):
        return response.json({'hello': 'world'})

    request, response = app.test_client.head('/')
    assert response.headers['Allow']== 'GET'

    @app.post('/')
    async def test(request):
        return response.json({'hello': 'world'})

    request, response = app.test_client.head('/')
    assert response.status == 405
    assert set(response.headers['Allow'].split(', ')) == set(['GET', 'POST'])
    assert response.headers['Content-Length'] == '0'
项目:sanic    作者:channelcat    | 项目源码 | 文件源码
def test_app_injection():
    app = Sanic('test_app_injection')
    expected = random.choice(range(0, 100))

    @app.listener('after_server_start')
    async def inject_data(app, loop):
        app.injected = expected

    @app.get('/')
    async def handler(request):
        return json({'injected': request.app.injected})

    request, response = app.test_client.get('/')

    response_json = loads(response.text)
    assert response_json['injected'] == expected
项目:WebGames    作者:Julien00859    | 项目源码 | 文件源码
def signin(req):
    if any(map(lambda key: key not in req.json, ["login", "password"])):
        logger.debug(f"Request is {req.json} but some arguments are missing.")
        raise InvalidUsage("Missing argument")

    user = await User.get_by_login(req.json["login"])
    if user is None:
        logger.debug(f"Request is {req.json} but user coundn't be found.")
        raise NotFound("User not found")

    if await accounts.is_frozen(user.id, req.ip):
        logger.debug(f"Request is {req.json} but the account is frozen.")
        raise InvalidUsage("Account frozen")

    if not compare_digest(user.password, User.hashpwd(req.json["password"])):
        logger.debug(f"Request is {req.json} but the password is invalid.")
        unfreeze = await accounts.freeze(user.id, req.ip)
        raise InvalidUsage("Invalid password. Account frozen until " + unfreeze.isoformat(sep=" ", timespec="seconds"))

    await accounts.unfreeze(user.id, req.ip)
    token = await accounts.register(user.id)
    logger.info(f"User {user.name} connected. Token generated: {token}")
    return json({"token": token, "id": user.id, "name": user.name})
项目:WebGames    作者:Julien00859    | 项目源码 | 文件源码
def refresh(req):
    if "id" not in req.json or "token" not in req.json:
        logger.debug(f"Request is {req.json} but some arguments are missing.")
        raise InvalidUsage("Missing argument")

    user = User.get_by_id(req.json["id"])
    if user is None:
        logger.debug(f"Request is {req.json} but user coundn't be found.")
        raise NotFound("User not found")

    if await accounts.is_frozen(user.id, req.ip):
        logger.debug(f"Request is {req.json} but the account is frozen.")
        raise InvalidUsage("Account frozen")

    if not await accounts.is_valid(req.json["id"], req.json["token"]):
        logger.debug(f"Request is {req.json} but the token is invalid or expirated.")
        await accounts.freeze(user.id, req.ip)
        raise NotFound("Token not found or expirated")

    await accounts.unfreeze(user.id, req.ip)
    logger.info(f"User {user.name} refreshed it's token: {req.json['token']}")
    return json({"token": req.json["token"], "id": user.id, "name": user.name})
项目:sirene    作者:etalab    | 项目源码 | 文件源码
def info_view(request, name, value):
    """Retrieve JSON or CSV encoded items give the name/value filter."""
    limit = int(request.args.get('limit', 3))
    offset = int(request.args.get('offset', 0))
    format = request.args.get('format', 'json')
    logger.info('?? Requesting {0} (offset={1}) items in {2}'.format(
                limit, offset, format))
    columns = request.args.get('columns')
    column_names = columns and columns.split(',') or app.config.columns
    sirets = retrieve_sirets(name, value, offset=offset, limit=limit)
    if not sirets:
        raise ServerError('?? No entry found for {0}/{1}'.format(name, value))
    logger.info('?? Retrieving {0} results for the {1}/{2} couple'.format(
                len(sirets), name, value))
    logger.info('?? Returning {0} results in {1}'.format(len(sirets), format))
    rows = [decode_siret(retrieve_siret(siret), column_names)
            for siret in sirets]
    return format_response(rows, format, column_names)
项目:sanic_crud    作者:Typhon66    | 项目源码 | 文件源码
def response_json(data=None, status_code=None, message=None, page=None, total_pages=None):
        response_data = {
            'data': data,
            'status_code': status_code,
            'message': message
        }

        if page:
            response_data['page'] = page
        if total_pages:
            response_data['total_pages'] = total_pages

        return json(response_data, status=status_code)
项目:sanic_crud    作者:Typhon66    | 项目源码 | 文件源码
def _validate_json(self, request):
        try:
            valid = request.json
            return True
        except Exception:
            return self.response_json(status_code=400,
                                      message=self.config.response_messages.ErrorInvalidJSON)
项目:sanic_crud    作者:Typhon66    | 项目源码 | 文件源码
def _validate_primary_key_immutable(self, request):
        if self.model.shortcuts.primary_key in request.json:
            return self.response_json(status_code=400,
                                      message=self.config.response_messages.ErrorPrimaryKeyUpdateInsert)
        else:
            return True
项目:sanic_crud    作者:Typhon66    | 项目源码 | 文件源码
def _validate_fields(self, request):
        fields = self.model.shortcuts.editable_fields
        request_data = request.json

        for key in request_data:
            if key not in fields:
                return self.response_json(status_code=400,
                                          message=self.config.response_messages.ErrorInvalidField.format(key, fields.keys()))

        return True
项目:sanic_crud    作者:Typhon66    | 项目源码 | 文件源码
def _generate_base_route(model_array):
    tables = {}

    for model in model_array:
        fields = []
        table_name = model.shortcuts.table_name
        required_fields = model.shortcuts.required_fields
        for field, field_object in model.shortcuts.fields.items():
            field_name = field_object.name
            field_type = field_object.get_db_field()
            is_required = field_name in required_fields or field_type == 'primary_key'

            fields.append({
                'field_name': field_name,
                'field_type': field_type,
                'is_required': is_required
            })

        tables[table_name] = {
            'route_url': model.route_url if hasattr(model, 'route_url') else '/{}'.format(table_name),
            'fields': fields
        }

    async def base_route(request):
        response_data = {
            'data': {'routes': tables},
            'status_code': 200,
            'message': 'OK'
        }

        return json(response_data, status=200)

    return base_route
项目:chrome-prerender    作者:bosondata    | 项目源码 | 文件源码
def list_browser_pages(request):
    renderer = request.app.prerender
    pages = await renderer.pages()
    return response.json(pages, ensure_ascii=False, indent=2, escape_forward_slashes=False)
项目:chrome-prerender    作者:bosondata    | 项目源码 | 文件源码
def show_brower_version(request):
    renderer = request.app.prerender
    version = await renderer.version()
    return response.json(version, ensure_ascii=False, indent=2, escape_forward_slashes=False)
项目:chrome-prerender    作者:bosondata    | 项目源码 | 文件源码
def disable_browser_rendering(request):
    global CONCURRENCY

    CONCURRENCY = 0
    return response.json({'message': 'success'})
项目:chrome-prerender    作者:bosondata    | 项目源码 | 文件源码
def enable_browser_rendering(request):
    global CONCURRENCY

    CONCURRENCY = int(os.environ.get('CONCURRENCY', cpu_count() * 2))
    return response.json({'message': 'success'})
项目:annotated-py-sanic    作者:hhstore    | 项目源码 | 文件源码
def test_async(request):
    return json({"test": True})
项目:annotated-py-sanic    作者:hhstore    | 项目源码 | 文件源码
def test_sync(request):
    return json({"test": True})
项目:annotated-py-sanic    作者:hhstore    | 项目源码 | 文件源码
def test(request, exception):
    return json({"exception": "{}".format(exception), "status": exception.status_code}, status=exception.status_code)


# ----------------------------------------------- #
# Read from request
# ----------------------------------------------- #
项目:annotated-py-sanic    作者:hhstore    | 项目源码 | 文件源码
def post_json(request):
    return json({"received": True, "message": request.json})
项目:annotated-py-sanic    作者:hhstore    | 项目源码 | 文件源码
def post_json(request):
    return json({"received": True, "form_data": request.form, "test": request.form.get('test')})
项目:annotated-py-sanic    作者:hhstore    | 项目源码 | 文件源码
def test(request):
    log.info("Received GET /")
    return json(await expensive_call())
项目:annotated-py-sanic    作者:hhstore    | 项目源码 | 文件源码
def foo(request):
    return json({'msg': 'hi from blueprint'})
项目:annotated-py-sanic    作者:hhstore    | 项目源码 | 文件源码
def foo2(request):
    return json({'msg': 'hi from blueprint2'})


#
# ????:
#   - ?? flask
#
项目:annotated-py-sanic    作者:hhstore    | 项目源码 | 文件源码
def handle(request):
    async with engine.acquire() as conn:
        result = []
        async for row in conn.execute(polls.select()):
            result.append({"question": row.question, "pub_date": row.pub_date})
        return json({"polls": result})
项目:annotated-py-sanic    作者:hhstore    | 项目源码 | 文件源码
def fetch(session, url):
    """
    Use session object to perform 'get' request on url
    """
    async with session.get(url) as response:
        return await response.json()


#
# ???? HTTP ????:
#   - ???? HTTP ??
#