Python requests.compat 模块,urlparse() 实例源码

我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用requests.compat.urlparse()

项目:trip    作者:littlecodersh    | 项目源码 | 文件源码
def _negotiate_socks(self, addr, proxy_addr):
        parsed = urlparse(proxy_addr[0])

        if parsed.scheme == 'socks5':
            socks_version, rdns = 2, False
        elif parsed.scheme == 'socks5h':
            socks_version, rdns = 2, True
        elif parsed.scheme == 'socks4':
            socks_version, rdns = 1, False
        elif parsed.scheme == 'socks4a':
            socks_version, rdns = 1, True
        else:
            raise ValueError(
                'Unable to determine SOCKS version from %s' % addr[0])
        username, password = get_auth_from_url(addr[0])
        stream = SockIOStream((
            socks_version, rdns, parsed.hostname, proxy_addr[1], username, password))
        return stream.connect(*addr)
项目:deb-python-requests-unixsocket    作者:openstack    | 项目源码 | 文件源码
def get_connection(self, url, proxies=None):
        proxies = proxies or {}
        proxy = proxies.get(urlparse(url.lower()).scheme)

        if proxy:
            raise ValueError('%s does not support specifying proxies'
                             % self.__class__.__name__)

        with self.pools.lock:
            pool = self.pools.get(url)
            if pool:
                return pool

            pool = UnixHTTPConnectionPool(url, self.timeout)
            self.pools[url] = pool

        return pool
项目:commissaire    作者:projectatomic    | 项目源码 | 文件源码
def get_connection(self, url, proxies=None):
        proxies = proxies or {}
        proxy = proxies.get(urlparse(url.lower()).scheme)

        if proxy:
            raise ValueError('%s does not support specifying proxies'
                             % self.__class__.__name__)

        with self.pools.lock:
            pool = self.pools.get(url)
            if pool:
                return pool

            pool = UnixHTTPConnectionPool(url, self.timeout)
            self.pools[url] = pool

        return pool
项目:deb-python-requests-kerberos    作者:openstack    | 项目源码 | 文件源码
def authenticate_user(self, response, **kwargs):
        """Handles user authentication with gssapi/kerberos"""

        host = urlparse(response.url).hostname

        try:
            auth_header = self.generate_request_header(response, host)
        except KerberosExchangeError:
            # GSS Failure, return existing response
            return response

        log.debug("authenticate_user(): Authorization header: {0}".format(
            auth_header))
        response.request.headers['Authorization'] = auth_header

        # Consume the content so we can reuse the connection for the next
        # request.
        response.content
        response.raw.release_conn()

        _r = response.connection.send(response.request, **kwargs)
        _r.history.append(response)

        log.debug("authenticate_user(): returning {0}".format(_r))
        return _r
项目:deb-python-requests-kerberos    作者:openstack    | 项目源码 | 文件源码
def __call__(self, request):
        if self.force_preemptive:
            # add Authorization header before we receive a 401
            # by the 401 handler
            host = urlparse(request.url).hostname

            auth_header = self.generate_request_header(None, host, is_preemptive=True)

            log.debug("HTTPKerberosAuth: Preemptive Authorization header: {0}".format(auth_header))

            request.headers['Authorization'] = auth_header

        request.register_hook('response', self.handle_response)
        try:
            self.pos = request.body.tell()
        except AttributeError:
            # In the case of HTTPKerberosAuth being reused and the body
            # of the previous request was a file-like object, pos has
            # the file position of the previous body. Ensure it's set to
            # None.
            self.pos = None
        return request
项目:edd    作者:JBEI    | 项目源码 | 文件源码
def _build_message(self, request):
        """
        Builds a string representation of the message contained in the request so it can be
        digested for HMAC generation
        """
        url = urlparse(request.url)

        # THe version 1 spec of the HmacSignature class calls for the message to be signed
        #   formatted as the following elements, each separated by a newline character:
        #   * UserId (same value as used in Authorization header)
        #   * HTTP Method (e.g. GET, POST)
        #   * HTTP Host (e.g. server.example.org)
        #   * Request path (e.g. /path/to/resource/)
        #   * SORTED query string, keyed by natural UTF8 byte-ordering of names
        #   * Request Body
        delimiter = '\n'
        msg = delimiter.join((
            self._USERNAME or '',
            request.method,
            url.netloc,
            url.path,
            self._sort_parameters(url.query),
            request.body or '',
        ))
        return msg
