Python urllib.request 模块,add_header() 实例源码

我们从Python开源项目中,提取了以下47个代码示例,用于说明如何使用urllib.request.add_header()

项目:domoticz-plugins    作者:milanvo    | 项目源码 | 文件源码
def _try_send(self, ip, port, body, header, data):
        try:
            request = urllib.request.Request('http://%s:%s/upnp/control/basicevent1' % (ip, port))
            request.add_header('Content-type', 'text/xml; charset="utf-8"')
            request.add_header('SOAPACTION', header)
            request_body = '<?xml version="1.0" encoding="utf-8"?>'
            request_body += '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">'
            request_body += '<s:Body>%s</s:Body></s:Envelope>' % body
            request.data = request_body.encode()
            result = urllib.request.urlopen(request, timeout=3)
            return self._extract(result.read().decode(), data)
        except Exception as e:
#        except:
#            raise
            print(str(e))
            return None
项目:AutoSteamGifts    作者:joaopsys    | 项目源码 | 文件源码
def getWebPage(url, headers, cookies, postData=None):
    try:
        if (postData):
            params = urllib.parse.urlencode(postData)
            params = params.encode('utf-8')
            request = urllib.request.Request(url, data=params, headers=headers)
        else:
            print('Fetching '+url)
            request = urllib.request.Request(url, None, headers)
        request.add_header('Cookie', cookies)
        if (postData):
            response = urllib.request.build_opener(urllib.request.HTTPCookieProcessor).open(request)
        else:
            response = urllib.request.urlopen(request)
        if response.info().get('Content-Encoding') == 'gzip':
            buf = BytesIO(response.read())
            f = gzip.GzipFile(fileobj=buf)
            r = f.read()
        else:
            r = response.read()

        return r
    except Exception as e:
        print("Error processing webpage: "+str(e))
        return None

## https://stackoverflow.com/questions/480214/how-do-you-remove-duplicates-from-a-list-in-python-whilst-preserving-order
项目:googleSpeech_with_NaverTTS    作者:chandong83    | 项目源码 | 文件源码
def play(self, txt):
        encText = urllib.parse.quote(txt)
        data = "speaker=" + self.speaker + "&speed=" + self.speed + "&text=" + encText;

        request = urllib.request.Request(url)
        request.add_header("X-Naver-Client-Id",client_id)
        request.add_header("X-Naver-Client-Secret",client_secret)
        response = urllib.request.urlopen(request, data=data.encode('utf-8'))
        rescode = response.getcode()
        if(rescode==200):
            response_body = response.read()
            with open(tmpPlayPath, 'wb') as f:
                f.write(response_body)

            #?? ???? ?? vlc
            os.system('cvlc ' + tmpPlayPath + ' --play-and-exit')
            #??????
            #os.system('omxplayer ' + tmpPlayPath)
        else:
            print("Error Code:" + rescode)
项目:prms-door    作者:prmakerspace    | 项目源码 | 文件源码
def authenticate_with_apikey(self, api_key, scope=None):
        """perform authentication by api key and store result for execute_request method
        api_key -- secret api key from account settings
        scope -- optional scope of authentication request. If None full list of API scopes will be used.
        """
        scope = "auto" if scope is None else scope
        data = {
            "grant_type": "client_credentials",
            "scope": scope
        }
        encoded_data = urllib.parse.urlencode(data).encode()
        request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
        request.add_header("ContentType", "application/x-www-form-urlencoded")
        request.add_header("Authorization", 'Basic ' + base64.standard_b64encode(('APIKEY:' + api_key).encode()).decode())
        response = urllib.request.urlopen(request)
        self._token = WaApiClient._parse_response(response)
        self._token.retrieved_at = datetime.datetime.now()
项目:prms-door    作者:prmakerspace    | 项目源码 | 文件源码
def authenticate_with_contact_credentials(self, username, password, scope=None):
        """perform authentication by contact credentials and store result for execute_request method
        username -- typically a contact email
        password -- contact password
        scope -- optional scope of authentication request. If None full list of API scopes will be used.
        """
        scope = "auto" if scope is None else scope
        data = {
            "grant_type": "password",
            "username": username,
            "password": password,
            "scope": scope
        }
        encoded_data = urllib.parse.urlencode(data).encode()
        request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
        request.add_header("ContentType", "application/x-www-form-urlencoded")
        auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode()
        request.add_header("Authorization", 'Basic ' + auth_header)
        response = urllib.request.urlopen(request)
        self._token = WaApiClient._parse_response(response)
        self._token.retrieved_at = datetime.datetime.now()
