Python requests 模块,post() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.post()

项目:meetup-facebook-bot    作者:Stark-Mountain    | 项目源码 | 文件源码
def send_message_to_facebook(access_token, user_id, message_data):
    """ Makes use of Send API:
        https://developers.facebook.com/docs/messenger-platform/send-api-reference
    """
    headers = {
        'Content-Type': 'application/json',
    }
    params = {
        'access_token': access_token,
    }
    payload = {
        'recipient': {
            'id': user_id,
        },
        'message': message_data,
    }
    url = 'https://graph.facebook.com/v2.6/me/messages'
    response = requests.post(url, headers=headers, params=params,
                             data=json.dumps(payload))
    response.raise_for_status()
    return response.json()
项目:upstox-python    作者:upstox    | 项目源码 | 文件源码
def api_call(self, url, http_method, data):

        headers = {"Content-Type" : "application/json", "x-api-key" : self.api_key,
                   "authorization" : "Bearer " + self.access_token}

        r = None

        if http_method is PyCurlVerbs.POST:
            r = requests.post(url, data=json.dumps(data), headers=headers)
        elif http_method is PyCurlVerbs.DELETE:
            r = requests.delete(url, headers=headers)
        elif http_method is PyCurlVerbs.PUT:
            r = requests.put(url, data=json.dumps(data), headers=headers)
        elif http_method is PyCurlVerbs.GET:
            r = requests.get(url, headers=headers)

        return r
项目:okta-awscli    作者:jmhale    | 项目源码 | 文件源码
def primary_auth(self):
        """ Performs primary auth against Okta """

        auth_data = {
            "username": self.username,
            "password": self.password
        }
        resp = requests.post(self.base_url+'/api/v1/authn', json=auth_data)
        resp_json = resp.json()
        if 'status' in resp_json:
            if resp_json['status'] == 'MFA_REQUIRED':
                factors_list = resp_json['_embedded']['factors']
                state_token = resp_json['stateToken']
                session_token = self.verify_mfa(factors_list, state_token)
            elif resp_json['status'] == 'SUCCESS':
                session_token = resp_json['sessionToken']
        elif resp.status_code != 200:
            print(resp_json['errorSummary'])
            exit(1)
        else:
            print(resp_json)
            exit(1)

        return session_token
项目:numerai    作者:gansanay    | 项目源码 | 文件源码
def upload_prediction(self, file_path):
        filename, signedRequest, headers, status_code = self.authorize(file_path)
        if status_code!=200:
            return status_code

        dataset_id, comp_id, status_code = self.get_current_competition()
        if status_code!=200:
            return status_code

        with open(file_path, 'rb') as fp:
            r = requests.Request('PUT', signedRequest, data=fp.read())
            prepped = r.prepare()
            s = requests.Session()
            resp = s.send(prepped)
            if resp.status_code!=200:
                return resp.status_code

        r = requests.post(self._submissions_url,
                    data={'competition_id':comp_id, 'dataset_id':dataset_id, 'filename':filename},
                    headers=headers)

        return r.status_code
项目:ISB-CGC-pipelines    作者:isb-cgc    | 项目源码 | 文件源码
def sendRequest(ip, port, route, data=None, protocol="http"):
        url = "{protocol}://{ip}:{port}{route}".format(protocol=protocol, ip=ip, port=port, route=route)

        if data is not None:
            try:
                resp = requests.post(url, data=data)

            except requests.HTTPError as e:
                raise PipelineServiceError("{reason}".format(reason=e))

        else:
            try:
                resp = requests.get(url)

            except requests.HTTPError as e:
                raise PipelineServiceError("{reason}".format(reason=e))

        return resp
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def put_comments(self, resource, comment):
        """ Post a comment on a file or URL.

        The initial idea of VirusTotal Community was that users should be able to make comments on files and URLs,
        the comments may be malware analyses, false positive flags, disinfection instructions, etc.

        Imagine you have some automatic setup that can produce interesting results related to a given sample or URL
        that you submit to VirusTotal for antivirus characterization, you might want to give visibility to your setup
        by automatically reviewing samples and URLs with the output of your automation.

        :param resource: either a md5/sha1/sha256 hash of the file you want to review or the URL itself that you want
                         to comment on.
        :param comment: the actual review, you can tag it using the "#" twitter-like syntax (e.g. #disinfection #zbot)
                        and reference users using the "@" syntax (e.g. @VirusTotalTeam).
        :return: If the comment was successfully posted the response code will be 1, 0 otherwise.
        """
        params = {'apikey': self.api_key, 'resource': resource, 'comment': comment}

        try:
            response = requests.post(self.base + 'comments/put', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
项目:cerebros_bot    作者:jslim18    | 项目源码 | 文件源码
def getESCXBalance(address): 
   try:
      payload = {
         "method": "get_balances",
         "params": {
            "filters":[{"field": "address", "op": "==", "value": address},
                       {"field": "asset", "op": "==", "value": "ESCX"}],
            "filterop": "and"
            },
          "jsonrpc":"2.0",
          "id":0
         }
      response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth)
      json_data = response.json()
      #quantity = json_data.quantity 
      return (json_data['result'].pop()['quantity']) / 100000000
   except: 
      return 0;
