Python requests 模块,head() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.head()

项目:faceless    作者:LiuRoy    | 项目源码 | 文件源码
def qiniu_upload_img(img_url):
    """?????????

    Args:
        img_url (string): ????
    """
    response = requests.get(img_url)
    image = response.content
    md5 = calc_md5(image)

    qiniu_url = 'http://{}/{}'.format(QINIU_HOSTNAME, md5)
    if requests.head(qiniu_url).ok:
        return qiniu_url

    q = Auth(QINIU_ACCESS_KEY, QINIU_SECRET_KEY)
    token = q.upload_token(QINIU_BUCKET, md5, 10)
    put_data(token, md5, image, mime_type='image/jpeg')
    return qiniu_url
项目:scripts-dadosgovbr    作者:dadosgovbr    | 项目源码 | 文件源码
def register(self):
        u"Register resource into CKAN"
        import ckanapi
        ckansite = ckanapi.RemoteCKAN(self.ckan_url, apikey=self.apikey)

        # resource url responds?
        resource = requests.head(self.url)
        self.size = int(resource.headers["content-length"])

        # resource exists?
        resources = ckansite.action.resource_search(query=u"url:%s" % self.url)
        if resources[u"count"] == 0:
            ckansite.action.resource_create(
                package_id = self.package_id,
                url = self.url,
                name = self.name,
                description = self.description,
                format = self.format,
                mimetype = self.mimetype,
                size = self.size,
            )
项目:opendxl-maxmind-service-python    作者:opendxl    | 项目源码 | 文件源码
def _is_update_needed(self):
        """
        Determines if an update for the database is necessary

        :return: Whether or not the database should be updated
        """

        # Retrieve the headers of the response to compare the MD5 checksums of the file
        headers_response = requests.head(self.MAXMIND_FREE_DB_URL)
        logger.debug(headers_response.headers)
        headers_response.raise_for_status()
        response_checksum = headers_response.headers.get('X-Database-MD5')
        logger.info("Database MD5 received in response headers: " + str(response_checksum))
        # Compare the current file checksum to the one received from MaxMind
        if response_checksum is not None and response_checksum == self._database_checksum:
            return False
        self._response_checksum = response_checksum
        return True
项目:ucloud-storage-python-wrapper    作者:wishket    | 项目源码 | 文件源码
def head_account_metadata(self):
        # get account's metadata
        url = self.url

        # request api
        response = requests.head(url, headers=self.base_headers)

        # formatting result
        result = dict()
        for key in response.headers:
            if key == 'X-Account-Container-Count':
                result['container_count'] = \
                    response.headers['X-Account-Container-Count']
            elif key == 'X-Account-Object-Count':
                result['object_count'] = \
                    response.headers['X-Account-Object-Count']
            elif key == 'X-Account-Bytes-Used':
                result['used_bytes'] = replace_bytes_to_readable(
                    response.headers['X-Account-Bytes-Used']
                )
            else:
                result[key] = response.headers[key]
        return result
项目:ucloud-storage-python-wrapper    作者:wishket    | 项目源码 | 文件源码
def head_container_metadata(self, container_name):
        """
        get container's metadata
        :param container_name: target container name
        :return: container's metadata dict
        """
        # check container metadata
        url = self.url + '/' + container_name

        # request api
        response = requests.head(url, headers=self.base_headers)

        # formatting result
        result = dict()
        for key in response.headers:
            if key == 'X-Container-Object-Count':
                result['object_count'] = \
                    response.headers['X-Container-Object-Count']
            elif key == 'X-Container-Bytes-Used':
                result['used_bytes'] = replace_bytes_to_readable(
                    response.headers['X-Container-Bytes-Used']
                )
            else:
                result[key] = response.headers[key]
        return result
项目:inspect-docker-image    作者:giantswarm    | 项目源码 | 文件源码
def get_layer_size(self, layer_hash):
        """
        Attempt to return the size of the given layer
        """
        url = "{base_url}/v2/{name}/blobs/{layer_hash}".format(
              base_url=self.base_url,
              name=self.repository_name,
              layer_hash=layer_hash)
        headers = {}
        if self.token is not None:
            headers["Authorization"] = "Bearer %s" % self.token
        r = requests.head(url, headers=headers, allow_redirects=True, timeout=(3.05,5))
        r.raise_for_status()
        if "content-length" in r.headers:
            self.layer_sizes[layer_hash] = int(r.headers["content-length"])
        else:
            self.layer_sizes[layer_hash] = None
        return self.layer_sizes[layer_hash]
