Python requests 模块,get() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.get()

项目:django-magic-cards    作者:pbaranay    | 项目源码 | 文件源码
def fetch_data():
    try:
        r = requests.get(MTG_JSON_URL)
    except requests.ConnectionError:
        r = requests.get(FALLBACK_MTG_JSON_URL)
    with closing(r), zipfile.ZipFile(io.BytesIO(r.content)) as archive:
        unzipped_files = archive.infolist()
        if len(unzipped_files) != 1:
            raise RuntimeError("Found an unexpected number of files in the MTGJSON archive.")
        data = archive.read(archive.infolist()[0])
    decoded_data = data.decode('utf-8')
    sets_data = json.loads(decoded_data)
    return sets_data
项目:X-ray-classification    作者:bendidi    | 项目源码 | 文件源码
def main():
    for url in url_list :
        try:
            r = requests.get(url)
        except : continue
        tree = html.fromstring(r.text)

        script = tree.xpath('//script[@language="javascript"]/text()')[0]

        json_string = regex.findall(script)[0]
        json_data = json.loads(json_string)

        next_page_url = tree.xpath('//footer/a/@href')

        links = [domain + x['nodeRef'] for x in json_data]
        for link in links:
            extract(link)
项目:ISB-CGC-pipelines    作者:isb-cgc    | 项目源码 | 文件源码
def sendRequest(ip, port, route, data=None, protocol="http"):
        url = "{protocol}://{ip}:{port}{route}".format(protocol=protocol, ip=ip, port=port, route=route)

        if data is not None:
            try:
                resp = requests.post(url, data=data)

            except requests.HTTPError as e:
                raise PipelineServiceError("{reason}".format(reason=e))

        else:
            try:
                resp = requests.get(url)

            except requests.HTTPError as e:
                raise PipelineServiceError("{reason}".format(reason=e))

        return resp
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def run(self):
        Analyzer.run(self)

        if self.data_type == 'domain' or self.data_type == 'url':
            try:
                pattern = re.compile("(?:Category: )([\w\s]+)")
                baseurl = 'http://www.fortiguard.com/webfilter?q='
                url = baseurl + self.getData()
                req = requests.get(url)
                category_match = re.search(pattern, req.content, flags=0)
                self.report({
                    'category': category_match.group(1)
                })
            except ValueError as e:
                self.unexpectedError(e)
        else:
            self.notSupported()
项目:ArticleSpider    作者:mtianyan    | 项目源码 | 文件源码
def judge_ip(self, ip, port):
        #??ip????
        http_url = "http://www.baidu.com"
        proxy_url = "http://{0}:{1}".format(ip, port)
        try:
            proxy_dict = {
                "http":proxy_url,
            }
            response = requests.get(http_url, proxies=proxy_dict)
        except Exception as e:
            print ("invalid ip and port")
            self.delete_ip(ip)
            return False
        else:
            code = response.status_code
            if code >= 200 and code < 300:
                print ("effective ip")
                return True
            else:
                print  ("invalid ip and port")
                self.delete_ip(ip)
                return False
项目:ISB-CGC-pipelines    作者:isb-cgc    | 项目源码 | 文件源码
def watchJob(jobId, exchangeName):
        queue = PipelineQueue('PIPELINE_JOB_{j}'.format(j=jobId))
        queue.bindToExchange(exchangeName, jobId)

        while True:
            body, method = queue.get()

            if method:
                body = json.loads(body)

                if body["current_status"] == "SUCCEEDED":
                    return jobId
                else:
                    raise PipelineServiceError("Job {j} has current status {s}!".format(j=jobId, s=body["current_status"]))
            else:
                pass
项目:ISB-CGC-pipelines    作者:isb-cgc    | 项目源码 | 文件源码
def _verify(self):
        try:
            self.read(self.path)
        except IOError as e:
            print "Couldn't open {path}: {reason}".format(path=self.path, reason=e)
            exit(-1)
        else:
            d = {}
            for name, attrs in self._configParams.iteritems():
                if attrs["required"]:
                    if not self.has_section(attrs["section"]):
                        raise LookupError("missing required section {s} in the configuration!\nRUN `isb-cgc-pipelines config` to correct the configuration".format(s=attrs["section"]))

                    if not self.has_option(attrs["section"], name):
                        raise LookupError("missing required option {o} in section {s}!\nRun `isb-cgc-pipelines config` to correct the configuration".format(s=attrs["section"], o=name))

                try:
                    d[name] = self.get(attrs["section"], name)

                except NoOptionError:
                    pass
                except NoSectionError:
                    pass

            return d
