我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sanic.response.json()。
def handle_no_auth(request): return response.json(dict(message='unauthorized'), status=401)
def api_profile(request, user): return response.json(dict(id=user.id, name=user.name))
def _validate_field_types(self, request): shortcuts = self.model.shortcuts response_messages = self.config.response_messages fields = shortcuts.editable_fields request_data = request.json for key, value in request_data.items(): expected_type = fields.get(key).db_field if expected_type in ['int', 'bool']: try: int(value) except (ValueError, TypeError): if expected_type == 'int': message = response_messages.ErrorTypeInteger.format(value) else: message = response_messages.ErrorTypeBoolean.format(value) return self.response_json(status_code=400, message=message) return True
def _validate_field_size(self, request): shortcuts = self.model.shortcuts response_messages = self.config.response_messages request_data = request.json for key, value in request_data.items(): field_type = shortcuts.editable_fields.get(key).db_field if field_type == 'int': min_size = -2147483647 max_size = 2147483647 elif field_type == 'bigint': min_size = -9223372036854775808 max_size = 9223372036854775807 else: continue if not min_size <= value <= max_size: return self.response_json(status_code=400, message=response_messages.ErrorFieldOutOfRange.format(key, min_size, max_size)) return True
def post_login(request): token = request.json['auth_token'] url = '{}/v1/login/verify/{}'.format(ID_SERVICE_LOGIN_URL, token) resp = await app.http.get(url) if resp.status != 200: raise SanicException("Login Failed", status_code=401) user = await resp.json() toshi_id = user['toshi_id'] session_id = generate_session_id() async with app.pool.acquire() as con: admin = await con.fetchrow("SELECT * FROM admins WHERE toshi_id = $1", toshi_id) if admin: await con.execute("INSERT INTO sessions (session_id, toshi_id) VALUES ($1, $2)", session_id, toshi_id) if admin: response = json_response(user) response.cookies['session'] = session_id #response.cookies['session']['secure'] = True return response else: toshi_log.info("Invalid login from: {}".format(toshi_id)) raise SanicException("Login Failed", status_code=401)
def test_storage(): app = Sanic('test_text') @app.middleware('request') def store(request): request['user'] = 'sanic' request['sidekick'] = 'tails' del request['sidekick'] @app.route('/') def handler(request): return json({ 'user': request.get('user'), 'sidekick': request.get('sidekick') }) request, response = sanic_endpoint_test(app) response_json = loads(response.text) assert response_json['user'] == 'sanic' assert response_json.get('sidekick') is None
def test_list_default(): app = Sanic('test_get') app.blueprint(openapi_blueprint) @app.put('/test') @doc.consumes(doc.List(int, description="All the numbers"), location="body") def test(request): return json({"test": True}) request, response = app.test_client.get('/openapi/spec.json') response_schema = json_loads(response.body.decode()) parameter = response_schema['paths']['/test']['put']['parameters'][0] assert response.status == 200 assert parameter['type'] == 'array' assert parameter['items']['type'] == 'integer'
def __call__(self, view_func, *_args, **_kwargs): async def wrapped_view(*args, **kwargs): request = None if 'request' in kwargs: request = kwags['request'] for arg in args: if type(arg) == Request: request = arg break if not request: raise ServerError('No request found!') if request['session'].get('user', None): return await view_func(*args, **kwargs) if self.redirect: return login_redirect(request) return json({'message': 'access denied'}, status=403) return wrapped_view
def delete(self, request, table_name=None): """Delete endpoint. :param request: Sanic Request. :param table_name: Name of the table to access. """ if not check_csrf(request): return json({'message': 'access denied'}, status=403) waf = get_jawaf() table = registry.get(table_name) target_id = request.json.get('id', None) if not target_id: return json({'message': 'no id'}, status=400) if not table: return json({'message': 'access denied'}, status=403) async with Connection(table['database']) as con: stmt = table['table'].delete().where(table['table'].c.id==target_id) await con.execute(stmt) await add_audit_action('delete', 'admin', table_name, request['session']['user']) return json({'message': 'success'}, status=200)
def get(self, request, table_name=None): """Get endpoint. Retrieve one object by id (url param `id=`) :param request: Sanic Request. :param table_name: Name of the table to access. """ waf = get_jawaf() table = registry.get(table_name) try: target_id = int(request.raw_args.get('id', None)) except: return json({'message': 'no id'}, status=400) if not table: return json({'message': 'access denied'}, status=403) async with Connection(table['database']) as con: query = sa.select('*').select_from(table['table']).where(table['table'].c.id==target_id) result = await con.fetchrow(query) if not result: return json({'message': 'not found', 'data': None}, status=404) return json({'message': 'success', 'data': result}, status=200)
def patch(self, request, table_name=None): """Patch endpoint. Partially edit a row. :param request: Sanic Request. :param table_name: Name of the table to access. """ if not check_csrf(request): return json({'message': 'access denied'}, status=403) waf = get_jawaf() table = registry.get(table_name) target_id = request.json.get('id', None) if not target_id: return json({'message': 'no id'}, status=400) if not table: return json({'message': 'access denied'}, status=403) request.json.pop('id') async with Connection(table['database']) as con: stmt = table['table'].update().where(table['table'].c.id==target_id).values(**request.json) await con.execute(stmt) await add_audit_action('put', 'admin', table_name, request['session']['user']) return json({'message': 'success'}, status=200)
def post(self, request, table_name=None): """Post endpoint. Create a new row. :param request: Sanic Request. :param table_name: Name of the table to access. """ if not check_csrf(request): return json({'message': 'access denied'}, status=403) waf = get_jawaf() table = registry.get(table_name) if not table: return json({'message': 'access denied'}, status=403) async with Connection(table['database']) as con: stmt = table['table'].insert().values(**request.json) await con.execute(stmt) await add_audit_action('post', 'admin', table_name, request['session']['user']) return json({'message': 'success'}, status=201)
def scoped(scopes, require_all=True, require_all_actions=True): def decorator(f): @wraps(f) async def decorated_function(request, *args, **kwargs): # Retrieve the scopes from the payload user_scopes = request.app.auth.retrieve_scopes(request) if user_scopes is None: # If there are no defined scopes in the payload, deny access is_authorized = False else: is_authorized = validate_scopes(request, scopes, user_scopes, require_all, require_all_actions) if is_authorized: # the user is authorized. # run the handler method and return the response response = await f(request, *args, **kwargs) return response else: # the user is not authorized. return json({ 'status': 'not_authorized', }, 403) return response return decorated_function return decorator
def get_token_reponse(request, access_token, output, refresh_token=None): response = json(output) if request.app.config.SANIC_JWT_COOKIE_SET: key = request.app.config.SANIC_JWT_COOKIE_TOKEN_NAME response.cookies[key] = str(access_token, 'utf-8') response.cookies[key]['domain'] = request.app.config.SANIC_JWT_COOKIE_DOMAIN response.cookies[key]['httponly'] = request.app.config.SANIC_JWT_COOKIE_HTTPONLY if refresh_token and request.app.config.SANIC_JWT_REFRESH_TOKEN_ENABLED: key = request.app.config.SANIC_JWT_COOKIE_REFRESH_TOKEN_NAME response.cookies[key] = refresh_token response.cookies[key]['domain'] = request.app.config.SANIC_JWT_COOKIE_DOMAIN response.cookies[key]['httponly'] = request.app.config.SANIC_JWT_COOKIE_HTTPONLY return response
def get_messages(id): result = {} try: messages = mail.Message.filter(address=id) if (len(messages) == 0): raise NotFound('could not find messages for email %s' % ( id)) result['count'] = len(messages) msg_list = [] for msg in messages: msg_list.append(msg.toJson()) result['messages'] = msg_list except (mail.Message.DoesNotExist, KeyError): raise NotFound('could not find messages for email %s' % ( id)) except (mail.Email.DoesNotExist, KeyError): raise NotFound('could not find email with address %s' % (id)) return json(result)
def get(request): ''' Generate x number of emails ''' count = int(get_post_param(request, 'count')) try: if count <= 0: raise InvalidUsage('count must be greater than 0') except KeyError: raise InvalidUsage('count must be present') live_emails = mail.Email.all() logger.debug("Generating %s emails.", count) email_gen = utils.generate_emails(count) emails = [] addresses = [] for n in range(count): email = next(email_gen) emails.append(email) addresses.append(email.address) db_service.batch_save(emails) return json({'accounts': addresses, "total_active": len(live_emails) + count}, status = 201)
def stickers_api_packs_stickers(self, request, packs, stickers): #packs = parse.unquote(packs) #stickers = parse.unquote(stickers) resp = {"packs": None, "stickers": None} resp["packs"] = packs resp["stickers"] = stickers resp.update({"comment": "Get sticker info"}) resp.update({"name": resp["stickers"]}) print(resp["stickers"]) sticker = await self.db.get_stickers_by_name(resp["stickers"]) print(sticker) resp.update({"url": sticker[0][2]}) resp.update({"id": sticker[0][0]}) resp.update({"rating": sticker[0][1]}) resp.update({"pack_url": sticker[0][3]}) return json(resp)
def test_form(test_cli): resp = yield from test_cli.post('/form/', data={'a': 2}) assert resp.status == 200 resp_json = yield from resp.json() assert resp_json == {'a': 2} body, mct = helpers.encode_multipart([('a', '2')]) resp = yield from test_cli.post('/form/', data=body, headers={'Content-Type': mct}) assert resp.status == 200 resp_json = yield from resp.json() assert resp_json == {'a': 2} resp = yield from test_cli.post('/cbv/', data={'a': 2}) assert resp.status == 200 resp_json = yield from resp.json() assert resp_json == {'a': 2}
def owllook_logout(request): """ ???? :param request: :return: : 0 ???? : 1 ???? """ user = request['session'].get('user', None) if user: response = json({'status': 1}) del response.cookies['user'] del response.cookies['owl_sid'] return response else: return json({'status': 0})
def owllook_delete_bookmark(request): """ ???? :param request: :return: : -1 ??session?? ?????? : 0 ?????? : 1 ?????? """ user = request['session'].get('user', None) data = parse_qs(str(request.body, encoding='utf-8')) bookmarkurl = data.get('bookmarkurl', '') if user and bookmarkurl: bookmark = unquote(bookmarkurl[0]) try: motor_db = motor_base.get_db() await motor_db.user_message.update_one({'user': user}, {'$pull': {'bookmarks': {"bookmark": bookmark}}}) LOGGER.info('??????') return json({'status': 1}) except Exception as e: LOGGER.exception(e) return json({'status': 0}) else: return json({'status': -1})
def change_email(request): """ ?????? :param request: :return: : -1 ??session?? ?????? : 0 ?????? : 1 ?????? """ user = request['session'].get('user', None) data = parse_qs(str(request.body, encoding='utf-8')) if user: try: email = data.get('email', None)[0] motor_db = motor_base.get_db() await motor_db.user.update_one({'user': user}, {'$set': {'email': email}}) LOGGER.info('??????') return json({'status': 1}) except Exception as e: LOGGER.exception(e) return json({'status': 0}) else: return json({'status': -1})
def authorized(): def decorator(f): @wraps(f) async def decorated_function(request, *args, **kwargs): # run some method that checks the request # for the client's authorization status is_authorized = check_request_for_authorization_status(request) if is_authorized: # the user is authorized. # run the handler method and return the response response = await f(request, *args, **kwargs) return response else: # the user is not authorized. return json({'status': 'not_authorized'}, 403) return decorated_function return decorator
def test_utf8_post_json(): app = Sanic('test_utf8_post_json') @app.route('/') async def handler(request): return text('OK') payload = {'test': '?'} headers = {'content-type': 'application/json'} request, response = app.test_client.get( '/', data=json_dumps(payload), headers=headers) assert request.json.get('test') == '?' assert response.text == 'OK'
def test_match_info(): app = Sanic('test_match_info') @app.route('/api/v1/user/<user_id>/') async def handler(request, user_id): return json(request.match_info) request, response = app.test_client.get('/api/v1/user/sanic_user/') assert request.match_info == {"user_id": "sanic_user"} assert json_loads(response.text) == {"user_id": "sanic_user"} # ------------------------------------------------------------ # # POST # ------------------------------------------------------------ #
def test_method_not_allowed(): app = Sanic('method_not_allowed') @app.get('/') async def test(request): return response.json({'hello': 'world'}) request, response = app.test_client.head('/') assert response.headers['Allow']== 'GET' @app.post('/') async def test(request): return response.json({'hello': 'world'}) request, response = app.test_client.head('/') assert response.status == 405 assert set(response.headers['Allow'].split(', ')) == set(['GET', 'POST']) assert response.headers['Content-Length'] == '0'
def test_app_injection(): app = Sanic('test_app_injection') expected = random.choice(range(0, 100)) @app.listener('after_server_start') async def inject_data(app, loop): app.injected = expected @app.get('/') async def handler(request): return json({'injected': request.app.injected}) request, response = app.test_client.get('/') response_json = loads(response.text) assert response_json['injected'] == expected
def signin(req): if any(map(lambda key: key not in req.json, ["login", "password"])): logger.debug(f"Request is {req.json} but some arguments are missing.") raise InvalidUsage("Missing argument") user = await User.get_by_login(req.json["login"]) if user is None: logger.debug(f"Request is {req.json} but user coundn't be found.") raise NotFound("User not found") if await accounts.is_frozen(user.id, req.ip): logger.debug(f"Request is {req.json} but the account is frozen.") raise InvalidUsage("Account frozen") if not compare_digest(user.password, User.hashpwd(req.json["password"])): logger.debug(f"Request is {req.json} but the password is invalid.") unfreeze = await accounts.freeze(user.id, req.ip) raise InvalidUsage("Invalid password. Account frozen until " + unfreeze.isoformat(sep=" ", timespec="seconds")) await accounts.unfreeze(user.id, req.ip) token = await accounts.register(user.id) logger.info(f"User {user.name} connected. Token generated: {token}") return json({"token": token, "id": user.id, "name": user.name})
def refresh(req): if "id" not in req.json or "token" not in req.json: logger.debug(f"Request is {req.json} but some arguments are missing.") raise InvalidUsage("Missing argument") user = User.get_by_id(req.json["id"]) if user is None: logger.debug(f"Request is {req.json} but user coundn't be found.") raise NotFound("User not found") if await accounts.is_frozen(user.id, req.ip): logger.debug(f"Request is {req.json} but the account is frozen.") raise InvalidUsage("Account frozen") if not await accounts.is_valid(req.json["id"], req.json["token"]): logger.debug(f"Request is {req.json} but the token is invalid or expirated.") await accounts.freeze(user.id, req.ip) raise NotFound("Token not found or expirated") await accounts.unfreeze(user.id, req.ip) logger.info(f"User {user.name} refreshed it's token: {req.json['token']}") return json({"token": req.json["token"], "id": user.id, "name": user.name})
def info_view(request, name, value): """Retrieve JSON or CSV encoded items give the name/value filter.""" limit = int(request.args.get('limit', 3)) offset = int(request.args.get('offset', 0)) format = request.args.get('format', 'json') logger.info('?? Requesting {0} (offset={1}) items in {2}'.format( limit, offset, format)) columns = request.args.get('columns') column_names = columns and columns.split(',') or app.config.columns sirets = retrieve_sirets(name, value, offset=offset, limit=limit) if not sirets: raise ServerError('?? No entry found for {0}/{1}'.format(name, value)) logger.info('?? Retrieving {0} results for the {1}/{2} couple'.format( len(sirets), name, value)) logger.info('?? Returning {0} results in {1}'.format(len(sirets), format)) rows = [decode_siret(retrieve_siret(siret), column_names) for siret in sirets] return format_response(rows, format, column_names)
def response_json(data=None, status_code=None, message=None, page=None, total_pages=None): response_data = { 'data': data, 'status_code': status_code, 'message': message } if page: response_data['page'] = page if total_pages: response_data['total_pages'] = total_pages return json(response_data, status=status_code)
def _validate_json(self, request): try: valid = request.json return True except Exception: return self.response_json(status_code=400, message=self.config.response_messages.ErrorInvalidJSON)
def _validate_primary_key_immutable(self, request): if self.model.shortcuts.primary_key in request.json: return self.response_json(status_code=400, message=self.config.response_messages.ErrorPrimaryKeyUpdateInsert) else: return True
def _validate_fields(self, request): fields = self.model.shortcuts.editable_fields request_data = request.json for key in request_data: if key not in fields: return self.response_json(status_code=400, message=self.config.response_messages.ErrorInvalidField.format(key, fields.keys())) return True
def _generate_base_route(model_array): tables = {} for model in model_array: fields = [] table_name = model.shortcuts.table_name required_fields = model.shortcuts.required_fields for field, field_object in model.shortcuts.fields.items(): field_name = field_object.name field_type = field_object.get_db_field() is_required = field_name in required_fields or field_type == 'primary_key' fields.append({ 'field_name': field_name, 'field_type': field_type, 'is_required': is_required }) tables[table_name] = { 'route_url': model.route_url if hasattr(model, 'route_url') else '/{}'.format(table_name), 'fields': fields } async def base_route(request): response_data = { 'data': {'routes': tables}, 'status_code': 200, 'message': 'OK' } return json(response_data, status=200) return base_route
def list_browser_pages(request): renderer = request.app.prerender pages = await renderer.pages() return response.json(pages, ensure_ascii=False, indent=2, escape_forward_slashes=False)
def show_brower_version(request): renderer = request.app.prerender version = await renderer.version() return response.json(version, ensure_ascii=False, indent=2, escape_forward_slashes=False)
def disable_browser_rendering(request): global CONCURRENCY CONCURRENCY = 0 return response.json({'message': 'success'})
def enable_browser_rendering(request): global CONCURRENCY CONCURRENCY = int(os.environ.get('CONCURRENCY', cpu_count() * 2)) return response.json({'message': 'success'})
def test_async(request): return json({"test": True})
def test_sync(request): return json({"test": True})
def test(request, exception): return json({"exception": "{}".format(exception), "status": exception.status_code}, status=exception.status_code) # ----------------------------------------------- # # Read from request # ----------------------------------------------- #
def post_json(request): return json({"received": True, "message": request.json})
def post_json(request): return json({"received": True, "form_data": request.form, "test": request.form.get('test')})
def test(request): log.info("Received GET /") return json(await expensive_call())
def foo(request): return json({'msg': 'hi from blueprint'})
def foo2(request): return json({'msg': 'hi from blueprint2'}) # # ????: # - ?? flask #
def handle(request): async with engine.acquire() as conn: result = [] async for row in conn.execute(polls.select()): result.append({"question": row.question, "pub_date": row.pub_date}) return json({"polls": result})
def fetch(session, url): """ Use session object to perform 'get' request on url """ async with session.get(url) as response: return await response.json() # # ???? HTTP ????: # - ???? HTTP ?? #