项目:specton    作者:somesortoferror    | 项目源码 | 文件源码
def getDLsize(self):
        debug_log("getDLsize called")
        it = QTreeWidgetItemIterator(self.tw)
        while it.value():
            item = it.value()
            url_test = item.data(0, dataURL)
            if url_test is not None:
                try:
                    r = requests.head(url_test)
                    r.raise_for_status()
                    try:
                        size = (int(r.headers['Content-Length']) / 1024) / 1024
                    except ValueError:
                        size = 0
                    if size > 0:
                        item.setText(2, "{} MiB".format(round(size, 2)))
                except requests.exceptions.HTTPError:
                    debug_log("Error {} getting DL size: {}".format(r.status_code, r.headers))
                    item.setText(2, r.status_code)
                except requests.exceptions.RequestException as e:
                    item.setText(2, self.tr("Error"))
                    debug_log(e, logging.ERROR)
            it += 1
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def head(self):
        try:
            response = requests.head(self.url, timeout=self.request_timeout)
            self.res = response # assign the response object from requests to a property on the instance of HTTP class
            return response.headers
        except requests.exceptions.ConnectionError as ce:
            return False
        except Exception as e:
            if DEBUG_FLAG:
                sys.stderr.write("Naked Framework Error: Unable to perform a HEAD request with the head() method (Naked.toolshed.network.py).")
            raise e


    #------------------------------------------------------------------------------
    # [ post method ] (string)
    #  HTTP POST request for text
    #  returns text from the URL as a string
    #------------------------------------------------------------------------------
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def head(self):
        try:
            response = requests.head(self.url, timeout=self.request_timeout)
            self.res = response # assign the response object from requests to a property on the instance of HTTP class
            return response.headers
        except requests.exceptions.ConnectionError as ce:
            return False
        except Exception as e:
            if DEBUG_FLAG:
                sys.stderr.write("Naked Framework Error: Unable to perform a HEAD request with the head() method (Naked.toolshed.network.py).")
            raise e


    #------------------------------------------------------------------------------
    # [ post method ] (string)
    #  HTTP POST request for text
    #  returns text from the URL as a string
    #------------------------------------------------------------------------------
项目:python-wp    作者:myles    | 项目源码 | 文件源码
def _get_wp_api_url(self, url):
        """
        Private function for finding the WP-API URL.

        Arguments
        ---------

        url : str
            WordPress instance URL.
        """
        resp = requests.head(url)

        # Search the Links for rel="https://api.w.org/".
        wp_api_rel = resp.links.get('https://api.w.org/')

        if wp_api_rel:
            return wp_api_rel['url']
        else:
            # TODO: Rasie a better exception to the rel doesn't exist.
            raise Exception
项目:precache_dev    作者:carlashley    | 项目源码 | 文件源码
def already_cached(self, asset_url):
        '''Checks if an item is already cached. This is indicated in the
        headers of the file being checked.'''
        try:
            req = requests.head(asset_url, headers={'user-agent': self.user_agent})  # NOQA
            if req.headers.get('Content-Type') is not None:
                # Item is not in cache
                self.log.debug('Not in cache: %s' % asset_url)
                return False
            else:
                # Item is already cached
                self.log.info('Already in cache: %s' % asset_url)
                return True
        except:
            # In case there is an error, we should return false anyway as there
            # is no harm in re-downloading the item if it's already downloaded.
            return False
项目:arisu    作者:Appleman1234    | 项目源码 | 文件源码
def get_normal_title(self,url):
        return_message = ""
        head = requests.head(url)
        max_size = 5e6
        if 'content-length' in head.headers and  int(head.headers['content-length']) > max_size:
            return_message = "File too big for link preview\r\n"
        else:
            with eventlet.Timeout(60, False):
                response = requests.get(url,timeout=30)
                if response.status_code == 200:
                    if 'text/html' in response.headers['content-type']:
                            soup = BeautifulSoup(response.content,"lxml")
                            if soup.title is not None:
                                return_message += soup.title.string + "\r\n"
                    else:
                        return_message += response.headers['content-type'] + " Size: " + response.headers['content-length'] + "\r\n"
        return return_message
项目:metatab    作者:Metatab    | 项目源码 | 文件源码
def find_decl_doc(self, name):


        raise IncludeError(name)

        import requests
        from requests.exceptions import InvalidSchema
        url = METATAB_ASSETS_URL + name + '.csv'
        try:
            # See if it exists online in the official repo
            r = requests.head(url, allow_redirects=False)
            if r.status_code == requests.codes.ok:
                return url

        except InvalidSchema:
            pass  # It's probably FTP
