Python requests 模块,request() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.request()

项目:telegram-autoposter    作者:vaniakosmos    | 项目源码 | 文件源码
def fetch(self):
        url = "https://www.reddit.com/top/.json"
        headers = {
            "User-Agent": f"ChangeMeClient/0.1 by YOUR_USERNAME_ON_REDDIT"
        }
        response = requests.request(method='GET', url=url, headers=headers)
        json = response.json()

        self.logger.debug(f'Fetched. Code: {response.status_code}')
        data = []
        if response.status_code == 200:
            data = json['data']['children']
        return data


# get necessary data
项目:cisco-meraki-fw-ap-rules-api    作者:robertcsapo    | 项目源码 | 文件源码
def meraki_requests(url,meraki_api_key,error_handle):
    url = "https://dashboard.meraki.com/api/v0%s" % url
    if meraki_debug == "1":
        print("GET: %s" % url)
    querystring = {}
    headers = {
        'x-cisco-meraki-api-key': meraki_api_key,
        'content-type': "application/json",
        'cache-control': "no-cache",
        }
    response = requests.request("GET", url, headers=headers, params=querystring)
    if response.status_code == 200:
        json_data = json.loads(response.text)
        return json_data
    else:
        if meraki_debug == "1":
            print(response.text)
        if error_handle == "enable":
            sys.exit("Failed: Code %s" % response.status_code)

# Meraki REST API Call - PUT
项目:safetyculture-sdk-python    作者:SafetyCulture    | 项目源码 | 文件源码
def get_user_api_token(logger):
    """
    Generate iAuditor API Token
    :param logger:  the logger
    :return:        API Token if authenticated else None
    """
    username = input("iAuditor username: ")
    password = getpass()
    generate_token_url = "https://api.safetyculture.io/auth"
    payload = "username=" + username + "&password=" + password + "&grant_type=password"
    headers = {
        'content-type': "application/x-www-form-urlencoded",
        'cache-control': "no-cache",
    }
    response = requests.request("POST", generate_token_url, data=payload, headers=headers)
    if response.status_code == requests.codes.ok:
        return response.json()['access_token']
    else:
        logger.error('An error occurred calling ' + generate_token_url + ': ' + str(response.json()))
        return None
项目:goodreads-api-client-python    作者:mdzhang    | 项目源码 | 文件源码
def _req(self, method: str='GET', endpoint: str=None, params: dict=None,
             data: dict=None, uses_oauth: bool=False):
        if params is None:
            params = {}

        fetch = self.session.request if uses_oauth else requests.request

        res = fetch(
            method=method,
            url='{}/{}'.format(self.base_url, endpoint),
            params={
                'format': 'xml',
                'key': self._developer_key,
                **params
            },
            data=data
        )

        res.raise_for_status()
        return res
项目:telegram-autoposter    作者:vaniakosmos    | 项目源码 | 文件源码
def fetch(self):
        section = 'hot'  # hot | top | user
        sort = 'viral'  # viral | top | time | rising (only available with user section)
        show_viral = 'true'
        show_mature = 'true'
        album_previews = 'false'

        url = f'https://api.imgur.com/3/gallery/{section}/{sort}'
        querystring = {"showViral": f"{show_viral}", "mature": f"{show_mature}", "album_previews": f"{album_previews}"}
        headers = {'authorization': f'Client-ID {client_id}'}
        response = requests.request("GET", url, headers=headers, params=querystring)
        json = response.json()

        self.logger.debug(f'Fetched. Code: {response.status_code}')

        return json['data'][:FETCH_LIMIT]
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def create_logset(logset_name=None, params=None):
    """
    Add a new logset to the current account.
    If a filename is given, load the contents of the file
    as json parameters for the request.
    If a name is given, create a new logset with the given name
    """
    if params is not None:
        request_params = params
    else:
        request_params = {
            'logset': {
                'name': logset_name
            }
        }

    headers = api_utils.generate_headers('rw')

    try:
        response = requests.post(_url()[1], json=request_params, headers=headers)
        handle_response(response, 'Creating logset failed.\n', 201)
    except requests.exceptions.RequestException as error:
        sys.stderr.write(error)
        sys.exit(1)
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def add_new_user(first_name, last_name, email):
    """
    Add a new user to the current account.
    """
    action, url = _url(('users',))
    json_content = {
        "user":
            {
                "email": str(email),
                "first_name": str(first_name),
                "last_name": str(last_name)
            }
    }
    body = json.dumps(json_content)
    headers = api_utils.generate_headers('owner', method='POST', action=action, body=body)

    try:
        response = requests.request('POST', url, json=json_content, headers=headers)
        handle_create_user_response(response)
    except requests.exceptions.RequestException as error:
        sys.stderr.write(error)
        sys.exit(1)
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def delete_user(user_key):
    """
    Delete a user from the current account.
    """
    action, url = _url(('users', user_key))
    headers = api_utils.generate_headers('owner', method='DELETE', action=action, body='')

    try:
        response = requests.request('DELETE', url, data='', headers=headers)
        if response_utils.response_error(response) is True:  # Check response has no errors
            sys.stderr.write('Delete user failed, status code: %s' % response.status_code)
            sys.exit(1)
        elif response.status_code == 204:
            print 'Deleted user'
    except requests.exceptions.RequestException as error:
        sys.stderr.write(error)
        sys.exit(1)
