Python aiohttp.web 模块,HTTPForbidden() 实例源码

我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用aiohttp.web.HTTPForbidden()

项目:aioviber    作者:nonamenix    | 项目源码 | 文件源码
def signature_middleware(app, handler):
    """Check request signature"""

    async def middleware_handler(request: Request):
        if request.method == 'POST' and app.bot.check_signature:
            sig = request.query.get('sig')
            body = await request.read()
            if verify_signature(body, sig, app.bot.auth_token):
                response = await handler(request)
            else:
                logger.warning('Post requests with bad signature {sig} {body}'.format(
                    sig=sig, body=body
                ))
                response = web.HTTPForbidden()
        else:
            response = await handler(request)

        return response

    return middleware_handler
项目:hangoutsbot    作者:das7pad    | 项目源码 | 文件源码
def adapter_do_OPTIONS(self, request):
        origin = request.headers["Origin"]

        allowed_origins = self._bot.config.get_option("api_origins")
        if allowed_origins is None:
            raise web.HTTPForbidden()

        if "*" == allowed_origins or "*" in allowed_origins:
            return web.Response(headers={
                "Access-Control-Allow-Origin": origin,
                "Access-Control-Allow-Headers": "content-type",
            })

        if not origin in allowed_origins:
            raise web.HTTPForbidden()

        return web.Response(headers={
            "Access-Control-Allow-Origin": origin,
            "Access-Control-Allow-Headers": "content-type",
            "Vary": "Origin",
        })
项目:aiohttp-three-template    作者:RobertoPrevato    | 项目源码 | 文件源码
def __call__(self, f):
        """
        Applies area initialization logic to a request handling function, for example by loading user session.

        :param f: the request handler to be decorated.
        :return: a wrapped request handler that loads user information.
        """
        @wraps(f)
        async def wrapped(request):
            # set the area property inside the request object
            request.area = self.name
            try:
                await self.before_request(request)
            except InvalidCultureException:
                # redirect to a proper url
                return HTTPFound(self.get_fallback_url(request))
            except InvalidAntiforgeryTokenException:
                raise HTTPForbidden()

            return await f(request)
        return wrapped
项目:aioweb    作者:kreopt    | 项目源码 | 文件源码
def pre_dispatch(request, controller, actionName):
    reason = None

    check_ok = True
    if request.method not in ('GET', 'HEAD', 'OPTIONS', 'TRACE'):

        action = getattr(controller, actionName)

        if not getattr(action, 'csrf_disabled', False):
            check_ok = False
            token = request.headers.get(CSRF_HEADER_NAME)
            if not token:
                data = await request.post()
                token = data.get(CSRF_FIELD_NAME)

            if token:
                if validate_token(token, await get_secret(request)):
                    check_ok = True
                else:
                    reason = REASON_BAD_TOKEN
            else:
                reason = REASON_NO_CSRF_COOKIE

    if not check_ok:
        raise web.HTTPForbidden(reason=reason)
项目:morpheus    作者:tutorcruncher    | 项目源码 | 文件源码
def call(self, request):
        try:
            event_data = (await request.post())['mandrill_events']
        except KeyError:
            raise HTTPBadRequest(text='"mandrill_events" not found in post data')

        sig_generated = base64.b64encode(
            hmac.new(
                self.app['webhook_auth_key'],
                msg=(self.app['mandrill_webhook_url'] + 'mandrill_events' + event_data).encode(),
                digestmod=hashlib.sha1
            ).digest()
        )
        sig_given = request.headers.get('X-Mandrill-Signature', '<missing>').encode()
        if not hmac.compare_digest(sig_generated, sig_given):
            raise HTTPForbidden(text='invalid signature')
        try:
            events = ujson.loads(event_data)
        except ValueError as e:
            raise HTTPBadRequest(text=f'invalid json data: {e}')

        await self.sender.update_mandrill_webhooks(events)
        return Response(text='message status updated\n')