项目:edd    作者:JBEI    | 项目源码 | 文件源码
def insert_spoofed_https_csrf_headers(headers, base_url):
    """
    Creates HTTP headers that help to work around Django's CSRF protection, which shouldn't apply
    outside of the browser context.
    :param headers: a dictionary into which headers will be inserted, if needed
    :param base_url: the base URL of the Django application being contacted
    """
    # if connecting to Django/DRF via HTTPS, spoof the 'Host' and 'Referer' headers that Django
    # uses to help prevent cross-site scripting attacks for secure browser connections. This
    # should be OK for a standalone Python REST API client, since the origin of a
    # cross-site scripting attack is malicious website code that executes in a browser,
    # but accesses another site's credentials via the browser or via user prompts within the
    # browser. Not applicable in this case for a standalone REST API client.
    # References:
    # https://docs.djangoproject.com/en/dev/ref/csrf/#how-it-works
    # http://security.stackexchange.com/questions/96114/why-is-referer-checking-needed-for-django
    # http://mathieu.fenniak.net/is-your-web-api-susceptible-to-a-csrf-exploit/
    # -to-prevent-csrf
    if urlparse(base_url).scheme == 'https':
        headers['Host'] = urlsplit(base_url).netloc
        headers['Referer'] = base_url  # LOL! Bad spelling is now standard :-)
项目:edd    作者:JBEI    | 项目源码 | 文件源码
def _process_query_dict(self, search_terms, entry_types, blast_program, blast_sequence,
                            search_web, sort_field, sort_ascending, page_number):
        query_dict = {}
        query_url = None  # TODO: re-instate this parameter if we can get ICE to support the same
        # queries in GET as in POST...should simplify client use
        if not query_url:
            if search_terms:
                query_dict['queryString'] = search_terms
            if entry_types:
                if not set(entry_types).issubset(set(ICE_ENTRY_TYPES)):
                    raise KeyError('')
                query_dict['entryTypes'] = entry_types
            self._process_query_blast(query_dict, blast_program, blast_sequence)
            query_dict['webSearch'] = search_web  # Note: affects results even if false?
            self._process_query_parameters(query_dict, sort_field, sort_ascending, page_number)
        else:
            # un-parse the query URL so we're using consistently following the same code path
            query_dict = parse_qs(urlparse(query_url).params)
        return query_dict
项目:plex-for-kodi-mod    作者:mrclemds    | 项目源码 | 文件源码
def get_connection(self, url, proxies=None):
        """Returns a urllib3 connection for the given URL. This should not be
        called from user code, and is only exposed for use when subclassing the
        :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.

        :param url: The URL to connect to.
        :param proxies: (optional) A Requests-style dictionary of proxies used on this request.
        """
        proxies = proxies or {}
        proxy = proxies.get(urlparse(url.lower()).scheme)

        if proxy:
            proxy_headers = self.proxy_headers(proxy)

            if proxy not in self.proxy_manager:
                self.proxy_manager[proxy] = proxy_from_url(
                    proxy,
                    proxy_headers=proxy_headers,
                    num_pools=self._pool_connections,
                    maxsize=self._pool_maxsize,
                    block=self._pool_block
                )

            conn = self.proxy_manager[proxy].connection_from_url(url)
        else:
            # Only scheme should be lower case
            parsed = urlparse(url)
            url = parsed.geturl()
            conn = self.poolmanager.connection_from_url(url)

        self.connections.append(conn)
        return conn