项目:girder_worker    作者:girder    | 项目源码 | 文件源码
def read(self, buf_len):
        """
        Implementation note: due to a constraint of the requests library, the
        buf_len that is used the first time this method is called will cause
        all future requests to ``read`` to have the same ``buf_len`` even if
        a different ``buf_len`` is passed in on subsequent requests.
        """
        if self._iter is None:  # lazy load response body iterator
            method = self.input_spec.get('method', 'GET').upper()
            headers = self.input_spec.get('headers', {})
            params = self.input_spec.get('params', {})
            req = requests.request(
                method, self.input_spec['url'], headers=headers, params=params,
                stream=True, allow_redirects=True)
            req.raise_for_status()  # we have the response headers already
            self._iter = req.iter_content(buf_len, decode_unicode=False)

        try:
            return six.next(self._iter)
        except StopIteration:
            return b''
项目:girder_worker    作者:girder    | 项目源码 | 文件源码
def push(data, spec, **kwargs):
    task_output = kwargs.get('task_output', {})
    target = task_output.get('target', 'memory')

    url = spec['url']
    method = spec.get('method', 'POST').upper()

    if target == 'filepath':
        with open(data, 'rb') as fd:
            request = requests.request(
                method, url, headers=spec.get('headers', {}), data=fd,
                params=spec.get('params', {}), allow_redirects=True)
    elif target == 'memory':
        request = requests.request(
            method, url, headers=spec.get('headers', {}), data=data,
            params=spec.get('params', {}), allow_redirects=True)
    else:
        raise Exception('Invalid HTTP fetch target: ' + target)

    try:
        request.raise_for_status()
    except Exception:
        print 'HTTP push failed (%s). Response: %s' % (url, request.text)
        raise
项目:otest    作者:rohe    | 项目源码 | 文件源码
def _func(self, conv):
        try:
            request = registration_request(conv)
        except NoSuchEvent:
            self._status = ERROR
        else:
            # 'offline_access only allow if response_type == 'code'
            try:
                req_scope = request['scope']
            except KeyError:
                pass
            else:
                if 'offline_access' in req_scope:
                    if request['response_type'] != 'code':
                        self._status = ERROR
                        self._message = 'Offline access not allowed for ' \
                                        'anything but code flow'

        return {}
项目:otest    作者:rohe    | 项目源码 | 文件源码
def _func(self, conv):
        try:
            request = registration_request(conv)
        except NoSuchEvent:
            self._status = ERROR
        else:
            try:
                _ss = request['software_statement']
            except KeyError:
                pass
            else:
                missing = []
                for claim in ['redirect_uris', 'grant_types', 'client_name',
                              'client_uri']:
                    if claim not in _ss:
                        missing.append(claim)
                if 'jwks_uri' not in _ss and 'jwks' not in _ss:
                    missing.append('jwks_uri/jwks')

                if missing:
                    self._status = WARNING
                    self._message = 'Missing "{}" claims from Software ' \
                                    'Statement'.format(missing)

        return {}