项目:pygameweb    作者:pygame    | 项目源码 | 文件源码
def downloadUrls(self, urls):
        url_data = {}
        for u in urls:
            url = self.base_url + u

            request = urllib.request.Request(url)


            # the .htaccess file checks for the header, and if it exists returns unprocessed data.
            request.add_header('User-agent', 'our-web-crawler')
            try:
                response = urllib.request.urlopen(request)
                data = response.read()
            except urllib.request.HTTPError:
                log (url)
                raise
            except urllib.request.URLError:
                log (url)
                raise
            yield (u,data)
项目:wswp    作者:kjam    | 项目源码 | 文件源码
def download(url, num_retries=2, user_agent='wswp', charset='utf-8'):
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e.reason)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
项目:wswp    作者:kjam    | 项目源码 | 文件源码
def download(url, num_retries=2, user_agent='wswp', charset='utf-8'):
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e.reason)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
项目:wswp    作者:kjam    | 项目源码 | 文件源码
def download(url, num_retries=2, user_agent='wswp', charset='utf-8'):
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e.reason)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
项目:Python-Web-Scraping-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def download(url, user_agent='wswp', num_retries=2, charset='utf-8'):
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e.reason)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
项目:Python-Web-Scraping-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def download(url, user_agent='wswp', num_retries=2, charset='utf-8'):
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e.reason)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
项目:Python-Web-Scraping-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def download(url, user_agent='wswp', num_retries=2, charset='utf-8'):
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e.reason)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
项目:maas    作者:maas    | 项目源码 | 文件源码
def put(self, location, params=None):
        """Dispatch a PUT request to a SeaMicro chassis.

        The seamicro box has order-dependent HTTP parameters, so we build
        our own get URL, and use a list vs. a dict for data, as the order is
        implicit.
        """
        opener = urllib.request.build_opener(urllib.request.HTTPHandler)
        url = self.build_url(location, params)
        request = urllib.request.Request(url)
        request.get_method = lambda: 'PUT'
        request.add_header('content-type', 'text/json')
        response = opener.open(request)
        json_data = self.parse_response(url, response)

        return json_data['result']
项目:mando.me    作者:z0noxz    | 项目源码 | 文件源码
def send(url, headers, timeout):
        request = urllib.request.Request(url)
        data = ""

        # Add password key
        request.add_header(_gs["smplshll_main_password_var"], _gs["smplshll_main_password"])

        for header in headers.keys():
            request.add_header(header, Utility.crypt(_gs["smplshll_input_password"], headers[header]))

        if timeout > 0:
            data = urllib.request.urlopen(request, timeout = timeout).read()
        else:
            data = urllib.request.urlopen(request).read()

        return Utility.crypt(_gs["smplshll_input_password"], data, False) if _gs["smplshll_response_encryption"] else data
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def build_http_request_obj(self, request_body):
        request = urllib.request.Request(self.url)
        request.add_header("Content-Type", "application/json")
        request.add_header("User-Agent", "gemstone-client")
        request.data = json.dumps(request_body).encode()
        request.method = "POST"
        return request
项目:python-fritzbox    作者:pamapa    | 项目源码 | 文件源码
def post(self, path, headers, body):
    uri = "%s/%s" % (self.url.geturl(), path)
    if self.debug: print("post: uri=%s, headers=%s" % (uri, headers))
    request = urllib2.Request(uri)
    for header in headers:
      request.add_header(header, headers[header])
    request.add_data(body)
    resp = urllib2.urlopen(request, cafile=self.cafile)
    if self.debug: print("resp: %s" % resp.info())
    return resp
项目:SublimeRSS    作者:JaredMHall    | 项目源码 | 文件源码
def _build_urllib2_request(url, agent, etag, modified, referrer, auth, request_headers):
    request = urllib.request.Request(url)
    request.add_header('User-Agent', agent)
    if etag:
        request.add_header('If-None-Match', etag)
    if isinstance(modified, str):
        modified = _parse_date(modified)
    elif isinstance(modified, datetime.datetime):
        modified = modified.utctimetuple()
    if modified:
        # format into an RFC 1123-compliant timestamp. We can't use
        # time.strftime() since the %a and %b directives can be affected
        # by the current locale, but RFC 2616 states that dates must be
        # in English.
        short_weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
        months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
        request.add_header('If-Modified-Since', '%s, %02d %s %04d %02d:%02d:%02d GMT' % (short_weekdays[modified[6]], modified[2], months[modified[1] - 1], modified[0], modified[3], modified[4], modified[5]))
    if referrer:
        request.add_header('Referer', referrer)
    if gzip and zlib:
        request.add_header('Accept-encoding', 'gzip, deflate')
    elif gzip:
        request.add_header('Accept-encoding', 'gzip')
    elif zlib:
        request.add_header('Accept-encoding', 'deflate')
    else:
        request.add_header('Accept-encoding', '')
    if auth:
        request.add_header('Authorization', 'Basic %s' % auth)
    if ACCEPT_HEADER:
        request.add_header('Accept', ACCEPT_HEADER)
    # use this for whatever -- cookies, special headers, etc
    # [('Cookie','Something'),('x-special-header','Another Value')]
    for header_name, header_value in list(request_headers.items()):
        request.add_header(header_name, header_value)
    request.add_header('A-IM', 'feed') # RFC 3229 support
    return request
