Python bottle.request 模块,url() 实例源码

我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用bottle.request.url()

项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def init_audit(log_group):

    def audit(f):

        @functools.wraps(f)
        def handle(account_id, *args, **kw):
            envelope = {
                'timestamp': int(time.time() * 1000),
                'message': json.dumps({
                    'user': request.environ.get('REMOTE_USER', ''),
                    'url': request.url,
                    'path': request.path,
                    'method': request.method,
                    'pid': os.getpid(),
                    'account_id': account_id,
                    'ip': request.remote_addr})
            }
            transport.send_group("%s=%s" % (log_group, account_id), [envelope])
            return f(account_id, *args, **kw)

        return handle

    return audit
项目:Pardus-Bulut    作者:ferhatacikalin    | 项目源码 | 文件源码
def test_url(self):
        """ Environ: URL building """
        request = BaseRequest({'HTTP_HOST':'example.com'})
        self.assertEqual('http://example.com/', request.url)
        request = BaseRequest({'SERVER_NAME':'example.com'})
        self.assertEqual('http://example.com/', request.url)
        request = BaseRequest({'SERVER_NAME':'example.com', 'SERVER_PORT':'81'})
        self.assertEqual('http://example.com:81/', request.url)
        request = BaseRequest({'wsgi.url_scheme':'https', 'SERVER_NAME':'example.com'})
        self.assertEqual('https://example.com/', request.url)
        request = BaseRequest({'HTTP_HOST':'example.com', 'PATH_INFO':'/path',
                               'QUERY_STRING':'1=b&c=d', 'SCRIPT_NAME':'/sp'})
        self.assertEqual('http://example.com/sp/path?1=b&c=d', request.url)
        request = BaseRequest({'HTTP_HOST':'example.com', 'PATH_INFO':'/pa th',
                               'SCRIPT_NAME':'/s p'})
        self.assertEqual('http://example.com/s%20p/pa%20th', request.url)
项目:xqz.es    作者:arahayrabedian    | 项目源码 | 文件源码
def callback():
    """ Step 3: Retrieving an access token.
    The user has been redirected back from the provider to your registered
    callback URL. With this redirection comes an authorization code included
    in the redirect URL. We will use that to obtain an access token.

    NOTE: your server name must be correctly configured in order for this to
    work, do this by adding the headers at your http layer, in particular:
    X_FORWARDED_HOST, X_FORWARDED_PROTO so that bottle can render the correct
    url and links for you.
    """
    oauth2session = OAuth2Session(settings.SLACK_OAUTH['client_id'],
                                  state=request.GET['state'])

    #PII - update privacy policy if oauth2 token is stored.
    token = oauth2session.fetch_token(
        settings.SLACK_OAUTH['token_url'],
        client_secret=settings.SLACK_OAUTH['client_secret'],
        authorization_response=request.url
    )

    # we don't need the token, we just need the user to have installed the app
    # in the future, if we need tokens, we'll get them.

    redirect('/?added_to_slack=true')
项目:Mmrz-Sync    作者:zhanglintc    | 项目源码 | 文件源码
def mmrz():
    username = request.get_cookie('username')
    password = request.get_cookie('password')
    password = urllib.unquote(password) if password else None

    if not verify_login(username, password):
        redirect('/')

    # need_https = "localhost" not in request.url
    need_https = False

    return_dict = dict(universal_ROUTE_dict)
    return_dict.update(dict(need_https=need_https))
    return return_dict
项目:Mmrz-Sync    作者:zhanglintc    | 项目源码 | 文件源码
def dict_short():
    query = request.urlparts.query
    url = '/dictionary?{0}'.format(query) if query else '/dictionary'
    redirect(url)
项目:download-manager    作者:thispc    | 项目源码 | 文件源码
def pre_processor():
    s = request.environ.get('beaker.session')
    user = parse_userdata(s)
    perms = parse_permissions(s)
    status = {}
    captcha = False
    update = False
    plugins = False
    if user["is_authenticated"]:
        status = PYLOAD.statusServer()
        info = PYLOAD.getInfoByPlugin("UpdateManager")
        captcha = PYLOAD.isCaptchaWaiting()

        # check if update check is available
        if info:
            if info["pyload"] == "True": update = True
            if info["plugins"] == "True": plugins = True


    return {"user": user,
            'status': status,
            'captcha': captcha,
            'perms': perms,
            'url': request.url,
            'update': update,
            'plugins': plugins}
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def info(account_id, resource_id):
    request_data = request.query
    if resource_id.startswith('sg-') and 'parent_id' not in request_data:
        abort(400, "Missing required parameter parent_id")
    result = controller.info(
        account_id, resource_id, request_data.get('parent_id', resource_id))
    response.content_type = "application/json"
    return json.dumps(result, indent=2, cls=Encoder)