项目:optimum-arena    作者:ovidiugiorgi    | 项目源码 | 文件源码
def get_best(url):
    url = 'http://www.infoarena.ro' + url
    source_code = requests.get(url)
    plain_text = source_code.text
    soup = BeautifulSoup(plain_text, "html.parser")
    name = soup.find('span', {'class': 'username'}).find('a')['href'][35:]
    tests = soup.find_all('td', {'class': 'number'})
    max_ms = -1
    for test in tests:
        test = test.string
        if test.endswith('ms'):
            time = int(test.strip('ms'))
            max_ms = max(max_ms, time)
    if name not in d or max_ms < d[name][0]:
        d[name] = (max_ms, url)
    print(max_ms, name, url)
项目:flora    作者:Lamden    | 项目源码 | 文件源码
def list(package_name):
    # lists all of the packages for a user, or all of the implementations for a package

    # <username> / <package> , <implementation>

    # detemine if there's a user and package, or just a user
    p = split_package_name(package_name)
    if p['username'] != None:
        # get all of the packages and print their names in a pretty print
        if p['package'] != None:
            # get all implementations and print their names in a pretty print
            if p['implementation'] != None:
                print('Cannot list one specific implementation. Use "print".')
                return
            return
        return

    print('Error parsing arguments. Got {}. Specify in format such that: <username>/<package> with <package> being optional.'.format(p))

# @cli.command()
# @click.argument('payload')
# def print(payload):

#   pass
项目:PhonePerformanceMeasure    作者:KyleCe    | 项目源码 | 文件源码
def __get_api_conf(self, sfile, conf_name):
        full_path = Fun.get_file_in_directory_full_path(sfile)
        print full_path
        if not os.path.exists(full_path):
            print("Error: Cannot get config file")
            sys.exit(-1)
        sfile = full_path
        conf = ConfigParser.ConfigParser()
        conf.read(sfile)
        print conf.sections()
        try:
            self.url = conf.get(conf_name, "url")
            self.access_token = conf.get(conf_name, "access_token")
            self.api_token = conf.get(conf_name, "api_token")
        except Exception, e:
            print("Error: " + str(e))
            sys.exit(-1)

    # ????
项目:ELO-Darts    作者:pwgraham91    | 项目源码 | 文件源码
def pull_user_data(session):
    print 'pulling users'
    user_data = requests.get(u"{}{}".format(config.prod_url, 'export/users'))
    loaded_data = json.loads(user_data.text)
    for user_dict in loaded_data:
        user = User(
            id=user_dict['id'],
            name=user_dict['name'],
            email=user_dict['email'],
            admin=user_dict['admin'],
            avatar=user_dict['avatar'],
            active=user_dict['active'],
            created_at=user_dict['created_at'],
            elo=user_dict['elo'],
            wins=user_dict['wins'],
            losses=user_dict['losses']
        )
        session.add(user)
    session.commit()
    print 'done pulling users'
项目:ELO-Darts    作者:pwgraham91    | 项目源码 | 文件源码
def pull_game_data(session):
    print 'pulling games'
    game_data = requests.get(u"{}{}".format(config.prod_url, 'export/games'))
    loaded_data = json.loads(game_data.text)
    for game_dict in loaded_data:
        game = Game(
            id=game_dict['id'],
            created_at=game_dict['created_at'],
            deleted_at=game_dict['deleted_at'],
            winner_id=game_dict['winner_id'],
            winner_elo_score=game_dict['winner_elo_score'],
            loser_id=game_dict['loser_id'],
            loser_elo_score=game_dict['loser_elo_score'],
            submitted_by_id=game_dict['submitted_by_id']
        )
        session.add(game)
    session.commit()
    print 'done pulling games'
项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def format_data(cls, data):
        """Re-format the response data for the front-end.

        Arguments:
          data (:py:class:`dict`): The JSON data from the response.

        Returns:
          :py:class:`dict`: The re-formatted data.

        """
        builds = [cls.format_build(build) for build in data.get('builds', [])]
        estimate_time(builds)
        return dict(
            builds=builds[:4],
            health=health_summary(builds),
            name=data.get('repository_name'),
        )