项目:otest    作者:rohe    | 项目源码 | 文件源码
def _func(self, conv):
        try:
            request = registration_request(conv)
        except NoSuchEvent:
            self._status = ERROR
        else:
            try:
                req_scopes = request['scope']
            except KeyError:
                pass
            else:
                if 'offline_access' in req_scopes:
                    if request['response_type'] != ['code']:
                        self._status = ERROR
                        self._message = 'Offline access only when using ' \
                                        '"code" flow'

        return {}
项目:otest    作者:rohe    | 项目源码 | 文件源码
def _func(self, conv):
        request = access_token_request(conv)

        ca = request['parsed_client_assertion']
        missing = []
        for claim in ["iss", "sub", "aud", "iat", "exp", "jti"]:
            if claim not in ca:
                missing.append(claim)

        if missing:
            self._status = ERROR
            self._message = 'Redirect_uri not registered'

        # verify jti entropy
        bits = calculate(ca['jti'])
        if bits < 128:
            self._status = WARNING
            self._message = 'Not enough entropy in string: {} < 128'.format(
                bits)

        return {}
项目:yelp-fusion    作者:Yelp    | 项目源码 | 文件源码
def search(api_key, term, location):
    """Query the Search API by a search term and location.

    Args:
        term (str): The search term passed to the API.
        location (str): The search location passed to the API.

    Returns:
        dict: The JSON response from the request.
    """

    url_params = {
        'term': term.replace(' ', '+'),
        'location': location.replace(' ', '+'),
        'limit': SEARCH_LIMIT
    }
    return request(API_HOST, SEARCH_PATH, api_key, url_params=url_params)
项目:fauxmo-plugins    作者:n8henrie    | 项目源码 | 文件源码
def get_state(self) -> str:
        """Get device state.

        Returns:
            "on", "off", or "unknown"

        """
        if self.state_cmd is None:
            return "unknown"

        resp = requests.request(self.state_method, self.state_cmd,
                                data=self.state_data, json=self.state_json,
                                headers=self.headers, auth=self.auth)

        if self.state_response_off in resp.text:
            return "off"
        elif self.state_response_on in resp.text:
            return "on"
        return "unknown"
项目:mycroft-light    作者:MatthewScholefield    | 项目源码 | 文件源码
def stt(self, audio, language, limit):
        """ Web API wrapper for performing Speech to Text (STT)

        Args:
            audio (bytes): The recorded audio, as in a FLAC file
            language (str): A BCP-47 language code, e.g. 'en-US'
            limit (int): Maximum minutes to transcribe(?)

        Returns:
            str: JSON structure with transcription results
        """
        return self.request({
            'method': 'POST',
            'headers': {'Content-Type': 'audio/x-flac'},
            'query': {'lang': language, 'limit': limit},
            'data': audio
        })
项目:it-jira-bamboohr    作者:saucelabs    | 项目源码 | 文件源码
def request_jira(client, url, method='GET', **kwargs):
    jwt_authorization = 'JWT %s' % encode_token(
        method, url, app.config.get('ADDON_KEY'),
        client.sharedSecret)
    result = requests.request(
        method,
        client.baseUrl.rstrip('/') + url,
        headers={
            "Authorization": jwt_authorization,
            "Content-Type": "application/json"
        },
        **kwargs)
    try:
        result.raise_for_status()
    except requests.HTTPError as e:
        raise requests.HTTPError(e.response.text, response=e.response)
    return result
项目:pyfc4    作者:ghukill    | 项目源码 | 文件源码
def copy(self, destination):

        '''
        Method to copy resource to another location

        Args:
            destination (rdflib.term.URIRef, str): URI location to move resource

        Returns:
            (Resource) new, moved instance of resource
        '''

        # set move headers
        destination_uri = self.repo.parse_uri(destination)

        # http request
        response = self.repo.api.http_request('COPY', self.uri, data=None, headers={'Destination':destination_uri.toPython()})

        # handle response
        if response.status_code == 201:
            return destination_uri
        else:
            raise Exception('HTTP %s, could not move resource %s to %s' % (response.status_code, self.uri, destination_uri))
