Python bottle.request 模块,query() 实例源码

我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用bottle.request.query()

项目:pddb    作者:omtinez    | 项目源码 | 文件源码
def _request(request, request_fallback=None):
        ''' Extract request fields wherever they may come from: GET, POST, forms, fallback '''
        # Use lambdas to avoid evaluating bottle.request.* which may throw an Error
        all_dicts = [
            lambda: request.json,
            lambda: request.forms,
            lambda: request.query,
            lambda: request.files,
            #lambda: request.POST,
            lambda: request_fallback
        ]
        request_dict = dict()
        for req_dict_ in all_dicts:
            try:
                req_dict = req_dict_()
            except KeyError:
                continue
            if req_dict is not None and hasattr(req_dict, 'items'):
                for req_key, req_val in req_dict.items():
                    request_dict[req_key] = req_val
        return request_dict
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def dtos_get_films_with_actors():
    try:
        queryObject = QueryObject(
            filter = "ReleaseYear='{0}'".format(request.query['releaseYear']),
            expand = ['FilmActors.Actor', 'FilmCategories']
        )
        resultSerialData = dataService.dataViewDto.getItems("Film", queryObject)

        return json.dumps(resultSerialData, cls=CustomEncoder, indent=2)

    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')


# POST: POST: api/datasource/crud/operations/dtos/TestAction
# with: Content-Type: application/json and body - {"param1":1}
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def entities_get_films_with_actors():
    try:
        queryObject = QueryObject(
            filter = "ReleaseYear='{0}'".format(request.query['releaseYear']),
            expand = ['FilmActors.Actor', 'FilmCategories']
        )
        resultSerialData = dataService.from_.remote.dtoView.Films.getItems(queryObject)

        return json.dumps(resultSerialData, cls=CustomEncoder, indent=2)

    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')


# POST: api/datasource/crud/operations/entities/TestAction
# with: Content-Type: application/json and body - {"param1":1}
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def _handle_cert(self, domain):
        rawcert = request.environ['ssl_certificate']

        certconfig = self._get_cert_config_if_allowed(domain, rawcert)

        logger.debug('Fetching certificate for domain %s', domain)

        (key, crt, chain) = self.acmeproxy.get_cert(
            domain=domain,
            altname=certconfig.altname,
            rekey=certconfig.rekey,
            renew_margin=certconfig.renew_margin,
            force_renew=('force_renew' in request.query and request.query['force_renew'] == 'true'),  # pylint: disable=unsupported-membership-test,unsubscriptable-object
            auto_renew=certconfig.renew_on_fetch
        )

        return {
            'crt': crt.decode(),
            'key': key.decode(),
            'chain': chain.decode()
        }
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def assertRedirect(self, target, result, query=None, status=303, **args):
        env = {'SERVER_PROTOCOL':'HTTP/1.1'}
        for key in list(args):
            if key.startswith('wsgi'):
                args[key.replace('_', '.', 1)] = args[key]
                del args[key]
        env.update(args)
        request.bind(env)
        bottle.response.bind()
        try:
            bottle.redirect(target, **(query or {}))
        except bottle.HTTPResponse:
            r = _e()
            self.assertEqual(status, r.status_code)
            self.assertTrue(r.headers)
            self.assertEqual(result, r.headers['Location'])
项目:ATLeS    作者:liffiton    | 项目源码 | 文件源码
def _filter_row(row, filters):
    def filterfunc(row, filt):
        val = filters[filt]['val']
        if filt == "tracks":
            tracks_set = set(val.split('|'))
            return row['trackrel'] in tracks_set
        elif filt.endswith(" (min)"):
            filt = filt.replace(" (min)", "")
            val = float(val)
            return row[filt] >= val
        elif filt.endswith(" (max)"):
            filt = filt.replace(" (max)", "")
            val = float(val)
            return row[filt] <= val
        elif isinstance(row[filt], bool):
            # need to convert our string query values to bool for the comparison
            return row[filt] == bool(val)
        else:
            return row[filt] == val

    return all(filterfunc(row, filt) for filt in filters)
项目:ATLeS    作者:liffiton    | 项目源码 | 文件源码
def download():
    trackrels = request.query.tracks.split('|')

    # write the archive into a temporary in-memory file-like object
    temp = tempfile.SpooledTemporaryFile()
    with zipfile.ZipFile(temp, 'w', zipfile.ZIP_DEFLATED) as archive:
        for trackrel in trackrels:
            base_wildcard = trackrel.replace("-track.csv", "*")
            paths = config.TRACKDIR.glob(base_wildcard)
            for path in paths:
                archive.write(str(path),
                              str(path.relative_to(config.TRACKDIR))
                              )

    temp.seek(0)

    # force a download; give it a filename and mime type
    response.set_header('Content-Disposition', 'attachment; filename="data.zip"')
    response.set_header('Content-Type', 'application/zip')

    # relying on garbage collector to delete tempfile object
    # (and hence the file itself) when done sending
    return temp