项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def format_build(cls, build):
        """Re-format the build data for the front-end.

        Arguments:
          build (:py:class:`dict`): The JSON data from the response.

        Returns:
          :py:class:`dict`: The re-formatted data.

        """
        start, finish, elapsed = elapsed_time(
            build.get('started_at'),
            build.get('finished_at'),
        )
        return super().format_build(dict(
            author=build.get('github_username'),
            duration=(
                None if start is None or finish is None else finish - start
            ),
            elapsed=elapsed,
            message=build.get('message'),
            outcome=build.get('status'),
            started_at=start,
        ))
项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def format_data(cls, name, data):
        """Re-format the response data for the front-end.

        Arguments:
          data (:py:class:`list`): The JSON data from the response.
          name (:py:class:`str`): The name of the repository.

        Returns:
          :py:class:`dict`: The re-formatted data.

        """
        return dict(
            commits=[cls.format_commit(commit.get('commit', {}))
                     for commit in data[:5] or []],
            name=name,
        )
项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def half_life(issues):
        """Calculate the half life of the service's issues.

        Args:
          issues (:py:class:`list`): The service's issue data.

        Returns:
          :py:class:`datetime.timedelta`: The half life of the issues.

        """
        lives = []
        for issue in issues:
            start = safe_parse(issue.get('created_at'))
            end = safe_parse(issue.get('closed_at'))
            if start and end:
                lives.append(end - start)
        if lives:
            lives.sort()
            size = len(lives)
            return lives[((size + (size % 2)) // 2) - 1]
项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def details(self, iteration):
        """Update the project data with more details.

        Arguments:
          iteration (:py:class:`int`): The current iteration number.

        Returns:
          :py:class:`dict`: Additional detail on the current iteration.

        """
        url = self.url_builder(
            '/projects/{id}/iterations/{number}',
            params={'number': iteration, 'id': self.project_id},
            url_params={'fields': ':default,velocity,stories'},
        )
        response = requests.get(url, headers=self.headers)
        if response.status_code == 200:
            update = response.json()
            return dict(
                stories=self.story_summary(update.get('stories', [])),
                velocity=update.get('velocity', 'unknown'),
            )
        else:
            logger.error('failed to update project iteration details')
        return {}
项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def format_data(self, name, data):
        """Re-format the response data for the front-end.

        Arguments:
          data (:py:class:`dict`): The JSON data from the response.
          name (:py:class:`str`): The name of the repository.

        Returns:
          :py:class:`dict`: The re-formatted data.

        """
        builds = [self.format_build(build) for build in data.get('builds', [])]

        return dict(
            builds=builds[:4],
            health=self.health(builds[0] if builds else None),
            name=name,
        )
项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def format_build(cls, build):
        """Re-format the build data for the front-end.

        Arguments:
          build (:py:class:`dict`): The JSON data from the response.

        Returns:
          :py:class:`dict`: The re-formatted data.

        """
        coverage = build.get('covered_percent')
        message = build.get('commit_message')
        return dict(
            author=build.get('committer_name') or '<no author>',
            committed=occurred(build.get('created_at')),
            coverage=None if coverage is None else '{:.1f}%'.format(coverage),
            message_text=remove_tags(message) if message else None,
            raw_coverage=coverage,
        )
项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def format_data(self, data):
        """Re-format the response data for the front-end.

        Arguments:
          data (:py:class:`dict`): The JSON data from the response.

        Returns:
          :py:class:`dict`: The re-formatted data.

        """
        commits = {commit['id']: commit for commit in data.get('commits', [])}
        builds = [
            self.format_build(build, commits.get(build.get('commit_id'), {}))
            for build in data.get('builds', [])
        ]
        estimate_time(builds)
        return dict(
            builds=builds[:4],
            health=health_summary(builds),
            name=self.repo,
        )
项目:flash_services    作者:textbook    | 项目源码 | 文件源码
def format_build(cls, build, commit):  # pylint: disable=arguments-differ
        """Re-format the build and commit data for the front-end.

        Arguments:
          build (:py:class:`dict`): The build data from the response.
          commit (:py:class:`dict`): The commit data from the response.

        Returns:
          :py:class:`dict`: The re-formatted data.

        """
        start, finish, elapsed = elapsed_time(
            build.get('started_at'),
            build.get('finished_at'),
        )
        return super().format_build(dict(
            author=commit.get('author_name'),
            duration=(
                None if start is None or finish is None else finish - start
            ),
            elapsed=elapsed,
            message=commit.get('message'),
            outcome=build.get('state'),
            started_at=start,
        ))
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def watch(self, path):
        params = {'watch': 'true'}
        url = self._base_url + path
        header = {}
        if self.token:
            header.update({'Authorization': 'Bearer %s' % self.token})

        # TODO(ivc): handle connection errors and retry on failure
        while True:
            with contextlib.closing(
                    requests.get(url, params=params, stream=True,
                                 cert=self.cert, verify=self.verify_server,
                                 headers=header)) as response:
                if not response.ok:
                    raise exc.K8sClientException(response.text)
                for line in response.iter_lines(delimiter='\n'):
                    line = line.strip()
                    if line:
                        yield jsonutils.loads(line)
项目:numerai    作者:gansanay    | 项目源码 | 文件源码
def download_current_dataset(self, dest_path='.', unzip=True):
        now = datetime.now().strftime('%Y%m%d')
        file_name = 'numerai_dataset_{0}.zip'.format(now)
        dest_file_path ='{0}/{1}'.format(dest_path, file_name)

        r = requests.get(self._dataset_url)
        if r.status_code!=200:
            return r.status_code

        with open(dest_file_path, "wb") as fp:
            for byte in r.content:
                fp.write(byte)

        if unzip:
            with zipfile.ZipFile(dest_file_path, "r") as z:
                z.extractall(dest_path)
        return r.status_code
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def get_file_report(self, this_hash):
        """ Get the scan results for a file.

        You can also specify a CSV list made up of a combination of hashes and scan_ids
        (up to 4 items with the standard request rate), this allows you to perform a batch
        request with one single call.
        i.e. {'resource': '99017f6eebbac24f351415dd410d522d, 88817f6eebbac24f351415dd410d522d'}.

        :param this_hash: The md5/sha1/sha256/scan_ids hash of the file whose dynamic behavioural report you want to
                            retrieve or scan_ids from a previous call to scan_file.
        :return:
        """
        params = {'apikey': self.api_key, 'resource': this_hash}

        try:
            response = requests.get(self.base + 'file/report', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def get_url_report(self, this_url, scan='0'):
        """ Get the scan results for a URL. (can do batch searches like get_file_report)

        :param this_url: a URL will retrieve the most recent report on the given URL. You may also specify a scan_id
                         (sha256-timestamp as returned by the URL submission API) to access a specific report. At the
                         same time, you can specify a CSV list made up of a combination of hashes and scan_ids so as
                         to perform a batch request with one single call (up to 4 resources per call with the standard
                         request rate). When sending multiples, the scan_ids or URLs must be separated by a new line
                         character.
        :param scan: (optional): this is an optional parameter that when set to "1" will automatically submit the URL
                      for analysis if no report is found for it in VirusTotal's database. In this case the result will
                      contain a scan_id field that can be used to query the analysis report later on.
        :return: JSON response
        """
        params = {'apikey': self.api_key, 'resource': this_url, 'scan': scan}

        try:
            response = requests.get(self.base + 'url/report', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def get_upload_url(self):
        """ Get a special URL for submitted files bigger than 32MB.

        In order to submit files bigger than 32MB you need to obtain a special upload URL to which you
        can POST files up to 200MB in size. This API generates such a URL.

        :return: JSON special upload URL to which you can POST files up to 200MB in size.
        """
        params = {'apikey': self.api_key}

        try:
            response = requests.get(self.base + 'file/scan/upload_url', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        if response.status_code == requests.codes.ok:
            return response.json()['upload_url']
        else:
            return dict(response_code=response.status_code)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def get_file_behaviour(self, this_hash):
        """ Get a report about the behaviour of the file in sand boxed environment.

        VirusTotal runs a distributed setup of Cuckoo sandbox machines that execute the files we receive. Execution is
        attempted only once, upon first submission to VirusTotal, and only Portable Executables under 10MB in size are
        ran. The execution of files is a best effort process, hence, there are no guarantees about a report being
        generated for a given file in our dataset.

        If a file did indeed produce a behavioural report, a summary of it can be obtained by using the file scan
        lookup call providing the additional HTTP POST parameter allinfo=1. The summary will appear under the
        behaviour-v1 property of the additional_info field in the JSON report.

        :param this_hash: The md5/sha1/sha256 hash of the file whose dynamic behavioural report you want to retrieve.
        :return: full JSON report of the file's execution as returned by the Cuckoo JSON report encoder.
        """
        params = {'apikey': self.api_key, 'hash': this_hash}

        try:
            response = requests.get(self.base + 'file/behaviour', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def get_file_distribution(self, before='', after='', reports='false', limit='1000'):
        """ Get a live feed with the latest files submitted to VirusTotal.

        Allows you to retrieve a live feed of absolutely all uploaded files to VirusTotal, and download them for
        further scrutiny. This API requires you to stay synced with the live submissions as only a backlog of 6
        hours is provided at any given point in time.

        :param before: (optional) Retrieve files received before the given timestamp, in timestamp descending order.
        :param after: (optional) Retrieve files received after the given timestamp, in timestamp ascending order.
        :param reports: (optional) Include the files' antivirus results in the response. Possible values are 'true' or
        'false' (default value is 'false').
        :param limit: (optional) Retrieve limit file items at most (default: 1000).
        :return: JSON response: please see https://www.virustotal.com/en/documentation/private-api/#file-distribution
        """
        params = {'apikey': self.api_key, 'before': before, 'after': after, 'reports': reports, 'limit': limit}

        try:
            response = requests.get(self.base + 'file/distribution', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def get_url_report(self, this_url, scan='0', allinfo=1):
        """ Get the scan results for a URL.

        :param this_url: A URL for which you want to retrieve the most recent report. You may also specify a scan_id
        (sha256-timestamp as returned by the URL submission API) to access a specific report. At the same time, you
        can specify a CSV list made up of a combination of urls and scan_ids (up to 25 items) so as to perform a batch
        request with one single call. The CSV list must be separated by new line characters.
        :param scan: (optional) This is an optional parameter that when set to "1" will automatically submit the URL
        for analysis if no report is found for it in VirusTotal's database. In this case the result will contain a
        scan_id field that can be used to query the analysis report later on.
        :param allinfo: (optional) If this parameter is specified and set to "1" additional info regarding the URL
        (other than the URL scanning engine results) will also be returned. This additional info includes VirusTotal
        related metadata (first seen date, last seen date, files downloaded from the given URL, etc.) and the output
        of other tools and datasets when fed with the URL.
        :return: JSON response
        """

        params = {'apikey': self.api_key, 'resource': this_url, 'scan': scan, 'allinfo': allinfo}

        try:
            response = requests.get(self.base + 'url/report', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def get_url_distribution(self, after=None, reports='true', limit=1000):
        """ Get a live feed with the lastest URLs submitted to VirusTotal.

        Allows you to retrieve a live feed of URLs submitted to VirusTotal, along with their scan reports. This
        call enables you to stay synced with VirusTotal URL submissions and replicate our dataset.

        :param after: (optional) Retrieve URLs received after the given timestamp, in timestamp ascending order.
        :param reports:  (optional) When set to "true" each item retrieved will include the results for each particular
        URL scan (in exactly the same format as the URL scan retrieving API). If the parameter is not specified, each
        item returned will only contain the scanned URL and its detection ratio.
        :param limit: (optional) Retrieve limit file items at most (default: 1000).
        :return: JSON response
        """

        params = {'apikey': self.api_key, 'after': after, 'reports': reports, 'limit': limit}

        try:
            response = requests.get(self.base + 'url/distribution', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def get_ip_report(self, this_ip):
        """ Get information about a given IP address.

        Retrieves a report on a given IP address (including the information recorded by VirusTotal's Passive DNS
        infrastructure).

        :param this_ip: A valid IPv4 address in dotted quad notation, for the time being only IPv4 addresses are
        supported.
        :return: JSON response
        """
        params = {'apikey': self.api_key, 'ip': this_ip}

        try:
            response = requests.get(self.base + 'ip-address/report', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def get_domain_report(self, this_domain):
        """ Get information about a given domain.

        Retrieves a report on a given domain (including the information recorded by VirusTotal's passive DNS
        infrastructure).

        :param this_domain: A domain name.
        :return: JSON response
        """
        params = {'apikey': self.api_key, 'domain': this_domain}

        try:
            response = requests.get(self.base + 'domain/report', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def get_comments(self, resource, before=None):
        """ Get comments for a file or URL.

        Retrieve a list of VirusTotal Community comments for a given file or URL. VirusTotal Community comments are
        user submitted reviews on a given item, these comments may contain anything from the in-the-wild locations of
        files up to fully-featured reverse engineering reports on a given sample.

        :param resource: Either an md5/sha1/sha256 hash of the file or the URL itself you want to retrieve.
        :param before: (optional) A datetime token that allows you to iterate over all comments on a specific item
        whenever it has been commented on more than 25 times.
        :return: JSON response - The application answers with the comments sorted in descending order according to
        their date.
        """
        params = dict(apikey=self.api_key, resource=resource, before=before)

        try:
            response = requests.get(self.base + 'comments/get', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
项目:pyupdater-wx-demo    作者:wettenhj    | 项目源码 | 文件源码
def WaitForFileServerToStart(port):
    """
    Wait for the Flask file server to start up.  Test it by trying the
    PyUpdater update URL, e.g. http://127.0.0.1:12345.  If we receive
    a ConnectionError, we continue waiting, but if we receive an HTTP
    response code (404), we return True.  For a frozen app, e.g. a
    Mac .app bundle, the location of the updates must be supplied by
    an environment variable, whereas when running from the source repo,
    the location of the updates is likely to be ./pyu-data/deploy/
    """
    url = 'http://%s:%s/fileserver-is-ready' % (LOCALHOST, port)
    attempts = 0
    while True:
        try:
            attempts += 1
            requests.get(url, timeout=1)
            return True
        except requests.exceptions.ConnectionError:
            time.sleep(0.25)
            if attempts > 10:
                logger.warning("WaitForFileServerToStart: timeout")
                return
项目:CTF    作者:calee0219    | 项目源码 | 文件源码
def checkFactorDB(n):
    """See if the modulus is already factored on factordb.com,
     and if so get the factors"""
    # Factordb gives id's of numbers, which act as links for full number
    # follow the id's and get the actual numbers

    r = requests.get('http://www.factordb.com/index.php?query=%s' % str(n))
    regex = re.compile("index\.php\?id\=([0-9]+)", re.IGNORECASE)
    ids = regex.findall(r.text)
    # These give you ID's to the actual number
    num = len(ids)-2
    print(ids)
    print(num)
    if num < 2: return 0
    else: return num * (num-1) / 2


#print solve('1ELuX8Do1NDSMy4eV8H82dfFtTvKaqYyhg')
项目:safetyculture-sdk-python    作者:SafetyCulture    | 项目源码 | 文件源码
def get_audit_actions(self, date_modified, offset=0, page_length=100):
        """
        Get all actions created after a specified date. If the number of actions found is more than 100, this function will
        page until it has collected all actions

        :param date_modified:   ISO formatted date/time string. Only actions created after this date are are returned.
        :param offset:          The index to start retrieving actions from
        :param page_length:     How many actions to fetch for each page of action results
        :return:                Array of action objects
        """
        logger = logging.getLogger('sp_logger')
        actions_url = self.api_url + 'actions/search'
        response = self.authenticated_request_post(
            actions_url,
            data=json.dumps({
                "modified_at": {"from": str(date_modified)},
                "offset": offset,
                "status": [0, 10, 50, 60]
            })
        )
        result = self.parse_json(response.content) if response.status_code == requests.codes.ok else None
        self.log_http_status(response.status_code, 'GET actions')
        if result is None or None in [result.get('count'), result.get('offset'), result.get('total'), result.get('actions')]:
            return None
        return self.get_page_of_actions(logger, date_modified, result, offset, page_length)
项目:earthview-scraper    作者:tom-james-watson    | 项目源码 | 文件源码
def recursive_scrape(url, count=0):

    # The API seems to link the images in a loop, so we can stop once we see an
    # image we have already seen.
    if url in seen_photos:
        return
    seen_photos[url] = True

    page = requests.get(url)
    photo_json = page.json()

    print photo_json
    yield photo_json

    next_url = 'https://earthview.withgoogle.com' + photo_json['nextApi']

    # Yielding from recursive functions is a bit funky
    for photo_json in recursive_scrape(next_url, count + 1):
        yield photo_json
项目:facebook-message-analysis    作者:szheng17    | 项目源码 | 文件源码
def facebook_id_to_username(self, facebook_id):
        """
        Converts a Facebook ID to a username.

        Args:
            facebook_id: A string representing a Facebook ID.

        Returns:
            A string representing the username corresponding to facebook_id.
        """
        # Store username in self.id_to_username if we have not seen the user ID yet
        if facebook_id not in self.id_to_username:
            graph_api_request = 'https://graph.facebook.com/' + facebook_id + '?fields=name&access_token=' + self.ACCESS_TOKEN
            response_dict = requests.get(graph_api_request).json()
            try:
                username = response_dict['name']
            except KeyError:
                self.id_to_username[facebook_id] = facebook_id + '@facebook.com'
            else:
                self.id_to_username[facebook_id] = username
        return self.id_to_username[facebook_id]
项目:graph    作者:noxern    | 项目源码 | 文件源码
def slack(text: hug.types.text):
    """Returns JSON containing an attachment with an image url for the Slack integration"""
    title = text

    if text == 'top250':
        top250_res = requests.get(IMDB_URL + '/chart/toptv', headers={'Accept-Language': 'en'})
        top250_page = html.fromstring(top250_res.text)
        candidates = top250_page.xpath('//*[@data-caller-name="chart-top250tv"]//tr/td[2]/a')

        title = random.choice(candidates).text

    return dict(
        response_type='in_channel',
        attachments=[
            dict(image_url=GRAPH_URL + f'/graph?title={quote(title)}&uuid={uuid.uuid4()}')
        ]
    )
项目:Python    作者:Guzi219    | 项目源码 | 文件源码
def saveFile(self, url, page, idx):
        user_define_name = self.now_date() + '_p_' + str(page) + '_' + string.zfill(idx, 2)  # ??2?
        file_ext = self.file_extension(url)  # ???
        save_file_name = user_define_name + "_" + file_ext

        # ???????open??
        # urllib.urlretrieve(item[0], self.save_path + save_file_name)
        # ????
        url = self.CheckUrlValidate(url)
        try:
            pic = requests.get(url, timeout=30)
            f = open(self.store_dir + os.sep + save_file_name, 'wb')
            f.write(pic.content)
            f.close()
            print '\ndone save file ' + save_file_name
        except ReadTimeout:
            print 'save file %s failed. cause by timeout(30)' %(save_file_name)
        except Exception, e:
            print 'this python version does not support https.'
            print e

    #??url????http:??
项目:Python    作者:Guzi219    | 项目源码 | 文件源码
def GetTotalPage(self, html):
        # create the BeautifulSoup
        some_soup = BeautifulSoup(html)
        #get the page div
        ele_a = some_soup.find('div', attrs={'class': 'page'})
        #get the last div>a text='??'
        last_a = ele_a.findAll('a')[-1]
        #substr 0:.html
        pagenum = last_a.get('href')[:-5]
        print 'pagenum :', pagenum
        # print type(last_a)

        self.SaveTotalPageToFile(pagenum)

    # store the max page number to totalpage.ini
    #new_page_num: new max page num
项目:PGO-mapscan-opt    作者:seikur0    | 项目源码 | 文件源码
def new_session(account):
    if account.get('session',None) is None:
        session = requests.session()

        session.verify = True
        session.headers.update({'User-Agent': 'Niantic App'})  # session.headers.update({'User-Agent': 'niantic'})
        if not account['proxy'] is None:
            session.proxies.update(account['proxy'])
        account['session'] = session
    else:
        account['session'].close()
        account['session'].cookies.clear()

    account['session_time'] = get_time()
    account['session_hash'] = os.urandom(32)

    account['api_url'] = API_URL
    account['auth_ticket'] = None
项目:cbapi-examples    作者:cbcommunity    | 项目源码 | 文件源码
def get_watchlist_id_by_name(watchlistsdict):
    """
    For each watchlist name specified in the config file, find the
    associated watchlist ID.
    NOTE: We trigger on watchlist IDs, and not on watchlist names
    """
    global cbtoken
    global cbserver

    headers = {'X-AUTH-TOKEN': cbtoken}

    r = requests.get("https://%s/api/v1/watchlist" % (cbserver),
                      headers=headers,
                      verify=False)

    parsed_json = json.loads(r.text)

    for watchlist in parsed_json:
        for key, value in watchlistsdict.iteritems():
            if watchlist['name'].lower() == key.lower():
                watchlistsdict[key] = watchlist['id']
项目:django-magic-cards    作者:pbaranay    | 项目源码 | 文件源码
def get_or_create(self, model, field, value, **kwargs):
        """
        Retrieves object of class `model` with lookup key `value` from the cache. If not found,
        creates the object based on `field=value` and any other `kwargs`.

        Returns a tuple of `(object, created)`, where `created` is a boolean specifying whether an
        `object` was created.
        """
        result = self[model].get(value)
        created = False
        if not result:
            kwargs[field] = value
            result = model.objects.create(**kwargs)
            self[model][value] = result
            created = True
        return result, created
项目:scibot    作者:SciCrunch    | 项目源码 | 文件源码
def tags(self):
        tags = self._fixed_tags
        if self._type == 'reply':  # NOTE replies don't ever get put in public directly
            out_tags = []
            for tag in tags:
                if tag.startswith('RRID:'):
                    continue  # we deal with the RRID itself in def rrid(self)
                elif tag == self.INCOR_TAG and self.rrid:
                    continue
                else:
                    out_tags.append(tag)
            if self.corrected:
                out_tags.append(self.CORR_TAG)
            return sorted(out_tags)
        else:
            return [t for t in tags if not t.startswith('RRID:')]  # let self.rrid handle the rrid tags
项目:scibot    作者:SciCrunch    | 项目源码 | 文件源码
def clean_dupes(get_annos, repr_issues=False):
    annos = get_annos()
    seen = set()
    dupes = [a.id for a in annos if a.id in seen or seen.add(a.id)]
    preunduped = [a for a in annos if a.id in dupes]
    for id_ in dupes:
        print('=====================')
        anns = sorted((a for a in annos if a.id == id_), key=lambda a: a.updated)
        if not repr_issues:
            [print(a.updated, HypothesisHelper(a, annos)) for a in anns]
        for a in anns[:-1]:  # all but latest
            annos.remove(a)
    deduped = [a for a in annos if a.id in dupes]
    assert len(preunduped) // len(dupes) == 2, 'Somehow you have managed to get more than 1 duplicate!'
    # get_annos.memoize_annos(annos)
    embed()
项目:PyWallet    作者:AndreMiras    | 项目源码 | 文件源码
def get_balance(address):
        """
        Retrieves the balance from etherscan.io.
        The balance is returned in ETH rounded to the second decimal.
        """
        address = PyWalib.address_hex(address)
        url = 'https://api.etherscan.io/api'
        url += '?module=account&action=balance'
        url += '&address=%s' % address
        url += '&tag=latest'
        if ETHERSCAN_API_KEY:
            '&apikey=%' % ETHERSCAN_API_KEY
        # TODO: handle 504 timeout, 403 and other errors from etherscan
        response = requests.get(url)
        response_json = response.json()
        PyWalib.handle_etherscan_error(response_json)
        balance_wei = int(response_json["result"])
        balance_eth = balance_wei / float(pow(10, 18))
        balance_eth = round(balance_eth, ROUND_DIGITS)
        return balance_eth
项目:pubchem-ranker    作者:jacobwindsor    | 项目源码 | 文件源码
def get_cids(self, cas):
        """
        Use the PubChem API to get the CID
        :param cas: string - CAS identifier
        :return: list of CIDs
        """
        uri = "http://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/name/%s/cids/json" \
              "?email=%s"

        try:
            response = get((uri % (cas, app.config['ADMIN_EMAIL']))).json()
            try:
                cids = response['IdentifierList']['CID']
                return cids
            except KeyError:
                return None

        except (exceptions.ConnectionError, TimeoutError, exceptions.Timeout,
                exceptions.ConnectTimeout, exceptions.ReadTimeout) as e:
            # Error. return the error and the CAS number that this error occured on
            sys.stderr.write("Error: %s. Occurred on CAS: %s", (e, cas))
            sys.stderr.flush()
            sys.stdout.flush()
项目:Rosi    作者:HaoBingo    | 项目源码 | 文件源码
def getRosiItem():
    start = time.time()
    index = 1
    while True:
        url = "http://www.mmxyz.net/category/rosi/page/{}/".format(index)
        res = requests.get(url,timeout=10)
        if res.status_code == 404:
            print("+   Time: {:.2f} S         +".format(time.time()-start))
            print("+   Total Pages:     {}   +".format(index-1))
            print("+  Total Numbers:   {}  +".format(len(RosiItems)))
            print("+-------------------------+\r\n\r\n")
            return
        soup = BeautifulSoup(res.content, "html.parser")
        rosiList = soup.find_all("a", class_="inimg")
        for rosi in rosiList:
            RosiItems.append(rosi['href'])
        index += 1
项目:Rosi    作者:HaoBingo    | 项目源码 | 文件源码
def getRosiItem():
    start = time.time()
    index = 1
    while True:
        url = "http://www.mmxyz.net/category/disi/page/{}/".format(index)
        res = requests.get(url,timeout=10)
        if res.status_code == 404:
            print("+   Time: {:.2f} S         +".format(time.time()-start))
            print("+   Total Pages:     {}   +".format(index-1))
            print("+  Total Numbers:   {}  +".format(len(RosiItems)))
            print("+-------------------------+\r\n\r\n")
            return
        soup = BeautifulSoup(res.content, "html.parser")
        rosiList = soup.find_all("a", class_="inimg")
        for rosi in rosiList:
            RosiItems.append(rosi['href'])
        index += 1