Python requests.compat 模块,urljoin() 实例源码

我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用requests.compat.urljoin()

项目:BJ_Sickrage    作者:gabrielbdsantos    | 项目源码 | 文件源码
def login(self):
        cookie_dict = requests.utils.dict_from_cookiejar(self.session.cookies)
        if cookie_dict.get('session'):
            return True

        login_params = {
            'submit': 'Login',
            'username': self.username,
            'password': self.password,
            'keeplogged': 0,
        }

        if not self.get_url(self.urls['login'], post_data=login_params, returns='text'):
            logger.log(u"Unable to connect to provider", logger.WARNING)
            return False

        response = self.get_url(urljoin(self.urls['base_url'],'index.php'), returns='text')

        if re.search('<title>Login :: BJ-Share</title>', response):
            logger.log(u"Invalid username or password. Check your settings", logger.WARNING)
            return False

        return True
项目:python-twitch-client    作者:tsifrer    | 项目源码 | 文件源码
def _request_get(self, path, params=None):
        url = urljoin(BASE_URL, path)

        headers = self._get_request_headers()

        response = requests.get(url, params=params, headers=headers)
        if response.status_code >= 500:

            backoff = self._initial_backoff
            for _ in range(self._max_retries):
                time.sleep(backoff)
                backoff_response = requests.get(url, params=params, headers=headers)
                if backoff_response.status_code < 500:
                    response = backoff_response
                    break
                backoff *= 2

        response.raise_for_status()
        return response.json()
项目:mail.ru-uploader    作者:instefa    | 项目源码 | 文件源码
def cloud_auth(session, login=LOGIN, password=PASSWORD):
    try:
        r = session.post('https://auth.mail.ru/cgi-bin/auth?lang=ru_RU&from=authpopup',
                         data = {'Login': login, 'Password': password, 'page': urljoin(CLOUD_URL, '?from=promo'),
                                 'new_auth_form': 1, 'Domain': get_email_domain(login)}, verify = VERIFY_SSL)
    except Exception as e:
        if LOGGER:
            LOGGER.error('Cloud auth HTTP request error: {}'.format(e))
        return None

    if r.status_code == requests.codes.ok:
        if LOGIN_CHECK_STRING in r.text:
            return True
        elif LOGGER:
            LOGGER.error('Cloud authorization request error. Check your credentials settings in {}. \
Do not forget to accept cloud LA by entering it in browser. \
HTTP code: {}, msg: {}'.format(CONFIG_FILE, r.status_code, r.text))
    elif LOGGER:
        LOGGER.error('Cloud authorization request error. Check your connection. \
HTTP code: {}, msg: {}'.format(r.status_code, r.text))
    return None
项目:mail.ru-uploader    作者:instefa    | 项目源码 | 文件源码
def get_csrf(session):
    try:
        r = session.get(urljoin(CLOUD_URL, 'tokens/csrf'), verify = VERIFY_SSL)
    except Exception as e:
        if LOGGER:
            LOGGER.error('Get csrf HTTP request error: {}'.format(e))
        return None

    if r.status_code == requests.codes.ok:
        r_json = r.json()
        token = r_json['body']['token']
        assert len(token) == 32, 'invalid CSRF token <{}> lentgh'.format(token)
        return token
    elif LOGGER:
        LOGGER.error('CSRF token request error. Check your connection and credentials settings in {}. \
HTTP code: {}, msg: {}'.format(CONFIG_FILE, r.status_code, r.text))
    return None
项目:mail.ru-uploader    作者:instefa    | 项目源码 | 文件源码
def get_upload_domain(session, csrf=''):
    """ return current cloud's upload domain url
    it seems that csrf isn't necessary in session,
    but forcing assert anyway to avoid possible future damage
    """
    assert csrf is not None, 'no CSRF' 
    url = urljoin(CLOUD_URL, 'dispatcher?token=' + csrf)

    try:
        r = session.get(url, verify = VERIFY_SSL)
    except Exception as e:
        if LOGGER:
            LOGGER.error('Get upload domain HTTP request error: {}'.format(e))
        return None

    if r.status_code == requests.codes.ok:
        r_json = r.json()
        return r_json['body']['upload'][0]['url']
    elif LOGGER:
        LOGGER.error('Upload domain request error. Check your connection. \
HTTP code: {}, msg: {}'.format(r.status_code, r.text))
    return None