项目:morpheus    作者:tutorcruncher    | 项目源码 | 文件源码
def messagebird_pricing(request):
    if not request.query.get('username') == 'mb-username':
        raise HTTPForbidden(text='bad username')
    if not request.query.get('password') == 'mb-password':
        raise HTTPForbidden(text='bad password')
    return json_response([
        {
            'mcc': '0',
            'country_name': 'Default rate',
            'rate': '0.0400',
        },
        {
            'mcc': '234',
            'country_name': 'United Kingdom',
            'rate': '0.0200',
        },
    ])
项目:aiohttp-transmute    作者:toumorokoshi    | 项目源码 | 文件源码
def error(request):
    raise HTTPForbidden(reason="unauthorized")
项目:django-redis-pubsub    作者:andrewyoung1991    | 项目源码 | 文件源码
def handle_auth(token):
    """ handles retrieving a user from a token
    """
    if token is None:
        raise HTTPForbidden(body=b"no token in request")

    user = authentication_method(token)
    if user is None:
        raise HTTPForbidden(body=b"invalid token")
    return user
项目:PollBot    作者:mozilla    | 项目源码 | 文件源码
def cli(loop, test_client):
    async def error403(request):
        raise web.HTTPForbidden()

    async def error404(request):
        return web.HTTPNotFound()

    async def error(request):
        raise ValueError()

    app = get_app(loop=loop)
    app.router.add_get('/error', error)
    app.router.add_get('/error-403', error403)
    app.router.add_get('/error-404', error404)
    return loop.run_until_complete(test_client(app))
项目:aiohttp_auth    作者:gnarlychicken    | 项目源码 | 文件源码
def auth_required(func):
    """Utility decorator that checks if a user has been authenticated for this
    request.

    Allows views to be decorated like:

        @auth_required
        def view_func(request):
            pass

    providing a simple means to ensure that whoever is calling the function has
    the correct authentication details.

    Args:
        func: Function object being decorated and raises HTTPForbidden if not

    Returns:
        A function object that will raise web.HTTPForbidden() if the passed
        request does not have the correct permissions to access the view.
    """
    @wraps(func)
    async def wrapper(*args):
        if (await get_auth(args[-1])) is None:
            raise web.HTTPForbidden()

        return await func(*args)

    return wrapper
项目:aiohttp_auth    作者:gnarlychicken    | 项目源码 | 文件源码
def acl_required(permission, context):
    """Returns a decorator that checks if a user has the requested permission
    from the passed acl context.

    This function constructs a decorator that can be used to check a aiohttp's
    view for authorization before calling it. It uses the get_permission()
    function to check the request against the passed permission and context. If
    the user does not have the correct permission to run this function, it
    raises HTTPForbidden.

    Args:
        permission: The specific permission requested.
        context: Either a sequence of ACL tuples, or a callable that returns a
            sequence of ACL tuples. For more information on ACL tuples, see
            get_permission()

    Returns:
        A decorator which will check the request passed has the permission for
        the given context. The decorator will raise HTTPForbidden if the user
        does not have the correct permissions to access the view.
    """

    def decorator(func):

        @wraps(func)
        async def wrapper(*args):
            request = args[-1]

            if callable(context):
                context = context()

            if await get_permitted(request, permission, context):
                return await func(*args)

            raise web.HTTPForbidden()

        return wrapper

    return decorator
项目:aiohttp-login    作者:imbolc    | 项目源码 | 文件源码
def admin_required(handler):
    @wraps(handler)
    async def decorator(*args):
        request = _get_request(args)
        response = await login_required(handler)(request)
        if request['user']['email'] not in cfg.ADMIN_EMAILS:
            raise HTTPForbidden(reason='You are not admin')
        return response
    return decorator
项目:PyGNS3    作者:mvdwoord    | 项目源码 | 文件源码
def shutdown(request, response):

        config = Config.instance()
        if config.get_section_config("Server").getboolean("local", False) is False:
            raise HTTPForbidden(text="You can only stop a local server")

        log.info("Start shutting down the server")

        # close all the projects first
        controller = Controller.instance()
        projects = controller.projects.values()

        tasks = []
        for project in projects:
            tasks.append(asyncio.async(project.close()))

        if tasks:
            done, _ = yield from asyncio.wait(tasks)
            for future in done:
                try:
                    future.result()
                except Exception as e:
                    log.error("Could not close project {}".format(e), exc_info=1)
                    continue

        # then shutdown the server itself
        from gns3server.web.web_server import WebServer
        server = WebServer.instance()
        asyncio.async(server.shutdown_server())
        response.set_status(201)