项目:bing    作者:thatnight    | 项目源码 | 文件源码
def open_url(url):
    request = urllib.request.Request(url)
    request.add_header('User-Agent',
                       'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.3357.400 QQBrowser/9.6.11858.400')
    response = urllib.request.urlopen(request)
    return response.read()


# download image
项目:google-crawler    作者:xibowang    | 项目源码 | 文件源码
def fetch_page(query):
    url = REQUEST_URL.format(BASE_URL, query)
    request = urllib.request.Request(url)
    request.add_header('User-agent', _random_user_agent())
    request.add_header('connection', 'keep-alive')
    request.add_header('Accept-Encoding', 'gzip, deflate, sdch, br')
    request.add_header('referer', REQUEST_URL.format(BASE_URL, ""))
    print(url)
    response = urllib.request.urlopen(request)

    data = response.read()
    print(type(data))
    return gzip.decompress(data)
项目:hangoutsbot    作者:das7pad    | 项目源码 | 文件源码
def upload_image(self, image_uri, sync, username, userid, channel_name):
        token = self.apikey
        logger.info('downloading %s', image_uri)
        filename = os.path.basename(image_uri)
        request = urllib.request.Request(image_uri)
        request.add_header("Authorization", "Bearer %s" % token)
        image_response = urllib.request.urlopen(request)
        content_type = image_response.info().get_content_type()

        filename_extension = mimetypes.guess_extension(content_type).lower() # returns with "."
        physical_extension = "." + filename.rsplit(".", 1).pop().lower()

        if physical_extension == filename_extension:
            pass
        elif filename_extension == ".jpe" and physical_extension in [ ".jpg", ".jpeg", ".jpe", ".jif", ".jfif" ]:
            # account for mimetypes idiosyncrancy to return jpe for valid jpeg
            pass
        else:
            logger.warning("unable to determine extension: {} {}".format(filename_extension, physical_extension))
            filename += filename_extension

        logger.info('uploading as %s', filename)
        image_id = yield from self.bot._client.upload_image(image_response, filename=filename)

        logger.info('sending HO message, image_id: %s', image_id)
        yield from sync._bridgeinstance._send_to_internal_chat(
            sync.hangoutid,
            "shared media from slack",
            {   "sync": sync,
                "source_user": username,
                "source_uid": userid,
                "source_title": channel_name },
            image_id=image_id )
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_custom_headers(self):
        url = "http://www.example.com"
        with support.transient_internet(url):
            opener = urllib.request.build_opener()
            request = urllib.request.Request(url)
            self.assertFalse(request.header_items())
            opener.open(request)
            self.assertTrue(request.header_items())
            self.assertTrue(request.has_header('User-agent'))
            request.add_header('User-Agent','Test-Agent')
            opener.open(request)
            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
项目:WeixinBot    作者:Urinx    | 项目源码 | 文件源码
def _get(self, url: object, api: object = None, timeout: object = None) -> object:
        request = urllib.request.Request(url=url)
        request.add_header('Referer', 'https://wx.qq.com/')
        if api == 'webwxgetvoice':
            request.add_header('Range', 'bytes=0-')
        if api == 'webwxgetvideo':
            request.add_header('Range', 'bytes=0-')
        try:
            response = urllib.request.urlopen(request, timeout=timeout) if timeout else urllib.request.urlopen(request)
            if api == 'webwxgetvoice' or api == 'webwxgetvideo':
                data = response.read()
            else:
                data = response.read().decode('utf-8')
            logging.debug(url)
            return data
        except urllib.error.HTTPError as e:
            logging.error('HTTPError = ' + str(e.code))
        except urllib.error.URLError as e:
            logging.error('URLError = ' + str(e.reason))
        except http.client.HTTPException as e:
            logging.error('HTTPException')
        except timeout_error as e:
            pass
        except ssl.CertificateError as e:
            pass
        except Exception:
            import traceback
            logging.error('generic exception: ' + traceback.format_exc())
        return ''