项目:upnpclient    作者:flyte    | 项目源码 | 文件源码
def _read_actions(self):
        action_url = urljoin(self._url_base, self._control_url)

        for action_node in self._findall('actionList/action'):
            name = action_node.findtext('name', namespaces=action_node.nsmap)
            argsdef_in = []
            argsdef_out = []
            for arg_node in action_node.findall(
                    'argumentList/argument', namespaces=action_node.nsmap):
                findtext = partial(arg_node.findtext, namespaces=arg_node.nsmap)
                arg_name = findtext('name')
                arg_statevar = self.statevars[findtext('relatedStateVariable')]
                if findtext('direction').lower() == 'in':
                    argsdef_in.append((arg_name, arg_statevar))
                else:
                    argsdef_out.append((arg_name, arg_statevar))
            action = Action(action_url, self.service_type, name, argsdef_in, argsdef_out)
            self.action_map[name] = action
            self.actions.append(action)
项目:brobeat-OLD    作者:blacktop    | 项目源码 | 文件源码
def get_log_types():
    url = "https://www.bro.org/sphinx/script-reference/"
    resp = requests.get(url=url + "log-files.html")
    soup = BeautifulSoup(resp.content, "html.parser")
    bro_logs = dict(logs=[])
    for table in soup.find_all("table", {"class": "docutils"}):
        for row in table.find('tbody').find_all('tr'):
            log = {}
            cols = row.find_all('td')
            cols = [ele.text.strip() for ele in cols]
            tds = [ele for ele in cols if ele]
            log['file'] = tds[0]
            log['log_type'] = os.path.splitext(log['file'])[0]
            log['description'] = tds[1]
            log['fields'] = []
            link = row.find('a', href=True)
            # do not add a URL for notice_alarm.log
            if link is not None and 'notice_alarm' not in log['log_type']:
                log['url'] = urljoin(url, link['href'])
                logger.info('adding log type: {}'.format(log['log_type']))
            bro_logs['logs'].append(log)
    return bro_logs
项目:ProjectQ    作者:ProjectQ-Framework    | 项目源码 | 文件源码
def _authenticate(email=None, password=None):
    """
    :param email:
    :param password:
    :return:
    """
    if email is None:
        try:
            input_fun = raw_input
        except NameError:
            input_fun = input
        email = input_fun('IBM QE user (e-mail) > ')
    if password is None:
        password = getpass.getpass(prompt='IBM QE password > ')

    r = requests.post(urljoin(_api_url, 'users/login'),
                      data={"email": email, "password": password})
    r.raise_for_status()

    json_data = r.json()
    user_id = json_data['userId']
    access_token = json_data['id']

    return user_id, access_token
项目:ProjectQ    作者:ProjectQ-Framework    | 项目源码 | 文件源码
def test_send_real_device_offline(monkeypatch):
    def mocked_requests_get(*args, **kwargs):
        class MockResponse:
            def __init__(self, json_data, status_code):
                self.json_data = json_data
                self.status_code = status_code

            def json(self):
                return self.json_data

        # Accessing status of device. Return online.
        status_url = 'Backends/ibmqx2/queue/status'
        if args[0] == urljoin(_api_url_status, status_url):
            return MockResponse({"state": False}, 200)
    monkeypatch.setattr("requests.get", mocked_requests_get)
    shots = 1
    json_qasm = "my_json_qasm"
    name = 'projectq_test'
    with pytest.raises(_ibm_http_client.DeviceOfflineError):
        _ibm_http_client.send(json_qasm,
                              device="ibmqx2",
                              user=None, password=None,
                              shots=shots, verbose=True)
