Python bottle 模块,abort() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bottle.abort()

项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def login_post_handler():
    try:
        username = bottle.request.forms['username']
        password = bottle.request.forms['password']
    except KeyError:
        bottle.abort(400, 'Invalid form.')
    try:
        user = model.get_user(username)
    except KeyError:
        bottle.redirect('/login?msg=fail')
    if user['password_hash'] == model.PASSWORDLESS_HASH:
        if not handler_util.is_admin():
            bottle.redirect('/login?msg=fail')
    else:
        if not sha256_crypt.verify(password, user['password_hash']):
            bottle.redirect('/login?msg=fail')
    handler_util.set_current_username(username)
    bottle.redirect('/')
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def problem_view_handler(problem_id):
    try:
        problem_id = int(problem_id)
    except ValueError:
        bottle.abort(404, 'Problem not found.')
    try:
        problem = model.get_public_problem(problem_id=problem_id)
    except KeyError:
        bottle.abort(404, 'Problem not found.')
    owner = model.get_user(problem['owner'])
    problem_spec = model.load_blob(problem['problem_spec_hash'])
    snapshot_time, ranked_solutions = model.get_last_problem_ranking_snapshot(
        problem_id=problem_id, public_only=True)
    handler_util.inject_ranks_to_ranked_solutions(ranked_solutions)
    template_dict = {
        'problem': problem,
        'problem_spec': problem_spec,
        'owner': owner,
        'ranked_solutions': ranked_solutions,
        'snapshot_time': snapshot_time,
    }
    return handler_util.render('problem_view.html', template_dict)
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def admin_problem_view_handler(problem_id):
    problem_id = int(problem_id)
    try:
        problem = model.get_problem_for_admin(problem_id=problem_id)
    except KeyError:
        bottle.abort(404, 'Problem not found.')
    owner = model.get_user(problem['owner'])
    problem_spec = model.load_blob(problem['problem_spec_hash'])
    snapshot_time, ranked_solutions = model.get_last_problem_ranking_snapshot(
        problem_id=problem_id, public_only=False)
    handler_util.inject_ranks_to_ranked_solutions(ranked_solutions)
    team_display_name_map = handler_util.compute_team_display_name_map(
        solution['owner'] for solution in ranked_solutions)
    template_dict = {
        'problem': problem,
        'problem_spec': problem_spec,
        'owner': owner,
        'ranked_solutions': ranked_solutions,
        'team_display_name_map': team_display_name_map,
        'snapshot_time': snapshot_time,
    }
    return handler_util.render('admin/problem_view.html', template_dict)
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def admin_solution_view_handler(solution_id):
    solution_id = int(solution_id)
    try:
        solution = model.get_solution_for_admin(solution_id=solution_id)
        problem = model.get_problem_for_admin(problem_id=solution['problem_id'])
    except KeyError:
        bottle.abort(404, 'Solution not found.')
    problem_owner = model.get_user(problem['owner'])
    solution_owner = model.get_user(solution['owner'])
    solution_spec = model.load_blob(solution['solution_spec_hash'])
    template_dict = {
        'solution': solution,
        'solution_spec': solution_spec,
        'solution_owner': solution_owner,
        'problem_owner': problem_owner,
    }
    return handler_util.render('admin/solution_view.html', template_dict)
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def enforce_api_rate_limit(action, limit_in_window):
    """Enforces API rate limit.

    Args:
        action: Action name.
        limit_in_window: Maximum number of requests of this action in a window.

    Raises:
        bottle.HTTPError: If the rate limit is exceeded.
    """
    if (FLAGS.enable_load_test_hacks and
            bottle.request.headers.get('X-Load-Test', '') == 'yes'):
        return
    if get_current_user()['organizer']:
        return
    username = get_current_username()
    if (model.record_last_api_access_time(username) <
            FLAGS.api_rate_limit_request_interval):
        bottle.abort(429, 'Rate limit exceeded (per-second limit).')
    count = model.increment_api_rate_limit_counter(username, action)
    if count > limit_in_window:
        bottle.abort(429, 'Rate limit exceeded (per-hour limit).')


