Python urllib.request 模块,HTTPHandler() 实例源码

我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用urllib.request.HTTPHandler()

项目:lambda_utils    作者:CloudHeads    | 项目源码 | 文件源码
def send_signal(event, response_status, reason, response_data=None):
    response_body = json.dumps(
        {
            'Status': response_status,
            'Reason': str(reason or 'ReasonCanNotBeNone'),
            'PhysicalResourceId': event.get('PhysicalResourceId', event['LogicalResourceId']),
            'StackId': event['StackId'],
            'RequestId': event['RequestId'],
            'LogicalResourceId': event['LogicalResourceId'],
            'Data': response_data or {}
        },
        sort_keys=True,
    )
    logging.debug(response_body)
    opener = build_opener(HTTPHandler)
    request = Request(event['ResponseURL'], data=response_body)
    request.add_header('Content-Type', '')
    request.add_header('Content-Length', len(response_body))
    request.get_method = lambda: 'PUT'
    opener.open(request)
项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self.args = args
        self.kw = kwargs
        urllib2.HTTPHandler.__init__(self)
项目:weibo    作者:windskyer    | 项目源码 | 文件源码
def login(self, username, pwd, cookie_file):
        """"
            Login with use name, password and cookies.
            (1) If cookie file exists then try to load cookies;
            (2) If no cookies found then do login
        """
        # If cookie file exists then try to load cookies
        if os.path.exists(cookie_file):
            try:
                cookie_jar = cookielib.LWPCookieJar(cookie_file)
                cookie_jar.load(ignore_discard=True, ignore_expires=True)
                loaded = 1
            except cookielib.LoadError:
                loaded = 0
                LOG.info('Loading cookies error')

            # install loaded cookies for urllib2
            if loaded:
                cookie_support = urllib2.HTTPCookieProcessor(cookie_jar)
                opener = urllib2.build_opener(cookie_support,
                                              urllib2.HTTPHandler)
                urllib2.install_opener(opener)
                LOG.info('Loading cookies success')
                return 1
            else:
                return self.do_login(username, pwd, cookie_file)

        else:  # If no cookies found
            return self.do_login(username, pwd, cookie_file)
项目:weibo    作者:windskyer    | 项目源码 | 文件源码
def save_cookie(self, text, cookie_file=CONF.cookie_file):
        cookie_jar2 = cookielib.LWPCookieJar()
        cookie_support2 = urllib2.HTTPCookieProcessor(cookie_jar2)
        opener2 = urllib2.build_opener(cookie_support2, urllib2.HTTPHandler)
        urllib2.install_opener(opener2)
        if six.PY3:
            text = text.decode('gbk')
        p = re.compile('location\.replace\(\'(.*?)\'\)')
        # ???httpfox??????????????
        # location.replace('http://weibo.com ?????????
        # ?????????????# ????login_url?? ??????re?????
        # p = re.compile('location\.replace\(\B'(.*?)'\B\)')
        # ??? ??????? re?????\'???????
        try:
            # Search login redirection URL
            login_url = p.search(text).group(1)
            data = urllib2.urlopen(login_url).read()
            # Verify login feedback, check whether result is TRUE
            patt_feedback = 'feedBackUrlCallBack\((.*)\)'
            p = re.compile(patt_feedback, re.MULTILINE)

            feedback = p.search(data).group(1)
            feedback_json = json.loads(feedback)
            if feedback_json['result']:
                cookie_jar2.save(cookie_file,
                                 ignore_discard=True,
                                 ignore_expires=True)
                return 1
            else:
                return 0
        except:
            return 0
项目:weibo    作者:windskyer    | 项目源码 | 文件源码
def login(self, username, pwd, cookie_file):
        """"
            Login with use name, password and cookies.
            (1) If cookie file exists then try to load cookies;
            (2) If no cookies found then do login
        """
        # If cookie file exists then try to load cookies
        if os.path.exists(cookie_file):
            try:
                cookie_jar = cookielib.LWPCookieJar(cookie_file)
                cookie_jar.load(ignore_discard=True, ignore_expires=True)
                loaded = 1
            except cookielib.LoadError:
                loaded = 0
                print('Loading cookies error')

            #install loaded cookies for urllib2
            if loaded:
                cookie_support = urllib2.HTTPCookieProcessor(cookie_jar)
                opener = urllib2.build_opener(cookie_support, urllib2.HTTPHandler)
                urllib2.install_opener(opener)
                print('Loading cookies success')
                return 1
            else:
                return self.do_login(username, pwd, cookie_file)

        else:  #If no cookies found
            return self.do_login(username, pwd, cookie_file)
