Python six.moves.urllib_parse 模块,urlencode() 实例源码

我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用six.moves.urllib_parse.urlencode()

项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def format_yahoo_index_url(symbol, start_date, end_date):
    """
    Format a URL for querying Yahoo Finance for Index data.
    """
    return (
        'http://ichart.finance.yahoo.com/table.csv?' + urlencode({
            's': symbol,
            # start_date month, zero indexed
            'a': start_date.month - 1,
            # start_date day
            'b': start_date.day,
            # start_date year
            'c': start_date.year,
            # end_date month, zero indexed
            'd': end_date.month - 1,
            # end_date day
            'e': end_date.day,
            # end_date year
            'f': end_date.year,
            # daily frequency
            'g': 'd',
        })
    )
项目:social-core    作者:python-social-auth    | 项目源码 | 文件源码
def unauthorized_token_request(self):
        """Return request for unauthorized token (first stage)"""

        params = self.request_token_extra_arguments()
        params.update(self.get_scope_argument())
        key, secret = self.get_key_and_secret()
        # decoding='utf-8' produces errors with python-requests on Python3
        # since the final URL will be of type bytes
        decoding = None if six.PY3 else 'utf-8'
        state = self.get_or_create_state()
        auth = OAuth1(
            key,
            secret,
            callback_uri=self.get_redirect_uri(state),
            decoding=decoding,
            signature_method=SIGNATURE_HMAC,
            signature_type=SIGNATURE_TYPE_QUERY
        )
        url = self.REQUEST_TOKEN_URL + '?' + urlencode(params)
        url, _, _ = auth.client.sign(url)
        return url
项目:social-core    作者:python-social-auth    | 项目源码 | 文件源码
def auth_url(self):
        """Return redirect url"""
        state = None
        if self.STATE_PARAMETER or self.REDIRECT_STATE:
            # Store state in session for further request validation. The state
            # value is passed as state parameter (as specified in OAuth2 spec),
            # but also added to redirect_uri, that way we can still verify the
            # request if the provider doesn't implement the state parameter.
            # Reuse token if any.
            name = self.name + '_state'
            state = self.strategy.session_get(name) or self.state_token()
            self.strategy.session_set(name, state)

        params = self.auth_params(state)
        params.update(self.get_scope_argument())
        params.update(self.auth_extra_arguments())
        return self.AUTHORIZATION_URL + '?' + urlencode(params)
项目:social-core    作者:python-social-auth    | 项目源码 | 文件源码
def modify_start_url(self, start_url):
        """
        Given a SAML redirect URL, parse it and change the ID to
        a consistent value, so the request is always identical.
        """
        # Parse the SAML Request URL to get the XML being sent to TestShib
        url_parts = urlparse(start_url)
        query = dict((k, v[0]) for (k, v) in
                     parse_qs(url_parts.query).items())
        xml = OneLogin_Saml2_Utils.decode_base64_and_inflate(
            query['SAMLRequest']
        )
        # Modify the XML:
        xml = xml.decode()
        xml, changed = re.subn(r'ID="[^"]+"', 'ID="TEST_ID"', xml)
        self.assertEqual(changed, 1)
        # Update the URL to use the modified query string:
        query['SAMLRequest'] = OneLogin_Saml2_Utils.deflate_and_base64_encode(
            xml
        )
        url_parts = list(url_parts)
        url_parts[4] = urlencode(query)
        return urlunparse(url_parts)
项目:dataplicity-lomond    作者:wildfoundry    | 项目源码 | 文件源码
def run_test(test_no):
    """Run a test from its index."""
    qs = urlencode({'case': test_no, 'agent': USER_AGENT})
    url = server + '/runCase?{}'.format(qs)
    run_ws(url)
项目:dataplicity-lomond    作者:wildfoundry    | 项目源码 | 文件源码
def run_test_cases(case_tuples):
    """Run a number of test cases from their 'case tuple'"""
    for case_tuple in case_tuples:
        print("\n[{}]".format(case_tuple))
        qs = urlencode({'agent': USER_AGENT})
        url = server + '/runCase?{}'.format(qs)
        run_ws(url)
    update_report()
项目:dataplicity-lomond    作者:wildfoundry    | 项目源码 | 文件源码
def update_report():
    """Tell wstest to update reports."""
    print("Updating reports...")
    qs = urlencode({'agent': USER_AGENT})
    url = server + "/updateReports?{}".format(qs)
    for _ in WebSocket(url):
        pass
项目:script.module.python.twitch    作者:MrSprigster    | 项目源码 | 文件源码
def prepare_request_uri(self, redirect_uri='http://localhost:3000/', scope=list(), force_verify=False, state=''):
        q = Qry('authorize')
        q.add_param('response_type', 'token')
        q.add_param('client_id', self.client_id)
        q.add_param('redirect_uri', redirect_uri)
        q.add_param('scope', ' '.join(scope))
        q.add_param('force_verify', str(force_verify).lower())
        q.add_param('state', state)
        return '?'.join([q.url, urlencode(q.params)])
项目:script.module.python.twitch    作者:MrSprigster    | 项目源码 | 文件源码
def prepare_token_uri(self, scope=list()):
        q = Qry('token')
        q.add_param('client_id', self.client_id)
        q.add_param('client_secret', self.client_secret)
        q.add_param('grant_type', 'client_credentials')
        q.add_param('scope', ' '.join(scope))
        return '?'.join([q.url, urlencode(q.params)])
项目:script.module.python.twitch    作者:MrSprigster    | 项目源码 | 文件源码
def prepare_revoke_uri(self, token):
        q = Qry('revoke')
        q.add_param('client_id', self.client_id)
        q.add_param('token', token)
        return '?'.join([q.url, urlencode(q.params)])