项目:PyWallet    作者:AndreMiras    | 项目源码 | 文件源码
def add_transaction(tx):
        """
        POST transaction to etherscan.io.
        """
        tx_hex = rlp.encode(tx).encode("hex")
        # use https://etherscan.io/pushTx to debug
        print("tx_hex:", tx_hex)
        url = 'https://api.etherscan.io/api'
        url += '?module=proxy&action=eth_sendRawTransaction'
        if ETHERSCAN_API_KEY:
            '&apikey=%' % ETHERSCAN_API_KEY
        # TODO: handle 504 timeout, 403 and other errors from etherscan
        response = requests.post(url, data={'hex': tx_hex})
        # response is like:
        # {'jsonrpc': '2.0', 'result': '0x24a8...14ea', 'id': 1}
        # or on error like this:
        # {'jsonrpc': '2.0', 'id': 1, 'error': {
        #   'message': 'Insufficient funds...', 'code': -32010, 'data': None}}
        response_json = response.json()
        print("response_json:", response_json)
        PyWalib.handle_etherscan_tx_error(response_json)
        tx_hash = response_json['result']
        # the response differs from the other responses
        return tx_hash
项目:flora    作者:Lamden    | 项目源码 | 文件源码
def register(name):
    # hit api to see if name is already registered
    if check_name(name)['status'] == 'error':
        print('{} already registered.'.format(name))
    else:
        # generate new keypair
        (pub, priv) = rsa.newkeys(512)

        if os.path.exists(KEY_LOCATION) == False:
            os.mkdir(KEY_LOCATION)

        # save to disk
        with open('{}/.key'.format(KEY_LOCATION), 'wb') as f:
            pickle.dump((pub, priv), f, pickle.HIGHEST_PROTOCOL)

        r = requests.post('{}/names'.format(API_LOCATION), data = {'name' : name, 'n' : pub.n, 'e' : pub.e})
        if r.json()['status'] == 'success':
            print('Successfully registered new name: {}'.format(name))
        else:
            print('Error registering name: {}'.format(name))
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def _make_request(self, path, cni_envs, expected_status=None):
        method = 'POST'

        address = config.CONF.cni_daemon.bind_address
        url = 'http://%s/%s' % (address, path)
        try:
            LOG.debug('Making request to CNI Daemon. %(method)s %(path)s\n'
                      '%(body)s',
                      {'method': method, 'path': url, 'body': cni_envs})
            resp = requests.post(url, json=cni_envs,
                                 headers={'Connection': 'close'})
        except requests.ConnectionError:
            LOG.exception('Looks like %s cannot be reached. Is kuryr-daemon '
                          'running?', address)
            raise
        LOG.debug('CNI Daemon returned "%(status)d %(reason)s".',
                  {'status': resp.status_code, 'reason': resp.reason})
        if expected_status and resp.status_code != expected_status:
            LOG.error('CNI daemon returned error "%(status)d %(reason)s".',
                      {'status': resp.status_code, 'reason': resp.reason})
            raise k_exc.CNIError('Got invalid status code from CNI daemon.')
        return resp
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def scan_file(self, this_file):
        """ Submit a file to be scanned by VirusTotal

        :param this_file: File to be scanned (32MB file size limit)
        :return: JSON response that contains scan_id and permalink.
        """
        params = {'apikey': self.api_key}
        try:
            if type(this_file) == str and os.path.isfile(this_file):
                files = {'file': (this_file, open(this_file, 'rb'))}
            elif isinstance(this_file, StringIO.StringIO):
                files = {'file': this_file.read()}
            else:
                files = {'file': this_file}
        except TypeError as e:
            return dict(error=e.message)

        try:
            response = requests.post(self.base + 'file/scan', files=files, params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def rescan_file(self, this_hash):
        """ Rescan a previously submitted filed or schedule an scan to be performed in the future.

        :param this_hash: a md5/sha1/sha256 hash. You can also specify a CSV list made up of a combination of any of
                          the three allowed hashes (up to 25 items), this allows you to perform a batch request with
                          one single call. Note that the file must already be present in our file store.
        :return: JSON response that contains scan_id and permalink.
        """
        params = {'apikey': self.api_key, 'resource': this_hash}

        try:
            response = requests.post(self.base + 'file/rescan', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def scan_url(self, this_url):
        """ Submit a URL to be scanned by VirusTotal.

        :param this_url: The URL that should be scanned. This parameter accepts a list of URLs (up to 4 with the
                         standard request rate) so as to perform a batch scanning request with one single call. The
                         URLs must be separated by a new line character.
        :return: JSON response that contains scan_id and permalink.
        """
        params = {'apikey': self.api_key, 'url': this_url}

        try:
            response = requests.post(self.base + 'url/scan', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def scan_file(self, this_file, notify_url=None, notify_changes_only=None):
        """ Submit a file to be scanned by VirusTotal.

        Allows you to send a file for scanning with VirusTotal. Before performing your submissions we encourage you to
        retrieve the latest report on the files, if it is recent enough you might want to save time and bandwidth by
        making use of it. File size limit is 32MB, in order to submmit files up to 200MB in size you must request a
        special upload URL.

        :param this_file: The file to be uploaded.
        :param notify_url: A URL to which a POST notification should be sent when the scan finishes.
        :param notify_changes_only: Used in conjunction with notify_url. Indicates if POST notifications should be
                                    sent only if the scan results differ from the previous analysis.
        :return: JSON response that contains scan_id and permalink.
        """
        params = {'apikey': self.api_key}
        files = {'file': (this_file, open(this_file, 'rb'))}

        try:
            response = requests.post(self.base + 'file/scan', files=files, params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def cancel_rescan_file(self, resource):
        """ Delete a previously scheduled scan.

        Deletes a scheduled file rescan task. The file rescan api allows you to schedule periodic scans of a file,
        this API call tells VirusTotal to stop rescanning a file that you have previously enqueued for recurrent
        scanning.

        :param resource: The md5/sha1/sha256 hash of the file whose dynamic behavioural report you want to retrieve.
        :return: JSON acknowledgement. In the event that the scheduled scan deletion fails for whatever reason, the
        response code will be -1.
        """
        params = {'apikey': self.api_key, 'resource': resource}

        try:
            response = requests.post(self.base + 'rescan/delete', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def put_comments(self, resource, comment):
        """ Post a comment on a file or URL.

        Allows you to place comments on URLs and files, these comments will be publicly visible in VirusTotal
        Community, under the corresponding tab in the reports for each particular item.

        Comments can range from URLs and locations where a given file was found in the wild to full reverse
        engineering reports on a given malware specimen, anything that may help other analysts in extending their
        knowledge about a particular file or URL.

        :param resource: Either an md5/sha1/sha256 hash of the file you want to review or the URL itself that you want
        to comment on.
        :param comment: The actual review, you can tag it using the "#" twitter-like syntax (e.g. #disinfection #zbot)
        and reference users using the "@" syntax (e.g. @VirusTotalTeam).
        :return: JSON response
        """
        params = {'apikey': self.api_key, 'resource': resource, 'comment': comment}

        try:
            response = requests.post(self.base + 'comments/put', params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
项目:Zoom2Youtube    作者:Welltory    | 项目源码 | 文件源码
def get_auth_code(self):
        """ Get access token for connect to youtube api """
        oauth_url = 'https://accounts.google.com/o/oauth2/token'
        data = dict(
            refresh_token=self.refresh_token,
            client_id=self.client_id,
            client_secret=self.client_secret,
            grant_type='refresh_token',
        )

        headers = {
            'Content-Type': 'application/x-www-form-urlencoded',
            'Accept': 'application/json'
        }
        response = requests.post(oauth_url, data=data, headers=headers)
        response = response.json()
        return response.get('access_token')
项目:cerebros_bot    作者:jslim18    | 项目源码 | 文件源码
def getESCXBalance(address): 
   try:
      payload = {
         "method": "get_balances",
         "params": {
            "filters":[{"field": "address", "op": "==", "value": address},
                       {"field": "asset", "op": "==", "value": "ESCX"}],
            "filterop": "and"
            },
          "jsonrpc":"2.0",
          "id":0
         }
      response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth)
      json_data = response.json()
      #quantity = json_data.quantity 
      return (json_data['result'].pop()['quantity']) / 100000000
   except: 
      return 0;
项目:kcdu    作者:learhy    | 项目源码 | 文件源码
def create_cd(col_name, type, display_name):
    url = 'https://api.kentik.com/api/v5/customdimension'
    json_template = '''
    {
        "name": "{{ column }}",
        "type": "{{ data_type }}",
        "display_name": "{{ pretty_name }}"
    }
    '''
    t = Template(json_template)
    data = json.loads(t.render(column = col_name, data_type = type, pretty_name = display_name))    
    response = requests.post(url, headers=headers, data=data)
    if response.status_code != 201:
        print("Unable to create custom dimension column. Exiting.")
        print("Status code: {}").format(response.status_code)
        print("Error message: {}").format(response.json()['error'])
        exit()
    else:
        print("Custom dimension \"{}\" created as id: {}").format(display_name, \
            response.json()['customDimension']['id'])
        return(response.json()['customDimension']['id'])
项目:docklet    作者:unias    | 项目源码 | 文件源码
def post_to_all(self, url = '/', data={}):
        if (url == '/'):
            res = []
            for masterip in masterips:
                try:
                    requests.post("http://"+getip(masterip)+":"+master_port+"/isalive/",data=data)
                except Exception as e:
                    logger.debug(e)
                    continue
                res.append(masterip)
            return res
        data = dict(data)
        data['token'] = session['token']
        logger.info("Docklet Request: user = %s data = %s, url = %s"%(session['username'], data, url))
        result = {}
        for masterip in masterips:
            try:
                res = requests.post("http://"+getip(masterip)+":"+master_port+url,data=data).json()
            except Exception as e:
                logger.debug(e)
                continue
            result[masterip] = res
            logger.debug("get result from " + getip(masterip))

        return result
项目:wait4disney    作者:gtt116    | 项目源码 | 文件源码
def get_token():
    headers = {
        'Host': 'authorization.shanghaidisneyresort.com',
        'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8',
        'X-NewRelic-ID': 'Uw4BWVZSGwICU1VRAgkH',
        'X-Conversation-Id': 'shdrA5320488-E03F-4795-A818-286C658EEBB6',
        'Accept': 'application/json',
        'User-Agent': 'SHDR/4.1.1 (iPhone; iOS 9.3.5; Scale/2.00)',
        'Accept-Language': 'zh-Hans-CN;q=1',
    }

    data = 'assertion_type=public&client_id=DPRD-SHDR.MOBILE.IOS-PROD&client_secret=&grant_type=assertion'

    resp = requests.post('https://authorization.shanghaidisneyresort.com/curoauth/v1/token',
                         headers=headers, data=data)
    resp.raise_for_status()
    token = resp.json()['access_token']
    return token
项目:Home-Assistant    作者:jmart518    | 项目源码 | 文件源码
def login(self):
        """Log in to the MyQ service."""

        params = {
            'username': self.username,
            'password': self.password
        }

        login = requests.post(
            'https://{host_uri}/{login_endpoint}'.format(
                host_uri=HOST_URI,
                login_endpoint=self.LOGIN_ENDPOINT),
                json=params,
                headers={
                    'MyQApplicationId': self.brand[APP_ID],
                    'User-Agent': self.USERAGENT
                }
        )

        auth = login.json()
        self.security_token = auth['SecurityToken']
        self._logger.debug('Logged in to MyQ API')
        return True
项目:placebo    作者:huseyinyilmaz    | 项目源码 | 文件源码
def test_last_request(self, mock):
        response = requests.post(GetMock.url,
                                 headers={'custom-header': 'huseyin'},
                                 data={'name': 'huseyin'})
        self.assertEqual(response.status_code, 200)
        last_request = mock.last_request
        # Test if last request has expected values.
        self.assertEqual(last_request.url, GetMock.url)

        body = last_request.body
        # In python 3 httpretty backend returns binary string for body.
        # So we are decoding it back to unicode to test.
        if isinstance(body, six.binary_type):
            body = body.decode('utf-8')
        self.assertEqual(body, 'name=huseyin')
        self.assertEqual(last_request.headers.get('custom-header'),
                         'huseyin')
        # Make sure that class's last_request is same as instances.
        self.assertIs(GetMock.last_request, mock.last_request)
项目:BioDownloader    作者:biomadeira    | 项目源码 | 文件源码
def test_fetch_from_url_or_retry_post_json(self):
        # mocked requests
        identifier = "1csb, 2pah"
        base_url = c.http_pdbe
        endpoint_url = "api/pdb/entry/summary/"
        response = response_mocker(kwargs={}, base_url=base_url,
                                   endpoint_url=endpoint_url,
                                   content_type='application/octet-stream',
                                   post=True, data=identifier)
        self.fetch_from_url_or_retry = MagicMock(return_value=response)
        url = base_url + endpoint_url + identifier
        r = self.fetch_from_url_or_retry(url, json=True, post=True, data=identifier,
                                         header={'application/octet-stream'},
                                         retry_in=None, wait=0,
                                         n_retries=10, stream=False).json()
        self.assertEqual(r, json.loads('{"data": "some json formatted output"}'))
项目:django_pipedrive    作者:MasAval    | 项目源码 | 文件源码
def call_api(self, method, endpoint, payload):
        url = urlparse.urljoin(self.api_base_url, endpoint)

        if method == 'POST':
            response = requests.post(url, data=payload)
        elif method == 'delete':
            response = requests.delete(url)
        elif method == 'put':
            response = requests.put(url, data=payload)
        else:
            if self.api_key:
                payload.update({'api_token': self.api_key})
            response = requests.get(url, params=payload)

        content = json.loads(response.content)

        return content
项目:okta-awscli    作者:jmhale    | 项目源码 | 文件源码
def verify_single_factor(self, factor_id, state_token):
        """ Verifies a single MFA factor """
        factor_answer = raw_input('Enter MFA token: ')
        req_data = {
            "stateToken": state_token,
            "answer": factor_answer
        }
        post_url = "%s/api/v1/authn/factors/%s/verify" % (self.base_url, factor_id)
        resp = requests.post(post_url, json=req_data)
        resp_json = resp.json()
        if 'status' in resp_json:
            if resp_json['status'] == "SUCCESS":
                return resp_json['sessionToken']
        elif resp.status_code != 200:
            print(resp_json['errorSummary'])
            exit(1)
        else:
            print(resp_json)
            exit(1)
项目:thermostat-sensor    作者:ankurp    | 项目源码 | 文件源码
def read_and_report(force_alert=False):
  h, t = dht.read_retry(dht.DHT22, TEMP_SENSOR_PIN)
  data = {
    "reading": {
      "humidity": h,
      "temperature": t,
      "sensor_id": mac
    }
  }

  if force_alert:  
    data['reading']['force_alert'] = True

  print("Sending reading: {}".format(data))
  try:
    response = requests.post(
      "http://{DOMAIN}/readings".format(DOMAIN=SERVER_DOMAIN),
      data=json.dumps(data), headers=headers)
    print("Reading Sent")
  except requests.exceptions.RequestException as e:
    print("Error sending reading: {}".format(e))
项目:bitrader    作者:jr-minnaar    | 项目源码 | 文件源码
def api_request(self, call, params, kind='auth', http_call='get'):
        """
        General API request. Generally, use the convenience functions below
        :param kind: the type of request to make. 'auth' makes an authenticated call; 'basic' is unauthenticated
        :param call: the API call to make
        :param params: a dict of query parameters
        :return: a json response, a BitXAPIError is thrown if the api returns with an error
        """
        url = self.construct_url(call)
        auth = self.auth if kind == 'auth' else None
        if http_call == 'get':
            response = requests.get(url, params, headers=self.headers, auth=auth)
        elif http_call == 'post':
            response = requests.post(url, data = params, headers=self.headers, auth=auth)
        else:
            raise ValueError('Invalid http_call parameter')
        try:
            result = response.json()
        except ValueError:
            result = {'error': 'No JSON content returned'}
        if response.status_code != 200 or 'error' in result:
            raise BitXAPIError(response)
        else:
            return result
项目:bitrader    作者:jr-minnaar    | 项目源码 | 文件源码
def create_limit_order(self, order_type, volume, price):
        """
        Create a new limit order
        :param order_type: 'buy' or 'sell'
        :param volume: the volume, in BTC
        :param price: the ZAR price per bitcoin
        :return: the order id
        """
        data = {
            'pair': self.pair,
            'type': 'BID' if order_type == 'buy' else 'ASK',
            'volume': str(volume),
            'price': str(price)

        }
        result = self.api_request('postorder', params=data, http_call='post')
        return result
项目:Plamber    作者:OlegKlimenko    | 项目源码 | 文件源码
def validate_captcha(captcha_post):
    """
    Validates the Google re-captcha data.

    :param str captcha_post: The re-captcha post data
    :return bool:
    """
    validated = False

    data = {
        'secret': settings.GOOGLE_RECAPTCHA_SECRET_KEY,
        'response': captcha_post
    }

    response = requests.post('https://www.google.com/recaptcha/api/siteverify', data=data, verify=False).json()
    if response['success']:
        validated = True

    return validated
项目:BackendAllStars    作者:belatrix    | 项目源码 | 文件源码
def send_message_android(destination, message):
    headers = {
        'Authorization': 'key=' + settings.FIREBASE_SERVER_KEY,
        'Content - Type': 'application/json'
    }
    payload = {
        "to": destination,
        "data": {
            "title": config.TITLE_PUSH_NOTIFICATION,
            "detail": message
        }
    }
    request = requests.post(
        settings.FIREBASE_API_URL,
        json=payload,
        headers=headers
    )
    print(request.text)
项目:BackendAllStars    作者:belatrix    | 项目源码 | 文件源码
def send_message_ios(destination, message):
    headers = {
        'Authorization': 'key=' + settings.FIREBASE_SERVER_KEY,
        'Content - Type': 'application/json'
    }
    payload = {
        "to": destination,
        "priority": "high",
        "badge": 0,
        "notification": {
            "title": config.TITLE_PUSH_NOTIFICATION,
            "text": message,
            "sound": "default",
        }
    }
    request = requests.post(
        settings.FIREBASE_API_URL,
        json=payload,
        headers=headers
    )
    print(request.text)
项目:cbas    作者:ImmobilienScout24    | 项目源码 | 文件源码
def obtain_access_token(config, password):
    info("Will now attempt to obtain an JWT...")
    auth_request_data = {'client_id': 'jumpauth',
                         'username': config.username,
                         'password': password,
                         'grant_type': 'password'}
    auth_url = get_auth_url(config)
    auth_response = requests.post(auth_url, auth_request_data)

    if auth_response.status_code not in [200, 201]:
        log.info("Authentication failed: {0}".
                 format(auth_response.json().get("error")))
        auth_response.raise_for_status()
    else:
        log.info("Authentication OK!")

    # TODO bail out if there was no access token in the answer
    access_token = auth_response.json()['access_token']
    info("Access token was received.")
    verbose("Access token is:\n'{0}'".format(access_token))
    return access_token
项目:Netkeeper    作者:1941474711    | 项目源码 | 文件源码
def authenticate(username, password):
    global Cookies, Host
    try:
        postData = "password=" + getPasswordEnc(password) + "&clientType=android&username=" + \
                   username + "&key=" + getSessionEnc(AccessToken) + "&code=8&clientip=" + LocalIpAddress
        header['Content-Length'] = '219'
        url = 'http://' + Host + '/wf.do'
        resp = requests.post(url=url, headers=header, cookies=Cookies, data=postData)
        resp.encoding = "GBK"
        print resp.request.headers
        text = resp.text
        print text
        if 'auth00' in text > 0:
            print "????."
            return True
        else:
            print "????[" + getErrorMsg(resp) + "]"
            return False
    except Exception as e:
        print e
        print "(6/10) ????????"
        return False
项目:umapi-client.py    作者:adobe-apiplatform    | 项目源码 | 文件源码
def __call__(self):
        headers = {
            "Content-Type": "application/x-www-form-urlencoded",
            "Cache-Control": "no-cache",
        }
        body = urlparse.urlencode({
            "client_id": self.api_key,
            "client_secret": self.client_secret,
            "jwt_token": self.jwt_token
        })

        r = requests.post(self.endpoint, headers=headers, data=body)
        if r.status_code != 200:
            raise RuntimeError("Unable to authorize against {}:\n"
                               "Response Code: {:d}, Response Text: {}\n"
                               "Response Headers: {}]".format(self.endpoint, r.status_code, r.text, r.headers))

        self.set_expiry(r.json()['expires_in'])

        return r.json()['access_token']
项目:django-zerodowntime    作者:rentlytics    | 项目源码 | 文件源码
def nightly_build(ctx, project_name, branch_name, circle_token):
    url = 'https://circleci.com/api/v1/project/{}/tree/{}?circle-token={}'.format(
        project_name,
        branch_name,
        circle_token
    )

    headers = {
        'Accept': 'application/json',
        'Content-Type': 'application/json',
    }

    body = {
        'build_parameters': {
            'RUN_NIGHTLY_BUILD': "true",
        }
    }

    response = requests.post(url, json=body, headers=headers)

    print("Nightly Build queued on CircleCI: http_status={}, response={}".format(
        response.status_code,
        response.json()
    ))
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def start_job(self, job_id):
        """
        Request to start a job

        :param job_id: the id of the job to start
        :return: the response obj:
                 {
                    result: string 'success' or 'already started'
                 }
        """
        # endpoint /v1/jobs/{job_id}/event
        endpoint = '{0}{1}/event'.format(self.endpoint, job_id)
        doc = {"start": None}
        r = requests.post(endpoint,
                          headers=self.headers,
                          data=json.dumps(doc),
                          verify=self.verify)
        if r.status_code != 202:
            raise exceptions.ApiClientException(r)
        return r.json()
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def stop_job(self, job_id):
        """
        Request to stop a job

        :param job_id: the id of the job to start
        :return: the response obj:
                 {
                    result: string 'success' or 'already stopped'
                 }
        """
        # endpoint /v1/jobs/{job_id}/event
        endpoint = '{0}{1}/event'.format(self.endpoint, job_id)
        doc = {"stop": None}
        r = requests.post(endpoint,
                          headers=self.headers,
                          data=json.dumps(doc),
                          verify=self.verify)
        if r.status_code != 202:
            raise exceptions.ApiClientException(r)
        return r.json()
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def abort_job(self, job_id):
        """
        Request to abort a job

        :param job_id: the id of the job to start
        :return: the response obj:
                 {
                    result: string 'success' or 'already stopped'
                 }
        """
        # endpoint /v1/jobs/{job_id}/event
        endpoint = '{0}{1}/event'.format(self.endpoint, job_id)
        doc = {"abort": None}
        r = requests.post(endpoint,
                          headers=self.headers,
                          data=json.dumps(doc),
                          verify=self.verify)
        if r.status_code != 202:
            raise exceptions.ApiClientException(r)
        return r.json()
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def end_session(self, session_id, job_id, session_tag, result):
        """
        Informs the freezer service that the job has ended.
        Provides information about the job's result and the session tag

        :param session_id:
        :param job_id:
        :param session_tag:
        :param result:
        :return:
        """
        # endpoint /v1/sessions/{sessions_id}/action
        endpoint = '{0}{1}/action'.format(self.endpoint, session_id)
        doc = {"end": {
            "job_id": job_id,
            "current_tag": session_tag,
            "result": result
        }}
        r = requests.post(endpoint,
                          headers=self.headers,
                          data=json.dumps(doc),
                          verify=self.verify)
        if r.status_code != 202:
            raise exceptions.ApiClientException(r)
        return r.json()
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def stop_job(self, job_id):
        """
        Request to stop a job

        :param job_id: the id of the job to start
        :return: the response obj:
                 {
                    result: string 'success' or 'already stopped'
                 }
        """
        # endpoint /v1/jobs/{job_id}/event
        endpoint = '{0}{1}/event'.format(self.endpoint, job_id)
        doc = {"stop": None}
        r = requests.post(endpoint,
                          headers=self.headers,
                          data=json.dumps(doc),
                          verify=self.verify)
        if r.status_code != 202:
            raise exceptions.ApiClientException(r)
        return r.json()
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def abort_job(self, job_id):
        """
        Request to abort a job

        :param job_id: the id of the job to start
        :return: the response obj:
                 {
                    result: string 'success' or 'already stopped'
                 }
        """
        # endpoint /v1/jobs/{job_id}/event
        endpoint = '{0}{1}/event'.format(self.endpoint, job_id)
        doc = {"abort": None}
        r = requests.post(endpoint,
                          headers=self.headers,
                          data=json.dumps(doc),
                          verify=self.verify)
        if r.status_code != 202:
            raise exceptions.ApiClientException(r)
        return r.json()
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def end_session(self, session_id, job_id, session_tag, result):
        """
        Informs the freezer service that the job has ended.
        Provides information about the job's result and the session tag

        :param session_id:
        :param job_id:
        :param session_tag:
        :param result:
        :return:
        """
        # endpoint /v1/sessions/{sessions_id}/action
        endpoint = '{0}{1}/action'.format(self.endpoint, session_id)
        doc = {"end": {
            "job_id": job_id,
            "current_tag": session_tag,
            "result": result
        }}
        r = requests.post(endpoint,
                          headers=self.headers,
                          data=json.dumps(doc),
                          verify=self.verify)
        if r.status_code != 202:
            raise exceptions.ApiClientException(r)
        return r.json()
项目:boss    作者:kabirbaidhya    | 项目源码 | 文件源码
def send(notif_type, **params):
    ''' Send slack notifications. '''
    url = config()['base_url'] + config()['endpoint']

    (text, color) = notification.get(
        notif_type,
        config=get_config(),
        notif_config=config(),
        create_link=create_link,
        **params
    )

    payload = {
        'attachments': [
            {
                'color': color,
                'text': text
            }
        ]
    }

    requests.post(url, json=payload)
项目:boss    作者:kabirbaidhya    | 项目源码 | 文件源码
def send(notif_type, **params):
    ''' Send hipchat notifications. '''

    url = API_BASE_URL.format(
        company_name=config()['company_name'],
        room_id=config()['room_id'],
        auth_token=config()['auth_token']
    )

    (text, color) = notification.get(
        notif_type,
        config=get_config(),
        notif_config=config(),
        create_link=create_link,
        **params
    )

    payload = {
        'color': color,
        'message': text,
        'notify': config()['notify'],
        'message_format': 'html'
    }

    requests.post(url, json=payload)
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def create_logset(logset_name=None, params=None):
    """
    Add a new logset to the current account.
    If a filename is given, load the contents of the file
    as json parameters for the request.
    If a name is given, create a new logset with the given name
    """
    if params is not None:
        request_params = params
    else:
        request_params = {
            'logset': {
                'name': logset_name
            }
        }

    headers = api_utils.generate_headers('rw')

    try:
        response = requests.post(_url()[1], json=request_params, headers=headers)
        handle_response(response, 'Creating logset failed.\n', 201)
    except requests.exceptions.RequestException as error:
        sys.stderr.write(error)
        sys.exit(1)
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def create(payload):
    """
    Create an api key with the provided ID
    """
    action, url = _url()

    headers = api_utils.generate_headers('owner', method='POST', body=json.dumps(payload),
                                         action=action)

    try:
        response = requests.post(url, headers=headers, json=payload)
        if response_utils.response_error(response):
            sys.stderr.write('Create api key failed.')
            sys.exit(1)
        elif response.status_code == 201:
            handle_api_key_response(response)
    except requests.exceptions.RequestException as error:
        sys.stderr.write(error)
        sys.exit(1)
项目:gql    作者:graphql-python    | 项目源码 | 文件源码
def execute(self, document, variable_values=None, timeout=None):
        query_str = print_ast(document)
        payload = {
            'query': query_str,
            'variables': variable_values or {}
        }

        data_key = 'json' if self.use_json else 'data'
        post_args = {
            'headers': self.headers,
            'auth': self.auth,
            'cookies': self.cookies,
            'timeout': timeout or self.default_timeout,
            data_key: payload
        }
        request = requests.post(self.url, **post_args)
        request.raise_for_status()

        result = request.json()
        assert 'errors' in result or 'data' in result, 'Received non-compatible response "{}"'.format(result)
        return ExecutionResult(
            errors=result.get('errors'),
            data=result.get('data')
        )
项目:spotify-connect-scrobbler    作者:jeschkies    | 项目源码 | 文件源码
def refresh_access_token(self, refresh_token):
        """ Returns the access token for Spotify Web API using a refresh token.

        Args:
            refresh_token (string): The token has been returned by the access
            token request.

        Returns:
            SpotifyCredentials: The parsed response from Spotify.
        """
        # Get new auth token
        payload = {
            'grant_type': 'refresh_token',
            'refresh_token': refresh_token
        }
        headers = self._make_authorization_headers()
        return requests.post(
            'https://accounts.spotify.com/api/token',
            data=payload,
            headers=headers
        ).json()
项目:spotify-connect-scrobbler    作者:jeschkies    | 项目源码 | 文件源码
def request_access_token(self, token):
        """ Request access token from Last.fm.

        Args:
            token (string): Token from redirect.

        Return:
            dict: Response from get session call.
        """
        payload = {
            'api_key': self.__key,
            'method': 'auth.getSession',
            'token': token
        }
        payload['api_sig'] = self.sign(payload)
        payload['format'] = 'json'
        response = requests.post(
            'https://ws.audioscrobbler.com/2.0/', params=payload).json()

        return LastfmCredentials(response['session']['key'])