Python bottle 模块,request() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bottle.request()

项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def login_required(perm=None):
    def _dec(func):
        def _view(*args, **kwargs):
            s = request.environ.get('beaker.session')
            if s.get("name", None) and s.get("authenticated", False):
                if perm:
                    perms = parse_permissions(s)
                    if perm not in perms or not perms[perm]:
                        if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
                            return HTTPError(403, "Forbidden")
                        else:
                            return redirect("/nopermission")

                return func(*args, **kwargs)
            else:
                if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
                    return HTTPError(403, "Forbidden")
                else:
                    return redirect("/login")

        return _view

    return _dec
项目:nematus    作者:EdinburghNLP    | 项目源码 | 文件源码
def translate(self):
        """
        Processes a translation request.
        """
        translation_request = request_provider(self._style, request)
        logging.debug("REQUEST - " + repr(translation_request))

        translations = self._translator.translate(
            translation_request.segments,
            translation_request.settings
        )
        response_data = {
            'status': TranslationResponse.STATUS_OK,
            'segments': [translation.target_words for translation in translations],
            'word_alignments': [translation.get_alignment_json(as_string=False) for translation in translations] if translation_request.settings.get_alignment else None,
            'word_probabilities': [translation.target_probs for translation in translations] if translation_request.settings.get_word_probs else None,
        }
        translation_response = response_provider(self._style, **response_data)
        logging.debug("RESPONSE - " + repr(translation_response))

        response.content_type = translation_response.get_content_type()
        return repr(translation_response)
项目:verify_code_server    作者:duanyifei    | 项目源码 | 文件源码
def code_verify(request):
    image_str = request.forms.get("code")
    typ = int(request.forms.get('type'))
    filename = os.path.join(
        curpath,
        'temp',
        '%s.png'%(
            md5.md5(
                str(time.time()) + str(random.random())
            ).hexdigest()
        )
    )
    with open(filename, 'wb') as f:
        f.write(image_str)
    img = Image.open(filename)
    exec '''from main import main_%s'''%(typ)
    exec '''code = main_%s.verify(img, code_template_dic[typ])'''%(typ)
    #exec '''code = main_%s.verify(img)'''%(typ)
    if hasattr(img, 'close'):
        img.close()
    try:
        print os.remove(filename)
    except Exception as e:
        print u"?????? %s"%e
    return code
项目:dhcp-stats-prometheus    作者:atonkyra    | 项目源码 | 文件源码
def prometheus_metrics():
    if not test_restricted(request['REMOTE_ADDR']):
        return ''
    dhcpstat = {'shared-networks': []}
    dhcp6stat = {'shared-networks': []}
    try:
        dhcpstat = json.loads(exec_command([args.binary, '-c', args.dhcp4_config, '-l', args.dhcp4_leases, '-f', 'j']))
    except:
        pass
    try:
        dhcp6stat = json.loads(exec_command([args.binary, '-c', args.dhcp6_config, '-l', args.dhcp6_leases, '-f', 'j']))
    except:
        pass
    data = []
    for shared_network in dhcpstat['shared-networks']:
        data.append('dhcp_pool_used{ip_version="%s",network="%s"} %s' % (4,shared_network['location'],shared_network['used']))
        data.append('dhcp_pool_free{ip_version="%s",network="%s"} %s' % (4,shared_network['location'],shared_network['free']))
        defined_leases = float(shared_network['defined'])
        leases_used_percentage = 0
        if defined_leases > 0:
            leases_used_percentage = float(shared_network['used'])/defined_leases
        data.append('dhcp_pool_usage{ip_version="%s",network="%s"} %s' % (4,shared_network['location'],leases_used_percentage))
    for shared_network in dhcp6stat['shared-networks']:
        data.append('dhcp_pool_used{ip_version="%s",network="%s"} %s' % (6,shared_network['location'],shared_network['used']))
        data.append('dhcp_pool_free{ip_version="%s",network="%s"} %s' % (6,shared_network['location'],shared_network['free']))
        defined_leases = float(shared_network['defined'])
        leases_used_percentage = 0
        if defined_leases > 0:
            leases_used_percentage = float(shared_network['used'])/defined_leases
        data.append('dhcp_pool_usage{ip_version="%s",network="%s"} %s' % (6,shared_network['location'],leases_used_percentage))
    response.content_type = 'text/plain'
    return '%s\n' % ('\n'.join(data))