# http://flask.pocoo.org/snippets/44/
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def post_batch_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleInsertEntityBatch(entitySetName, request.json, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder)
    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')


## DELETE: api/datasource/crud/batch/:entitySetName
#@delete('/api/datasource/crud/batch/<entitySetName>')
#def delete_batch_entityset(entitySetName):
#    try:
#        result = dataProviderDto.apiProvider.handleDeleteEntityBatch(entitySetName, request.json, dataService)
#        response.content_type = "application/json; charset=utf-8"
#        return json.dumps(result, cls=CustomEncoder)
#    except dalUtils.StatusCodeError as err:
#        response.status = err.value
#    except:
#        abort(400, 'Bad Request')


# DELETE: api/datasource/crud/batch/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
项目:memex-dossier-open    作者:dossier    | 项目源码 | 文件源码
def v1_random_fc_get(response, dbid_to_visid, store):
    '''Retrieves a random feature collection from the database.

    The route for this endpoint is:
    ``GET /dossier/v1/random/feature-collection``.

    Assuming the database has at least one feature collection,
    this end point returns an array of two elements. The first
    element is the content id and the second element is a
    feature collection (in the same format returned by
    :func:`dossier.web.routes.v1_fc_get`).

    If the database is empty, then a 404 error is returned.

    Note that currently, this may not be a uniformly random sample.
    '''
    # Careful, `store.scan()` would be obscenely slow here...
    sample = streaming_sample(store.scan_ids(), 1, 1000)
    if len(sample) == 0:
        bottle.abort(404, 'The feature collection store is empty.')
    return [dbid_to_visid(sample[0]), util.fc_to_json(store.get(sample[0]))]
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def verify_user_exists(user):
    user in pubsub.user_info or abort(404, f'Unknown user: {user!r}')
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def register_handler():
    if settings.has_contest_finished():
        bottle.abort(403, 'The contest is over!')
    empty_user = {
        'display_name': '',
        'contact_email': '',
        'member_names': '',
        'nationalities': '',
        'languages': '',
        'source_url': '',
    }
    return handler_util.render('register.html', {'user': empty_user})
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def register_post_handler():
    if settings.has_contest_finished():
        bottle.abort(403, 'The contest is over!')
    try:
        basic_profile = handler_util.parse_basic_profile_forms()
    except ValueError:
        bottle.abort(400, 'Invalid form.')
    username, password = model.register_user(
        remote_host=bottle.request.remote_addr,
        **basic_profile)
    template_dict = {
        'username': username,
        'password': password,
    }
    return handler_util.render('registered.html', template_dict)
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def profile_post_handler():
    try:
        basic_profile = handler_util.parse_basic_profile_forms()
    except ValueError:
        bottle.abort(400, 'Invalid form.')
    username = handler_util.get_current_username()
    model.update_user(username, **basic_profile)
    bottle.redirect('/profile?msg=updated')
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def solution_submit_handler():
    if handler_util.get_current_user()['organizer']:
        bottle.abort(400, 'You are organizer, not allowed to submit solutions.')
    problem_id = bottle.request.query.get('problem_id', '')
    return handler_util.render('solution_submit.html', {'problem_id': problem_id})
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def admin_visualize_problem_handler(problem_id):
    thumbnail = bool(bottle.request.query.get('thumbnail'))
    problem_id = int(problem_id)
    try:
        problem = model.get_problem_for_admin(problem_id=problem_id)
    except KeyError:
        bottle.abort(404, 'Problem not found.')
    problem_spec = model.load_blob(problem['problem_spec_hash'])
    image_binary = visualize.visualize_problem(problem_spec, thumbnail)
    bottle.response.content_type = 'image/png'
    return image_binary
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def admin_visualize_solution_handler(solution_id):
    thumbnail = bool(bottle.request.query.get('thumbnail'))
    solution_id = int(solution_id)
    try:
        solution = model.get_solution_for_admin(solution_id=solution_id)
    except KeyError:
        bottle.abort(404, 'Solution not found.')
    solution_spec = model.load_blob(solution['solution_spec_hash'])
    problem_spec = model.load_blob(solution['problem_spec_hash'])
    image_binary = visualize.visualize_solution(
        problem_spec, solution_spec, thumbnail)
    bottle.response.content_type = 'image/png'
    return image_binary
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def testing_cron_snapshot_job_handler():
    if not FLAGS.enable_testing_handlers:
        bottle.abort(403, 'Disabled')
    cron_jobs.snapshot_job()
    return 'done'
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def _protect_before_contest_hook():
    """Before-request hook to protect the whole website before the contest."""
    if bottle.request.path.startswith(('/health', '/ping', '/api/', '/admin/')):
        return
    if not is_admin() and not settings.has_contest_started():
        bottle.abort(403, 'The contest has not started yet.')
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def _enforce_web_rate_limit_hook():
    """Before-request hook to enforce web rate limit."""
    if (FLAGS.enable_load_test_hacks and
            bottle.request.headers.get('X-Load-Test', '') == 'yes'):
        return
    if bottle.request.path.startswith(('/health', '/ping', '/static/', '/api/', '/admin/')):
        return
    user = get_current_user()
    if not user:
        return
    if user['organizer']:
        return
    if not model.decrement_web_rate_limit_counter(user['_id']):
        bottle.abort(429, 'Rate limit exceeded.')
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def _protect_xsrf_hook():
    """Before-request hook to protect from XSRF attacks."""
    # No need to protect API calls.
    if bottle.request.path.startswith('/api/'):
        return
    if bottle.request.method not in ('GET', 'HEAD'):
        xsrf_token = bottle.request.forms.get('xsrf_token', 'N/A')
        if xsrf_token != get_xsrf_token():
            bottle.abort(400, 'XSRF token is incorrect or not set.')