项目:apimas    作者:grnet    | 项目源码 | 文件源码
def construct_collection(self, instance, spec, loc, context):
        """
        Constructor for `.collection` predicate.

        This constructor aims to aggregate the cerberus validation schemas
        for every single field defined by the collection.
        """
        instance = super(self.__class__, self).construct_collection(
            instance, spec, loc, context)
        self.init_adapter_conf(instance)
        schema = {field_name: schema.get(self.ADAPTER_CONF, {})
                  for field_name, schema in doc.doc_get(
                      instance, ('*',)).iteritems()}
        collection = context.get('parent_name')
        endpoint = urljoin(
            self.root_url, TRAILING_SLASH.join([loc[0], collection]))
        endpoint += TRAILING_SLASH
        instance[self.ADAPTER_CONF] = schema
        client = ApimasClient(endpoint, schema)
        self.clients[loc[0] + '/' + collection] = client
        return instance
项目:python-twitch-client    作者:tsifrer    | 项目源码 | 文件源码
def _request_post(self, path, data=None, params=None):
        url = urljoin(BASE_URL, path)

        headers = self._get_request_headers()

        response = requests.post(url, json=data, params=params, headers=headers)
        response.raise_for_status()
        if response.status_code == 200:
            return response.json()
项目:python-twitch-client    作者:tsifrer    | 项目源码 | 文件源码
def _request_put(self, path, data=None, params=None):
        url = urljoin(BASE_URL, path)

        headers = self._get_request_headers()

        response = requests.put(url, json=data, params=params, headers=headers)
        response.raise_for_status()
        if response.status_code == 200:
            return response.json()
项目:python-twitch-client    作者:tsifrer    | 项目源码 | 文件源码
def _request_delete(self, path, params=None):
        url = urljoin(BASE_URL, path)

        headers = self._get_request_headers()

        response = requests.delete(url, params=params, headers=headers)
        response.raise_for_status()
        if response.status_code == 200:
            return response.json()
项目:GitHub-Repo-Recommender    作者:frankyjuang    | 项目源码 | 文件源码
def req(url, hdr):
    try:
        res = requests.get(urljoin(BASE_URL, url), headers=hdr, timeout=10.0)
    except requests.Timeout:
        raise RequestTimeoutError(url)
    except requests.ConnectionError:
        raise RequestTimeoutError(url)
    if res.status_code != 200:
        raise StatusCodeError(url, res.status_code)
    return res
项目:mail.ru-uploader    作者:instefa    | 项目源码 | 文件源码
def get_cloud_space(session, csrf='', login=LOGIN):
    """ returns available free space in bytes """
    assert csrf is not None, 'no CSRF'

    timestamp = str(int(time.mktime(datetime.datetime.now().timetuple())* 1000))
    quoted_login = quote_plus(login)
    command = ('user/space?api=' + str(API_VER) + '&email=' + quoted_login +
               '&x-email=' + quoted_login + '&token=' + csrf + '&_=' + timestamp)
    url = urljoin(CLOUD_URL, command)

    try:
        r = session.get(url, verify = VERIFY_SSL)
    except Exception as e:
        if LOGGER:
            LOGGER.error('Get cloud space HTTP request error: {}'.format(e))
        return 0

    if r.status_code == requests.codes.ok:
        r_json = r.json()
        total_bytes = r_json['body']['total'] * 1024 * 1024
        used_bytes = r_json['body']['used'] * 1024 * 1024
        return total_bytes - used_bytes
    elif LOGGER:
        LOGGER.error('Cloud free space request error. Check your connection. \
HTTP code: {}, msg: {}'.format(r.status_code, r.text))
    return 0