项目:WeixinBot    作者:Urinx    | 项目源码 | 文件源码
def _post(self, url: object, params: object, jsonfmt: object = True) -> object:
        if jsonfmt:
            data = (json.dumps(params)).encode()

            request = urllib.request.Request(url=url, data=data)
            request.add_header(
                'ContentType', 'application/json; charset=UTF-8')
        else:
            request = urllib.request.Request(url=url, data=urllib.parse.urlencode(params).encode(encoding='utf-8'))


        try:
            response = urllib.request.urlopen(request)
            data = response.read()
            if jsonfmt:
                return json.loads(data.decode('utf-8') )#object_hook=_decode_dict)
            return data
        except urllib.error.HTTPError as e:
            logging.error('HTTPError = ' + str(e.code))
        except urllib.error.URLError as e:
            logging.error('URLError = ' + str(e.reason))
        except http.client.HTTPException as e:
            logging.error('HTTPException')
        except Exception:
            import traceback
            logging.error('generic exception: ' + traceback.format_exc())

        return ''
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_custom_headers(self):
        url = "http://www.example.com"
        with support.transient_internet(url):
            opener = urllib.request.build_opener()
            request = urllib.request.Request(url)
            self.assertFalse(request.header_items())
            opener.open(request)
            self.assertTrue(request.header_items())
            self.assertTrue(request.has_header('User-agent'))
            request.add_header('User-Agent','Test-Agent')
            opener.open(request)
            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
项目:domoticz    作者:ericstaal    | 项目源码 | 文件源码
def volgendeZonondergang(self):
    # sunrise from domoticz... But I don't know how to retrieve it....

    try:
      domoticzurl = 'https://127.0.0.1:8443/json.htm?type=command&param=getSunRiseSet'
      encoding = 'utf-8'

      inlog = '%s:%s' % (self.domoticzusername, self.domoticzpassword) 
      base64string = base64.b64encode(inlog.encode(encoding)).decode(encoding)
      request = urllib.request.Request(domoticzurl)
      request.add_header("Authorization", "Basic %s" % base64string)
      response = urllib.request.urlopen(request)
      data = response.read()

      JSON_object = json.loads(data.decode(encoding))
      time = JSON_object['Sunset'].split(':')
      now = datetime.now()
      ret = datetime(now.year, now.month, now.day, int(time[0]), int(time[1]), 0)
      # when started after sunset use 'now'
      now = now + timedelta(minutes = int(Parameters["Mode4"])) 
      if (now > ret):
        ret = ret + timedelta(days = 1) 
      return ret
    except Exception as e:
      self.LogError("Error retrieving Sunset: "+ str(e))
      now = datetime.now()
      return datetime(now.year, now.month, now.day, 22, 0, 0)
项目:domoticz    作者:ericstaal    | 项目源码 | 文件源码
def domoticzrequest (url):
  request = urllib.request.Request(url)
  request.add_header("Authorization", "Basic %s" % base64string)
  response = urllib.request.urlopen(request)
  return response.read().decode('utf-8')
项目:domoticz    作者:ericstaal    | 项目源码 | 文件源码
def domoticzrequest (url):
  request = urllib.request.Request(url)
  request.add_header("Authorization", "Basic %s" % base64string)
  response = urllib.request.urlopen(request)
  return response.read().decode('utf-8')
项目:textnews    作者:qznc    | 项目源码 | 文件源码
def _build_urllib2_request(url, agent, etag, modified, referrer, auth, request_headers):
    request = urllib.request.Request(url)
    request.add_header('User-Agent', agent)
    if etag:
        request.add_header('If-None-Match', etag)
    if isinstance(modified, str):
        modified = _parse_date(modified)
    elif isinstance(modified, datetime.datetime):
        modified = modified.utctimetuple()
    if modified:
        # format into an RFC 1123-compliant timestamp. We can't use
        # time.strftime() since the %a and %b directives can be affected
        # by the current locale, but RFC 2616 states that dates must be
        # in English.
        short_weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
        months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
        request.add_header('If-Modified-Since', '%s, %02d %s %04d %02d:%02d:%02d GMT' % (short_weekdays[modified[6]], modified[2], months[modified[1] - 1], modified[0], modified[3], modified[4], modified[5]))
    if referrer:
        request.add_header('Referer', referrer)
    if gzip and zlib:
        request.add_header('Accept-encoding', 'gzip, deflate')
    elif gzip:
        request.add_header('Accept-encoding', 'gzip')
    elif zlib:
        request.add_header('Accept-encoding', 'deflate')
    else:
        request.add_header('Accept-encoding', '')
    if auth:
        request.add_header('Authorization', 'Basic %s' % auth)
    if ACCEPT_HEADER:
        request.add_header('Accept', ACCEPT_HEADER)
    # use this for whatever -- cookies, special headers, etc
    # [('Cookie','Something'),('x-special-header','Another Value')]
    for header_name, header_value in list(request_headers.items()):
        request.add_header(header_name, header_value)
    request.add_header('A-IM', 'feed') # RFC 3229 support
    return request