项目:cbas    作者:ImmobilienScout24    | 项目源码 | 文件源码
def auth_server():
    username = request.forms.get('username')
    if username == 'user_ok':
        return {'access_token': 'my-nifty-access-token'}
    elif username == 'auth_fail':
        response.status = 400
        return {'error': 'errored with HTTP 400 on request'}
    elif username == 'create_fail':
        return {'access_token': 'the-token-with-which-create-will-fail'}
    elif username == 'delete_fail':
        return {'access_token': 'the-token-with-which-delete-will-fail'}
    elif username == 'empty_page':
        return {'access_token': 'the-token-which-causes-an-empty-page'}
    else:
        return {}
项目:cbas    作者:ImmobilienScout24    | 项目源码 | 文件源码
def create():
    auth_token = request.headers.get('Authorization').split()[1]
    if auth_token == 'my-nifty-access-token':
        return {'response': 'Successful creation of user.'}
    elif auth_token == 'the-token-with-which-create-will-fail':
        response.status = 403
        return {'error': 'Permission denied'}
    elif auth_token == 'the-token-which-causes-an-empty-page':
        response.status = 403
        return "empty"
项目:cbas    作者:ImmobilienScout24    | 项目源码 | 文件源码
def delete():
    auth_token = request.headers.get('Authorization').split()[1]
    if auth_token == 'my-nifty-access-token':
        return {'response': 'Successful deletion of user.'}
    elif auth_token == 'the-token-with-which-delete-will-fail':
        response.status = 403
        return {'error': 'Permission denied'}
项目:maltego_tds_transforms    作者:passivetotal    | 项目源码 | 文件源码
def load_client(context):
    """Get an instance of a loaded client."""
    username = context.getTransformSetting('username')
    api_key = context.getTransformSetting('aKey')
    test_status = context.getTransformSetting('test_local')
    if test_status and test_status == 'True':
        server = context.getTransformSetting('server')
        version = context.getTransformSetting('version')
        return AttributeRequest(username, api_key, server, version)
    else:
        return AttributeRequest(username, api_key, headers=gen_debug(request))
项目:maltego_tds_transforms    作者:passivetotal    | 项目源码 | 文件源码
def load_client(context):
    """Get an instance of a loaded client."""
    username = context.getTransformSetting('username')
    api_key = context.getTransformSetting('aKey')
    test_status = context.getTransformSetting('test_local')
    if test_status and test_status == 'True':
        server = context.getTransformSetting('server')
        version = context.getTransformSetting('version')
        return DnsRequest(username, api_key, server, version)
    else:
        return DnsRequest(username, api_key, headers=gen_debug(request))
项目:maltego_tds_transforms    作者:passivetotal    | 项目源码 | 文件源码
def load_enrichment(context):
    """Get an instance of a loaded client."""
    username = context.getTransformSetting('username')
    api_key = context.getTransformSetting('aKey')
    test_status = context.getTransformSetting('test_local')
    if test_status and test_status == 'True':
        server = context.getTransformSetting('server')
        version = context.getTransformSetting('version')
        return EnrichmentRequest(username, api_key, server, version, debug=True)
    else:
        return EnrichmentRequest(username, api_key, headers=gen_debug(request))