项目:pyfc4    作者:ghukill    | 项目源码 | 文件源码
def _build_rdf(self, data=None):

        '''
        Parse incoming rdf as self.rdf.orig_graph, create copy at self.rdf.graph

        Args:
            data (): payload from GET request, expected RDF content in various serialization formats

        Returns:
            None
        '''

        # recreate rdf data
        self.rdf = SimpleNamespace()
        self.rdf.data = data
        self.rdf.prefixes = SimpleNamespace()
        self.rdf.uris = SimpleNamespace()
        # populate prefixes
        for prefix,uri in self.repo.context.items():
            setattr(self.rdf.prefixes, prefix, rdflib.Namespace(uri))
        # graph
        self._parse_graph()
项目:pyfc4    作者:ghukill    | 项目源码 | 文件源码
def revert_to(self):

        '''
        method to revert resource to this version by issuing PATCH

        Args:
            None

        Returns:
            None: sends PATCH request, and refreshes parent resource
        '''

        # send patch
        response = self.resource.repo.api.http_request('PATCH', self.uri)

        # if response 204
        if response.status_code == 204:
            logger.debug('reverting to previous version of resource, %s' % self.uri)

            # refresh current resource handle
            self._current_resource.refresh()

        else:
            raise Exception('HTTP %s, could not revert to resource version, %s' % (response.status_code, self.uri))
项目:py_fake_server    作者:Telichkin    | 项目源码 | 文件源码
def test_add_two_simple_handlers_with_same_route(server: FakeServer,
                                                 methods: List[str],
                                                 routes: List[str],
                                                 statuses: List[int]):
    server. \
        on_(methods[0], routes[0]). \
        response(status=statuses[0])

    server. \
        on_(methods[1], routes[1]). \
        response(status=statuses[1])

    response_0 = requests.request(methods[0], server.base_uri + routes[0])
    response_1 = requests.request(methods[1], server.base_uri + routes[1])
    assert response_0.status_code == statuses[0]
    assert response_1.status_code == statuses[1]
项目:farmer    作者:vmfarms    | 项目源码 | 文件源码
def _request(self, method, path, **kwargs):
        url = self.url_for('api', 'v{self.version}'.format(self=self), *path.split('/'))

        try:
            headers = self.headers.update(kwargs['headers'])
        except KeyError:
            headers = self.headers

        try:
            response = requests.request(method, url, headers=headers, **kwargs)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.ConnectionError:
            raise VMFarmsAPIError('Cannot connect to VM Farms API at {}.'.format(self.url))
        except requests.exceptions.HTTPError as error:
            self._raise_http_error_as_api_error(error)
        except ValueError:
            raise VMFarmsAPIError('Unexpected response from server.', response)
项目:orionsdk-python    作者:solarwinds    | 项目源码 | 文件源码
def _req(self, method, frag, data=None):
        resp = requests.request(method, self.url + frag,
                                data=json.dumps(data, default=_json_serial),
                                verify=self.verify,
                                auth=self.credentials,
                                headers={'Content-Type': 'application/json'})

        # try to extract reason from response when request returns error
        if 400 <= resp.status_code < 600:
            try:
                resp.reason = json.loads(resp.text)['Message'];
            except:
                pass;

        resp.raise_for_status()
        return resp
项目:gluon    作者:openstack    | 项目源码 | 文件源码
def sendjson(self, method, urlpath, obj=None):
        """Send json to the OpenDaylight controller."""
        headers = {'Content-Type': 'application/json'}
        data = jsonutils.dumps(obj, indent=2) if obj else None
        url = '/'.join([self.url, urlpath])
        LOG.debug("Sending METHOD (%(method)s) URL (%(url)s) JSON (%(obj)s)" %
                  {'method': method, 'url': url, 'obj': obj})
        r = requests.request(method, url=url,
                             headers=headers, data=data,
                             auth=self.auth, timeout=self.timeout)
        try:
            r.raise_for_status()
        except Exception as ex:
            LOG.error("Error Sending METHOD (%(method)s) URL (%(url)s)"
                      "JSON (%(obj)s) return: %(r)s ex: %(ex)s rtext: "
                      "%(rtext)s" %
                      {'method': method, 'url': url, 'obj': obj, 'r': r,
                       'ex': ex, 'rtext': r.text})
            return r
        try:
            return json.loads(r.content)
        except Exception:
            LOG.debug("%s" % r)
            return