项目:prms-door    作者:prmakerspace    | 项目源码 | 文件源码
def execute_request(self, api_url, api_request_object=None, method=None):
        """
        perform api request and return result as an instance of ApiObject or list of ApiObjects
        api_url -- absolute or relative api resource url
        api_request_object -- any json serializable object to send to API
        method -- HTTP method of api request. Default: GET if api_request_object is None else POST
        """
        if self._token is None:
            raise ApiException("Access token is not abtained. "
                               "Call authenticate_with_apikey or authenticate_with_contact_credentials first.")

        if not api_url.startswith("http"):
            api_url = self.api_endpoint + api_url

        if method is None:
            if api_request_object is None:
                method = "GET"
            else:
                method = "POST"

        request = urllib.request.Request(api_url, method=method)
        if api_request_object is not None:
            request.data = json.dumps(api_request_object, cls=_ApiObjectEncoder).encode()

        request.add_header("Content-Type", "application/json")
        request.add_header("Accept", "application/json")
        request.add_header("Authorization", "Bearer " + self._get_access_token())

        try:
            response = urllib.request.urlopen(request)
            return WaApiClient._parse_response(response)
        except urllib.error.HTTPError as httpErr:
            if httpErr.code == 400:
                raise ApiException(httpErr.read())
            else:
                raise
项目:prms-door    作者:prmakerspace    | 项目源码 | 文件源码
def _refresh_auth_token(self):
        data = {
            "grant_type": "refresh_token",
            "refresh_token": self._token.refresh_token
        }
        encoded_data = urllib.parse.urlencode(data).encode()
        request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
        request.add_header("ContentType", "application/x-www-form-urlencoded")
        auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode()
        request.add_header("Authorization", 'Basic ' + auth_header)
        response = urllib.request.urlopen(request)
        self._token = WaApiClient._parse_response(response)
        self._token.retrieved_at = datetime.datetime.now()
项目:MIT-6.0001-Problem-sets-solution    作者:cantell    | 项目源码 | 文件源码
def _build_urllib2_request(url, agent, etag, modified, referrer, auth, request_headers):
    request = urllib.request.Request(url)
    request.add_header('User-Agent', agent)
    if etag:
        request.add_header('If-None-Match', etag)
    if isinstance(modified, str):
        modified = _parse_date(modified)
    elif isinstance(modified, datetime.datetime):
        modified = modified.utctimetuple()
    if modified:
        # format into an RFC 1123-compliant timestamp. We can't use
        # time.strftime() since the %a and %b directives can be affected
        # by the current locale, but RFC 2616 states that dates must be
        # in English.
        short_weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
        months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
        request.add_header('If-Modified-Since', '%s, %02d %s %04d %02d:%02d:%02d GMT' % (short_weekdays[modified[6]], modified[2], months[modified[1] - 1], modified[0], modified[3], modified[4], modified[5]))
    if referrer:
        request.add_header('Referer', referrer)
    if gzip and zlib:
        request.add_header('Accept-encoding', 'gzip, deflate')
    elif gzip:
        request.add_header('Accept-encoding', 'gzip')
    elif zlib:
        request.add_header('Accept-encoding', 'deflate')
    else:
        request.add_header('Accept-encoding', '')
    if auth:
        request.add_header('Authorization', 'Basic %s' % auth)
    if ACCEPT_HEADER:
        request.add_header('Accept', ACCEPT_HEADER)
    # use this for whatever -- cookies, special headers, etc
    # [('Cookie','Something'),('x-special-header','Another Value')]
    for header_name, header_value in list(request_headers.items()):
        request.add_header(header_name, header_value)
    request.add_header('A-IM', 'feed') # RFC 3229 support
    return request
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_custom_headers(self):
        url = "http://www.example.com"
        with support.transient_internet(url):
            opener = urllib.request.build_opener()
            request = urllib.request.Request(url)
            self.assertFalse(request.header_items())
            opener.open(request)
            self.assertTrue(request.header_items())
            self.assertTrue(request.has_header('User-agent'))
            request.add_header('User-Agent','Test-Agent')
            opener.open(request)
            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