项目:PyGNS3    作者:mvdwoord    | 项目源码 | 文件源码
def debug(request, response):

        config = Config.instance()
        if config.get_section_config("Server").getboolean("local", False) is False:
            raise HTTPForbidden(text="You can only debug a local server")

        debug_dir = os.path.join(config.config_dir, "debug")
        try:
            if os.path.exists(debug_dir):
                shutil.rmtree(debug_dir)
            os.makedirs(debug_dir)
            with open(os.path.join(debug_dir, "controller.txt"), "w+") as f:
                f.write(ServerHandler._getDebugData())
        except Exception as e:
            # If something is wrong we log the info to the log and we hope the log will be include correctly to the debug export
            log.error("Could not export debug informations {}".format(e), exc_info=1)

        try:
            if Controller.instance().gns3vm.engine == "vmware":
                vmx_path = Controller.instance().gns3vm.current_engine().vmx_path
                if vmx_path:
                    shutil.copy(vmx_path, os.path.join(debug_dir, os.path.basename(vmx_path)))
        except OSError as e:
            # If something is wrong we log the info to the log and we hope the log will be include correctly to the debug export
            log.error("Could not copy VMware VMX file {}".format(e), exc_info=1)

        for compute in list(Controller.instance().computes.values()):
            try:
                r = yield from compute.get("/debug", raw=True)
                data = r.body.decode("utf-8")
            except Exception as e:
                data = str(e)
            with open(os.path.join(debug_dir, "compute_{}.txt".format(compute.name)), "w+") as f:
                f.write("Compute ID: {}\n".format(compute.id))
                f.write(data)

        response.set_status(201)
项目:tomodachi    作者:kalaspuff    | 项目源码 | 文件源码
def static_request_handler(cls: Any, obj: Any, context: Dict, func: Any, path: str, base_url: str) -> Any:
        if '?P<filename>' not in base_url:
            pattern = r'^{}(?P<filename>.+?)$'.format(re.sub(r'\$$', '', re.sub(r'^\^?(.*)$', r'\1', base_url)))
        else:
            pattern = r'^{}$'.format(re.sub(r'\$$', '', re.sub(r'^\^?(.*)$', r'\1', base_url)))
        compiled_pattern = re.compile(pattern)

        if path.startswith('/'):
            path = os.path.dirname(path)
        else:
            path = '{}/{}'.format(os.path.dirname(context.get('context', {}).get('_service_file_path')), path)

        if not path.endswith('/'):
            path = '{}/'.format(path)

        async def handler(request: web.Request) -> web.Response:
            result = compiled_pattern.match(request.path)
            filename = result.groupdict()['filename']
            filepath = '{}{}'.format(path, filename)

            try:
                if os.path.isdir(filepath) or not os.path.exists(filepath):
                    raise web.HTTPNotFound()

                pathlib.Path(filepath).open('r')
                return FileResponse(filepath)
            except PermissionError as e:
                raise web.HTTPForbidden()

        context['_http_routes'] = context.get('_http_routes', [])
        context['_http_routes'].append(('GET', pattern, handler))

        start_func = cls.start_server(obj, context)
        return (await start_func) if start_func else None
项目:aiohttp_test_chat    作者:steelkiwi    | 项目源码 | 文件源码
def get(self, **kw):
        session = await get_session(self.request)
        if session.get('user'):
            del session['user']
            redirect(self.request, 'login')
        else:
            raise web.HTTPForbidden(body=b'Forbidden')