项目:councillor-party    作者:carsonyl    | 项目源码 | 文件源码
def get_minutes_url(config, metadata):
    start_date = parse_timestamp_naively(metadata['start']).astimezone(get_tz(config))
    meeting_type = None
    for key, value in config.get('minutes_abbrs', {}).items():
        if key in metadata[config['minutes_abbrs_for']]:
            meeting_type = value
            break

    minutes_url = metadata.get('minutes_url')
    if minutes_url:
        requests.head(minutes_url).raise_for_status()
        return minutes_url
    elif config['id'] == 'vancouver':
        if not meeting_type:
            return 'N/A'
        if metadata['title'] == 'Inaugural Council Meeting':
            meeting_type = 'inau'
        mins = 'http://council.vancouver.ca/{dt:%Y%m%d}/{type}{dt:%Y%m%d}ag.htm'.format(
            type=meeting_type, dt=start_date)
        requests.head(mins).raise_for_status()
        return mins

    return 'N/A'
项目:pnda-aws-templates    作者:pndaproject    | 项目源码 | 文件源码
def check_pnda_mirror():

    def raise_error(reason):
        CONSOLE.info('PNDA mirror...... ERROR')
        CONSOLE.error(reason)
        CONSOLE.error(traceback.format_exc())
        sys.exit(1)

    try:
        mirror = PNDA_ENV['mirrors']['PNDA_MIRROR']
        response = requests.head(mirror)
        # expect 200 (open mirror) 403 (no listing allowed)
        # or any redirect (in case of proxy/redirect)
        if response.status_code not in [200, 403, 301, 302, 303, 307, 308]:
            raise_error("PNDA mirror configured and present "
                        "but responded with unexpected status code (%s). " % response.status_code)
        CONSOLE.info('PNDA mirror...... OK')
    except KeyError:
        raise_error('PNDA mirror was not defined in pnda_env.yaml')
    except:
        raise_error("Failed to connect to PNDA mirror. Verify connection "
                    "to %s, check mirror in pnda_env.yaml and try again." % mirror)
项目:163mooc_dumper    作者:wsdzl    | 项目源码 | 文件源码
def _len(url):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML\
        , like Gecko) Chrome/50.0.2661.102 Safari/537.36',
        'Range': 'bytes=0-0',
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8'
    }
    with requests.get(url, headers=headers) as r:
        length = r.headers['Content-Range']
        if length.find('0-0/') == -1:
            length = None
        else:
            length = length.split('0-0/')[-1]
            length = int(length) if length else 0
    if not length:
        del headers['Range']
        with requests.head(url, headers=headers) as r:
            length = r.headers['Content-Length']
            length = int(length) if length else 0
    return length
项目:swarm    作者:a7vinx    | 项目源码 | 文件源码
def _scan_target_normal(self,target):
        try:
            r=requests.head(target)
            #it maybe a directory
            if self._check_eponymous_dir(r,target):
                return target+'/;'

            if self._check_exist_code(r.status_code):
                # check content
                r=requests.get(target)
                if self._check_exist_code(r.status_code):
                    for cur in self._not_exist_flag:
                        if r.text.find(cur)!=-1:
                            return ''
                    return target+';'
            return ''
        except Exception as e:
            return ''
项目:edx-video-pipeline    作者:edx    | 项目源码 | 文件源码
def validate_media_url(self):
        """
        Validates the media URL

        Raises:
            3PlayMediaUrlError: on invalid media url or content type
        """
        if not self.media_url:
            raise ThreePlayMediaUrlError('Invalid media URL "{media_url}".'.format(media_url=self.media_url))

        response = requests.head(url=self.media_url)
        if not response.ok:
            raise ThreePlayMediaUrlError('The URL "{media_url}" is not Accessible.'.format(media_url=self.media_url))
        elif response.headers['Content-Type'] != self.allowed_content_type:
            raise ThreePlayMediaUrlError(
                'Media content-type should be "{allowed_type}". URL was "{media_url}", content-type was "{type}"'.format(  # pylint: disable=line-too-long
                    allowed_type=self.allowed_content_type,
                    media_url=self.media_url,
                    type=response.headers['Content-Type'],
                )
            )
项目:website-status-cli    作者:prabhakar267    | 项目源码 | 文件源码
def _check_website(url, code):
    try:
        response = requests.head(url, headers={'user_agent': DEFAULT_HEADERS})
        website_status_code = response.status_code
        if code:
            return website_status_code
        else:
            if website_status_code == 200:
                return True
            else:
                return False
    except requests.exceptions.Timeout, requests.exceptions.ConnectionError:
        if code:
            return -1
        else:
            return False