项目:icfpc2016-judge    作者:icfpc2016    | 项目源码 | 文件源码
def _require_gzip_hook():
    """Before-request hook to require gzip for API requests."""
    if (FLAGS.enable_load_test_hacks and
            bottle.request.headers.get('X-Load-Test', '') == 'yes'):
        return
    if bottle.request.path.startswith('/api/'):
        accept_encoding = bottle.request.headers.get(
            'X-Accept-Encoding',
            bottle.request.headers.get('Accept-Encoding', ''))
        if 'gzip' not in accept_encoding:
            bottle.abort(
                400, 'Accept-Encoding: gzip is required for API requests')
项目:driveboardapp    作者:nortd    | 项目源码 | 文件源码
def checkserial(func):
    """Decorator to call function only when machine connected."""
    def _decorator(*args, **kwargs):
            if driveboard.connected():
                return func(*args, **kwargs)
            else:
                bottle.abort(400, "No machine.")
    return _decorator




### STATIC FILES
项目:driveboardapp    作者:nortd    | 项目源码 | 文件源码
def offset(x, y, z):
    if not driveboard.status()['ready']:
        bottle.abort(400, "Machine not ready.")
    driveboard.def_offset_custom(x, y, z)
    driveboard.sel_offset_custom()
    return '{}'
项目:driveboardapp    作者:nortd    | 项目源码 | 文件源码
def clear_offset():
    if not driveboard.status()['ready']:
        bottle.abort(400, "Machine not ready.")
    driveboard.def_offset_custom(0,0,0)
    driveboard.sel_offset_table()
    return '{}'




### JOBS QUEUE
项目:driveboardapp    作者:nortd    | 项目源码 | 文件源码
def _get(jobname, library=False):
    # get job as sting
    if library:
        jobpath = os.path.join(conf['rootdir'], 'library', jobname.strip('/\\'))
    else:
        jobpath = os.path.join(conf['stordir'], jobname.strip('/\\'))
    if os.path.exists(jobpath+'.dba'):
        jobpath = jobpath+'.dba'
    elif os.path.exists(jobpath + '.dba.starred'):
        jobpath = jobpath + '.dba.starred'
    else:
        bottle.abort(400, "No such file.")
    with open(jobpath) as fp:
        job = fp.read()
    return job