项目:Orator-Google-App-Engine    作者:MakarenaLabs    | 项目源码 | 文件源码
def create(controller="", method=""):
    return exec_command(controller=controller, method=method, input=request.query)
项目:pddb    作者:omtinez    | 项目源码 | 文件源码
def _extract_params(request_dict, param_list, param_fallback=False):
        ''' Extract pddb parameters from request '''

        if not param_list or not request_dict:
            return dict()

        query = dict()
        for param in param_list:
            # Retrieve all items in the form of {param: value} and
            # convert {param__key: value} into {param: {key: value}}
            for query_key, query_value in request_dict.items():
                if param == query_key:
                    query[param] = query_value
                else:
                    query_key_parts = query_key.split('__', 1)
                    if param == query_key_parts[0]:
                        query[param] = {query_key_parts[1]: query_value}

        # Convert special string "__null__" into Python None
        nullifier = lambda d: {k:(nullifier(v) if isinstance(v, dict) else # pylint: disable=used-before-assignment
                                  (None if v == '__null__' else v)) for k, v in d.items()}

        # When fallback is enabled and no parameter matched, assume query refers to first parameter
        if param_fallback and all([param_key not in query.keys() for param_key in param_list]):
            query = {param_list[0]: dict(request_dict)}

        # Return a dictionary with only the requested parameters
        return {k:v for k, v in nullifier(query).items() if k in param_list}
项目:movies-python-py2neo-3.0    作者:neo4j-examples    | 项目源码 | 文件源码
def get_search():
    try:
        q = request.query["q"]
    except KeyError:
        return []
    else:
        results = graph.run(
            "MATCH (movie:Movie) "
            "WHERE movie.title =~ {title} "
            "RETURN movie", {"title": "(?i).*" + q + ".*"})
        response.content_type = "application/json"
        return json.dumps([{"movie": dict(row["movie"])} for row in results])
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def get_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleGet(entitySetName, request.query, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder, indent=2)
    except:
        abort(400, 'Bad Request')


# GET: api/datasource/crud/single/:entitySetName?keys=key1:{key1}
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def get_single_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleGetSingle(entitySetName, request.query, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder, indent=2)
    except:
        abort(400, 'Bad Request')


# GET: api/datasource/crud/many/:entitySetName?keys=key1:1,2,3,4;key2:4,5,6,7
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def get_many_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleGetMany(entitySetName, request.query, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder, indent=2)
    except:
        abort(400, 'Bad Request')


# PUT: api/datasource/crud/:entitySetName?keys=key1:{key1}
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def put_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleUpdateEntity(entitySetName, request.query, request.json, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder)
    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')


# PATCH: api/datasource/crud/:entitySetName?keys=key1:{key1}
#@patch('/api/datasource/crud/<entitySetName>')
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def delete_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleDeleteEntity(entitySetName, request.query, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder)
    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')


# PUT: api/datasource/crud/batch/:entitySetName
项目:python-server-typescript-client    作者:marianc    | 项目源码 | 文件源码
def delete_batch_entityset(entitySetName):
    try:
        result = dataProviderDto.apiProvider.handleDeleteEntityBatch(entitySetName, request.query, dataService)
        response.content_type = "application/json; charset=utf-8"
        return json.dumps(result, cls=CustomEncoder)
    except dalUtils.StatusCodeError as err:
        response.status = err.value
    except:
        abort(400, 'Bad Request')
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def _handle_renew_all(self):
        rawcert = request.environ['ssl_certificate']

        self._assert_admin(rawcert)

        result = {
            'ok': [],
            'error': []
        }

        for cert in self.acmeproxy.list_certificates():
            domain = cert['cn']
            certconfig = self.certificates_config.match(domain)
            if certconfig:
                logger.debug('Getting certificate for domain %s', domain)
                try:
                    self.acmeproxy.get_cert(
                        domain=domain,
                        altname=certconfig.altname,
                        rekey=certconfig.rekey,
                        renew_margin=certconfig.renew_margin,
                        force_renew=('force_renew' in request.query and request.query['force_renew'] == 'true'),  # pylint: disable=unsupported-membership-test,unsubscriptable-object
                    )
                    result['ok'].append(domain)
                except Exception as e:
                    logger.error('Encountered exception while getting certificate for domain %s (%s)', domain, e)
                    result['error'].append(domain)
            else:
                logger.error('No configuration found for domain %s', domain)
                result['error'].append(domain)

        return result
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def info(account_id, resource_id):
    request_data = request.query
    if resource_id.startswith('sg-') and 'parent_id' not in request_data:
        abort(400, "Missing required parameter parent_id")
    result = controller.info(
        account_id, resource_id, request_data.get('parent_id', resource_id))
    response.content_type = "application/json"
    return json.dumps(result, indent=2, cls=Encoder)