项目:open-ledger    作者:creativecommons    | 项目源码 | 文件源码
def sync(self, attempt_perceptual_hash=False):
        """Attempt to sync the image with the remote location. Optionally does a number of
        other actions:
        * If the image exists and we have not constructed a fingerprint for it,
          do so at this time and populate the image model's perceptual_hash field
          (if attempt_perceptual_hash is True; defaults to False because it's slow)
        * If the image does not exist, mark it as removed_from_source
        * If the image is removed_from_source, delete it from the search engine
        * In all cases, update the last_synced_with_source timestamp on the image"""
        req = requests.head(self.url)
        if req.status_code == 200:
            if not self.perceptual_hash and attempt_perceptual_hash:
                self.perceptual_hash = self.generate_hash()
        else:
            self.removed_from_source = True

        self.last_synced_with_source = timezone.now()

        self.save()
项目:crossrefapi    作者:fabiobatalha    | 项目源码 | 文件源码
def do_http_request(self, method, endpoint, data=None, files=None, timeout=100, only_headers=False, custom_header=None):

        if only_headers is True:
            return requests.head(endpoint)

        if method == 'post':
            action = requests.post
        else:
            action = requests.get

        if custom_header:
            headers = {'user-agent': custom_header}
        else:
            headers = {'user-agent': str(Etiquette())}

        if method == 'post':
            result = action(endpoint, data=data, files=files, timeout=timeout, headers=headers)
        else:
            result = action(endpoint, params=data, timeout=timeout, headers=headers)

        if self.throttle is True:
            self._update_rate_limits(result.headers)
            sleep(self.throttling_time)

        return result
项目:utils    作者:jianbing    | 项目源码 | 文件源码
def get_url_file_size(url, proxies=None):
    """???????????????KB

    :param proxies:
    :param url:
    :return:
    """
    r = requests.head(url=url, proxies=proxies, timeout=3.0)

    while r.is_redirect:  # ??????
        # print 'got'
        # print r.headers['Location']
        r = requests.head(url=r.headers['Location'], proxies=proxies, timeout=3.0)
    # print r.headers
    # print r.headers['Content-Length']
    return int(r.headers['Content-Length']) / 1024
项目:webkivy    作者:miohtama    | 项目源码 | 文件源码
def load(self, url):
        """Load script from URL."""

        # Check for a redirect to get a final base_url where to start

        resp = requests.head(url, allow_redirects=True)
        if url != resp.url:
            # Followed a redirect
            url = resp.url

        fname = get_response_fname(resp)
        _, ext = os.path.splitext(fname)

        if ext == ".py":
            py_file = self.fetch_file(url, url)
            return py_file
        else:
            self.crawl(url, url)
项目:-PunkScan    作者:swordli    | 项目源码 | 文件源码
def fuzzworth_contentl(self, head, strict = False):
        '''Check the content-length before moving on. If strict is set to True content-length
        must be validated before moving on. If strict is set to False (default) the URL will
        be fuzzed if no content-length is found or no head is returned. The idea behind this
        method is to ensure that huge files are not downloaded, slowing down fuzzing.'''

        #no param no fuzzing
        if head and "content-length" in head:

            content_length = int(head["content-length"])

            if content_length < self.pagesize_limit:
                return True

            else:
                return False

        else:
            if strict:
                return False

            else:
                return True
项目:-PunkScan    作者:swordli    | 项目源码 | 文件源码
def fuzzworth_contentl_with_type_fallback(self, head, allowed_types_lst, full_req_match = False):
        '''This method will check the content length header strictly. If a content-length header is found
        and the content-length is below a certain amount (set in the fuzzer config) return True if either of those
        is not satisfied,. If it is not check the content-type, if the content-type is of a certain type return 
        True. If not or if content-type can't be read either, return False.'''

        #if content-length header is found return True
        if head and "content-length" in head:
            return self.fuzzworth_contentl(head, strict = True)

        #if content-length header not found, check content-type, if it is of allowed type
        #return True, otherwise false
        if head and "content-type" in head:
            return self.fuzzworth_content_type(head, allowed_types_lst, full_req_match = False, strict = True)

        return False
项目:AutoSQLInjecter    作者:WangYihang    | 项目源码 | 文件源码
def exce(indexOfResult,indexOfChar,queryASCII):
    # content-start
    url = "http://127.0.0.1/Less-9/?id="
    tempurl = url + getPayload(indexOfResult,indexOfChar,queryASCII)
    before_time = time.time()
    requests.head(tempurl)
    after_time = time.time()
    # content-end
    use_time = after_time - before_time
    # judge-start
    # ?sleep????? , ???????? (????????????? , ???SQL??????????sleep????????)
    # ?????????? , ????/???????sleep?????????
    if abs(use_time) > error_time: 
        return True
    else:
        return False
    # judge-end