项目:mail.ru-uploader    作者:instefa    | 项目源码 | 文件源码
def post_file(session, domain='', file='', login=LOGIN):
    """ posts file to the cloud's upload server
    param: file - string filename with path
    """
    assert domain is not None, 'no domain'
    assert file is not None, 'no file'

    filetype = guess_type(file)[0]
    if not filetype:
        filetype = DEFAULT_FILETYPE
        if LOGGER:
            LOGGER.warning('File {} type is unknown, using default: {}'.format(file, DEFAULT_FILETYPE))

    filename = os.path.basename(file)
    quoted_login = quote_plus(login)
    timestamp = str(int(time.mktime(datetime.datetime.now().timetuple()))) + TIME_AMEND
    url = urljoin(domain, '?cloud_domain=' + str(CLOUD_DOMAIN_ORD) + '&x-email=' + quoted_login + '&fileapi' + timestamp)
    m = MultipartEncoder(fields={'file': (quote_plus(filename), open(file, 'rb'), filetype)})

    try:
        r = session.post(url, data=m, headers={'Content-Type': m.content_type}, verify = VERIFY_SSL)
    except Exception as e:
        if LOGGER:
            LOGGER.error('Post file HTTP request error: {}'.format(e))
        return (None, None)

    if r.status_code == requests.codes.ok:
        if len(r.content):
            hash = r.content[:40].decode()
            size = int(r.content[41:-2])
            return (hash, size)
        elif LOGGER:
            LOGGER.error('File {} post error, no hash and size received'.format(file))
    elif LOGGER:
        LOGGER.error('File {} post error, http code: {}, msg: {}'.format(file, r.status_code, r.text))
    return (None, None)
项目:ruffruffs    作者:di    | 项目源码 | 文件源码
def prepare_url(value):
    # Issue #1483: Make sure the URL always has a trailing slash
    httpbin_url = value.url.rstrip('/') + '/'

    def inner(*suffix):
        return urljoin(httpbin_url, '/'.join(suffix))

    return inner
项目:valence    作者:openstack    | 项目源码 | 文件源码
def test_get_rfs_url(self):
        CONF.podm.url = "https://127.0.0.1:8443"
        expected = urljoin(CONF.podm.url, "redfish/v1/Systems/1")

        # test without service_ext
        result = redfish.get_rfs_url("/Systems/1/")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("/Systems/1")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("Systems/1/")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("Systems/1")
        self.assertEqual(expected, result)

        # test with service_ext
        result = redfish.get_rfs_url("/redfish/v1/Systems/1/")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("/redfish/v1/Systems/1")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("redfish/v1/Systems/1/")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("redfish/v1/Systems/1")
        self.assertEqual(expected, result)
项目:valence    作者:openstack    | 项目源码 | 文件源码
def test_get_rfs_url_with_tailing_slash(self):
        CONF.podm.url = "https://127.0.0.1:8443/"
        expected = urljoin(CONF.podm.url, "redfish/v1/Systems/1")

        # test without service_ext
        result = redfish.get_rfs_url("/Systems/1/")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("/Systems/1")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("Systems/1/")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("Systems/1")
        self.assertEqual(expected, result)

        # test with service_ext
        result = redfish.get_rfs_url("/redfish/v1/Systems/1/")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("/redfish/v1/Systems/1")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("redfish/v1/Systems/1/")
        self.assertEqual(expected, result)

        result = redfish.get_rfs_url("redfish/v1/Systems/1")
        self.assertEqual(expected, result)
项目:upnpclient    作者:flyte    | 项目源码 | 文件源码
def __init__(self, url_base, service_type, service_id, control_url, scpd_url, event_sub_url):
        self._url_base = url_base
        self.service_type = service_type
        self.service_id = service_id
        self._control_url = control_url
        self.scpd_url = scpd_url
        self._event_sub_url = event_sub_url

        self.actions = []
        self.action_map = {}
        self.statevars = {}
        self._log = _getLogger('Service')

        self._log.debug('%s url_base: %s', self.service_id, self._url_base)
        self._log.debug('%s SCPDURL: %s', self.service_id, self.scpd_url)
        self._log.debug('%s controlURL: %s', self.service_id, self._control_url)
        self._log.debug('%s eventSubURL: %s', self.service_id, self._event_sub_url)

        url = urljoin(self._url_base, self.scpd_url)
        self._log.info('Reading %s', url)
        resp = requests.get(url, timeout=HTTP_TIMEOUT)
        resp.raise_for_status()
        self.scpd_xml = etree.fromstring(resp.content)
        self._find = partial(self.scpd_xml.find, namespaces=self.scpd_xml.nsmap)
        self._findtext = partial(self.scpd_xml.findtext, namespaces=self.scpd_xml.nsmap)
        self._findall = partial(self.scpd_xml.findall, namespaces=self.scpd_xml.nsmap)

        self._read_state_vars()
        self._read_actions()