项目:blender-addon-updater    作者:CGCookie    | 项目源码 | 文件源码
def get_raw(self, url):
        # print("Raw request:", url)
        request = urllib.request.Request(url)

        # setup private request headers if appropriate
        if self._engine.token != None:
            if self._engine.name == "gitlab":
                request.add_header('PRIVATE-TOKEN',self._engine.token)
            else:
                if self._verbose: print("Tokens not setup for engine yet")

        # run the request
        try:
            result = urllib.request.urlopen(request)
        except urllib.error.HTTPError as e:
            self._error = "HTTP error"
            self._error_msg = str(e.code)
            self._update_ready = None
        except urllib.error.URLError as e:
            self._error = "URL error, check internet connection"
            self._error_msg = str(e.reason)
            self._update_ready = None
            return None
        else:
            result_string = result.read()
            result.close()
            return result_string.decode()


    # result of all api calls, decoded into json format
项目:wswp    作者:kjam    | 项目源码 | 文件源码
def download(url, num_retries=2, user_agent='wswp', charset='utf-8', proxy=None):
    """ Download a given URL and return the page content
        args:
            url (str): URL
        kwargs:
            user_agent (str): user agent (default: wswp)
            charset (str): charset if website does not include one in headers
            proxy (str): proxy url, ex 'http://IP' (default: None)
            num_retries (int): number of retries if a 5xx error is seen (default: 2)
    """
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        if proxy:
            proxy_support = urllib.request.ProxyHandler({'http': proxy})
            opener = urllib.request.build_opener(proxy_support)
            urllib.request.install_opener(opener)
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
项目:wswp    作者:kjam    | 项目源码 | 文件源码
def download(url, num_retries=2, user_agent='wswp', charset='utf-8', proxy=None):
    """ Download a given URL and return the page content
        args:
            url (str): URL
        kwargs:
            user_agent (str): user agent (default: wswp)
            charset (str): charset if website does not include one in headers
            proxy (str): proxy url, ex 'http://IP' (default: None)
            num_retries (int): number of retries if a 5xx error is seen (default: 2)
    """
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        if proxy:
            proxy_support = urllib.request.ProxyHandler({'http': proxy})
            opener = urllib.request.build_opener(proxy_support)
            urllib.request.install_opener(opener)
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e.reason)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
项目:Web-Scraping    作者:Martian2Lee    | 项目源码 | 文件源码
def get_html(url):
    request = urllib.request.Request(url)
    request.add_header("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.99 Safari/537.36")
    html = urllib.request.urlopen(request)
    return html.read()
项目:Web-Scraping    作者:Martian2Lee    | 项目源码 | 文件源码
def get_html(url):
    request = urllib.request.Request(url)
    request.add_header("User-Agent", "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.99 Safari/537.36")
    html = urllib.request.urlopen(request)
    return html.read()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_custom_headers(self):
        url = "http://www.example.com"
        with support.transient_internet(url):
            opener = urllib.request.build_opener()
            request = urllib.request.Request(url)
            self.assertFalse(request.header_items())
            opener.open(request)
            self.assertTrue(request.header_items())
            self.assertTrue(request.has_header('User-agent'))
            request.add_header('User-Agent','Test-Agent')
            opener.open(request)
            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
项目:machine-learning-python    作者:pspxiaochen    | 项目源码 | 文件源码
def _build_urllib2_request(url, agent, accept_header, etag, modified, referrer, auth, request_headers):
    request = urllib.request.Request(url)
    request.add_header('User-Agent', agent)
    if etag:
        request.add_header('If-None-Match', etag)
    if isinstance(modified, basestring):
        modified = _parse_date(modified)
    elif isinstance(modified, datetime.datetime):
        modified = modified.utctimetuple()
    if modified:
        # format into an RFC 1123-compliant timestamp. We can't use
        # time.strftime() since the %a and %b directives can be affected
        # by the current locale, but RFC 2616 states that dates must be
        # in English.
        short_weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
        months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
        request.add_header('If-Modified-Since', '%s, %02d %s %04d %02d:%02d:%02d GMT' % (short_weekdays[modified[6]], modified[2], months[modified[1] - 1], modified[0], modified[3], modified[4], modified[5]))
    if referrer:
        request.add_header('Referer', referrer)
    if gzip and zlib:
        request.add_header('Accept-encoding', 'gzip, deflate')
    elif gzip:
        request.add_header('Accept-encoding', 'gzip')
    elif zlib:
        request.add_header('Accept-encoding', 'deflate')
    else:
        request.add_header('Accept-encoding', '')
    if auth:
        request.add_header('Authorization', 'Basic %s' % auth)
    if accept_header:
        request.add_header('Accept', accept_header)
    # use this for whatever -- cookies, special headers, etc
    # [('Cookie','Something'),('x-special-header','Another Value')]
    for header_name, header_value in request_headers.items():
        request.add_header(header_name, header_value)
    request.add_header('A-IM', 'feed') # RFC 3229 support
    return request