项目:deb-python-proliantutils    作者:openstack    | 项目源码 | 文件源码
def validate_href(image_href):
        """Validate HTTP image reference.

        :param image_href: Image reference.
        :raises: exception.ImageRefValidationFailed if HEAD request failed or
            returned response code not equal to 200.
        :returns: Response to HEAD request.
        """
        try:
            response = requests.head(image_href)
            if response.status_code != http_client.OK:
                raise exception.ImageRefValidationFailed(
                    image_href=image_href,
                    reason=("Got HTTP code %s instead of 200 in response to "
                            "HEAD request." % response.status_code))
        except requests.RequestException as e:
            raise exception.ImageRefValidationFailed(image_href=image_href,
                                                     reason=e)
        return response
项目:siphon-cli    作者:getsiphon    | 项目源码 | 文件源码
def get_remote_source_length():
    url = os.environ.get('SP_REMOTE_SOURCE')
    try:
        response = requests.head(url, allow_redirects=True, timeout=10)
    except requests.exceptions.RequestException as e:
        puts(colored.red('[HEAD] %s' % url))
        puts(colored.red('Failed to get remote installation size: %s' % e))
        sys.exit(1)
    size = response.headers.get('content-length')
    if not size:
        size = response.headers.get('Content-Length')
    if not size or not size.isdigit():
        puts(colored.red('Could not fetch the remote Content-Length.'))
        sys.exit(1)
    try:
        size = int(size)
    except ValueError:
        pass
    return size
项目:SwarmOps    作者:staugur    | 项目源码 | 文件源码
def _get_imageId(self, ImageName, tag="latest"):
        """ ??????tag?imageId/digest """

        ReqUrl = self._baseUrl + "/repositories/{}/tags/{}".format(ImageName, tag) if self.version == 1 else self._baseUrl + "/{}/manifests/{}".format(ImageName, tag)
        logger.info("_get_imageId for url {}".format(ReqUrl))
        try:
            if self.version == 1:
                r = requests.get(ReqUrl, timeout=self.timeout, verify=self.verify)
            else:
                r = requests.head(ReqUrl, timeout=self.timeout, verify=self.verify, allow_redirects=True, headers={"Content-Type": "application/vnd.docker.distribution.manifest.v2+json"})
        except Exception,e:
            logger.error(e, exc_info=True)
            return False
        else:
            if self.version == 1:
                return r.json()
            else:
                return r.headers.get("Docker-Content-Digest", "")
        return ""
项目:centr    作者:ibiBgOR    | 项目源码 | 文件源码
def check_download_session(url, download_dir, cookies):
    r = requests.head(url, cookies=cookies)
    if r.status_code != 200 or 'Content-Disposition' not in r.headers:
        return False
    m = re.search('filename="(.+)"', r.headers['Content-Disposition'])
    if not m:
        return False
    f = m.group(1)
    filename = os.path.join(download_dir, f)
    if os.path.isfile(filename):
        return True
    # get it
    print "Fetching %s" % f
    r2 = requests.get(url, cookies=cookies)
    if r2.status_code != 200:
        return False
    with open(filename, 'wb') as f:
        f.write(r2.content)
    return True
项目:moescan    作者:RicterZ    | 项目源码 | 文件源码
def _scan(self):
        while True:
            if self.queue.empty():
                break
            try:
                sub = self.queue.get_nowait()
                self.queue.task_done()
                domain = self.target + sub
                r = requests.head(domain, headers=header, timeout=5, stream=True)
                code = r.status_code
                if code in self.status_code:
                    logger.info('status code {} -> {}'.format(code, domain))

            except Exception, e:
                pass

        self.thread_count -= 1
项目:Breezes    作者:staugur    | 项目源码 | 文件源码
def _from_image_tag_getId(self, ImageName, tag, url, version=1):
        """ ??????tag?imageId/digest """

        if url:
            ReqUrl = url.strip("/") + "/v1/repositories/{}/tags/{}".format(ImageName, tag) if version == 1 else url.strip("/") + "/v2/{}/manifests/{}".format(ImageName, tag)
            logger.info("_from_image_tag_getId for url {}".format(ReqUrl))
            try:
                if version == 1:
                    r = requests.get(ReqUrl, timeout=self.timeout, verify=self.verify)
                else:
                    r = requests.head(ReqUrl, timeout=self.timeout, verify=self.verify, allow_redirects=True, headers={"Content-Type": "application/vnd.docker.distribution.manifest.v2+json"})
            except Exception,e:
                logger.error(e, exc_info=True)
            else:
                if version == 1:
                    return r.json()
                else:
                    return r.headers.get("Docker-Content-Digest", "")
        return ""