项目:trip    作者:littlecodersh    | 项目源码 | 文件源码
def _create_stream(self, max_buffer_size, af, addr, source_ip=None,
            source_port=None):
        # Always connect in plaintext; we'll convert to ssl if necessary
        # after one connection has completed.
        source_port_bind = source_port if isinstance(source_port, int) else 0
        source_ip_bind = source_ip

        socket_obj = socket.socket(af)
        set_close_exec(socket_obj.fileno())
        try:
            stream = IOStream(socket_obj,
                              io_loop=self.io_loop,
                              max_buffer_size=max_buffer_size)

            # connect proxy
            if source_port_bind or source_ip_bind:
                @gen.coroutine
                def _(addr):
                    proxy_headers = get_proxy_headers(source_ip_bind)
                    parsed = urlparse(source_ip_bind)
                    scheme, host, port = parsed.scheme, parsed.hostname, source_port_bind
                    if 'socks' in scheme:
                        r = yield self._negotiate_socks(addr, (source_ip_bind, source_port_bind))
                        raise gen.Return(r)
                    elif scheme in ('http', 'https'):
                        r = yield stream.connect((host, port))
                        if scheme == 'https':
                            yield self._connect_tunnel(stream, addr, proxy_headers)
                        raise gen.Return(r)
                    else:
                        raise AttributeError('Unknown scheme: %s' % scheme)
                return _(addr)
            else:
                return stream.connect(addr)
        except socket.error as e:
            fu = Future()
            fu.set_exception(e)
            return fu
项目:trip    作者:littlecodersh    | 项目源码 | 文件源码
def send(self, stream=False, timeout=None, verify=True,
             cert=None, proxies=None):
        request = self.request
        connect_timeout, self.read_timeout = parse_timeout(timeout)
        self.stream_body = stream

        # set connect timeout
        with stack_context.ExceptionStackContext(self._handle_exception):
            if connect_timeout:
                self._timeout = self.io_loop.call_later(connect_timeout,
                    stack_context.wrap(functools.partial(
                        self._on_timeout, 'while connecting')))

            # set proxy related info
            proxy = select_proxy(request.url, proxies)
            self.headers = request.headers.copy()
            if proxy:
                proxy = prepend_scheme_if_needed(proxy, 'http')
                parsed = urlparse(proxy)
                scheme, host, port = parsed.scheme, proxy, parsed.port
                port = port or (443 if scheme == 'https' else 80)
                self.start_line = RequestStartLine(request.method, request.url, '')
                self.headers.update(get_proxy_headers(proxy))
            else:
                host, port = None, None
                self.start_line = request.start_line

            self.tcp_client.connect(
                request.host, request.port,
                af=request.af,
                ssl_options=self._get_ssl_options(request, verify, cert),
                max_buffer_size=self.max_buffer_size,
                source_ip=host, source_port=port,
                callback=self._on_connect)
项目:requests-http-signature    作者:kislyuk    | 项目源码 | 文件源码
def get_string_to_sign(self, request, headers):
        sts = []
        for header in headers:
            if header == "(request-target)":
                path_url = requests.models.RequestEncodingMixin.path_url.fget(request)
                sts.append("(request-target): {} {}".format(request.method.lower(), path_url))
            else:
                if header.lower() == "host":
                    value = request.headers.get("host", urlparse(request.url).hostname)
                else:
                    value = request.headers[header]
                sts.append("{k}: {v}".format(k=header.lower(), v=value))
        return "\n".join(sts).encode()
项目:pybel    作者:pybel    | 项目源码 | 文件源码
def get_uri_name(url):
    """Gets the file name from the end of the URL. Only useful for PyBEL's testing though since it looks specifically
    if the file is from the weird owncloud resources distributed by Fraunhofer"""
    url_parsed = urlparse(url)

    if url.startswith(FRAUNHOFER_RESOURCES):
        return url_parsed.query.split('=')[-1]
    else:
        url_parts = url_parsed.path.split('/')
        return url_parts[-1]
项目:pybel    作者:pybel    | 项目源码 | 文件源码
def is_url(s):
    """Checks if a string is a valid URL

    :param str s: An input string
    :return: Is the string a valid URL?
    :rtype: bool
    """
    return urlparse(s).scheme != ""
项目:deb-python-requests-unixsocket    作者:openstack    | 项目源码 | 文件源码
def connect(self):
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        sock.settimeout(self.timeout)
        socket_path = unquote(urlparse(self.unix_socket_url).netloc)
        sock.connect(socket_path)
        self.sock = sock