# this set to post to restrict permissions, perhaps another url space.
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_get(self):
        """ Environ: GET data """
        qs = tonat(tob('a=a&a=1&b=b&c=c&cn=%e7%93%b6'), 'latin1')
        request = BaseRequest({'QUERY_STRING':qs})
        self.assertTrue('a' in request.query)
        self.assertTrue('b' in request.query)
        self.assertEqual(['a','1'], request.query.getall('a'))
        self.assertEqual(['b'], request.query.getall('b'))
        self.assertEqual('1', request.query['a'])
        self.assertEqual('b', request.query['b'])
        self.assertEqual(tonat(tob('?'), 'latin1'), request.query['cn'])
        self.assertEqual(touni('?'), request.query.cn)
项目:ATLeS    作者:liffiton    | 项目源码 | 文件源码
def tracks(db):
    selected_filters = {}
    for filt in request.query.keys():
        val = request.query.get(filt)
        if val == '':
            # unspecified min/max numeric values, e.g.
            continue

        # figure out query string without this selection (for backing out in web interface)
        querystring_without = '&'.join("{}={}".format(key, val) for key, val in request.query.items() if key != filt)
        # store for showing in page
        selected_filters[filt] = {
            'val': val,
            'querystring_without': querystring_without
        }

    track_data = _get_track_data(db, selected_filters)

    # possible filter params/values for selected rows
    possible_filters = _get_filters(track_data, selected_filters) if track_data else []

    return template('tracks',
                    tracks=track_data,
                    filters=possible_filters,
                    selected=selected_filters,
                    query_string=request.query_string,
                    query=request.query
                    )
项目:openrefine-wikidata    作者:wetneb    | 项目源码 | 文件源码
def jsonp(view):
    def wrapped(*posargs, **kwargs):
        args = {}
        # if we access the args via get(),
        # we can get encoding errors...
        for k in request.forms:
            args[k] = getattr(request.forms, k)
        for k in request.query:
            args[k] = getattr(request.query, k)
        callback = args.get('callback')
        status_code = 200
        try:
            result = view(args, *posargs, **kwargs)
        except (KeyError) as e:#ValueError, AttributeError, KeyError) as e:
            import traceback, sys
            traceback.print_exc(file=sys.stdout)
            result = {'status':'error',
                    'message':'invalid query',
                    'details': str(e)}
            status_code = 403
        if callback:
            result = '%s(%s);' % (callback, json.dumps(result))

        if status_code == 200:
            return result
        else:
            abort(status_code, result)

    return wrapped
项目:geo-recommender    作者:harkous    | 项目源码 | 文件源码
def run_profiling(self, num_loops, num_neighbors, age_proximity):
        """Executes the k_nearest_neighbors algorithm for num_loops times and returns the average running time

        Args:
            num_loops: number of loops for which we query the server
            num_neighbors: number of neighbors to query for
            age_proximity: maximum difference between a candidate neighbor's age and the user


        Returns:

        """
        print('profiling over ', num_loops, ' times')
        random_latitudes = random.uniform(-90, 90, num_loops)
        random_longitudes = random.uniform(-180, 180, num_loops)
        time_list = []

        for i in tqdm(range(len(random_latitudes))):
            start_time = time.clock()
            kd_store.k_nearest_neighbors({'name': 'bla bla', 'age': 23, 'latitude': random_latitudes[i] / 2,
                                          'longitude': random_longitudes[i]}, num_neighbors, age_proximity)
            end_time = time.clock()
            time_list.append(end_time - start_time)

        # get the timing statistics
        stats_desc = stats.describe(time_list)
        frac_times_exceeded = len(np.where(np.array(time_list) >= 1)[0]) / len(time_list)
        print('\nfraction of times with delay > 1 is: ', frac_times_exceeded, '\n')
        print('\nStats:\n', stats_desc)
        return stats_desc