项目:PyGEOMET    作者:pygeomet    | 项目源码 | 文件源码
def setURL(self,update=None):
        #if no valid path, do nothing:
        if self.path == None or self.grid == None or self.var == None or\
        self.ext == None or self.year == None:
            self.url = None
        else:
            self.url = self.path + '/' + self.grid + '/' + self.var + "." +\
            self.ext + self.year + ".nc"

        #Check if the file exists
        fexist = requests.head(self.url+'.html')
        if (fexist.status_code >= 400):
            self.yearList = self.yearList[1:]
            self.year = self.yearList[self.currentYearIndex+1]
            self.url = self.path + '/' + self.grid + '/' + self.var + "." +\
            self.ext + self.year + ".nc"
        if update == True:
            self.NCARfile()
项目:Pentest    作者:n3k    | 项目源码 | 文件源码
def run(self):
        while True:
            piece = self.queue.get()
            words = piece.split("\n")
            for word in words:
                url = self.target + word + self.suffix
                #print "[*] Trying: " + url
                try:
                    r = requests.head(url)
                    if r.status_code == 200:
                        print "[+] 200 - " + url
                except:
                    print "[*] Request Error: " + url
            self.queue.task_done()

# http://stackoverflow.com/questions/519633/lazy-method-for-reading-big-file-in-python
项目:cyouapitestframework    作者:ChrisWang1985    | 项目源码 | 文件源码
def send_http_request_used_exec(self, url, method, request_body="", header="", cookie=None):
        if method not in ["get", "post", "put", "delete", "head", "options"]:
            raise Exception("Not supported method: %s" % method)

        _cookie_obj = cookie
        _response = None

        if header is not "":
            _request_api_string = "_response = requests.%s(%s, data=%s, header=%s, _cookie_obj)" % (method, url,
                                                                                                    request_body,
                                                                                                    header)
        else:
            _request_api_string = "_response = requests.%s(%s, data=%s, _cookie_obj)" % (method, url, request_body)

        exec _request_api_string

        return _response
项目:wrfxpy    作者:openwfm    | 项目源码 | 文件源码
def _is_var_ready(self, cycle, var):
        """
        Checks if the variable var is ready for the given forecast hour by comparing its
        filetime to the timestamp given by the forecast hour.  If the filetime is newer
        (later) then the variable is ready.

        :param cycle: which cycle are we working with (UTC)
        :param var: the variable identifier
        :return: true if the variable is ready
        """
        # find last-modified time of file in UTC timezone
        url = self._remote_var_url(cycle.hour, var)
        r = requests.head(url)
        if r.status_code != 200:
            raise ValueError('Cannot find variable %s for hour %d at url %s' % (var, cycle.hour, url))
        last_modif = self._parse_header_timestamp(r.headers['Last-Modified'])

        return last_modif > cycle
项目:cassandra-backup-and-restore    作者:WenyuChang    | 项目源码 | 文件源码
def setup_args():
    '''
    setup command line arguments
    '''
    parser = argparse.ArgumentParser(description="Beatle request utility",
                                     formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--debug', action='store_true', default=False,
                        help="enable debugging log")
    parser.add_argument('--shard', action='store',
                        help='shard name or list shards by "list" value')
    parser.add_argument('request', action='store',
                        help='request: post/get/head/delete/list')
    parser.add_argument('filename', action='store',
                        help='file name')

    return parser
项目:script.reddit.reader    作者:gedisony    | 项目源码 | 文件源码
def sitesManager( media_url ):
    #picks which class will handle the media identification and extraction for website_name

    #first resolve url shortener
    shorteners=['bit.ly','goo.gl','tinyurl.com']
    if any(shortener in media_url for shortener in shorteners):
        #v=sitesBase.requests_get('https://unshorten.me/s/'+ urllib.quote_plus( media_url ) )
        v = requests.head( media_url, timeout=REQUEST_TIMEOUT, allow_redirects=True )
        log('  short url(%s)=%s'%(media_url,repr(v.url)))
        media_url=v.url

    for subcls in sitesBase.__subclasses__():
        regex=subcls.regex
        if regex:
            match=re.compile( regex  , re.I).findall( media_url )
            #log("testing:{0}[{1}] {2}".format(media_url,regex, repr(match)) )
            if match :
                return subcls( media_url )