项目:habrahabr-api-python    作者:dotzero    | 项目源码 | 文件源码
def _request(self, url, method='GET', data=None):
        url = self._auth.endpoint + url
        headers = self._auth.headers

        if data is not None:
            data = urlencode(data)
            if method in ['GET', 'DELETE']:
                url = url + '?' + data
                data = None
            else:
                headers.update({'Content-Type': POST_CONTENT_TYPE})
                if sys.version_info > (3,):  # python3
                    data = data.encode('utf-8')

        log.debug(method + ' ' + url)
        log.debug(data)

        try:
            opener = build_opener(HTTPHandler)
            request = Request(url, data=data, headers=headers)
            request.get_method = lambda: method
            response = opener.open(request).read()
            data = self._parse_response(response)
        except HTTPError as e:
            log.error(e)
            data = self._parse_response(e.read())
            raise ApiHandlerError('Invalid server response', data)
        except ValueError as e:
            log.error(e)
            raise ApiHandlerError('Invalid server response')

        return data
项目:MailFail    作者:m0rtem    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self.args = args
        self.kw = kwargs
        urllib2.HTTPHandler.__init__(self)
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def hashed_download(url, temp, digest):
    """Download ``url`` to ``temp``, make sure it has the SHA-256 ``digest``,
    and return its path."""
    # Based on pip 1.4.1's URLOpener but with cert verification removed. Python
    # >=2.7.9 verifies HTTPS certs itself, and, in any case, the cert
    # authenticity has only privacy (not arbitrary code execution)
    # implications, since we're checking hashes.
    def opener():
        opener = build_opener(HTTPSHandler())
        # Strip out HTTPHandler to prevent MITM spoof:
        for handler in opener.handlers:
            if isinstance(handler, HTTPHandler):
                opener.handlers.remove(handler)
        return opener

    def read_chunks(response, chunk_size):
        while True:
            chunk = response.read(chunk_size)
            if not chunk:
                break
            yield chunk

    response = opener().open(url)
    path = join(temp, urlparse(url).path.split('/')[-1])
    actual_hash = sha256()
    with open(path, 'wb') as file:
        for chunk in read_chunks(response, 4096):
            file.write(chunk)
            actual_hash.update(chunk)

    actual_digest = actual_hash.hexdigest()
    if actual_digest != digest:
        raise HashError(url, path, actual_digest, digest)
    return path
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def send(event, context, response_status, reason=None, response_data=None, physical_resource_id=None):
    response_data = response_data or {}
    response_body = json.dumps(
        {
            'Status': response_status,
            'Reason': reason or "See the details in CloudWatch Log Stream: " + context.log_stream_name,
            'PhysicalResourceId': physical_resource_id or context.log_stream_name,
            'StackId': event['StackId'],
            'RequestId': event['RequestId'],
            'LogicalResourceId': event['LogicalResourceId'],
            'Data': response_data
        }
    )

    opener = build_opener(HTTPHandler)
    request = Request(event['ResponseURL'], data=response_body)
    request.add_header('Content-Type', '')
    request.add_header('Content-Length', len(response_body))
    request.get_method = lambda: 'PUT'
    try:
        response = opener.open(request)
        print("Status code: {}".format(response.getcode()))
        print("Status message: {}".format(response.msg))
        return True
    except HTTPError as exc:
        print("Failed executing HTTP request: {}".format(exc.code))
        return False
项目:VIA4CVE    作者:cve-search    | 项目源码 | 文件源码
def getFile(cls, getfile, unpack=True):
    if cls.getProxy():
      proxy = req.ProxyHandler({'http': cls.getProxy(), 'https': cls.getProxy()})
      auth = req.HTTPBasicAuthHandler()
      opener = req.build_opener(proxy, auth, req.HTTPHandler)
      req.install_opener(opener)
    try:
      response = req.urlopen(getfile)
    except:
      msg = "[!] Could not fetch file %s"%getfile
      if cls.exitWhenNoSource(): sys.exit(msg)
      else:                      print(msg)
      data = None
    data = response.read()
    # TODO: if data == text/plain; charset=utf-8, read and decode
    if unpack:
      if   'gzip' in response.info().get('Content-Type'):
        data = gzip.GzipFile(fileobj = BytesIO(data))
      elif 'bzip2' in response.info().get('Content-Type'):
        data = BytesIO(bz2.decompress(data))
      elif 'zip' in response.info().get('Content-Type'):
        fzip = zipfile.ZipFile(BytesIO(data), 'r')
        if len(fzip.namelist())>0:
          data=BytesIO(fzip.read(fzip.namelist()[0]))
      # In case the webserver is being generic
      elif 'application/octet-stream' in response.info().get('Content-Type'):
        if data[:4] == b'PK\x03\x04': # Zip
          fzip = zipfile.ZipFile(BytesIO(data), 'r')
          if len(fzip.namelist())>0:
            data=BytesIO(fzip.read(fzip.namelist()[0]))
    return (data, response)