项目:driveboardapp    作者:nortd    | 项目源码 | 文件源码
def _get_path(jobname, library=False):
    if library:
        jobpath = os.path.join(conf['rootdir'], 'library', jobname.strip('/\\'))
    else:
        jobpath = os.path.join(conf['stordir'], jobname.strip('/\\'))
    if os.path.exists(jobpath+'.dba'):
        return jobpath+'.dba'
    elif os.path.exists(jobpath+'.dba.starred'):
        return jobpath+'.dba.starred'
    else:
        bottle.abort(400, "No such file.")
项目:driveboardapp    作者:nortd    | 项目源码 | 文件源码
def load():
    """Load a dba, svg, dxf, or gcode job.

    Args:
        (Args come in through the POST request.)
        job: Parsed dba or job string (dba, svg, dxf, or ngc).
        name: name of the job (string)
        optimize: flag whether to optimize (bool)
        overwrite: flag whether to overwite file if present (bool)
    """
    load_request = json.loads(bottle.request.forms.get('load_request'))
    job = load_request.get('job')  # always a string
    name = load_request.get('name')
    # optimize defaults
    if 'optimize' in load_request:
        optimize = load_request['optimize']
    else:
        optimize = True
    # overwrite defaults
    if 'overwrite' in load_request:
        overwrite = load_request['overwrite']
    else:
        overwrite = False
    # sanity check
    if job is None or name is None:
        bottle.abort(400, "Invalid request data.")
    # convert
    try:
        job = jobimport.convert(job, optimize=optimize)
    except TypeError:
        if DEBUG: traceback.print_exc()
        bottle.abort(400, "Invalid file type.")

    if not overwrite:
        name = _unique_name(name)
    _add(json.dumps(job), name)
    return json.dumps(name)
项目:driveboardapp    作者:nortd    | 项目源码 | 文件源码
def listing(kind=None):
    """List all queue jobs by name."""
    if kind is None:
        files = _get_sorted('*.dba*', stripext=True)
    elif kind == 'starred':
        files = _get_sorted('*.dba.starred', stripext=True)
        print files
    elif kind == 'unstarred':
        files = _get_sorted('*.dba', stripext=True)
    else:
        bottle.abort(400, "Invalid kind.")
    return json.dumps(files)
项目:driveboardapp    作者:nortd    | 项目源码 | 文件源码
def star(jobname):
    """Star a job."""
    jobpath = _get_path(jobname)
    if jobpath.endswith('.dba'):
        os.rename(jobpath, jobpath + '.starred')
    else:
        bottle.abort(400, "No such file.")
    return '{}'
项目:driveboardapp    作者:nortd    | 项目源码 | 文件源码
def unstar(jobname):
    """Unstar a job."""
    jobpath = _get_path(jobname)
    if jobpath.endswith('.starred'):
        os.rename(jobpath, jobpath[:-8])
    else:
        bottle.abort(400, "No such file.")
    return '{}'
项目:driveboardapp    作者:nortd    | 项目源码 | 文件源码
def run(jobname):
    """Send job from queue to the machine."""
    job = _get(jobname)
    if not driveboard.status()['ready']:
        bottle.abort(400, "Machine not ready.")
    driveboard.job(json.loads(job))
    return '{}'
项目:driveboardapp    作者:nortd    | 项目源码 | 文件源码
def build():
    """Build firmware from firmware/src files (for all config files)."""
    return_code = driveboard.build()
    if return_code != 0:
        bottle.abort(400, "Build failed.")
    else:
        return '{}'
项目:driveboardapp    作者:nortd    | 项目源码 | 文件源码
def flash(firmware=None):
    """Flash firmware to MCU."""
    if firmware is None:
        return_code = driveboard.flash()
    else:
        return_code = driveboard.flash(firmware=firmware)
    if return_code != 0:
        bottle.abort(400, "Flashing failed.")
    else:
        return '{}'
项目:driveboardapp    作者:nortd    | 项目源码 | 文件源码
def reset():
    """Reset MCU"""
    try:
        driveboard.reset()
    except IOError:
        bottle.abort(400, "Reset failed.")
    return '{}'