项目:maltego_tds_transforms    作者:passivetotal    | 项目源码 | 文件源码
def load_client(context):
    """Get an instance of a loaded client."""
    username = context.getTransformSetting('username')
    api_key = context.getTransformSetting('aKey')
    test_status = context.getTransformSetting('test_local')
    if test_status and test_status == 'True':
        server = context.getTransformSetting('server')
        version = context.getTransformSetting('version')
        return SslRequest(username, api_key, server, version)
    else:
        return SslRequest(username, api_key, headers=gen_debug(request))
项目:maltego_tds_transforms    作者:passivetotal    | 项目源码 | 文件源码
def load_client(context):
    """Get an instance of a loaded client."""
    username = context.getTransformSetting('username')
    api_key = context.getTransformSetting('aKey')
    test_status = context.getTransformSetting('test_local')
    if test_status and test_status == 'True':
        server = context.getTransformSetting('server')
        version = context.getTransformSetting('version')
        return ActionsClient(username, api_key, server, version)
    else:
        return ActionsClient(username, api_key, headers=gen_debug(request))
项目:maltego_tds_transforms    作者:passivetotal    | 项目源码 | 文件源码
def load_client(context):
    """Get an instance of a loaded client."""
    username = context.getTransformSetting('username')
    api_key = context.getTransformSetting('aKey')
    test_status = context.getTransformSetting('test_local')
    if test_status and test_status == 'True':
        server = context.getTransformSetting('server')
        version = context.getTransformSetting('version')
        return EnrichmentRequest(username, api_key, server, version)
    else:
        return EnrichmentRequest(username, api_key, headers=gen_debug(request))
项目:first_timer_scraper    作者:niccokunzmann    | 项目源码 | 文件源码
def add_organization_html(ending):
    """Submit an organization for scraping.
    This shows an html page with a link to the status of the organization.
    """
    organization = request.forms.get('organization')
    scraper.scrape_organization(organization)
    if ending == "json":
        return {"status": "ok", "urls": api.get_organization_urls(organization)}
    return template("added-organization.html", organization=organization)
项目:first_timer_scraper    作者:niccokunzmann    | 项目源码 | 文件源码
def get_all_organizations(ending):
    """List all organizations with links to their statuses."""
    start = int(request.query.get("offset", 0))
    count = min(int(request.query.get("limit", DEFAULT_PAGINATION_COUNT)), MAX_PAGINATION_COUNT)
    if ending == "json":
        return api.get_organizations(start, count)
    return template("organizations.html", start=start, count=count)
项目:first_timer_scraper    作者:niccokunzmann    | 项目源码 | 文件源码
def add_repository(ending):
    """Sumbit a repository for scraping
    This shows an html page with a link to the status of the repository.
    """
    repository = request.forms.get('repository')
    organization_name, repository_name = repository.split("/")
    scraper.scrape_repository(repository)
    if ending == "json":
        return {"status": "ok", "urls": api.get_repository_urls(organization_name, repository_name)}
    return template("added-repository.html", repository=repository)
项目:first_timer_scraper    作者:niccokunzmann    | 项目源码 | 文件源码
def add_authentication():
    """Add `username` and `password` to those usable to scrape github.

    They will be tried and removed if invalid.
    """
    # http://bottlepy.org/docs/dev/tutorial.html#http-request-methods
    username = request.forms.get('username')
    password = request.forms.get('password')
    if check_login((username, password)):
        credentials.add((username, password))
        return static("add-github-authentication-success.html")
    return static("add-github-authentication-failure.html")
项目:boddle    作者:keredson    | 项目源码 | 文件源码
def __enter__(self):
    self.orig = bottle.request.environ
    bottle.request.environ = self.environ
    for k,v in self.extras.items():
      if hasattr(bottle.request, k):
        self.extra_orig[k] = getattr(bottle.request, k)
      setattr(bottle.request, k, v)
    setattr(bottle.BaseRequest, 'app', True)