项目:pyuniprot    作者:cebel    | 项目源码 | 文件源码
def get_path_to_file_from_url(cls, url):
        """standard file path

        :param str url: download URL
        """
        file_name = urlparse(url).path.split('/')[-1]
        return os.path.join(PYUNIPROT_DATA_DIR, file_name)
项目:flickr_downloader    作者:Denisolt    | 项目源码 | 文件源码
def _key_from_url(url):
        parsed = urlparse(url)
        return urlunparse((parsed.scheme.lower(),
                           parsed.netloc.lower(),
                           '', '', '', ''))
项目:flickr_downloader    作者:Denisolt    | 项目源码 | 文件源码
def _build_request_path(url, proxy_info):
    uri = compat.urlparse(url)
    proxy_url = proxy_info.get('request_path')
    if proxy_url is not None:
        return proxy_url, uri

    request_path = _coerce_to_bytes(uri.path)
    if uri.query:
        request_path += b'?' + _coerce_to_bytes(uri.query)

    return request_path, uri
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def _key_from_url(url):
        parsed = urlparse(url)
        return urlunparse((parsed.scheme.lower(),
                           parsed.netloc.lower(),
                           '', '', '', ''))
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def _build_request_path(url, proxy_info):
    uri = compat.urlparse(url)
    proxy_url = proxy_info.get('request_path')
    if proxy_url is not None:
        return proxy_url, uri

    request_path = _coerce_to_bytes(uri.path)
    if uri.query:
        request_path += b'?' + _coerce_to_bytes(uri.query)

    return request_path, uri
项目:html-telegraph-poster    作者:mercuree    | 项目源码 | 文件源码
def preprocess_media_tags(element):
    if isinstance(element, html.HtmlElement):
        if element.tag in ['ol', 'ul']:
            # ignore any spaces between <ul> and <li>
            element.text = ''
        elif element.tag == 'li':
            # ignore spaces after </li>
            element.tail = ''
        elif element.tag == 'iframe':
            iframe_src = element.get('src')

            youtube = re.match(youtube_re, iframe_src)
            vimeo = re.match(vimeo_re, iframe_src)
            if youtube or vimeo:
                element.text = ''  # ignore any legacy text
                if youtube:
                    yt_id = urlparse(iframe_src).path.replace('/embed/', '')
                    element.set('src', '/embed/youtube?url=' + quote_plus('https://www.youtube.com/watch?v=' + yt_id))
                elif vimeo:
                    element.set('src', '/embed/vimeo?url=' + quote_plus('https://vimeo.com/' + vimeo.group(2)))

                if not len(element.xpath('./ancestor::figure')):
                    _wrap_figure(element)
            else:
                element.drop_tag()

        elif element.tag == 'blockquote' and element.get('class') == 'twitter-tweet':
            twitter_links = element.xpath('.//a[@href]')
            for tw_link in twitter_links:
                if twitter_re.match(tw_link.get('href')):
                    twitter_frame = html.HtmlElement()
                    twitter_frame.tag = 'iframe'
                    twitter_frame.set('src', '/embed/twitter?url=' + quote_plus(tw_link.get('href')))
                    element.addprevious(twitter_frame)
                    _wrap_figure(twitter_frame)
                    element.drop_tree()
项目:commissaire    作者:projectatomic    | 项目源码 | 文件源码
def connect(self):
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        sock.settimeout(self.timeout)
        socket_path = unquote(urlparse(self.unix_socket_url).netloc)
        sock.connect(socket_path)
        self.sock = sock
项目:aegea    作者:kislyuk    | 项目源码 | 文件源码
def __init__(self, count, url, cls, session, params=None, etag=None,
                 headers=None):
        GitHubCore.__init__(self, {}, session)
        #: Original number of items requested
        self.original = count
        #: Number of items left in the iterator
        self.count = count
        #: URL the class used to make it's first GET
        self.url = url
        #: Last URL that was requested
        self.last_url = None
        self._api = self.url
        #: Class for constructing an item to return
        self.cls = cls
        #: Parameters of the query string
        self.params = params or {}
        self._remove_none(self.params)
        # We do not set this from the parameter sent. We want this to
        # represent the ETag header returned by GitHub no matter what.
        # If this is not None, then it won't be set from the response and
        # that's not what we want.
        #: The ETag Header value returned by GitHub
        self.etag = None
        #: Headers generated for the GET request
        self.headers = headers or {}
        #: The last response seen
        self.last_response = None
        #: Last status code received
        self.last_status = 0

        if etag:
            self.headers.update({'If-None-Match': etag})

        self.path = urlparse(self.url).path