项目:experiment-manager    作者:softfire-eu    | 项目源码 | 文件源码
def error_translation(func):
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except ValueError as e:
            traceback.print_exc()
            bottle.abort(400, e.args)
        except exceptions.ExperimentNotFound as e:
            traceback.print_exc()
            bottle.abort(404, e.message)
        except exceptions.ExperimentValidationError as e:
            traceback.print_exc()
            bottle.abort(400, e.message)
        except exceptions.ManagerNotFound as e:
            traceback.print_exc()
            bottle.abort(404, e.message)
        except exceptions.ResourceAlreadyBooked as e:
            traceback.print_exc()
            bottle.abort(400, e.message)
        except exceptions.ResourceNotFound as e:
            traceback.print_exc()
            bottle.abort(404, e.message)
        except exceptions.RpcFailedCall:
            traceback.print_exc()
            bottle.abort(500, "Ups, an internal error occurred, please report to us the procedure and we will fix it")
        except FileNotFoundError:
            traceback.print_exc()
            bottle.abort(404, "File not found in your request")

    return wrapper
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def get_metadata():
    try:
        response.content_type = "application/json; charset=utf-8"
        metadataClient = databaseInfo.getMetadataClient()
        return json.dumps(metadataClient, cls=MetadataEncoder, indent=2)
    except:
        abort(500, 'Internal server error')


# GET: api/datasource/crud/:entitySetName?skip=20&top=10
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def get_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleGet(entitySetName, request.query, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder, indent=2)
    except:
        abort(400, 'Bad Request')


# GET: api/datasource/crud/single/:entitySetName?keys=key1:{key1}
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def get_single_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleGetSingle(entitySetName, request.query, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder, indent=2)
    except:
        abort(400, 'Bad Request')


# GET: api/datasource/crud/many/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def get_many_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleGetMany(entitySetName, request.query, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder, indent=2)
    except:
        abort(400, 'Bad Request')


# PUT: api/datasource/crud/:entitySetName?keys=key1:{key1}
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def put_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleUpdateEntity(entitySetName, request.query, request.json, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder)
    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')


# PATCH: api/datasource/crud/:entitySetName?keys=key1:{key1}
#@patch('/api/datasource/crud/<entitySetName>')
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def post_entityset(entitySetName):
    # test1 = json.loads(request.body.read())
    try:
        result = dataProviderDto.apiProvider.handleInsertEntity(entitySetName, request.json, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder)
    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')


# DELETE: api/datasource/crud/:entitySetName?keys=key1:{key1}
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def delete_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleDeleteEntity(entitySetName, request.query, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder)
    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')


# PUT: api/datasource/crud/batch/:entitySetName
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def put_batch_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleUpdateEntityBatch(entitySetName, request.json, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder)
    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')

# PATCH: api/datasource/crud/batch/:entitySetName
#@patch('/api/datasource/crud/batch/<entitySetName>')
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def patch_batch_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleUpdateEntityBatch(entitySetName, request.json, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder)
    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')


# POST: api/datasource/crud/batch/:entitySetName
项目:find-that-charity    作者:TechforgoodCAST    | 项目源码 | 文件源码
def charity(regno, filetype='html'):
    res = app.config["es"].get(index=app.config["es_index"], doc_type=app.config["es_type"], id=regno, ignore=[404])
    if "_source" in res:
        if filetype == "html":
            return bottle.template('charity', charity=res["_source"], charity_id=res["_id"])
        else:
            return res["_source"]
    else:
        bottle.abort(404, bottle.template('Charity {{regno}} not found.', regno=regno))
项目:find-that-charity    作者:TechforgoodCAST    | 项目源码 | 文件源码
def charity(regno):
    res = app.config["es"].get(index=app.config["es_index"], doc_type=app.config["es_type"], id=regno, ignore=[404])
    if "_source" in res:
        return bottle.template('preview', charity=res["_source"], charity_id=res["_id"])
    else:
        bottle.abort(404, bottle.template('Charity {{regno}} not found.', regno=regno))
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_httperror_in_generator_callback(self):
        @self.app.route('/')
        def test():
            yield
            bottle.abort(404, 'teststring')
        self.assertInBody('teststring')
        self.assertInBody('404 Not Found')
        self.assertStatus(404)
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_401(self):
        """ WSGI: abort(401, '') (HTTP 401) """
        @bottle.route('/')
        def test(): bottle.abort(401)
        self.assertStatus(401, '/')
        @bottle.error(401)
        def err(e):
            bottle.response.status = 200
            return str(type(e))
        self.assertStatus(200, '/')
        self.assertBody("<class 'bottle.HTTPError'>",'/')