项目:boddle    作者:keredson    | 项目源码 | 文件源码
def __exit__(self,a,b,c):
    bottle.request.environ = self.orig
    for k,v in self.extras.items():
      if k in self.extra_orig:
        setattr(bottle.request, k, self.extra_orig[k])
      else:
        try:
          delattr(bottle.request, k)
        except AttributeError:
          pass
    setattr(bottle.BaseRequest, 'app', self.orig_app_reader)
项目:boddle    作者:keredson    | 项目源码 | 文件源码
def testParams(self):
    with boddle(params={'name':'derek'}):
      self.assertEqual(bottle.request.params['name'], 'derek')
项目:boddle    作者:keredson    | 项目源码 | 文件源码
def test_no_params_no_throw(self):
    with boddle():
      self.assertEqual(list(bottle.request.params.items()), [])
项目:boddle    作者:keredson    | 项目源码 | 文件源码
def testGetParams(self):
    with boddle(method='get', params={'name':'derek'}):
      self.assertEqual(bottle.request.params['name'], 'derek')
项目:boddle    作者:keredson    | 项目源码 | 文件源码
def testPath(self):
    with boddle(path='/derek'):
      self.assertEqual(bottle.request.path, '/derek')
      with boddle(path='/anderson'):
        self.assertEqual(bottle.request.path, '/anderson')
      self.assertEqual(bottle.request.path, '/derek')
项目:boddle    作者:keredson    | 项目源码 | 文件源码
def testMethod(self):
    with boddle(method='post'):
      self.assertEqual(bottle.request.method, 'POST')
项目:boddle    作者:keredson    | 项目源码 | 文件源码
def testHeaders(self):
    with boddle(headers={'x_header':'value'}):
      self.assertEqual(bottle.request.headers['X_HEADER'], 'value')
项目:boddle    作者:keredson    | 项目源码 | 文件源码
def testHyphenatedHeaders(self):
    with boddle(headers={'x-header':'value'}):
      self.assertEqual(bottle.request.headers['X-HEADER'], 'value')
项目:boddle    作者:keredson    | 项目源码 | 文件源码
def testExtraStuff(self):
    with boddle(extra='woot'):
      self.assertEqual(bottle.request.extra, 'woot')
      with boddle(extra='woot2'):
        self.assertEqual(bottle.request.extra, 'woot2')
    self.assertFalse(hasattr(bottle.request,'extra'))
项目:boddle    作者:keredson    | 项目源码 | 文件源码
def testURL(self):
    with boddle(url='https://github.com/keredson/boddle'):
      self.assertEqual(bottle.request.url, 'https://github.com/keredson/boddle')
      self.assertEqual(bottle.request.fullpath, '/keredson/boddle')
      self.assertEqual(bottle.request.path, '/keredson/boddle')
项目:boddle    作者:keredson    | 项目源码 | 文件源码
def testRevert(self):
    with boddle(params={'name':'derek'}):
      self.assertEqual(bottle.request.params['name'], 'derek')
      with boddle(params={'name':'anderson'}):
        self.assertEqual(bottle.request.params['name'], 'anderson')
      self.assertEqual(bottle.request.params['name'], 'derek')
项目:boddle    作者:keredson    | 项目源码 | 文件源码
def testBody(self):
    with boddle(body='body'):
      self.assertEqual(bottle.request.body.read(), b'body')
      self.assertEqual(bottle.request.body.readline(), b'body')
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def set_session(request, info):
    s = request.environ.get('beaker.session')
    s["authenticated"] = True
    s["user_id"] = info["id"]
    s["name"] = info["name"]
    s["role"] = info["role"]
    s["perms"] = info["permission"]
    s["template"] = info["template"]
    s.save()

    return s
项目:brother_ql_web    作者:pklaus    | 项目源码 | 文件源码
def get_preview_image():
    context = get_label_context(request)
    im = create_label_im(**context)
    return_format = request.query.get('return_format', 'png')
    if return_format == 'base64':
        import base64
        response.set_header('Content-type', 'text/plain')
        return base64.b64encode(image_to_png_bytes(im))
    else:
        response.set_header('Content-type', 'image/png')
        return image_to_png_bytes(im)