项目:pymarketcap    作者:mondeja    | 项目源码 | 文件源码
def stats(self):
        """ Get global cryptocurrencies statistics.

        Returns:
            dict: Global markets statistics
        """
        url = urljoin(self.urls["api"], 'global/')
        response = get(url).json(parse_int=self.parse_int,
                                 parse_float=self.parse_float)
        return response


    #######    WEB PARSER METHODS    #######
项目:pymarketcap    作者:mondeja    | 项目源码 | 文件源码
def _get_ranks(self, query, temp):
        """Internal function for get gainers and losers

        Args:
            query: Query to obtain ranks, gainers or losers
            temp: Temporal period obtaining gainers or losers,
                1h, 24h or 7d
        """
        url = urljoin(self.urls["web"], 'gainers-losers/')
        html = self._html(url)

        call = str(query) + '-' + str(temp)

        response = []
        html_rank = html.find('div', {'id': call}).find_all('tr')

        for curr in html_rank[1:]:
            _childs, childs = (curr.contents, [])
            for c in _childs:
                if c != '\n':
                    childs.append(c)
            for n, g in enumerate(childs):
                if n == 1:
                    name = str(g.a.getText())
                elif n == 2:
                    symbol = str(g.string)
                elif n == 3:
                    _volume_24h = sub(r'\$|,', '', g.a.getText())
                    volume_24h = self.parse_int(_volume_24h)
                elif n == 4:
                    _price = sub(r'\$|,', '', g.a.getText())
                    price = self.parse_float(_price)
                elif n == 5:
                    percent = self.parse_float(sub(r'%', '', g.string))
            currency = {'symbol': symbol, 'name': name,
                        '24h_volume_usd': volume_24h,
                        'price_usd': price, 'percent_change': percent}
            response.append(currency)
        return response
项目:pymarketcap    作者:mondeja    | 项目源码 | 文件源码
def global_cap(self, bitcoin=True, start=None, end=None):
        """Get global market capitalization graphs, including
        or excluding Bitcoin

        Args:
            bitcoin (bool, optional): Indicates if Bitcoin will
                be includedin global market capitalization graph.
                As default True
            start (optional, datetime): Time to start retrieving
                graphs data. If not provided get As default None
            end (optional, datetime): Time to end retrieving
                graphs data.

        Returns (dict):
            List of lists with timestamp and values
        """
        base_url = self.urls["graphs_api"]
        if bitcoin:
            endpoint = "global/marketcap-total/"
        else:
            endpoint = "global/marketcap-altcoin/"
        url = urljoin(base_url, endpoint)

        if start and end:
            url += self._add_start_end(url, start, end)

        return get(url).json()
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def prepare_url(value):
    # Issue #1483: Make sure the URL always has a trailing slash
    httpbin_url = value.url.rstrip('/') + '/'

    def inner(*suffix):
        return urljoin(httpbin_url, '/'.join(suffix))

    return inner
项目:filegardener    作者:smorin    | 项目源码 | 文件源码
def prepare_url(value):
    # Issue #1483: Make sure the URL always has a trailing slash
    httpbin_url = value.url.rstrip('/') + '/'

    def inner(*suffix):
        return urljoin(httpbin_url, '/'.join(suffix))

    return inner
项目:brobeat-OLD    作者:blacktop    | 项目源码 | 文件源码
def build_url(current_url, next_url):
    if is_url(next_url):
        return next_url
    else:
        return urljoin(current_url, next_url)
项目:ProjectQ    作者:ProjectQ-Framework    | 项目源码 | 文件源码
def _run(qasm, device, user_id, access_token, shots):
    suffix = 'codes/execute'

    r = requests.post(urljoin(_api_url, suffix),
                      data=qasm,
                      params={"access_token": access_token,
                              "deviceRunType": device,
                              "fromCache": "false",
                              "shots": shots},
                      headers={"Content-Type": "application/json"})
    r.raise_for_status()

    r_json = r.json()
    execution_id = r_json["id"]
    return execution_id