项目:pepipost-sdk-python    作者:pepipost    | 项目源码 | 文件源码
def execute_as_string(self, request):
        """Execute a given HttpRequest to get a string response back

        Args:
            request (HttpRequest): The given HttpRequest to execute.

        Returns:
            HttpResponse: The response of the HttpRequest.

        """ 
        response = requests.request(HttpMethodEnum.to_string(request.http_method), 
                                   request.query_url, 
                                   headers=request.headers,
                                   params=request.query_parameters, 
                                   data=request.parameters,
                                   files=request.files,
                                   auth=(request.username, request.password))

        return self.convert_response(response, False)
项目:pepipost-sdk-python    作者:pepipost    | 项目源码 | 文件源码
def execute_as_binary(self, request):
        """Execute a given HttpRequest to get a binary response back

        Args:
            request (HttpRequest): The given HttpRequest to execute.

        Returns:
            HttpResponse: The response of the HttpRequest.

        """

        response = requests.request(HttpMethodEnum.to_string(request.http_method), 
                                   request.query_url, 
                                   headers=request.headers,
                                   params=request.query_parameters, 
                                   data=request.parameters, 
                                   files=request.files,
                                   auth=(request.username, request.password))

        return self.convert_response(response, True)
项目:Python-Scripts-Repo-on-Data-Science    作者:qalhata    | 项目源码 | 文件源码
def scrapeSource(url, magicFrag='2017',
                 scraperFunction=getNYTText, token='None'):
    urlBodies = {}
    requests = urllib3.PoolManager()
    response = requests.request('GET', url)
    soup = BeautifulSoup(response.data)
    # the above lines of code sets up the beautifulSoup page
    # now we find links
    # links are always of the form <a href='url'> link-text </a>
    for a in soup.findAll('a'):
        try:
            # the line above refers to indiv. scrapperFunction
            # for NYT & washPost
            if body and len(body) > 0:
                urlBodies[url] = body
                print(url)
        except:
            numErrors = 0
            numErrors += 1
项目:PythonStudyCode    作者:TongTongX    | 项目源码 | 文件源码
def get_version(self):
        """Fetches the current version number of the Graph API being used."""
        args = {"access_token": self.access_token}
        try:
            response = requests.request("GET",
                                        "https://graph.facebook.com/" +
                                        self.version + "/me",
                                        params=args,
                                        timeout=self.timeout,
                                        proxies=self.proxies)
        except requests.HTTPError as e:
            response = json.loads(e.read())
            raise GraphAPIError(response)

        try:
            headers = response.headers
            version = headers["facebook-api-version"].replace("v", "")
            return float(version)
        except Exception:
            raise GraphAPIError("API version number not available")
项目:PythonStudyCode    作者:TongTongX    | 项目源码 | 文件源码
def debug_access_token(self, token, app_id, app_secret):
        """
        Gets information about a user access token issued by an app. See
        <https://developers.facebook.com/docs/facebook-login/access-tokens
        #debug>

        We can generate the app access token by concatenating the app
        id and secret: <https://developers.facebook.com/docs/
        facebook-login/access-tokens#apptokens>

        """
        args = {
            "input_token": token,
            "access_token": "%s|%s" % (app_id, app_secret)
        }
        return self.request("/debug_token", args=args)
项目:jcsclient    作者:jiocloudservices    | 项目源码 | 文件源码
def __init__(self):
        self.http_method = None
        # mandatory headers for each request
        self.http_headers = {
                              'Authorization': None,
                              'Date': None,
                            }
        self.dss_url = config.get_service_url('dss')
        if(self.dss_url.endswith('/')):
            self.dss_url = self.dss_url[:-1]
        self.access_key = config.get_access_key()
        self.secret_key = config.get_secret_key()
        self.is_secure_request = config.check_secure()
        self.dss_op_path = None
        self.dss_query_str = None
        self.dss_query_str_for_signature = None