项目:SourceLeakHacker    作者:WangYihang    | 项目源码 | 文件源码
def check(url, timeout):
    try:
        if timeout <= 0:
            timeout = 20
        response = requests.head(url,timeout = timeout)
        code = response.status_code
        screenLock.acquire()
        if code == 200:
            ColorPrinter.print_green_text("[ " + str(code) + " ]")
            print "Checking : " + url
            if "404" in response.text:
                ColorPrinter.print_blue_text(url + "\tMaybe every page same!")
        elif code == 404 or code == 405:
            pass
            # ColorPrinter.print_red_text("[ " + str(code) + " ]")
            # print "Checking : " + url
        else:
            ColorPrinter.print_blue_text("[ " + str(code) + " ]")
            print "Checking : " + url
    except Exception as e:
        screenLock.acquire()
        print e
    finally:
        screenLock.release()
项目:AngelSword    作者:Lucifer1993    | 项目源码 | 文件源码
def run(self):
        headers = {
        "User-Agent":"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50"
        }
        payloads = ["/excel/sso_user_export.php",
                    "/excel/user_export.php",
                    "/excel/server_export.php"]

        try:
            for payload in payloads:
                vulnurl = self.url + payload
                req = requests.head(vulnurl, headers=headers, timeout=10, verify=False)
                if r"application/vnd.ms-excel" in req.headers["Content-Type"]:
                    cprint("[+]???????????????...(??)\tpayload: "+vulnurl, "yellow")

        except:
            cprint("[-] "+__file__+"====>????", "cyan")
项目:AngelSword    作者:Lucifer1993    | 项目源码 | 文件源码
def run(self):
        headers = {
            "User-Agent":"Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_8; en-us) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50"
        }
        payloads = ["/data/dkcm_ssdfhwejkfs.mdb",
                    "/_data/___dkcms_30_free.mdb",
                    "/_data/I^(()UU()H.mdb"]
        for payload in payloads:
            vulnurl = self.url + payload
            try:
                req = requests.head(vulnurl, headers=headers, timeout=10, verify=False)
                if req.headers["Content-Type"] == "application/x-msaccess":
                    cprint("[+]??dkcms???????...(??)\tpayload: "+vulnurl, "red")

            except:
                cprint("[-] "+__file__+"====>????", "cyan")
项目:punkspider    作者:aiwennba    | 项目源码 | 文件源码
def fuzzworth_contentl(self, head, strict = False):
        '''Check the content-length before moving on. If strict is set to True content-length
        must be validated before moving on. If strict is set to False (default) the URL will
        be fuzzed if no content-length is found or no head is returned. The idea behind this
        method is to ensure that huge files are not downloaded, slowing down fuzzing.'''

        #no param no fuzzing
        if head and "content-length" in head:

            content_length = int(head["content-length"])

            if content_length < self.pagesize_limit:
                return True

            else:
                return False

        else:
            if strict:
                return False

            else:
                return True
项目:punkspider    作者:aiwennba    | 项目源码 | 文件源码
def fuzzworth_contentl_with_type_fallback(self, head, allowed_types_lst, full_req_match = False):
        '''This method will check the content length header strictly. If a content-length header is found
        and the content-length is below a certain amount (set in the fuzzer config) return True if either of those
        is not satisfied,. If it is not check the content-type, if the content-type is of a certain type return 
        True. If not or if content-type can't be read either, return False.'''

        #if content-length header is found return True
        if head and "content-length" in head:
            return self.fuzzworth_contentl(head, strict = True)

        #if content-length header not found, check content-type, if it is of allowed type
        #return True, otherwise false
        if head and "content-type" in head:
            return self.fuzzworth_content_type(head, allowed_types_lst, full_req_match = False, strict = True)

        return False
项目:spdx-github    作者:spdx    | 项目源码 | 文件源码
def pull_request_to_github(main_repo_user, repo_name, environment):

    #Creating strings, mainly URLs, that will be needed for the
    #pull request process.

    #This is the API URL to make a pull request
    pull_request_url = ('https://api.github.com/repos/{}/{}/pulls'.format(
                        main_repo_user, repo_name))
    #This has the username and password from the environment file.
    #It is used to log in for API calls.
    auth_string = ('{}:{}'.format(environment['github_username'],
                   environment['github_password']))
    #This is the data that will be posted for the pull request.
    #It tells the API what the pull request will be like.
    pull_request_data = ('{{"title": "{}", "head": "{}:master",'
                         ' "base": "master"}}'.format(
                         environment['github_pull_request_title'],
                         environment['github_username']))

    pull_request_command = ['curl', '--user', auth_string, pull_request_url,
                            '-d', pull_request_data]

    #Make the pull request to the main repository
    subprocess.check_output(pull_request_command)
    return pull_request_command