项目:census    作者:ioddly    | 项目源码 | 文件源码
def login_xenforo(user=None):
    if user: return redirect('/')

    code = request.query['code']

    data =  {'grant_type': 'authorization_code',
            'code': code,
            'client_id': config['XF_CLIENT_ID'],
            'client_secret': config['XF_SECRET_KEY'],
            'redirect_uri': '%s/login/xenforo' % config['SITE_URL']}

    res = requests.post('%s/api/index.php?oauth/token' % config['XF_URL'], data = data) 
    res_data = res.json()
    res.raise_for_status()

    pprint(res_data)
    user = {
        'identity': res_data['user_id'],
        'identity_type': 'xenforo',
        'access_token': res_data['access_token'],
        'refresh_token': res_data['refresh_token'],
        'user_id': res_data['user_id'],
        'authlink': str(uuid.uuid4()), # this is still used to unsubscribe from emails
        'subscribed': True,
    }

    # get extended information
    res = requests.get('%s/api/?users/%s&oauth_token=%s' % (config['XF_URL'], res_data['user_id'], user['access_token']))
    xf_user = res.json()['user']

    user['xf_username'] = xf_user['username']
    user['email'] = xf_user['user_email']

    r.table('users').get(user['identity']).replace(user).run(conn())

    return users.login(user)
项目:nokia-deployer    作者:nokia    | 项目源码 | 文件源码
def test_list_environments(self):
        parsed = json.loads(api.environments_list(self.session))
        for env in parsed['environments']:
            self.assertTrue(env['deploy_authorized'])
        request.account = self.session.query(m.User).filter_by(username="username").one()
        parsed = json.loads(api.environments_list(self.session))
        for env in parsed['environments']:
            if env['id'] == 1:
                self.assertTrue(env['deploy_authorized'])
            else:
                self.assertFalse(env['deploy_authorized'])
项目:nokia-deployer    作者:nokia    | 项目源码 | 文件源码
def test_start_deployment_non_admin(self, mocked):
        request.account = self.session.query(m.User).filter(m.User.username == 'username').one()
        self._set_body_json({
            'target': {
                'cluster': None,
                'server': None
            },
            'branch': 'master',
            'commit': 'abcde'
        })
        api.environments_start_deployment(1, self.session)
项目:nokia-deployer    作者:nokia    | 项目源码 | 文件源码
def test_start_deployment_with_impersonating(self, mocked):
        request.account = self.session.query(m.User).filter(m.User.username == 'impersonator').one()
        request.environ['HTTP_X_IMPERSONATE_USERNAME'] = 'username'
        self._set_body_json({
            'target': {
                'cluster': None,
                'server': None
            },
            'branch': 'master',
            'commit': 'abcde'
        })
        api.environments_start_deployment(1, self.session)
项目:nokia-deployer    作者:nokia    | 项目源码 | 文件源码
def test_start_deployment_with_impersonating_unprivilegied_user(self, mocked):
        request.account = self.session.query(m.User).filter(m.User.username == 'impersonator').one()
        request.environ['HTTP_X_IMPERSONATE_USERNAME'] = 'username'
        self._set_body_json({
            'target': {
                'cluster': None,
                'server': None
            },
            'branch': 'master',
            'commit': 'abcde'
        })
        with self.assertRaises(HTTPError) as cm:
            api.environments_start_deployment(2, self.session)
            self.assertEquals(403, cm.exception.code)
项目:nokia-deployer    作者:nokia    | 项目源码 | 文件源码
def test_expunge_default_user(self):
        user = self.session.query(m.User).filter_by(username="default").one()
        api._expunge_user(user, self.session)
        self.session.close()
        # Make sure we can access attributes after the session is closed
        self.assertEqual(1, len(user.roles))
        self.assertEqual(0, len(user.default_roles))
项目:nokia-deployer    作者:nokia    | 项目源码 | 文件源码
def test_diff_requires_login(self):
        # The default user should not be able to read diff
        user = self.session.query(m.User).filter_by(username="default").one()
        request.account = user
        request.query['from'] = 'ab'
        request.query['to'] = 'de'
        with self.assertRaises(HTTPError) as e:
            api.repositories_diff(1, self.session)
        self.assertEqual(403, e.exception.status_code)
项目:nokia-deployer    作者:nokia    | 项目源码 | 文件源码
def test_list_deployments_requires_read(self):
        user = self.session.query(m.User).filter_by(username="default").one()
        request.account = user
        deploys = json.loads(api.deployments_list("my repo", self.session))['deployments']
        self.assertEqual(0, len(deploys))
        user = self.session.query(m.User).filter_by(username="root").one()
        request.account = user
        deploys = json.loads(api.deployments_list("my repo", self.session))['deployments']
        self.assertGreater(len(deploys), 0)