Python bottle.request 模块,environ() 实例源码

我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用bottle.request.environ()

项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def logten():
    ip = request['REMOTE_ADDR']
    day= datetime.datetime.now().strftime("_%Y_%m_%d_%H")
    url = request.environ['PATH_INFO']+day
    if 'url' not in cache:
        cache['url'] = {}
    if ip+url not in cache['url']:
        cache['url'][ip+url]=0
        if len(cache['url'])>200:
            cache['url']={}
            logit('clear cache url')
        if 'Mozilla/4.0' in request.environ.get('HTTP_USER_AGENT','no agent'):
            pass
        else:
            logit('''url @ %s [ <a href="http://www.baidu.com/s?wd=%s&_=%.0f" target="_blank">%s</a> ] %.1f
                <span style="color:gray">%s</span>'''%(url,ip,time.time()/10,ip,cache['pass']-time.time(),request.environ.get('HTTP_USER_AGENT','no agent')))
    return True
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def _handle_cert(self, domain):
        rawcert = request.environ['ssl_certificate']

        certconfig = self._get_cert_config_if_allowed(domain, rawcert)

        logger.debug('Fetching certificate for domain %s', domain)

        (key, crt, chain) = self.acmeproxy.get_cert(
            domain=domain,
            altname=certconfig.altname,
            rekey=certconfig.rekey,
            renew_margin=certconfig.renew_margin,
            force_renew=('force_renew' in request.query and request.query['force_renew'] == 'true'),  # pylint: disable=unsupported-membership-test,unsubscriptable-object
            auto_renew=certconfig.renew_on_fetch
        )

        return {
            'crt': crt.decode(),
            'key': key.decode(),
            'chain': chain.decode()
        }
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def local_check(function):
    def _view(*args, **kwargs):
        if request.environ.get('REMOTE_ADDR', "0") in ('127.0.0.1', 'localhost') \
        or request.environ.get('HTTP_HOST','0') == '127.0.0.1:9666':
            return function(*args, **kwargs)
        else:
            return HTTPError(403, "Forbidden")

    return _view
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def flashgot():
    if request.environ['HTTP_REFERER'] != "http://localhost:9666/flashgot" and request.environ['HTTP_REFERER'] != "http://127.0.0.1:9666/flashgot":
        return HTTPError()

    autostart = int(request.forms.get('autostart', 0))
    package = request.forms.get('package', None)
    urls = filter(lambda x: x != "", request.forms['urls'].split("\n"))
    folder = request.forms.get('dir', None)

    if package:
        PYLOAD.addPackage(package, urls, autostart)
    else:
        PYLOAD.generateAndAddPackages(urls, autostart)

    return ""
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def _handle_list_certs(self):
        rawcert = request.environ['ssl_certificate']

        self._assert_admin(rawcert)

        certs = self.acmeproxy.list_certificates()

        return {
            'certificates': certs
        }
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def _handle_renew_all(self):
        rawcert = request.environ['ssl_certificate']

        self._assert_admin(rawcert)

        result = {
            'ok': [],
            'error': []
        }

        for cert in self.acmeproxy.list_certificates():
            domain = cert['cn']
            certconfig = self.certificates_config.match(domain)
            if certconfig:
                logger.debug('Getting certificate for domain %s', domain)
                try:
                    self.acmeproxy.get_cert(
                        domain=domain,
                        altname=certconfig.altname,
                        rekey=certconfig.rekey,
                        renew_margin=certconfig.renew_margin,
                        force_renew=('force_renew' in request.query and request.query['force_renew'] == 'true'),  # pylint: disable=unsupported-membership-test,unsubscriptable-object
                    )
                    result['ok'].append(domain)
                except Exception as e:
                    logger.error('Encountered exception while getting certificate for domain %s (%s)', domain, e)
                    result['error'].append(domain)
            else:
                logger.error('No configuration found for domain %s', domain)
                result['error'].append(domain)

        return result
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def _handle_revoke_cert(self, domain):
        rawcert = request.environ['ssl_certificate']

        self._assert_admin(rawcert)

        self.acmeproxy.revoke_certificate(domain)

        return {
            'status': 'revoked'
        }