项目:pyrundeck    作者:pschmitt    | 项目源码 | 文件源码
def __request(self, method, url, params=None):
        logger.info('{} {} Params: {}'.format(method, url, params))
        cookies = {'JSESSIONID': self.auth_cookie}
        h = {
            'Accept': 'application/json',
            'Content-Type': 'application/json',
            'X-Rundeck-Auth-Token': self.token
        }
        r = requests.request(
            method, url, cookies=cookies, headers=h, json=params,
            verify=self.verify
        )
        logger.debug(r.content)
        r.raise_for_status()
        try:
            return r.json()
        except ValueError as e:
            logger.error(e.message)
            return r.content
项目:buffer-api    作者:barisariburnu    | 项目源码 | 文件源码
def create_request(self, method, path, options):
        """Creating a request with the given arguments

        If api_version is set, appends it immediately after host
        """
        version = '/' + options['api_version'] if 'api_version' in options else ''

        # Adds a suffix (ex: ".html", ".json") to url
        suffix = options['response_type'] if 'response_type' in options else 'json'
        path = path + '.' + suffix

        path = urlparse.urljoin(self.base, version + path)

        if 'api_version' in options:
            del options['api_version']

        if 'response_type' in options:
            del options['response_type']

        return requests.request(method, path, **options)
项目:memes-reposter    作者:vaniakosmos    | 项目源码 | 文件源码
def fetch(self):
        section = 'hot'  # hot | top | user
        sort = 'viral'  # viral | top | time | rising (only available with user section)
        show_viral = 'true'
        show_mature = 'true'
        album_previews = 'false'

        url = f'https://api.imgur.com/3/gallery/{section}/{sort}'
        querystring = {"showViral": f"{show_viral}", "mature": f"{show_mature}", "album_previews": f"{album_previews}"}
        headers = {'authorization': f'Client-ID {IMGUR_CLIENT_ID}'}
        response = requests.request("GET", url, headers=headers, params=querystring)
        json = response.json()

        self.logger.debug(f'Fetched. Code: {response.status_code}')

        return json['data'][:FETCH_LIMIT]
项目:python_wpapi    作者:Lobosque    | 项目源码 | 文件源码
def _request(self, endpoint, method='GET', files=None, headers={}, **kwargs):
        params = {}
        if method == 'GET':
            params = kwargs
            data = None
            headers = {'Content-Length': '0'}
        else:
            data = kwargs

        response = requests.request(method,
            endpoint,
            auth=self.auth,
            params=params,
            json=data,
            headers=headers,
            files=files
        )
        if not response.status_code // 100 == 2:
            error = WpApiError.factory(response)
            raise error
        return response.json()
项目:healthchecks_asgards    作者:andela    | 项目源码 | 文件源码
def request(self, method, url, **kwargs):
        try:
            options = dict(kwargs)
            if "headers" not in options:
                options["headers"] = {}

            options["timeout"] = 5
            options["headers"]["User-Agent"] = "healthchecks.io"

            r = requests.request(method, url, **options)
            if r.status_code not in (200, 201, 204):
                return "Received status code %d" % r.status_code
        except requests.exceptions.Timeout:
            # Well, we tried
            return "Connection timed out"
        except requests.exceptions.ConnectionError:
            return "Connection failed"
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def find_finger():
    ps = request.vars.ps
    import serial
    ser = serial.Serial('/dev/ttyACM0', 9600)
    ser.write('5')
    # ser.write(b'5') #Prefixo b necessario se estiver utilizando Python 3.X
    while True:
        line = ser.readline()
        print line
        if "FINGERFOUND" in line:
            id = line.split(",")[1]
            ser.close()
            r = requests.get("http://174.138.34.125:8081/walletapi/customer/%s/" % id)
            obj = json.loads(r.text)
            if ps == obj['password']:
                return r.text
            else:
                return 'error'
项目:lexicon    作者:AnalogJ    | 项目源码 | 文件源码
def _request(self, action='GET',  url='/', data=None, query_params=None):
        if data is None:
            data = {}
        if query_params is None:
            query_params = {}
        default_headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/json',
            'Authorization': 'Bearer {0}'.format(self.options.get('auth_token'))
        }
        if not url.startswith(self.api_endpoint):
            url = self.api_endpoint + url

        r = requests.request(action, url, params=query_params,
                             data=json.dumps(data),
                             headers=default_headers)
        r.raise_for_status()  # if the request fails for any reason, throw an error.
        if action == 'DELETE':
            return ''
        else:
            return r.json()