项目:green-button    作者:bernard357    | 项目源码 | 文件源码
def web_index(token=None):
    """
    provides an index of buttons

    This function is called from far far away, over the Internet
    """

    logging.info('Serving index page')

    try:
        if 'key' not in settings['server']:
            pass

        elif decode_token(settings, token) != 'index':
            raise ValueError('Invalid label in token')

    except Exception as feedback:
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            logging.error("Unable to serve the index page")
            raise
        else:
            logging.error(str(feedback))
            response.status = 400
            return 'Invalid request'

    items = []
    global buttons
    for button in buttons:
        items.append({
            'label': button,
            'delete-url': '/delete/'+settings['tokens'].get(button+'-delete'),
            'initialise-url': '/initialise/'+settings['tokens'].get(button+'-initialise'),
            'push-url': '/'+settings['tokens'].get(button),
            })
    logging.debug('Buttons: {}'.format(items))

    return template('views/list_items', prefix=settings['server']['url'], items=items)

#
# invoked from bt.tn
#
项目:green-button    作者:bernard357    | 项目源码 | 文件源码
def web_press(button=None):
    """
    Processes the press of a bt.tn device

    This function is called from far far away, over the Internet
    """

    if button is None:
        button = settings['server']['default']

    try:

        button = decode_token(settings, button)

        context = load_button(settings, button)

        return handle_button(context)

    except socket.error as feedback:
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            logging.error("Unable to push '{}'".format(button))
            raise
        else:
            logging.error(str(feedback))
            response.status = 500
            return 'Internal error'

    except Exception as feedback:
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            logging.error("Unable to push '{}'".format(button))
            raise
        else:
            logging.error(str(feedback))
            response.status = 400
            return 'Invalid request'
项目:green-button    作者:bernard357    | 项目源码 | 文件源码
def web_initialise(button=None):
    """
    Initialises a room

    This function is called from far far away, over the Internet
    """

    if button is None:
        button = settings['server']['default']

    logging.info("Initialising button '{}'".format(button))

    try:
        button = decode_token(settings, button, action='initialise')

        context = load_button(settings, button)
        delete_room(context)

        global buttons
        buttons.pop(button, None)

        context = load_button(settings, button)
        context['spark']['id'] = get_room(context)

        return 'OK'

    except Exception as feedback:
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            logging.error("Unable to initialise '{}'".format(button))
            raise
        else:
            logging.error(str(feedback))
            response.status = 400
            return 'Invalid request'
项目:green-button    作者:bernard357    | 项目源码 | 文件源码
def web_delete(button=None):
    """
    Deletes a room

    This function is called from far far away, over the Internet
    """

    if button is None:
        button = settings['server']['default']

    logging.info("Deleting button '{}'".format(button))

    try:
        button = decode_token(settings, button, action='delete')

        context = load_button(settings, button)
        delete_room(context)

        global buttons
        buttons.pop(button, None)

        return 'OK'

    except Exception as feedback:
        if logging.getLogger().getEffectiveLevel() == logging.DEBUG:
            logging.error("Unable to delete '{}'".format(button))
            raise
        else:
            logging.error(str(feedback))
            response.status = 400
            return 'Invalid request'
项目:ocspresponder    作者:threema-ch    | 项目源码 | 文件源码
def _handle_get(self, request_data):
        """
        An OCSP GET request contains the DER-in-base64 encoded OCSP request in the
        HTTP request URL.
        """
        der = base64.b64decode(request_data)
        ocsp_request = self._parse_ocsp_request(der)
        return self._build_http_response(ocsp_request)
项目:ocspresponder    作者:threema-ch    | 项目源码 | 文件源码
def _handle_post(self):
        """
        An OCSP POST request contains the DER encoded OCSP request in the HTTP
        request body.
        """
        der = request.body.read()
        ocsp_request = self._parse_ocsp_request(der)
        return self._build_http_response(ocsp_request)