项目:social-core    作者:python-social-auth    | 项目源码 | 文件源码
def auth_url(self):
        """Return authorization redirect url."""
        key, secret = self.get_key_and_secret()
        callback = self.strategy.absolute_uri(self.redirect_uri)
        callback = sub(r'^https', 'http', callback)
        query = urlencode({'cb': callback})
        return 'https://www.twilio.com/authorize/{0}?{1}'.format(key, query)
项目:social-core    作者:python-social-auth    | 项目源码 | 文件源码
def revoke_token(self, token, uid):
        if self.REVOKE_TOKEN_URL:
            url = self.revoke_token_url(token, uid)
            params = self.revoke_token_params(token, uid)
            headers = self.revoke_token_headers(token, uid)
            data = urlencode(params) if self.REVOKE_TOKEN_METHOD != 'GET' \
                                     else None
            response = self.request(url, params=params, headers=headers,
                                    data=data, method=self.REVOKE_TOKEN_METHOD)
            return self.process_revoke_token_response(response)
项目:social-core    作者:python-social-auth    | 项目源码 | 文件源码
def oauth_authorization_request(self, token):
        """Generate OAuth request to authorize token."""
        if not isinstance(token, dict):
            token = parse_qs(token)
        params = self.auth_extra_arguments() or {}
        params.update(self.get_scope_argument())
        params[self.OAUTH_TOKEN_PARAMETER_NAME] = token.get(
            self.OAUTH_TOKEN_PARAMETER_NAME
        )
        state = self.get_or_create_state()
        params[self.REDIRECT_URI_PARAMETER_NAME] = self.get_redirect_uri(state)
        return '{0}?{1}'.format(self.authorization_url(), urlencode(params))
项目:social-core    作者:python-social-auth    | 项目源码 | 文件源码
def auth_url(self):
        """Return redirect url"""
        state = self.get_or_create_state()
        params = self.auth_params(state)
        params.update(self.get_scope_argument())
        params.update(self.auth_extra_arguments())
        params = urlencode(params)
        if not self.REDIRECT_STATE:
            # redirect_uri matching is strictly enforced, so match the
            # providers value exactly.
            params = unquote(params)
        return '{0}?{1}'.format(self.authorization_url(), params)
项目:invenio-github    作者:inveniosoftware    | 项目源码 | 文件源码
def register_oauth_flow():
    """Register URIs used for OAuth flow."""
    # https://developer.github.com/v3/oauth/
    httpretty.register_uri(
        httpretty.GET,
        'https://github.com/login/oauth/authorize',
        status=200,
        body='User required to accept/reject scopes on this page'
    )

    def access_token_callback(request, uri, headers):
        assert request.method == 'POST'
        parsed_body = request.parse_request_body(request.body)
        assert 'client_id' in parsed_body
        assert 'client_secret' in parsed_body
        assert 'code' in parsed_body
        assert 'redirect_uri' in parsed_body

        if parsed_body['code'][0] == 'bad_verification_code':
            body = dict(
                error_uri='http://developer.github.com/v3/oauth/'
                          '#bad-verification-code',
                error_description='The code passed is '
                                  'incorrect or expired.',
                error='bad_verification_code',
            )
        else:
            body = dict(
                access_token='%s_token' % parsed_body['code'][0],
                scope='admin:repo_hook,user:email',
                token_type='bearer',
            )

        headers['content-type'] = 'application/x-www-form-urlencoded'

        return (
            200,
            headers,
            urllib_parse.urlencode(body)
        )

    httpretty.register_uri(
        httpretty.POST,
        'https://github.com/login/oauth/access_token',
        body=access_token_callback,
    )
项目:registry    作者:boundlessgeo    | 项目源码 | 文件源码
def record_to_dict(record):
    # Encodes record title if it is not empty.
    if record.title:
        record.title = record.title.encode('ascii', 'ignore').decode('utf-8')

    bbox = wkt2geom(record.wkt_geometry)
    min_x, min_y, max_x, max_y = bbox[0], bbox[1], bbox[2], bbox[3]

    record_dict = {
        'title': record.title,
        'abstract': record.abstract,
        'title_alternate': record.title_alternate,
        'checks_list': [],
        'bbox': bbox,
        'min_x': min_x,
        'min_y': min_y,
        'max_x': max_x,
        'max_y': max_y,
        'source': record.source,
        'source_type': record.type,
        'tile_url': '/layer/%s/wmts/%s/default_grid/{z}/{x}/{y}.png' % (record.identifier, record.title_alternate),
        'layer_date': record.date_modified,
        'layer_originator': record.creator,
        'layer_identifier': record.identifier,
        'links': {
            'xml': '/'.join(['layer', record.identifier, 'xml']),
            'yml': '/'.join(['layer', record.identifier, 'yml']),
            'png': '/'.join(['layer', record.identifier, 'png'])
        },
        # 'rectangle': box(min_x, min_y, max_x, max_y),
        'layer_geoshape': {
            'type': 'envelope',
            'coordinates': [
                [min_x, max_y], [max_x, min_y]
            ]
        }
    }

    if(record.format == 'OGC:WMS'):
        legend_opts = {
            'SERVICE': 'WMS',
            'VERSION': '1.1.1',
            'REQUEST': 'GetLegendGraphic',
            'FORMAT': 'image/png',
            'LAYER': record.title_alternate
        }

        record_dict['legend_url'] = '/layer/%s/service?' % record.identifier + urlencode(legend_opts)

    record_dict = include_registry_tags(record_dict, record.xml)

    if record.links:
        record_dict['references'] = parse_references(record.links)

    if record.source:
        record_dict['source_host'] = urlparse(record.source).netloc

    return record_dict