项目:aegea    作者:kislyuk    | 项目源码 | 文件源码
def _api(self, uri):
        self._uri = urlparse(uri)
项目:OctoFusion    作者:tapnair    | 项目源码 | 文件源码
def _key_from_url(url):
        parsed = urlparse(url)
        return urlunparse((parsed.scheme.lower(),
                           parsed.netloc.lower(),
                           '', '', '', ''))
项目:OctoFusion    作者:tapnair    | 项目源码 | 文件源码
def _build_request_path(url, proxy_info):
    uri = compat.urlparse(url)
    proxy_url = proxy_info.get('request_path')
    if proxy_url is not None:
        return proxy_url, uri

    request_path = _coerce_to_bytes(uri.path)
    if uri.query:
        request_path += b'?' + _coerce_to_bytes(uri.query)

    return request_path, uri
项目:pyctd    作者:cebel    | 项目源码 | 文件源码
def get_path_to_file_from_url(cls, url):
        """standard file path

        :param str url: CTD download URL 
        """
        file_name = urlparse(url).path.split('/')[-1]
        return os.path.join(cls.pyctd_data_dir, file_name)
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def __init__(self, count, url, cls, session, params=None, etag=None,
                 headers=None):
        models.GitHubCore.__init__(self, {}, session)
        #: Original number of items requested
        self.original = count
        #: Number of items left in the iterator
        self.count = count
        #: URL the class used to make it's first GET
        self.url = url
        #: Last URL that was requested
        self.last_url = None
        self._api = self.url
        #: Class for constructing an item to return
        self.cls = cls
        #: Parameters of the query string
        self.params = params or {}
        self._remove_none(self.params)
        # We do not set this from the parameter sent. We want this to
        # represent the ETag header returned by GitHub no matter what.
        # If this is not None, then it won't be set from the response and
        # that's not what we want.
        #: The ETag Header value returned by GitHub
        self.etag = None
        #: Headers generated for the GET request
        self.headers = headers or {}
        #: The last response seen
        self.last_response = None
        #: Last status code received
        self.last_status = 0

        if etag:
            self.headers.update({'If-None-Match': etag})

        self.path = urlparse(self.url).path
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def _api(self, uri):
        self._uri = urlparse(uri)
        self.url = uri
项目:deb-python-requests-kerberos    作者:openstack    | 项目源码 | 文件源码
def authenticate_server(self, response):
        """
        Uses GSSAPI to authenticate the server.

        Returns True on success, False on failure.
        """

        log.debug("authenticate_server(): Authenticate header: {0}".format(
            _negotiate_value(response)))

        host = urlparse(response.url).hostname

        try:
            result = kerberos.authGSSClientStep(self.context[host],
                                                _negotiate_value(response))
        except kerberos.GSSError:
            log.exception("authenticate_server(): authGSSClientStep() failed:")
            return False

        if result < 1:
            log.error("authenticate_server(): authGSSClientStep() failed: "
                      "{0}".format(result))
            return False

        log.debug("authenticate_server(): returning {0}".format(response))
        return True
项目:edd    作者:JBEI    | 项目源码 | 文件源码
def _extract_pagination_params(self, query_url):
        query_params_string = urlparse(query_url).query
        query_dict = parse_qs(query_params_string) if query_params_string else None
        offset = query_dict.get[RESULT_OFFSET_PARAMETER]
        if offset:
            offset = offset[0]
项目:edd    作者:JBEI    | 项目源码 | 文件源码
def parse_query_url(query_url):
    if query_url:
        url_elements = urlparse(query_url)
        url_parameters = parse_qs(url_elements.query)
        return url_elements, url_parameters
    return None, {}