我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用requests.compat.urlparse()。
def _negotiate_socks(self, addr, proxy_addr): parsed = urlparse(proxy_addr[0]) if parsed.scheme == 'socks5': socks_version, rdns = 2, False elif parsed.scheme == 'socks5h': socks_version, rdns = 2, True elif parsed.scheme == 'socks4': socks_version, rdns = 1, False elif parsed.scheme == 'socks4a': socks_version, rdns = 1, True else: raise ValueError( 'Unable to determine SOCKS version from %s' % addr[0]) username, password = get_auth_from_url(addr[0]) stream = SockIOStream(( socks_version, rdns, parsed.hostname, proxy_addr[1], username, password)) return stream.connect(*addr)
def get_connection(self, url, proxies=None): proxies = proxies or {} proxy = proxies.get(urlparse(url.lower()).scheme) if proxy: raise ValueError('%s does not support specifying proxies' % self.__class__.__name__) with self.pools.lock: pool = self.pools.get(url) if pool: return pool pool = UnixHTTPConnectionPool(url, self.timeout) self.pools[url] = pool return pool
def authenticate_user(self, response, **kwargs): """Handles user authentication with gssapi/kerberos""" host = urlparse(response.url).hostname try: auth_header = self.generate_request_header(response, host) except KerberosExchangeError: # GSS Failure, return existing response return response log.debug("authenticate_user(): Authorization header: {0}".format( auth_header)) response.request.headers['Authorization'] = auth_header # Consume the content so we can reuse the connection for the next # request. response.content response.raw.release_conn() _r = response.connection.send(response.request, **kwargs) _r.history.append(response) log.debug("authenticate_user(): returning {0}".format(_r)) return _r
def __call__(self, request): if self.force_preemptive: # add Authorization header before we receive a 401 # by the 401 handler host = urlparse(request.url).hostname auth_header = self.generate_request_header(None, host, is_preemptive=True) log.debug("HTTPKerberosAuth: Preemptive Authorization header: {0}".format(auth_header)) request.headers['Authorization'] = auth_header request.register_hook('response', self.handle_response) try: self.pos = request.body.tell() except AttributeError: # In the case of HTTPKerberosAuth being reused and the body # of the previous request was a file-like object, pos has # the file position of the previous body. Ensure it's set to # None. self.pos = None return request
def _build_message(self, request): """ Builds a string representation of the message contained in the request so it can be digested for HMAC generation """ url = urlparse(request.url) # THe version 1 spec of the HmacSignature class calls for the message to be signed # formatted as the following elements, each separated by a newline character: # * UserId (same value as used in Authorization header) # * HTTP Method (e.g. GET, POST) # * HTTP Host (e.g. server.example.org) # * Request path (e.g. /path/to/resource/) # * SORTED query string, keyed by natural UTF8 byte-ordering of names # * Request Body delimiter = '\n' msg = delimiter.join(( self._USERNAME or '', request.method, url.netloc, url.path, self._sort_parameters(url.query), request.body or '', )) return msg
def insert_spoofed_https_csrf_headers(headers, base_url): """ Creates HTTP headers that help to work around Django's CSRF protection, which shouldn't apply outside of the browser context. :param headers: a dictionary into which headers will be inserted, if needed :param base_url: the base URL of the Django application being contacted """ # if connecting to Django/DRF via HTTPS, spoof the 'Host' and 'Referer' headers that Django # uses to help prevent cross-site scripting attacks for secure browser connections. This # should be OK for a standalone Python REST API client, since the origin of a # cross-site scripting attack is malicious website code that executes in a browser, # but accesses another site's credentials via the browser or via user prompts within the # browser. Not applicable in this case for a standalone REST API client. # References: # https://docs.djangoproject.com/en/dev/ref/csrf/#how-it-works # http://security.stackexchange.com/questions/96114/why-is-referer-checking-needed-for-django # http://mathieu.fenniak.net/is-your-web-api-susceptible-to-a-csrf-exploit/ # -to-prevent-csrf if urlparse(base_url).scheme == 'https': headers['Host'] = urlsplit(base_url).netloc headers['Referer'] = base_url # LOL! Bad spelling is now standard :-)
def _process_query_dict(self, search_terms, entry_types, blast_program, blast_sequence, search_web, sort_field, sort_ascending, page_number): query_dict = {} query_url = None # TODO: re-instate this parameter if we can get ICE to support the same # queries in GET as in POST...should simplify client use if not query_url: if search_terms: query_dict['queryString'] = search_terms if entry_types: if not set(entry_types).issubset(set(ICE_ENTRY_TYPES)): raise KeyError('') query_dict['entryTypes'] = entry_types self._process_query_blast(query_dict, blast_program, blast_sequence) query_dict['webSearch'] = search_web # Note: affects results even if false? self._process_query_parameters(query_dict, sort_field, sort_ascending, page_number) else: # un-parse the query URL so we're using consistently following the same code path query_dict = parse_qs(urlparse(query_url).params) return query_dict
def get_connection(self, url, proxies=None): """Returns a urllib3 connection for the given URL. This should not be called from user code, and is only exposed for use when subclassing the :class:`HTTPAdapter <requests.adapters.HTTPAdapter>`. :param url: The URL to connect to. :param proxies: (optional) A Requests-style dictionary of proxies used on this request. """ proxies = proxies or {} proxy = proxies.get(urlparse(url.lower()).scheme) if proxy: proxy_headers = self.proxy_headers(proxy) if proxy not in self.proxy_manager: self.proxy_manager[proxy] = proxy_from_url( proxy, proxy_headers=proxy_headers, num_pools=self._pool_connections, maxsize=self._pool_maxsize, block=self._pool_block ) conn = self.proxy_manager[proxy].connection_from_url(url) else: # Only scheme should be lower case parsed = urlparse(url) url = parsed.geturl() conn = self.poolmanager.connection_from_url(url) self.connections.append(conn) return conn
def _create_stream(self, max_buffer_size, af, addr, source_ip=None, source_port=None): # Always connect in plaintext; we'll convert to ssl if necessary # after one connection has completed. source_port_bind = source_port if isinstance(source_port, int) else 0 source_ip_bind = source_ip socket_obj = socket.socket(af) set_close_exec(socket_obj.fileno()) try: stream = IOStream(socket_obj, io_loop=self.io_loop, max_buffer_size=max_buffer_size) # connect proxy if source_port_bind or source_ip_bind: @gen.coroutine def _(addr): proxy_headers = get_proxy_headers(source_ip_bind) parsed = urlparse(source_ip_bind) scheme, host, port = parsed.scheme, parsed.hostname, source_port_bind if 'socks' in scheme: r = yield self._negotiate_socks(addr, (source_ip_bind, source_port_bind)) raise gen.Return(r) elif scheme in ('http', 'https'): r = yield stream.connect((host, port)) if scheme == 'https': yield self._connect_tunnel(stream, addr, proxy_headers) raise gen.Return(r) else: raise AttributeError('Unknown scheme: %s' % scheme) return _(addr) else: return stream.connect(addr) except socket.error as e: fu = Future() fu.set_exception(e) return fu
def send(self, stream=False, timeout=None, verify=True, cert=None, proxies=None): request = self.request connect_timeout, self.read_timeout = parse_timeout(timeout) self.stream_body = stream # set connect timeout with stack_context.ExceptionStackContext(self._handle_exception): if connect_timeout: self._timeout = self.io_loop.call_later(connect_timeout, stack_context.wrap(functools.partial( self._on_timeout, 'while connecting'))) # set proxy related info proxy = select_proxy(request.url, proxies) self.headers = request.headers.copy() if proxy: proxy = prepend_scheme_if_needed(proxy, 'http') parsed = urlparse(proxy) scheme, host, port = parsed.scheme, proxy, parsed.port port = port or (443 if scheme == 'https' else 80) self.start_line = RequestStartLine(request.method, request.url, '') self.headers.update(get_proxy_headers(proxy)) else: host, port = None, None self.start_line = request.start_line self.tcp_client.connect( request.host, request.port, af=request.af, ssl_options=self._get_ssl_options(request, verify, cert), max_buffer_size=self.max_buffer_size, source_ip=host, source_port=port, callback=self._on_connect)
def get_string_to_sign(self, request, headers): sts = [] for header in headers: if header == "(request-target)": path_url = requests.models.RequestEncodingMixin.path_url.fget(request) sts.append("(request-target): {} {}".format(request.method.lower(), path_url)) else: if header.lower() == "host": value = request.headers.get("host", urlparse(request.url).hostname) else: value = request.headers[header] sts.append("{k}: {v}".format(k=header.lower(), v=value)) return "\n".join(sts).encode()
def get_uri_name(url): """Gets the file name from the end of the URL. Only useful for PyBEL's testing though since it looks specifically if the file is from the weird owncloud resources distributed by Fraunhofer""" url_parsed = urlparse(url) if url.startswith(FRAUNHOFER_RESOURCES): return url_parsed.query.split('=')[-1] else: url_parts = url_parsed.path.split('/') return url_parts[-1]
def is_url(s): """Checks if a string is a valid URL :param str s: An input string :return: Is the string a valid URL? :rtype: bool """ return urlparse(s).scheme != ""
def connect(self): sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(self.timeout) socket_path = unquote(urlparse(self.unix_socket_url).netloc) sock.connect(socket_path) self.sock = sock
def get_path_to_file_from_url(cls, url): """standard file path :param str url: download URL """ file_name = urlparse(url).path.split('/')[-1] return os.path.join(PYUNIPROT_DATA_DIR, file_name)
def _key_from_url(url): parsed = urlparse(url) return urlunparse((parsed.scheme.lower(), parsed.netloc.lower(), '', '', '', ''))
def _build_request_path(url, proxy_info): uri = compat.urlparse(url) proxy_url = proxy_info.get('request_path') if proxy_url is not None: return proxy_url, uri request_path = _coerce_to_bytes(uri.path) if uri.query: request_path += b'?' + _coerce_to_bytes(uri.query) return request_path, uri
def preprocess_media_tags(element): if isinstance(element, html.HtmlElement): if element.tag in ['ol', 'ul']: # ignore any spaces between <ul> and <li> element.text = '' elif element.tag == 'li': # ignore spaces after </li> element.tail = '' elif element.tag == 'iframe': iframe_src = element.get('src') youtube = re.match(youtube_re, iframe_src) vimeo = re.match(vimeo_re, iframe_src) if youtube or vimeo: element.text = '' # ignore any legacy text if youtube: yt_id = urlparse(iframe_src).path.replace('/embed/', '') element.set('src', '/embed/youtube?url=' + quote_plus('https://www.youtube.com/watch?v=' + yt_id)) elif vimeo: element.set('src', '/embed/vimeo?url=' + quote_plus('https://vimeo.com/' + vimeo.group(2))) if not len(element.xpath('./ancestor::figure')): _wrap_figure(element) else: element.drop_tag() elif element.tag == 'blockquote' and element.get('class') == 'twitter-tweet': twitter_links = element.xpath('.//a[@href]') for tw_link in twitter_links: if twitter_re.match(tw_link.get('href')): twitter_frame = html.HtmlElement() twitter_frame.tag = 'iframe' twitter_frame.set('src', '/embed/twitter?url=' + quote_plus(tw_link.get('href'))) element.addprevious(twitter_frame) _wrap_figure(twitter_frame) element.drop_tree()
def __init__(self, count, url, cls, session, params=None, etag=None, headers=None): GitHubCore.__init__(self, {}, session) #: Original number of items requested self.original = count #: Number of items left in the iterator self.count = count #: URL the class used to make it's first GET self.url = url #: Last URL that was requested self.last_url = None self._api = self.url #: Class for constructing an item to return self.cls = cls #: Parameters of the query string self.params = params or {} self._remove_none(self.params) # We do not set this from the parameter sent. We want this to # represent the ETag header returned by GitHub no matter what. # If this is not None, then it won't be set from the response and # that's not what we want. #: The ETag Header value returned by GitHub self.etag = None #: Headers generated for the GET request self.headers = headers or {} #: The last response seen self.last_response = None #: Last status code received self.last_status = 0 if etag: self.headers.update({'If-None-Match': etag}) self.path = urlparse(self.url).path
def _api(self, uri): self._uri = urlparse(uri)
def get_path_to_file_from_url(cls, url): """standard file path :param str url: CTD download URL """ file_name = urlparse(url).path.split('/')[-1] return os.path.join(cls.pyctd_data_dir, file_name)
def __init__(self, count, url, cls, session, params=None, etag=None, headers=None): models.GitHubCore.__init__(self, {}, session) #: Original number of items requested self.original = count #: Number of items left in the iterator self.count = count #: URL the class used to make it's first GET self.url = url #: Last URL that was requested self.last_url = None self._api = self.url #: Class for constructing an item to return self.cls = cls #: Parameters of the query string self.params = params or {} self._remove_none(self.params) # We do not set this from the parameter sent. We want this to # represent the ETag header returned by GitHub no matter what. # If this is not None, then it won't be set from the response and # that's not what we want. #: The ETag Header value returned by GitHub self.etag = None #: Headers generated for the GET request self.headers = headers or {} #: The last response seen self.last_response = None #: Last status code received self.last_status = 0 if etag: self.headers.update({'If-None-Match': etag}) self.path = urlparse(self.url).path
def _api(self, uri): self._uri = urlparse(uri) self.url = uri
def authenticate_server(self, response): """ Uses GSSAPI to authenticate the server. Returns True on success, False on failure. """ log.debug("authenticate_server(): Authenticate header: {0}".format( _negotiate_value(response))) host = urlparse(response.url).hostname try: result = kerberos.authGSSClientStep(self.context[host], _negotiate_value(response)) except kerberos.GSSError: log.exception("authenticate_server(): authGSSClientStep() failed:") return False if result < 1: log.error("authenticate_server(): authGSSClientStep() failed: " "{0}".format(result)) return False log.debug("authenticate_server(): returning {0}".format(response)) return True
def _extract_pagination_params(self, query_url): query_params_string = urlparse(query_url).query query_dict = parse_qs(query_params_string) if query_params_string else None offset = query_dict.get[RESULT_OFFSET_PARAMETER] if offset: offset = offset[0]
def parse_query_url(query_url): if query_url: url_elements = urlparse(query_url) url_parameters = parse_qs(url_elements.query) return url_elements, url_parameters return None, {}