项目:lexicon    作者:AnalogJ    | 项目源码 | 文件源码
def _request(self, action='GET',  url='/', data=None, query_params=None):
        if data is None:
            data = {}
        if query_params is None:
            query_params = {}

        default_headers = {
            'Accept': 'application/json',
            # 'Content-Type': 'application/json',
            'API-Key': self.options['auth_token']
        }

        r = requests.request(action, self.api_endpoint + url, params=query_params,
                             data=data,
                             headers=default_headers)
        r.raise_for_status()  # if the request fails for any reason, throw an error.

        if action == 'DELETE' or action == 'PUT' or action == 'POST':
            return r.text # vultr handles succss/failure via HTTP Codes, Only GET returns a response.
        return r.json()
项目:lexicon    作者:AnalogJ    | 项目源码 | 文件源码
def _request(self, action='GET',  url='/', data=None, query_params=None):
        if data is None:
            data = {}
        if query_params is None:
            query_params = {}
        default_headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/json',
            'X-NSONE-Key': self.options['auth_token']
        }
        default_auth = None

        r = requests.request(action, self.api_endpoint + url, params=query_params,
                             data=json.dumps(data),
                             headers=default_headers,
                             auth=default_auth)
        r.raise_for_status()  # if the request fails for any reason, throw an error.
        return r.json()
项目:lexicon    作者:AnalogJ    | 项目源码 | 文件源码
def _request(self, action='GET',  url='/', data=None, query_params=None):
        if data is None:
            data = {}
        if query_params is None:
            query_params = {}
        query_params['format'] = 'json'
        query_params['_user'] = self.options['auth_username']
        query_params['_key'] = self.options['auth_token']
        default_headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        }

        r = requests.request(action, self.api_endpoint + url, params=query_params,
                             data=json.dumps(data),
                             headers=default_headers)
        r.raise_for_status()  # if the request fails for any reason, throw an error.
        return r.json()
项目:lexicon    作者:AnalogJ    | 项目源码 | 文件源码
def _request(self, action='GET', url='/', data=None, query_params=None):
        if not data:
            data = {}
        if not query_params:
            query_params = {}

        result = requests.request(action, self.api_endpoint + url,
                                  params=query_params,
                                  data=json.dumps(data),
                                  headers={
                                      'Content-Type': 'application/json',
                                      'Accept': 'application/json',
                                      # GoDaddy use a key/secret pair to authenticate
                                      'Authorization': 'sso-key {0}:{1}'.format(
                                          self.options.get('auth_key'),
                                          self.options.get('auth_secret'))
                                  })

        result.raise_for_status()
        return result.json()
项目:lexicon    作者:AnalogJ    | 项目源码 | 文件源码
def _request(self, action='GET', url='/', data=None, query_params=None):
        if data is None:
            data = {}
        if query_params is None:
            query_params = {}

        query_params['format'] = 'json'
        default_headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        }

        credentials = (self.options['auth_username'], self.options['auth_token'])
        response = requests.request(action,
                                    self.api_endpoint + url,
                                    params=query_params,
                                    data=json.dumps(data),
                                    headers=default_headers,
                                    auth=credentials)

        # if the request fails for any reason, throw an error.
        response.raise_for_status()
        return response.json()

    # Adds TTL parameter if passed as argument to lexicon.
项目:lexicon    作者:AnalogJ    | 项目源码 | 文件源码
def _request(self, action='GET', url='/', data=None, query_params=None):
        # Set default values for missing arguments
        data = data if data else {}
        query_params = query_params if query_params else {}

        # Merge authentication data into request
        if action == 'GET':
            query_params.update(self._build_authentication_data())
        else:
            data.update(self._build_authentication_data())

        # Fire request against ClouDNS API and parse result as JSON
        r = requests.request(action, self.api_endpoint + url, params=query_params, data=data)
        r.raise_for_status()
        payload = r.json()

        # Check ClouDNS specific status code and description
        if 'status' in payload and 'statusDescription' in payload and payload['status'] != 'Success':
            raise Exception('ClouDNS API request has failed: ' + payload['statusDescription'])

        # Return payload
        return payload