项目:ProjectQ    作者:ProjectQ-Framework    | 项目源码 | 文件源码
def _get_result(execution_id, access_token, num_retries=300, interval=1):
    suffix = 'Executions/{execution_id}'.format(execution_id=execution_id)

    for _ in range(num_retries):
        r = requests.get(urljoin(_api_url, suffix),
                         params={"access_token": access_token})
        r.raise_for_status()

        r_json = r.json()
        status = r_json["status"]["id"]
        if status == "DONE":
            return r_json["result"]
        time.sleep(interval)
项目:apimas    作者:grnet    | 项目源码 | 文件源码
def __call__(self, value):
        if value is None:
            return value
        return urljoin(self.ref_endpoint, value).rstrip('/') + '/'
项目:apimas    作者:grnet    | 项目源码 | 文件源码
def format_endpoint(self, resource_id):
        """
        This method concatenates the resource's endpoint with a specified
        identifier.

        Example: endpoint/<pk>/
        """
        if isinstance(resource_id, unicode):
            resource_id = resource_id.encode("utf-8")
        return urljoin(self.endpoint, quote(
            str(resource_id))) + TRAILING_SLASH
项目:pymarketcap    作者:mondeja    | 项目源码 | 文件源码
def markets(self, currency):
        """Get available coinmarketcap markets data.
        It needs a currency as argument.

        Args:
            currency (str): Currency to get market data

        Returns:
            list: markets on wich provided currency is currently tradeable
        """
        if self.is_symbol(currency):
            currency = self.correspondences[currency]

        url = urljoin(self.urls["web"], "currencies/%s/" % currency)
        html = self._html(url)

        response = []
        marks = html.find('tbody').find_all('tr')

        for m in marks:
            _volume_24h = m.find('span', {'class': 'volume'}).getText()
            volume_24h = self.parse_int(sub(r'\D', '', _volume_24h))
            _price = m.find('span', {'class': 'price'}).getText()
            _price = sub(r'\$| |\*|,', '', _price)
            price = self.parse_float(_price)

            _childs, childs = (m.contents, [])
            for c in _childs:
                if c != '\n':
                    childs.append(c)
            for n, c in enumerate(childs):
                nav = c.string
                if n == 1:
                    exchange = str(nav)
                elif n == 2:
                    pair = str(c.getText()).replace('/', self.pair_separator)
                    if pair[-1] == '*':
                        pair = pair.replace(' *', '')
                elif n == 5:
                    percent_volume = self.parse_float(nav.replace('%', ''))
                elif n == 6:
                    updated = nav == "Recently"
            market = {'exchange': exchange, 'pair': pair,
                      '24h_volume_usd': volume_24h,
                      'price_usd': price,
                      'percent_volume': percent_volume,
                      "updated": updated}
            response.append(market)

        return response
项目:pymarketcap    作者:mondeja    | 项目源码 | 文件源码
def exchange(self, name):
        """Obtain data from a exchange passed as argument

        Example:
            exchange('poloniex')

        Args:
            name (str): Exchange to retrieve data

        Returns:
            list: Data from all markets in a exchange
        """
        url = urljoin(self.urls["web"], 'exchanges/%s/' % name)
        html = self._html(url)

        marks = html.find('table').find_all('tr')
        response = []
        for m in marks[1:]:
            _childs, childs = (m.contents, [])
            for c in _childs:
                if c != '\n':
                    childs.append(c)
            for n, c in enumerate(childs):
                if n == 0:
                    rank = self.parse_int(c.getText())
                elif n == 1:
                    name = str(c.getText())
                elif n == 2:
                    market = str(c.getText().replace(
                        '/', self.pair_separator
                    ))
                elif n == 3:
                    _volume_24h_usd = sub(r'\$|,', '', c.getText())
                    volume_24h_usd = self.parse_int(_volume_24h_usd)
                elif n == 4:
                    _price_usd = sub(r'\$| |\*', '', c.getText())
                    price_usd = self.parse_float(_price_usd)
                elif n == 5:
                    _perc_volume = c.getText().replace('%', '')
                    perc_volume = self.parse_float(_perc_volume)
            indicators = {'rank': rank,
                          'name': name,
                          'market': market,
                          'volume_24h_usd': volume_24h_usd,
                          'price_usd': price_usd,
                          'perc_volume': perc_volume}
            response.append(indicators)

        return response