项目:certproxy    作者:geneanet    | 项目源码 | 文件源码
def _handle_delete_cert(self, domain):
        rawcert = request.environ['ssl_certificate']

        self._assert_admin(rawcert)

        self.acmeproxy.delete_certificate(domain)

        return {
            'status': 'deleted'
        }
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_ims(self):
        """ SendFile: If-Modified-Since"""
        request.environ['HTTP_IF_MODIFIED_SINCE'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())
        res = static_file(os.path.basename(__file__), root='./')
        self.assertEqual(304, res.status_code)
        self.assertEqual(int(os.stat(__file__).st_mtime), parse_date(res.headers['Last-Modified']))
        self.assertAlmostEqual(int(time.time()), parse_date(res.headers['Date']))
        request.environ['HTTP_IF_MODIFIED_SINCE'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(100))
        self.assertEqual(open(__file__,'rb').read(), static_file(os.path.basename(__file__), root='./').body.read())
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_download(self):
        """ SendFile: Download as attachment """
        basename = os.path.basename(__file__)
        f = static_file(basename, root='./', download=True)
        self.assertEqual('attachment; filename="%s"' % basename, f.headers['Content-Disposition'])
        request.environ['HTTP_IF_MODIFIED_SINCE'] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(100))
        f = static_file(os.path.basename(__file__), root='./')
        self.assertEqual(open(__file__,'rb').read(), f.body.read())
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_range(self):
        basename = os.path.basename(__file__)
        request.environ['HTTP_RANGE'] = 'bytes=10-25,-80'
        f = static_file(basename, root='./')
        c = open(basename, 'rb'); c.seek(10)
        self.assertEqual(c.read(16), tob('').join(f.body))
        self.assertEqual('bytes 10-25/%d' % len(open(basename, 'rb').read()),
                         f.headers['Content-Range'])
        self.assertEqual('bytes', f.headers['Accept-Ranges'])
项目:piku    作者:rcarmo    | 项目源码 | 文件源码
def default():
    result = {}
    table = ['<table class="u-full-width"><tbody>']
    for k, v in sorted(os.environ.iteritems()):
        table.append('<tr><th>%s</th><td>%s</td></tr>' % (k, v))
    table.append('</tbody></table>')
    result['sys_data'] = '\n'.join(table)
    table = ['<table class="u-full-width"><tbody>']
    for k, v in sorted(dict(request.environ).iteritems()):
        table.append('<tr><th>%s</th><td>%s</td></tr>' % (k, v))
    table.append('</tbody></table>')
    result['req_data'] = '\n'.join(table)
    return result
项目:nokia-deployer    作者:nokia    | 项目源码 | 文件源码
def tearDown(self):
        self.session.rollback()
        database.drop_all()
        database.stop_engine()
        # Bottle memoizes the 'request.json' attribute.
        # We don't want it memoized between two unit tests.
        # So let's dememoize it (yes, I don't think that word exists either).
        request.environ.pop('bottle.request.json', None)
        request.environ.pop('bottle.request.body', None)
项目:nokia-deployer    作者:nokia    | 项目源码 | 文件源码
def _set_body_json(self, data):
        body = json.dumps(data)
        request.environ['CONTENT_LENGTH'] = str(len(tob(body)))
        request.environ['CONTENT_TYPE'] = 'application/json'
        request.environ['wsgi.input'] = BytesIO()
        request.environ['wsgi.input'].write(tob(body))
        request.environ['wsgi.input'].seek(0)
项目:nokia-deployer    作者:nokia    | 项目源码 | 文件源码
def test_start_deployment_with_impersonating(self, mocked):
        request.account = self.session.query(m.User).filter(m.User.username == 'impersonator').one()
        request.environ['HTTP_X_IMPERSONATE_USERNAME'] = 'username'
        self._set_body_json({
            'target': {
                'cluster': None,
                'server': None
            },
            'branch': 'master',
            'commit': 'abcde'
        })
        api.environments_start_deployment(1, self.session)
项目:nokia-deployer    作者:nokia    | 项目源码 | 文件源码
def test_start_deployment_with_impersonating_unprivilegied_user(self, mocked):
        request.account = self.session.query(m.User).filter(m.User.username == 'impersonator').one()
        request.environ['HTTP_X_IMPERSONATE_USERNAME'] = 'username'
        self._set_body_json({
            'target': {
                'cluster': None,
                'server': None
            },
            'branch': 'master',
            'commit': 'abcde'
        })
        with self.assertRaises(HTTPError) as cm:
            api.environments_start_deployment(2, self.session)
            self.assertEquals(403, cm.exception.code)
项目:jerrybuild    作者:fboender    | 项目源码 | 文件源码
def generic_handler():
    """
    The generic handler catches all requests not caught by any other route. It
    checks the configuration to see if the URL requested is one registered as a
    job's webhook URL handler. If so, it normalizes the request and queues the
    job for building.

    It returns immediately (aynsc) with a JSON structure containing the job id.
    """
    jobdef_manager = request.deps['jobdef_manager']
    build_queue = request.deps['build_queue']
    config = request.deps['config']
    providers = request.deps['providers']

    jobdef = jobdef_manager.get_jobdef_from_url(request.path)
    if not jobdef:
        abort(404, "Not found")

    logging.info("Received event for job '{}'".format(jobdef.name))

    # Log debug info about the received request
    logging.debug("request environ: {}".format(request.environ))
    logging.debug("request path: {}".format(request.path))
    logging.debug("request method: {}".format(request.method))
    for k, v in request.headers.items():
        logging.debug("request header: {}={}".format(k, v))
    for k, v in request.query.items():
        logging.debug("request query: {}={}".format(k, v))
    logging.debug("request body: {}".format(request.body.read()))
    logging.debug("request auth: {}".format(request.auth))

    env = job.make_env(request, jobdef, providers)

    job_inst = jobdef.make_job(request.body.read().decode('utf8'), env)
    build_queue.put(job_inst)

    return {'id': job_inst.id}