项目:lexicon    作者:AnalogJ    | 项目源码 | 文件源码
def _request(self, action='GET',  url='/', data=None, query_params=None):
        if data is None:
            data = {}
        if query_params is None:
            query_params = {}
        default_headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/json',
            'PddToken': self.options.get('auth_token')
        }

        if not url.startswith(self.api_endpoint):
            url = self.api_endpoint + url

        r = requests.request(action, url, params=query_params,
                             data=json.dumps(data),
                             headers=default_headers)
        r.raise_for_status()  # if the request fails for any reason, throw an error.
        if action == 'DELETE':
            return ''
        else:
            return r.json()
项目:lexicon    作者:AnalogJ    | 项目源码 | 文件源码
def _request(self, action='GET',  url='/', data=None, query_params=None):
        if data is None:
            data = {}
        if query_params is None:
            query_params = {}
        default_headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        }
        default_auth = (self.options['auth_username'], self.options['auth_token'])

        r = requests.request(action, self.api_endpoint + url, params=query_params,
                             data=json.dumps(data),
                             headers=default_headers,
                             auth=default_auth)
        r.raise_for_status()  # if the request fails for any reason, throw an error.
        return r.json()
项目:lexicon    作者:AnalogJ    | 项目源码 | 文件源码
def _request(self, action='GET',  url='', data=None, query_params=None):
        if data is None:
            data = {}
        if query_params is None:
            query_params = {}
        default_headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        }

        query_params['api_key'] = self.options.get('auth_token')
        query_params['resultFormat'] = 'JSON'
        query_params['api_action'] = url

        r = requests.request(action, self.api_endpoint, params=query_params,
                             data=json.dumps(data),
                             headers=default_headers)
        r.raise_for_status()  # if the request fails for any reason, throw an error.
        if action == 'DELETE':
            return ''
        else:
            result = r.json()
            if len(result['ERRORARRAY']) > 0:
                raise Exception('Linode api error: {0}'.format(result['ERRORARRAY']))
            return result
项目:lexicon    作者:AnalogJ    | 项目源码 | 文件源码
def _request(self, action='GET',  url='/', data=None, query_params=None):
        if data is None:
            data = {}
        if query_params is None:
            query_params = {}

        default_headers = {
            'Accept': 'application/json',
            'Content-Type': 'application/json',
        }
        default_auth = requests.auth.HTTPBasicAuth(self.options['auth_username'], self.options['auth_token'])

        r = requests.request(action, self.api_endpoint + url, params=query_params,
                             data=json.dumps(data),
                             headers=default_headers,
                             auth=default_auth)
        r.raise_for_status()  # if the request fails for any reason, throw an error.
        return r.json()
项目:lexicon    作者:AnalogJ    | 项目源码 | 文件源码
def create_record(self, type, name, content):
        request = {
                'action': 'SET', 
                'type': type, 
                'name': self.options['domain'], 
                'value': content 
        }

        if name is not None:
            request['name'] = self._full_name(name)

        if self.options.get('ttl'):
            request['ttl'] = self.options.get('ttl')

        if self.options.get('priority'):
            request['prio'] = self.options.get('priority')

        payload = self._get('/dns/dyndns.jsp', request)

        if payload.find('is_ok').text != 'OK:':
            raise Exception('An error occurred: {0}'.format(payload.find('is_ok').text))

        logger.debug('create_record: %s', True)
        return True
项目:lexicon    作者:AnalogJ    | 项目源码 | 文件源码
def delete_record(self, identifier=None, type=None, name=None, content=None):
        if identifier is not None:
            type, name, content = self._parse_identifier(identifier)

        request = {
            'action' : 'DELETE',
            'name': self.options['domain'] 
        }

        if type is not None:
            request['type'] = type
        if name is not None:
            request['name'] = self._full_name(name)
        if content is not None:
            request['value'] = content

        payload = self._get('/dns/dyndns.jsp', request)

        if payload.find('is_ok').text != 'OK:':
            raise Exception('An error occurred: {0}'.format(payload.find('is_ok').text))

        logger.debug('delete_record: %s', True)
        return True