项目:kite    作者:pyrocko    | 项目源码 | 文件源码
def get_test_data(fn):

    def _dir_content(url):
        r = requests.get(url)
        r.raise_for_status()

        entries = re.findall(r'href="([a-zA-Z0-9_.-]+/?)"', r.text)

        files = sorted(set(fn for fn in entries
                           if not fn.endswith('/')))
        return files

    def _file_size(url):
        r = requests.head(url, headers={'Accept-Encoding': 'identity'})
        r.raise_for_status()
        return int(r.headers['content-length'])

    def _download_file(url, fn_local):
        if op.exists(fn_local):
            if os.stat(fn_local).st_size == _file_size(url):
                logger.info('Using cached file %s' % fn_local)
                return fn_local

        logger.info('Downloading %s...' % url)
        fsize = _file_size(url)

        r = requests.get(url, stream=True)
        r.raise_for_status()

        dl_bytes = 0
        with open(fn_local, 'wb') as f:
            for d in r.iter_content(chunk_size=1024):
                dl_bytes += len(d)
                f.write(d)

        if dl_bytes != fsize:
            raise DownloadError('Download incomplete!')
        logger.info('Download completed.')
        return fn_local

    url = urljoin(data_uri, fn)

    dl_dir = data_dir
    _makedir(dl_dir)

    if fn.endswith('/'):
        dl_dir = op.join(data_dir, fn)
        _makedir(dl_dir)
        dl_files = _dir_content(url)

        dl_files = zip([urljoin(url, u) for u in dl_files],
                       [op.join(dl_dir, f) for f in dl_files])
    else:
        dl_files = (url, op.join(data_dir, fn))
        return _download_file(*dl_files)

    return [_download_file(*f) for f in dl_files]
项目:ProjectQ    作者:ProjectQ-Framework    | 项目源码 | 文件源码
def send(qasm, device='sim_trivial_2', user=None, password=None,
         shots=1, verbose=False):
    """
    Sends QASM through the IBM API and runs the quantum circuit.

    Args:
        qasm: QASM representation of the circuit to run.
        device (str): 'sim_trivial_2' or 'real' to run on simulator or on the
            real chip, respectively.
        user (str): IBM quantum experience user.
        password (str): IBM quantum experience user password.
        shots (int): Number of runs of the same circuit to collect statistics.
        verbose (bool): If True, additional information is printed, such as
            measurement statistics. Otherwise, the backend simply registers
            one measurement result (same behavior as the projectq Simulator).
    """
    try:
        # check if the device is online
        if device in ['ibmqx2', 'ibmqx4']:
            url = 'Backends/{}/queue/status'.format(device)
            r = requests.get(urljoin(_api_url_status, url))
            online = r.json()['state']

            if not online:
                print("The device is offline (for maintenance?). Use the "
                      "simulator instead or try again later.")
                raise DeviceOfflineError("Device is offline.")
            if device == 'ibmqx2':
                device = 'real'

        if verbose:
            print("Authenticating...")
        user_id, access_token = _authenticate(user, password)
        if verbose:
            print("Running code...")
        execution_id = _run(qasm, device, user_id, access_token, shots)
        if verbose:
            print("Waiting for results...")
        res = _get_result(execution_id, access_token)
        if verbose:
            print("Done.")
        return res
    except requests.exceptions.HTTPError as err:
        print("There was an error running your code:")
        print(err)
    except requests.exceptions.RequestException as err:
        print("Looks like something is wrong with server:")
        print(err)
    except KeyError as err:
        print("Failed to parse response:")
        print(err)