项目:ocspresponder    作者:threema-ch    | 项目源码 | 文件源码
def _parse_ocsp_request(self, request_der: bytes) -> OCSPRequest:
        """
        Parse the request bytes, return an ``OCSPRequest`` instance.
        """
        return OCSPRequest.load(request_der)
项目:switchboard    作者:josefschneider    | 项目源码 | 文件源码
def _device_set(self):
        response.headers['Content-Type'] = 'application/json'
        retval = { }

        def verify_and_set(client):
            try:
                data = json.loads(request.body.read().decode('ascii'))
            except:
                raise ValueError('Unable to decode json data from PUT request')

            if data is None:
                raise ValueError('No data set in body of PUT request')

            if not 'name' in data:
                raise KeyError('No "name" field in body of PUT request')

            if not 'value' in data:
                raise KeyError('No "value" field in body of PUT request')

            client.set_device_value(data['name'], data['value'])

        if self._debug:
            verify_and_set(self)
        else:
            try:
                verify_and_set(self)
            except Exception as e:
                retval['error'] = str(e)

        return json.dumps(retval)
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def tasks_create_file():
    response = {}

    data = request.files.file
    package = request.forms.get("package", "")
    timeout = request.forms.get("timeout", "")
    priority = request.forms.get("priority", 1)
    options = request.forms.get("options", "")
    machine = request.forms.get("machine", "")
    platform = request.forms.get("platform", "")
    tags = request.forms.get("tags", None)
    custom = request.forms.get("custom", "")
    memory = request.forms.get("memory", 'False')
    clock = request.forms.get("clock", None)
    shrike_url = request.forms.get("shrike_url", None)
    shrike_msg = request.forms.get("shrike_msg", None)
    shrike_sid = request.forms.get("shrike_sid", None)
    shrike_refer = request.forms.get("shrike_refer", None)

    if memory.upper() == 'FALSE' or memory == '0':
        memory = False
    else:
        memory = True

    enforce_timeout = request.forms.get("enforce_timeout", 'False')
    if enforce_timeout.upper() == 'FALSE' or enforce_timeout == '0':
        enforce_timeout = False
    else:
        enforce_timeout = True

    temp_file_path = store_temp_file(data.file.read(), data.filename)
    try:
        task_ids = db.demux_sample_and_add_to_db(file_path=temp_file_path, package=package, timeout=timeout, options=options, priority=priority,
                machine=machine, platform=platform, custom=custom, memory=memory, enforce_timeout=enforce_timeout, tags=tags, clock=clock,
                shrike_url=shrike_url, shrike_msg=shrike_msg, shrike_sid=shrike_sid, shrike_refer=shrike_refer)
    except CuckooDemuxError as e:
        return HTTPError(500, e)

    response["task_ids"] = task_ids
    return jsonize(response)
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def tasks_list(limit=None, offset=None):
    response = {}

    response["tasks"] = []

    completed_after = request.GET.get("completed_after")
    if completed_after:
        completed_after = datetime.fromtimestamp(int(completed_after))

    status = request.GET.get("status")

    # optimisation required for dist speedup
    ids = request.GET.get("ids")

    for row in db.list_tasks(limit=limit, details=True, offset=offset,
                             completed_after=completed_after,
                             status=status, order_by=Task.completed_on.asc()):
        task = row.to_dict()
        if ids:
            task = {"id":task["id"], "completed_on":task["completed_on"]}

        else:
            task["guest"] = {}
            if row.guest:
                task["guest"] = row.guest.to_dict()

            task["errors"] = []
            for error in row.errors:
                task["errors"].append(error.message)

            task["sample"] = {}
            if row.sample_id:
                sample = db.view_sample(row.sample_id)
                task["sample"] = sample.to_dict()

        response["tasks"].append(task)

    return jsonize(response)