项目:gallery-cms    作者:crccheck    | 项目源码 | 文件源码
def login(request):
    session = await get_session(request)

    client = GoogleClient(
        client_id=os.getenv('OAUTH_CLIENT_ID'),
        client_secret=os.getenv('OAUTH_CLIENT_SECRET'),
        scope='email profile',
    )
    # FIXME not picking up https
    client.params['redirect_uri'] = '{}://{}{}'.format(request.scheme, request.host, request.path)

    if client.shared_key not in request.GET:  # 'code' not in request.GET
        return web.HTTPFound(client.get_authorize_url())

    access_token, __ = await client.get_access_token(request.GET)
    user, info = await client.user_info()
    # TODO store in session storage

    if user.email in args.admins:
        session['is_authed'] = True
        session['user'] = {
            'name': info['displayName'],
            'email': user.email,
            'avatar': user.picture,
        }
    else:
        return web.HTTPForbidden()

    return web.HTTPFound('/')
项目:Windless    作者:chiaki64    | 项目源码 | 文件源码
def require(func):
    @functools.wraps(func)
    async def wrapper(*args):
        if isinstance(args[-1], AbstractView):
            request = args[-1].request
        else:
            request = args[-1]
        has_perm = await permits(request, 'Administrator')
        if not has_perm:
            raise web.HTTPForbidden()

        if isinstance(args, AbstractView):
            return await func(*args)
        return await func(*args)
    return wrapper
项目:aiohttp-tokio    作者:fafhrd91    | 项目源码 | 文件源码
def test_100_continue_custom_response(loop, test_client):

    @asyncio.coroutine
    def handler(request):
        data = yield from request.post()
        assert b'123', data['name']
        return web.Response()

    @asyncio.coroutine
    def expect_handler(request):
        if request.version == HttpVersion11:
            if auth_err:
                return web.HTTPForbidden()

            request.transport.write(b"HTTP/1.1 100 Continue\r\n\r\n")

    form = FormData()
    form.add_field('name', b'123',
                   content_transfer_encoding='base64')

    app = web.Application()
    app.router.add_post('/', handler, expect_handler=expect_handler)
    client = yield from test_client(app)

    auth_err = False
    resp = yield from client.post('/', data=form, expect100=True)
    assert 200 == resp.status

    auth_err = True
    resp = yield from client.post('/', data=form, expect100=True)
    assert 403 == resp.status
项目:evernote-telegram-bot    作者:djudman    | 项目源码 | 文件源码
def oauth_callback(request):
    logger = request.app.logger
    bot = request.app.bot
    config_data = config['evernote']['basic_access']
    params = parse_qs(request.query_string)
    callback_key = params.get('key', [''])[0]
    session_key = params.get('session_key')[0]
    try:
        session = StartSession.get({'oauth_data.callback_key': callback_key})
    except ModelNotFound as e:
        logger.error(e, exc_info=1)
        return web.HTTPForbidden()

    if not params.get('oauth_verifier'):
        logger.info('User declined access. No access token =(')
        bot.send_message(session.data['chat_id'],
                         'We are sorry, but you declined authorization ??')
        return web.HTTPFound(bot.url)
    if session.key != session_key:
        text = 'Session is expired. \
Please, send /start command to create new session'
        bot.send_message(session.data['chat_id'], text)
        return web.HTTPFound(bot.url)

    user_data = session.data['user']
    name = '{0} {1}'.format(user_data['first_name'], user_data['last_name'])
    user = User(id=session.id,
                name=name,
                username=user_data['username'],
                telegram_chat_id=session.data['chat_id'],
                mode='multiple_notes',
                places={},
                settings={'evernote_access': 'basic'})
    try:
        future = asyncio.ensure_future(
            bot.evernote.get_access_token(config_data, session.oauth_data,
                                          params['oauth_verifier'][0])
        )
        future.add_done_callback(
            functools.partial(set_access_token, bot, user)
        )
    except TokenRequestDenied as e:
        logger.error(e, exc_info=1)
        text = 'We are sorry, but we have some problems with Evernote \
connection. Please try again later'
        bot.send_message(user.telegram_chat_id, text)
    except Exception as e:
        logger.fatal(e, exc_info=1)
        bot.send_message(user.telegram_chat_id, 'Oops. Unknown error')

    text = 'Evernote account is connected.\n\
From now you can just send message and note be created.'
    bot.send_message(user.telegram_chat_id, text)
    user.save()
    return web.HTTPFound(bot.url)