项目:certbot    作者:nikoloskii    | 项目源码 | 文件源码
def hashed_download(url, temp, digest):
    """Download ``url`` to ``temp``, make sure it has the SHA-256 ``digest``,
    and return its path."""
    # Based on pip 1.4.1's URLOpener but with cert verification removed. Python
    # >=2.7.9 verifies HTTPS certs itself, and, in any case, the cert
    # authenticity has only privacy (not arbitrary code execution)
    # implications, since we're checking hashes.
    def opener():
        opener = build_opener(HTTPSHandler())
        # Strip out HTTPHandler to prevent MITM spoof:
        for handler in opener.handlers:
            if isinstance(handler, HTTPHandler):
                opener.handlers.remove(handler)
        return opener

    def read_chunks(response, chunk_size):
        while True:
            chunk = response.read(chunk_size)
            if not chunk:
                break
            yield chunk

    response = opener().open(url)
    path = join(temp, urlparse(url).path.split('/')[-1])
    actual_hash = sha256()
    with open(path, 'wb') as file:
        for chunk in read_chunks(response, 4096):
            file.write(chunk)
            actual_hash.update(chunk)

    actual_digest = actual_hash.hexdigest()
    if actual_digest != digest:
        raise HashError(url, path, actual_digest, digest)
    return path
项目:splunk_ta_ps4_f1_2016    作者:jonathanvarley    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self.args = args
        self.kw = kwargs
        urllib2.HTTPHandler.__init__(self)
项目:PocHunter    作者:DavexPro    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self.args = args
        self.kw = kwargs
        urllib2.HTTPHandler.__init__(self)
项目:Tor    作者:r0oth3x49    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self.args = args
        self.kw = kwargs
        urllib2.HTTPHandler.__init__(self)
项目:TA-SyncKVStore    作者:georgestarcher    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self.args = args
        self.kw = kwargs
        urllib2.HTTPHandler.__init__(self)
项目:cb-defense-splunk-app    作者:carbonblack    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self.args = args
        self.kw = kwargs
        urllib2.HTTPHandler.__init__(self)
项目:Google-Alfred3-Workflow    作者:ethan-funny    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self.args = args
        self.kw = kwargs
        urllib2.HTTPHandler.__init__(self)
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self.args = args
        self.kw = kwargs
        urllib2.HTTPHandler.__init__(self)
项目:htcap_mysql    作者:0xa-saline    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        self.args = args
        self.kw = kwargs
        urllib2.HTTPHandler.__init__(self)
项目:antenna    作者:mozilla-services    | 项目源码 | 文件源码
def hashed_download(url, temp, digest):
    """Download ``url`` to ``temp``, make sure it has the SHA-256 ``digest``,
    and return its path."""
    # Based on pip 1.4.1's URLOpener but with cert verification removed. Python
    # >=2.7.9 verifies HTTPS certs itself, and, in any case, the cert
    # authenticity has only privacy (not arbitrary code execution)
    # implications, since we're checking hashes.
    def opener():
        opener = build_opener(HTTPSHandler())
        # Strip out HTTPHandler to prevent MITM spoof:
        for handler in opener.handlers:
            if isinstance(handler, HTTPHandler):
                opener.handlers.remove(handler)
        return opener

    def read_chunks(response, chunk_size):
        while True:
            chunk = response.read(chunk_size)
            if not chunk:
                break
            yield chunk

    response = opener().open(url)
    path = join(temp, urlparse(url).path.split('/')[-1])
    actual_hash = sha256()
    with open(path, 'wb') as file:
        for chunk in read_chunks(response, 4096):
            file.write(chunk)
            actual_hash.update(chunk)

    actual_digest = actual_hash.hexdigest()
    if actual_digest != digest:
        raise HashError(url, path, actual_digest, digest)
    return path