项目:spdx-github    作者:spdx    | 项目源码 | 文件源码
def check_valid_url(repo_zip_url):
    #Check that it's a URL
    if(repo_zip_url[:7] != 'http://' and repo_zip_url[:8] != 'https://'):
        return False
    #Get just the head.
    request = requests.head(repo_zip_url)
    #It should be either success (200's) or redirect(300's).
    #Otherwise, inform the user of failure.
    if (request.status_code >= 400 or request.status_code < 200):
        print('Could not reach URL provided.\n')
        print('Provided url was {} and resulted in status code {}'.format(
              repo_zip_url,str(request.status_code)))
        return False
    else:
        return True

#This sends a notification email based on information provided
#in the environment file
项目:harbor-py    作者:tobegit3hub    | 项目源码 | 文件源码
def check_project_exist(self, project_name):
        result = False
        path = '%s://%s/api/projects?project_name=%s' % (
            self.protocol, self.host, project_name)
        response = requests.head(path,
                                 cookies={'beegosessionID': self.session_id})
        if response.status_code == 200:
            result = True
            logging.debug(
                "Successfully check project exist, result: {}".format(result))
        elif response.status_code == 404:
            result = False
            logging.debug(
                "Successfully check project exist, result: {}".format(result))
        else:
            logging.error("Fail to check project exist")
        return result

    # POST /projects
项目:check_modulemd    作者:fedora-modularity    | 项目源码 | 文件源码
def _is_commit_ref_available(self, package, namespace):
        """
        Check if commit ref is available on git repository
        """
        if not package or not namespace:
            self.error("Missing parameter to check if commit is available")

        repository = "https://src.fedoraproject.org/cgit/%s/%s.git" % (namespace, package.name)

        if package.repository:
            repository = package.repository

        ref = "HEAD"
        if package.ref:
            ref = package.ref
        patch_path = "/patch/?id=%s" % ref

        url = repository + patch_path
        resp = requests.head(url)
        if resp.status_code < 200 or resp.status_code >= 300:
            self.error("Could not find ref '%s' on '%s'. returned exit status %d; output:\n%s" %
                       (ref, package.name, resp.status_code, resp.text))

        self.log.info("Found ref: %s for %s" % (ref, package.name))
项目:devblogs    作者:abdelhai    | 项目源码 | 文件源码
def url_resp(url):
    """Receives a url returns True in case of 200 or 301 response codes"""
    res = None
    try:
        headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) \AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}
        res = requests.head(url, timeout=3, headers=headers)
    except:
        pass

    if res:
        code = res.status_code
        print('*' + str(code) + '*')
        if code == 200 or code == 301:

            print(url)
            print('#ok#')
            return True
        else:
            print('#bad#')
项目:SlackUptimeMonitor    作者:AndreiD    | 项目源码 | 文件源码
def back_online_notifier(the_url):
    print("found " + the_url + " back online.")
    try:
        old_status_file = pickle.load(open("status.p", "rb"))
    except Exception as ex:
        print("The status.p file was not found. it will be recreated." + str(ex))
        return

    if (the_url in old_status_file) and (old_status_file[the_url]['status'] == "down"):
        it_was_down_time = old_status_file[the_url]['time']
        current_time = datetime.datetime.now().replace(microsecond=0)
        send_message(CHANNEL_ID, ":herb: " + the_url + " is back online. It was down for " + str(current_time - it_was_down_time))
    else:
        print("skipping notifying that the url is online")


# --- getting only the head ---
项目:twentybn-dl    作者:TwentyBN    | 项目源码 | 文件源码
def total_blocks_and_bytes(self):
        total_blocks, total_bytes = 0, 0
        for u in self.input_urls:
            head_response_headers = requests.head(u).headers
            if 'Content-Length' not in head_response_headers:
                m = "The url: '{}' doesn't support the 'Content-Length' field.".format(u)
                raise ContentLengthNotSupportedException(m)
            else:
                remote_size = int(head_response_headers['Content-Length'])
            total_bytes += remote_size
            num_blocks, last_block_size = divmod(remote_size, self.blocksize)
            total_blocks += num_blocks
            if last_block_size:
                total_blocks += 1
        return total_blocks, total_bytes