# this set to post to restrict permissions, perhaps another url space.
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def api_url():
    parsed = request.urlparts
    url = "%s://%s%s" % (parsed.scheme, parsed.netloc, request.script_name)
    return url
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def error(e):
    response.content_type = "application/json"
    return json.dumps({
        "status": e.status,
        "url": repr(request.url),
        "exception": repr(e.exception),
        #  "traceback": e.traceback and e.traceback.split('\n') or '',
        "body": repr(e.body)
    }, indent=2)
项目:ray    作者:felipevolpone    | 项目源码 | 文件源码
def dispatch(url):
    """
        This class is the beginning of all entrypoints in the Ray API. Here, each url
        will be redirect to the right handler
    """

    url = bottle_req.path
    log.info('request: %s', bottle_req.url)

    if url[-1] == '/':
        url = url[:-1]

    response_code = 200

    try:
        processed = process(url, bottle_req, bottle_resp)

        try:
            from_func, http_status = processed[0], processed[1]
            bottle_resp.status = http_status
            return from_func
        except:
            return processed

    except exceptions.RayException as e:
        log.exception('ray exception: ')
        response_code = e.http_code

    except:
        log.exception('exception:')
        raise

    bottle_resp.status = response_code
项目:ray    作者:felipevolpone    | 项目源码 | 文件源码
def __handle_action(url):
    # url e.g: /api/user/123/action

    arg = None
    if len(url.split('/')) >= 5:  # indicate that has an id between endpoint and action_name
        arg = http.param_at(url, -2)

    return Action(url, arg, bottle_req).process_action()
项目:Mmrz-Sync    作者:zhanglintc    | 项目源码 | 文件源码
def query_hujiang(key_word):
    if not key_word:
        return []

    headers = {
        'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1',
        'Accept-Encoding': 'gzip, deflate, sdch',
        }

    url = "https://m.hujiang.com/d/dict_jp_api.ashx?type=jc&w={0}".format(urllib.quote(key_word))

    proxies = {}
    response = requests.get(url, headers=headers, verify=False, proxies=proxies)
    try:
        defines = response.json()
    except:
        return []

    for i in range(len(defines)):
        Comment = defines[i]["Comment"]
        comments = re.findall("<br/>([^a-zA-Z]+)<br/>", Comment)

        tmp = ", ".join(comments)

        # ??tmp??, ??????????
        # tmp = Comment if not tmp else tmp

        # ???????????
        # mch = re.search(u"(?.+??)", tmp)
        # tmp = mch.group(1) if mch else tmp

        # ??????
        tmp = re.sub(u"\?.+?\?",   "", tmp)
        tmp = re.sub(u"\(.+?\)",  "", tmp)
        tmp = re.sub(u"?+?",     "", tmp)

        defines[i]["Comment"] = tmp

        defines[i]["PronounceJp"] = re.sub("\[|\]", "", defines[i]["PronounceJp"])

    return defines

### static files
项目:Mmrz-Sync    作者:zhanglintc    | 项目源码 | 文件源码
def get_hujiang_tts():
    key_word = request.params.get('key_word', None)
    job_id   = request.params.get('job_id', None)

    if not key_word:
        return "key_word is null"

    headers = {
        'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.89 Safari/537.36',
        'Accept-Encoding': 'gzip, deflate, sdch',
    }

    url = "http://dict.hjenglish.com/jp/jc/" + urllib.quote(key_word)

    req = urllib2.Request(url, None, headers)
    response = urllib2.urlopen(req)
    compressedData = response.read()

    compressedStream = StringIO.StringIO(compressedData)
    gzipper = gzip.GzipFile(fileobj=compressedStream)
    html = gzipper.read()

    soup = BeautifulSoup(html, "html.parser")

    ret_info = {
        "found": False,
        "message_str": "",
        "tts_url": "",
        "job_id": job_id,
    }

    jpSound_list = soup.select('span[class=jpSound]')
    if len(jpSound_list) < 1:
        ret_info["found"] = False
        ret_info["message_str"] = "jpSound not found"
        return json.dumps(ret_info)

    jpSound = str(jpSound_list[0])
    mc = re.search("GetTTSVoice\(\"(.*?)\"\)", jpSound)
    if not mc:
        ret_info["found"] = False
        ret_info["message_str"] = "tts_url not found"
        return json.dumps(ret_info)

    tts_url = mc.group(1)
    ret_info["found"] = True
    ret_info["message_str"] = "tts_url is found"
    ret_info["tts_url"] = tts_url
    return json.dumps(ret_info)