项目:machine-learning-python    作者:pspxiaochen    | 项目源码 | 文件源码
def _build_urllib2_request(url, agent, accept_header, etag, modified, referrer, auth, request_headers):
    request = urllib.request.Request(url)
    request.add_header('User-Agent', agent)
    if etag:
        request.add_header('If-None-Match', etag)
    if isinstance(modified, basestring):
        modified = _parse_date(modified)
    elif isinstance(modified, datetime.datetime):
        modified = modified.utctimetuple()
    if modified:
        # format into an RFC 1123-compliant timestamp. We can't use
        # time.strftime() since the %a and %b directives can be affected
        # by the current locale, but RFC 2616 states that dates must be
        # in English.
        short_weekdays = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
        months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
        request.add_header('If-Modified-Since', '%s, %02d %s %04d %02d:%02d:%02d GMT' % (short_weekdays[modified[6]], modified[2], months[modified[1] - 1], modified[0], modified[3], modified[4], modified[5]))
    if referrer:
        request.add_header('Referer', referrer)
    if gzip and zlib:
        request.add_header('Accept-encoding', 'gzip, deflate')
    elif gzip:
        request.add_header('Accept-encoding', 'gzip')
    elif zlib:
        request.add_header('Accept-encoding', 'deflate')
    else:
        request.add_header('Accept-encoding', '')
    if auth:
        request.add_header('Authorization', 'Basic %s' % auth)
    if accept_header:
        request.add_header('Accept', accept_header)
    # use this for whatever -- cookies, special headers, etc
    # [('Cookie','Something'),('x-special-header','Another Value')]
    for header_name, header_value in request_headers.items():
        request.add_header(header_name, header_value)
    request.add_header('A-IM', 'feed') # RFC 3229 support
    return request
项目:Python-Web-Scraping-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def download(url, user_agent='wswp', num_retries=2, charset='utf-8', proxy=None):
    """ Download a given URL and return the page content
        args:
            url (str): URL
        kwargs:
            user_agent (str): user agent (default: wswp)
            charset (str): charset if website does not include one in headers
            proxy (str): proxy url, ex 'http://IP' (default: None)
            num_retries (int): number of retries if a 5xx error is seen (default: 2)
    """
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        if proxy:
            proxy_support = urllib.request.ProxyHandler({'http': proxy})
            opener = urllib.request.build_opener(proxy_support)
            urllib.request.install_opener(opener)
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
项目:Python-Web-Scraping-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def download(url, user_agent='wswp', num_retries=2, charset='utf-8', proxy=None):
    """ Download a given URL and return the page content
        args:
            url (str): URL
        kwargs:
            user_agent (str): user agent (default: wswp)
            charset (str): charset if website does not include one in headers
            proxy (str): proxy url, ex 'http://IP' (default: None)
            num_retries (int): number of retries if a 5xx error is seen (default: 2)
    """
    print('Downloading:', url)
    request = urllib.request.Request(url)
    request.add_header('User-agent', user_agent)
    try:
        if proxy:
            proxy_support = urllib.request.ProxyHandler({'http': proxy})
            opener = urllib.request.build_opener(proxy_support)
            urllib.request.install_opener(opener)
        resp = urllib.request.urlopen(request)
        cs = resp.headers.get_content_charset()
        if not cs:
            cs = charset
        html = resp.read().decode(cs)
    except (URLError, HTTPError, ContentTooShortError) as e:
        print('Download error:', e.reason)
        html = None
        if num_retries > 0:
            if hasattr(e, 'code') and 500 <= e.code < 600:
                # recursively retry 5xx HTTP errors
                return download(url, num_retries - 1)
    return html