项目:evernote-telegram-bot    作者:djudman    | 项目源码 | 文件源码
def oauth_callback_full_access(request):
    logger = request.app.logger
    bot = request.app.bot
    params = parse_qs(request.query_string)
    callback_key = params.get('key', [''])[0]
    session_key = params.get('session_key')[0]
    try:
        session = StartSession.get({'oauth_data.callback_key': callback_key})
        user = User.get({'id': session.id})
    except ModelNotFound as e:
        logger.error(e, exc_info=1)
        return web.HTTPForbidden()

    if not params.get('oauth_verifier'):
        logger.info('User declined full access =(')
        bot.send_message(
            user.telegram_chat_id,
            'We are sorry, but you deny read/update access??',
            {'hide_keyboard': True}
        )
        return web.HTTPFound(bot.url)
    if session.key != session_key:
        text = 'Session is expired. Please, send /start command to create \
new session'
        bot.send_message(user.telegram_chat_id, text)
        return web.HTTPFound(bot.url)
    try:
        oauth_verifier = params['oauth_verifier'][0]
        config_data = config['evernote']['full_access']
        future = asyncio.ensure_future(
            bot.evernote.get_access_token(config_data, session.oauth_data,
                                          oauth_verifier)
        )
        future.add_done_callback(
            functools.partial(switch_to_one_note_mode, bot, user.id)
        )
    except TokenRequestDenied as e:
        logger.error(e, exc_info=1)
        bot.send_message(
            user.telegram_chat_id,
            'We are sorry, but we have some problems with Evernote connection.\
 Please try again later',
            {'hide_keyboard': True}
        )
    except Exception as e:
        logger.fatal(e, exc_info=1)
        bot.send_message(user.telegram_chat_id, 'Oops. Unknown error',
                         {'hide_keyboard': True})
    text = 'From now this bot in "One note" mode'
    bot.send_message(user.telegram_chat_id, text,
                     {'hide_keyboard': True})
    return web.HTTPFound(bot.url)
项目:fireq    作者:superdesk    | 项目源码 | 文件源码
def auth_middleware(app, handler):
    """ Login via Github """
    def gh_client(**kw):
        return GithubClient(conf['github_id'], conf['github_secret'], **kw)

    async def callback(request):
        session = await get_session(request)
        log.debug('callback: session=%s GET=%s', session, request.GET)
        if session.get('github_state') != request.GET.get('state'):
            return web.HTTPBadRequest()
        code = request.GET.get('code')
        if not code:
            return web.HTTPBadRequest()

        gh = gh_client()
        token, _ = await gh.get_access_token(code)
        gh = gh_client(access_token=token)
        req = await gh.request('GET', 'user')
        user = await req.json()
        req.close()
        users = []
        for org in conf['github_orgs']:
            _, resp = await gh_api('orgs/%s/members?per_page=100' % org)
            users.extend(u['login'] for u in resp)
        log.debug('members %s: %s', len(users), users)
        if user.get('login') in users:
            session['login'] = user.get('login')
            session.pop('github_state', None)
            session.pop('github_url', None)
            location = session.pop('location')
            return web.HTTPFound(location)
        return web.HTTPForbidden()

    async def check_auth(request):
        session = await get_session(request)
        login = session.get('login')
        if login:
            request['login'] = login
            return await handler(request)
        elif 'github_state' not in session:
            gh = gh_client()
            state = str(uuid.uuid4())
            url = gh.get_authorize_url(scope='', state=state)
            session['github_state'] = state
            session['github_url'] = url
            session['location'] = request.path
            log.debug('check_auth: %s', session)
        return web.HTTPFound(conf['url_prefix'] + '/login')

    async def inner(request):
        if request.path == (conf['url_prefix'] + conf['github_callback']):
            return await callback(request)
        elif request.path == (conf['url_prefix'] + '/hook'):
            return await handler(request)
        elif request.path == (conf['url_prefix'] + '/login'):
            return await handler(request)
        else:
            return await check_auth(request)

    return inner