项目:cnschema    作者:cnschema    | 项目源码 | 文件源码
def enable_cors(fn):
    def _enable_cors(*args, **kwargs):
        # set CORS headers
        response.headers['Access-Control-Allow-Origin'] = '*'
        response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS'
        response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'

        if bottle.request.method != 'OPTIONS':
            # actual request; reply with the actual response
            return fn(*args, **kwargs)

    return _enable_cors
项目:cnschema    作者:cnschema    | 项目源码 | 文件源码
def autocomplete():
    #logging.info(request)
    try:
        q = request.query.get("q")
        return cns.es_autocomplete(q)
    except:
        traceback.print_exc()
        logging.info("error")
        return ""
项目:cnschema    作者:cnschema    | 项目源码 | 文件源码
def search():
    try:
        q = request.query.get("q")
        offset = request.query.get("offset", 0)
        return cns.es_search(q, offset)
    except:
        traceback.print_exc()
        logging.info("error")
        return ""
项目:memex-dossier-open    作者:dossier    | 项目源码 | 文件源码
def enable_cors(self):
        '''Enables Cross Origin Resource Sharing.

        This makes sure the necessary headers are set so that this
        web application's routes can be accessed from other origins.

        :rtype: :class:`WebBuilder`
        '''
        def access_control_headers():
            bottle.response.headers['Access-Control-Allow-Origin'] = '*'
            bottle.response.headers['Access-Control-Allow-Methods'] = \
                'GET, POST, PUT, DELETE, OPTIONS'
            bottle.response.headers['Access-Control-Allow-Headers'] = \
                'Origin, X-Requested-With, Content-Type, Accept, Authorization'

        def options_response(res):
            if bottle.request.method == 'OPTIONS':
                new_res = bottle.HTTPResponse()
                new_res.headers['Access-Control-Allow-Origin'] = '*'
                new_res.headers['Access-Control-Allow-Methods'] = \
                    bottle.request.headers.get(
                        'Access-Control-Request-Method', '')
                new_res.headers['Access-Control-Allow-Headers'] = \
                    bottle.request.headers.get(
                        'Access-Control-Request-Headers', '')
                return new_res
            res.headers['Allow'] += ', OPTIONS'
            return bottle.request.app.default_error_handler(res)

        self.app.add_hook('after_request', access_control_headers)
        self.app.error_handler[int(405)] = options_response
        return self
项目:census    作者:ioddly    | 项目源码 | 文件源码
def invalidate():
    "Invalidates all results; sends request for updates. Recommended once per year."
    pass
项目:verify_code_server    作者:duanyifei    | 项目源码 | 文件源码
def code_verify_out():
    try:
        code = code_verify(request)
    except Exception as e:
        print traceback.format_exc()
        code = 'ERROR'
    return code
项目:c-bastion    作者:ImmobilienScout24    | 项目源码 | 文件源码
def create_user_with_key():
    """
    Create a user directory with a keyfile on the shared volume, data
    arriving in the payload of the request with a JSON payload.
    """
    username = username_from_request(request)
    if not username:
        response.status = 422
        return {'error': "Parameter 'username' not specified"}
    elif not username_valid(username):
        response.status = 400
        return {'error':
                "Invalid parameter 'username': '{0}' not allowed.".
                format(username)
                }

    pubkey = request.json.get('pubkey')
    if not pubkey:
        response.status = 422
        return {'error': "Parameter 'pubkey' not specified"}

    abs_home_path = normpath(os.path.join(HOME_PATH_PREFIX, username))

    username_was_added = check_and_add(username)

    # Do the actual creation
    store_pubkey(username, abs_home_path, pubkey)

    response.status = 201
    return {'response':
            'Successful creation of user {0} and/or upload of key.'
            .format(username)}
项目:anxiety    作者:hectron    | 项目源码 | 文件源码
def insert_countdown():
    """Inserts a countdown into the database."""
    return json_api.insert_countdown(request)


# Run this only if we are not in production