项目:robotframework-testrail    作者:ATEME    | 项目源码 | 文件源码
def __send_request(self, method, uri, data):
        url = self.__url + uri
        request = urllib.request.Request(url)
        if (method == 'POST'):
            request.data = bytes(json.dumps(data), 'utf-8')
        auth = str(
            base64.b64encode(
                bytes('%s:%s' % (self.user, self.password), 'utf-8')
            ),
            'ascii'
        ).strip()
        request.add_header('Authorization', 'Basic %s' % auth)
        request.add_header('Content-Type', 'application/json')

        e = None
        try:
            response = urllib.request.urlopen(request).read()
        except urllib.error.HTTPError as ex:
            response = ex.read()
            e = ex

        if response:
            result = json.loads(response.decode())
        else:
            result = {}

        if e != None:
            if result and 'error' in result:
                error = '"' + result['error'] + '"'
            else:
                error = 'No additional error message received'
            raise APIError('TestRail API returned HTTP %s (%s)' % 
                (e.code, error))

        return result
项目:telegram-urlprobot    作者:GabrielRF    | 项目源码 | 文件源码
def __call(self, url=API_URL, params={}, data=None, headers={}):
        """Common method for API call.

        url: API URL
        params: query string parameters
        data: POST data
        headers: additional request headers

        Return: parsed JSON structure or raise GooglError.
        """
        params.update(key=self.key)
        if self.userip is not None:
            params.update(userip=self.userip)

        full_url = "%s?%s" % (url % self.api, urllib.parse.urlencode(params))

        request = urllib.request.Request(full_url, data=bytes(data, encoding="UTF-8"), headers=headers)

        if self.referer is not None:
            request.add_header("Referer", self.referer)
        if self.client_login is not None:
            request.add_header("Authorization", "GoogleLogin auth=%s" % self.client_login)

        try:
            response = urllib.request.urlopen(request)
            return json.loads(str(response.read(), encoding="UTF-8"))
        except urllib.error.HTTPError as e:
            error = json.loads(e.fp.read())
            raise GooglError(error["error"]["code"], error["error"]["message"])
项目:mensahd    作者:cvzi    | 项目源码 | 文件源码
def _getMealsURL():
    """Download meals information from XML feed"""
    request = urllib.request.Request(mealsURL)
    request.add_header("Authorization", "Basic %s" % mealsURL_authorization)
    result = urllib.request.urlopen(request, timeout=__timeoutSeconds)
    return result, 0
项目:PassiveScanner    作者:jjf012    | 项目源码 | 文件源码
def post_api(self, api_path):
        options = json.dumps(self.options)
        options = options if isinstance(options, bytes) else options.encode('utf8')
        request = urllib.request.Request(self.arachni_url + api_path, options)
        request.add_header('Content-Type', 'application/json')
        return urllib.request.urlopen(request).read().decode('utf8')
项目:blender-addon-updater    作者:CGCookie    | 项目源码 | 文件源码
def stage_repository(self, url):

        local = os.path.join(self._updater_path,"update_staging")
        error = None

        # make/clear the staging folder
        # ensure the folder is always "clean"
        if self._verbose: print("Preparing staging folder for download:\n",local)
        if os.path.isdir(local) == True:
            try:
                shutil.rmtree(local)
                os.makedirs(local)
            except:
                error = "failed to remove existing staging directory"
        else:
            try:
                os.makedirs(local)
            except:
                error = "failed to create staging directory"

        if error != None:
            if self._verbose: print("Error: Aborting update, "+error)
            self._error = "Update aborted, staging path error"
            self._error_msg = "Error: {}".format(error)
            return False

        if self._backup_current==True:
            self.create_backup()
        if self._verbose: print("Now retrieving the new source zip")

        self._source_zip = os.path.join(local,"source.zip")

        if self._verbose: print("Starting download update zip")
        try:
            request = urllib.request.Request(url)

            # setup private token if appropriate
            if self._engine.token != None:
                if self._engine.name == "gitlab":
                    request.add_header('PRIVATE-TOKEN',self._engine.token)
                else:
                    if self._verbose: print("Tokens not setup for selected engine yet")
            self.urlretrieve(urllib.request.urlopen(request), self._source_zip)
            # add additional checks on file size being non-zero
            if self._verbose: print("Successfully downloaded update zip")
            return True
        except Exception as e:
            self._error = "Error retrieving download, bad link?"
            self._error_msg = "Error: {}".format(e)
            if self._verbose:
                print("Error retrieving download, bad link?")
                print("Error: {}".format(e))
            return False