项目:anom-py    作者:Bogdanp    | 项目源码 | 文件源码
def view_post(user, slug):
    post = Post.query().where(Post.slug == slug).get()
    if post is None:
        return abort(404)
    return {"post": post, "embedded": False}
# [END post]


# [START create]
项目:anom-py    作者:Bogdanp    | 项目源码 | 文件源码
def user_required(*permissions):
    def decorator(fn):
        def wrapper(*args, **kwargs):
            session_id = request.get_cookie("sid")
            if not session_id:
                return redirect("/login")

            session = Session.get(session_id)
            if not session:
                return redirect("/login")

            user = session.user.get()
            if user is None:
                return redirect("/login")

            for permission in permissions:
                if permission not in user.permissions:
                    return abort(403)

            return fn(user, *args, **kwargs)
        return wrapper
    return decorator
# [END user-required]


# [START create-session]
项目:memex-dossier-open    作者:dossier    | 项目源码 | 文件源码
def create_injector(param_name, fun_param_value):
    '''Dependency injection with Bottle.

    This creates a simple dependency injector that will map
    ``param_name`` in routes to the value ``fun_param_value()``
    each time the route is invoked.

    ``fun_param_value`` is a closure so that it is lazily evaluated.
    This is useful for handling thread local services like database
    connections.

    :param str param_name: name of function parameter to inject into
    :param fun_param_value: the value to insert
    :type fun_param_value: a closure that can be applied with zero
                           arguments
    '''
    class _(object):
        api = 2

        def apply(self, callback, route):
            if param_name not in inspect.getargspec(route.callback)[0]:
                return callback

            def _(*args, **kwargs):
                pval = fun_param_value()
                if pval is None:
                    logger.error('service "%s" unavailable', param_name)
                    bottle.abort(503, 'service "%s" unavailable' % param_name)
                    return
                kwargs[param_name] = pval
                return callback(*args, **kwargs)
            return _
    return _()
项目:memex-dossier-open    作者:dossier    | 项目源码 | 文件源码
def v1_search(request, response, visid_to_dbid, config,
              search_engines, filters, cid, engine_name):
    '''Search feature collections.

    The route for this endpoint is:
    ``/dossier/v1/<content_id>/search/<search_engine_name>``.

    ``content_id`` can be any *profile* content identifier. (This
    restriction may be lifted at some point.) Namely, it must start
    with ``p|``.

    ``engine_name`` corresponds to the search strategy to
    use. The list of available search engines can be retrieved with the
    :func:`v1_search_engines` endpoint.

    This endpoint returns a JSON payload which is an object with a
    single key, ``results``. ``results`` is a list of objects, where
    the objects each have ``content_id`` and ``fc`` attributes.
    ``content_id`` is the unique identifier for the result returned,
    and ``fc`` is a JSON serialization of a feature collection.

    There are also two query parameters:

    * **limit** limits the number of results to the number given.
    * **filter** sets the filtering function. The default
      filter function, ``already_labeled``, will filter out any
      feature collections that have already been labeled with the
      query ``content_id``.
    '''
    db_cid = visid_to_dbid(cid)
    try:
        search_engine = search_engines[engine_name]
    except KeyError as e:
        bottle.abort(404, 'Search engine "%s" does not exist.' % e.message)
    query = request.query if request.method == 'GET' else request.forms
    search_engine = (config.create(search_engine)
                           .set_query_id(db_cid)
                           .set_query_params(query))
    for name, filter in filters.items():
        search_engine.add_filter(name, config.create(filter))
    return search_engine.respond(response)