Python requests 模块,put() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.put()

项目:upstox-python    作者:upstox    | 项目源码 | 文件源码
def api_call(self, url, http_method, data):

        headers = {"Content-Type" : "application/json", "x-api-key" : self.api_key,
                   "authorization" : "Bearer " + self.access_token}

        r = None

        if http_method is PyCurlVerbs.POST:
            r = requests.post(url, data=json.dumps(data), headers=headers)
        elif http_method is PyCurlVerbs.DELETE:
            r = requests.delete(url, headers=headers)
        elif http_method is PyCurlVerbs.PUT:
            r = requests.put(url, data=json.dumps(data), headers=headers)
        elif http_method is PyCurlVerbs.GET:
            r = requests.get(url, headers=headers)

        return r
项目:aws-waf-security-automation    作者:cerbo    | 项目源码 | 文件源码
def send_response(event, context, responseStatus, responseData):
    responseBody = {'Status': responseStatus,
                    'Reason': 'See the details in CloudWatch Log Stream: ' + context.log_stream_name,
                    'PhysicalResourceId': context.log_stream_name,
                    'StackId': event['StackId'],
                    'RequestId': event['RequestId'],
                    'LogicalResourceId': event['LogicalResourceId'],
                    'Data': responseData}

    req = None
    try:
        req = requests.put(event['ResponseURL'], data=json.dumps(responseBody))

        if req.status_code != 200:
            print(req.text)
            raise Exception('Recieved non 200 response while sending response to CFN.')
        return

    except requests.exceptions.RequestException as e:
        if req != None:
            print(req.text)
        print(e)
        raise
项目:Home-Assistant    作者:jmart518    | 项目源码 | 文件源码
def set_state(self, device_id, state):
        """Set device state."""
        payload = {
            'attributeName': 'desireddoorstate',
            'myQDeviceId': device_id,
            'AttributeValue': state,
        }
        device_action = requests.put(
            'https://{host_uri}/{device_set_endpoint}'.format(
                host_uri=HOST_URI,
                device_set_endpoint=self.DEVICE_SET_ENDPOINT),
                data=payload,
                headers={
                    'MyQApplicationId': self.brand[APP_ID],
                    'SecurityToken': self.security_token,
                    'User-Agent': self.USERAGENT
                }
        )

        return device_action.status_code == 200
项目:django_pipedrive    作者:MasAval    | 项目源码 | 文件源码
def call_api(self, method, endpoint, payload):
        url = urlparse.urljoin(self.api_base_url, endpoint)

        if method == 'POST':
            response = requests.post(url, data=payload)
        elif method == 'delete':
            response = requests.delete(url)
        elif method == 'put':
            response = requests.put(url, data=payload)
        else:
            if self.api_key:
                payload.update({'api_token': self.api_key})
            response = requests.get(url, params=payload)

        content = json.loads(response.content)

        return content
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def replace_log(log_id, params):
    """
    Replace the given log with the details provided
    """
    headers = api_utils.generate_headers('rw')

    try:
        response = requests.put(_url(('logs', log_id))[1], json=params, headers=headers)
        if response_utils.response_error(response):
            sys.stderr.write('Update log failed.\n')
            sys.exit(1)
        elif response.status_code == 200:
            sys.stdout.write('Log: %s updated to:\n' % log_id)
            api_utils.pretty_print_string_as_json(response.text)
    except requests.exceptions.RequestException as error:
        sys.stderr.write(error)
        sys.exit(1)
项目:aws-tailor    作者:alanwill    | 项目源码 | 文件源码
def cfn_response(response_status, response_data, reason, physical_resource_id, event, context):
    # Put together the response to be sent to the S3 pre-signed URL

    if reason:
        reason = reason
    else:
        reason = 'See the details in CloudWatch Log Stream: ' + context.log_stream_name

    responseBody = {'Status': response_status,
                    'Reason': 'See the details in CloudWatch Log Stream: ' + context.log_stream_name,
                    'PhysicalResourceId': physical_resource_id,
                    'StackId': event['StackId'],
                    'RequestId': event['RequestId'],
                    'LogicalResourceId': event['LogicalResourceId'],
                    'Data': response_data
                    }
    print('Response Body:', responseBody)
    response = requests.put(event['ResponseURL'], data=json.dumps(responseBody))
    if response.status_code != 200:
        print(response.text)
        raise Exception('Response error received.')
    return
项目:mesos-autoscaler    作者:prudhvitella    | 项目源码 | 文件源码
def __requests_put(self, uri, data, headers):
        if self.mesos_user:
            # print "User " + self.mesos_user + "Pass " + self.mesos_pass
            # print "URI " + self.mesos_host + uri
            response = requests.put(self.mesos_host + uri, data,
                                    headers=headers,
                                    verify=False,
                                    auth=(self.mesos_user,
                                          self.mesos_pass))
        else:
            response = requests.put(self.mesos_host + uri,
                                    data,
                                    headers=headers,
                                    verify=False)
            # print "Response:" + str(response)
        return response.json()
项目:mesos-autoscaler    作者:prudhvitella    | 项目源码 | 文件源码
def __requests_put(self, uri, data, headers):
        if self.marathon_user:
            # print "User " + self.marathon_user + "Pass " + self.marathon_pass
            # print "URI " + self.marathon_host + uri
            response = requests.put(self.marathon_host + uri, data,
                                    headers=headers,
                                    verify=False,
                                    auth=(self.marathon_user,
                                          self.marathon_pass))
        else:
            response = requests.put(self.marathon_host + uri,
                                    data,
                                    headers=headers,
                                    verify=False)
            # print "Response:" + str(response)
        return response
项目:aws-database-migration-tools    作者:awslabs    | 项目源码 | 文件源码
def sendResponse(event, context, responseStatus, responseData):
    responseBody = {'Status': responseStatus,
                    'Reason': 'See the details in CloudWatch Log Stream: ' + context.log_stream_name,
                    'PhysicalResourceId': context.log_stream_name,
                    'StackId': event['StackId'],
                    'RequestId': event['RequestId'],
                    'LogicalResourceId': event['LogicalResourceId'],
                    'Data': responseData}
    print 'RESPONSE BODY:n' + json.dumps(responseBody)
    try:
        req = requests.put(event['ResponseURL'], data=json.dumps(responseBody))
        if req.status_code != 200:
            print req.text
            raise Exception('Recieved non 200 response while sending response to CFN.')
        return
    except requests.exceptions.RequestException as e:
        print e
        raise
项目:craton    作者:openstack    | 项目源码 | 文件源码
def create_cloud(self, cloud, data=None):
        cloud_url = self.url + "/clouds"
        payload = {"name": cloud}

        print("Creating cloud entry for %s with data %s" % (payload, data))
        resp = requests.post(cloud_url, headers=self.headers,
                             data=json.dumps(payload), verify=False)
        if resp.status_code != 201:
            raise Exception(resp.text)

        self.cloud = resp.json()
        if data:
            reg_id = self.cloud["id"]
            cloud_data_url = self.url + "/clouds/%s/variables" % reg_id
            resp = requests.put(cloud_data_url, headers=self.headers,
                                data=json.dumps(data), verify=False)
            if resp.status_code != 200:
                print(resp.text)
项目:craton    作者:openstack    | 项目源码 | 文件源码
def create_region(self, region, data=None):
        region_url = self.url + "/regions"
        payload = {"name": region, "cloud_id": self.cloud.get("id")}

        print("Creating region entry for %s with data %s" % (payload, data))
        resp = requests.post(region_url, headers=self.headers,
                             data=json.dumps(payload), verify=False)
        if resp.status_code != 201:
            raise Exception(resp.text)

        self.region = resp.json()
        if data:
            reg_id = self.region["id"]
            region_data_url = self.url + "/regions/%s/variables" % reg_id
            resp = requests.put(region_data_url, headers=self.headers,
                                data=json.dumps(data), verify=False)
            if resp.status_code != 200:
                print(resp.text)
项目:craton    作者:openstack    | 项目源码 | 文件源码
def create_cell(self, cell, data=None):
        region_url = self.url + "/cells"
        payload = {"region_id": self.region.get("id"),
                   "cloud_id": self.cloud.get("id"), "name": cell}

        print("Creating cell entry %s with data %s" % (payload, data))
        resp = requests.post(region_url, headers=self.headers,
                             data=json.dumps(payload), verify=False)
        if resp.status_code != 201:
            raise Exception(resp.text)

        self.cell = resp.json()
        if data:
            c_id = resp.json()["id"]
            region_data_url = self.url + "/cells/%s/variables" % c_id
            resp = requests.put(region_data_url, headers=self.headers,
                                data=json.dumps(data), verify=False)
            if resp.status_code != 200:
                print(resp.text)
项目:dactyl    作者:ripple    | 项目源码 | 文件源码
def upload_es_json(es_json, es_index, es_base, id, doc_type='article'):
    """Uploads a document to the ElasticSearch index."""

    # Using requests
    url = "{es_base}/{index}/{doc_type}/{id}".format(
        es_base=es_base,
        index=es_index.lower(), # ES index names must be lowercase
        doc_type=doc_type,
        id=id,
    )
    headers = {"Content-Type": "application/json"}
    logger.info("Uploading to ES: PUT %s" % url)
    r = requests.put(url, headers=headers, data=es_json)
    if r.status_code >= 400:
        recoverable_error("ES upload failed with error: '%s'" % r.text,
                config.bypass_errors)
项目:berlin-devfest-2016-backend    作者:giansegato    | 项目源码 | 文件源码
def predict(path):
    global n
    global model

    parentNode = os.path.split(path)[0]
    j = requests.get(FIREBASE_URL + parentNode + FIREBASE_ENDING_URL).json()
    sample = np.asarray([j[x] for x in j.keys() if "x_" in x]).reshape(1, -1)

    if sample.shape[1] == n:
        j['y_predicted'] = model.predict_proba(sample)[0][1]
        j['action'] = model.predict(sample)[0]
        r = requests.put(FIREBASE_URL + parentNode + FIREBASE_ENDING_URL, data = json.dumps(j))

        print "Features: ", sample
        print "Prediction: ", model.predict_proba(sample) 
    else:
        print "Couldn't make prediction, wrong dimension: ", sample.shape[1], " vs ", n

# 2nd: SSEClient that prints stuff
项目:deeppavlov    作者:deepmipt    | 项目源码 | 文件源码
def upload_model_to_nexus(project):
    """
    Use 'pyb -P model_name="<model_name>" upload_model_to_nexus' to upload archived model to Nexus repository
    of the lab. If model_name == 'deeppavlov_docs', then documentation from build/docs will be archived and uploaded.
    archive_model task will be executed before.
    """
    import requests, datetime
    os.chdir('build')
    model_name = project.get_property('model_name')
    file_name = model_name + '_' + datetime.date.today().strftime("%y%m%d") + '.tar.gz'
    url = 'http://share.ipavlov.mipt.ru:8080/repository/'
    url += 'docs/' if model_name == 'deeppavlov_docs' else 'models/'
    headers = {'Content-Type': 'application/binary'}
    with open(file_name, 'rb') as artifact:
        requests.put(url + model_name + '/' + file_name, headers=headers,
                     data=artifact, auth=('jenkins', 'jenkins123'))
项目:provisioning-lib    作者:meraki    | 项目源码 | 文件源码
def renameorg(apikey, orgid, neworgname, suppressprint=False):
    __hasorgaccess(apikey, orgid)
    calltype = 'Organization Rename'
    puturl = '{0}/organizations/{1}'.format(str(base_url), str(orgid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }
    putdata = {
        'name': format(str(neworgname))
    }
    dashboard = requests.put(puturl, data=json.dumps(putdata), headers=headers)
    #
    # Call return handler function to parse Dashboard response
    #
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result


# Create a new organization
# https://dashboard.meraki.com/api_docs#create-a-new-organization
项目:provisioning-lib    作者:meraki    | 项目源码 | 文件源码
def updatecontact(apikey, networkid, contactid, name, suppressprint=False):
    calltype = 'Phone Contact'
    puturl = '{0}/networks/{1}/phoneContacts/{2}'.format(str(base_url), str(networkid), str(contactid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }

    putdata = {'name': name}

    dashboard = requests.put(puturl, data=json.dumps(putdata), headers=headers)
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result


# Delete a phone contact. Google Directory contacts cannot be removed.
# https://dashboard.meraki.com/api_docs#delete-a-phone-contact
项目:provisioning-lib    作者:meraki    | 项目源码 | 文件源码
def updatevlan(apikey, networkid, vlanid, vlanname=None, mxip=None, subnetip=None, suppressprint=False):
    calltype = 'VLAN'
    puturl = '{0}/networks/{1}/vlans/{2}'.format(str(base_url), str(networkid), str(vlanid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }
    putdata = {}
    if vlanname is not None:
        putdata['name'] = format(str(vlanname))
    if mxip is not None:
        putdata['applianceIp'] = format(str(mxip))
    if subnetip is not None:
        putdata['subnet'] = format(str(subnetip))

    putdata = json.dumps(putdata)
    dashboard = requests.put(puturl, data=putdata, headers=headers)
    #
    # Call return handler function to parse Dashboard response
    #
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result
项目:provisioning-lib    作者:meraki    | 项目源码 | 文件源码
def renameorg(apikey, orgid, neworgname, suppressprint=False):
    __hasorgaccess(apikey, orgid)
    calltype = 'Organization Rename'
    puturl = '{0}/organizations/{1}'.format(str(base_url), str(orgid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }
    putdata = {
        'name': format(str(neworgname))
    }
    dashboard = requests.put(puturl, data=json.dumps(putdata), headers=headers)
    #
    # Call return handler function to parse Dashboard response
    #
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result
项目:FileNetPEAPI    作者:wandss    | 项目源码 | 文件源码
def lockTask(self, task):

        """Receives a task dictionary, obtainned with getTasks() method,
        and locks the task so other users can't access this task at same time.
        Usage:
        >>> pe.lockTask(task)
        """

        locked = requests.get(self.client.baseurl
                              +task['stepElement'],
                              auth = self.client.cred)
        eTag = locked.headers['ETag']
        locked = requests.put(self.client.baseurl
                              + task['stepElement'],
                              auth = self.client.cred,
                              params={'action':'lock',
                                      'If-Match':eTag}
                              )
项目:FileNetPEAPI    作者:wandss    | 项目源码 | 文件源码
def endTask(self, task, comment=None):        
        """Receives a task and finishes it, finishing the workflow itself or
        moving to the next step in the task. Is also possible to create a
        comment before ending the task.
        Usage:
        >>> pe.endTask(task) #or
        >>> pe.endTask(task, u'Completed the task!')

        """
        params = {'action':'dispatch'}
        step = self.getStep(task)
        if step.get('systemProperties').get('responses')\
           and not step.get('systemProperties').get('selectedResponse'):
            return "This task needs to be updated. Check the updateTask method."
        else:
            params['selectedResponse'] = 1           

        if comment:
            task = self.saveAndUnlockTask(task, comment)

        lock = self.lockTask(task)
        params['If-Match'] = task['ETag']
        dispatched = requests.put(self.client.baseurl + task['stepElement'],
                                  auth = self.client.cred,
                                  params=params)
项目:warranty_check    作者:device42    | 项目源码 | 文件源码
def upload_lifecycle(self, data):
        path = '/api/1.0/lifecycle_event/'
        url = self.url + path
        payload = data
        headers = {
            'Authorization': 'Basic ' + base64.b64encode(self.username + ':' + self.password),
            'Content-Type': 'application/x-www-form-urlencoded'
        }

        r = requests.put(url, data=payload, headers=headers, verify=False)

        if DEBUG:
            print '\t[+] Posting data: %s' % str(payload)
            print '\t[*] Status code: %d' % r.status_code
            print '\t[*] Response: %s' % str(r.text)
        return r.json()
项目:sqlalchemy-media    作者:pylover    | 项目源码 | 文件源码
def _upload_file(self, url: str, data: str, content_type: str,
                     rrs: bool = False):
        ensure_aws4auth()

        auth = AWS4Auth(self.access_key, self.secret_key, self.region, 's3')
        if rrs:
            storage_class = 'REDUCED_REDUNDANCY'
        else:
            storage_class = 'STANDARD'
        headers = {
            'Cache-Control': 'max-age=' + str(self.max_age),
            'x-amz-acl': self.acl,
            'x-amz-storage-class': storage_class
        }
        if content_type:
            headers['Content-Type'] = content_type
        res = requests.put(url, auth=auth, data=data, headers=headers)
        if not 200 <= res.status_code < 300:
            raise S3Error(res.text)
项目:automation-scripts    作者:meraki    | 项目源码 | 文件源码
def setdevicedata(p_apikey, p_shardhost, p_nwid, p_device, p_field, p_value, p_movemarker):
    #modifies value of device record. Returns the new value
    #on failure returns one device record, with all values 'null'
    #p_movemarker is boolean: True/False

    movevalue = "false"
    if p_movemarker:
        movevalue = "true"

    time.sleep(API_EXEC_DELAY)
    try:
        r = requests.put('https://%s/api/v0/networks/%s/devices/%s' % (p_shardhost, p_nwid, p_device), data=json.dumps({p_field: p_value, 'moveMapMarker': movevalue}), headers={'X-Cisco-Meraki-API-Key': p_apikey, 'Content-Type': 'application/json'})
    except:
        printusertext('ERROR 16: Unable to contact Meraki cloud')
        sys.exit(2)

    if r.status_code != requests.codes.ok:
        return ('null')

    return('ok')
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def saucelabsSetup(self):
        self.log("Setup for saucelabs")
        auth = (self._remoteUsername, os.getenv("REMOTE_ACCESSKEY"))
        baseURL = "{0}/{1}".format(self.SAUCELABS_API_BASE, self._remoteUsername)
        jobsURL = "{0}/jobs".format(baseURL)
        response = requests.get(jobsURL, \
            auth = auth, \
            headers = { 'content-type': 'application/json' }, \
            params = { 'full': 'true' })
        self.log("response={0}".format(json.dumps(response.json())))
        remoteJobid = self.saucelabsFindJobId(response.json(), self._remoteTestID())
        self.saucelabsJobURL = '{0}/jobs/{1}'.format(baseURL, remoteJobid)
        self._resultReference = "{0}/{1}".format(self.SAUCELABS_RESULTS_BASE, remoteJobid)
        response = requests.put(self.saucelabsJobURL,
            auth = auth, 
            headers = { 'content-type': 'application/json' }, 
            data = json.dumps({'name': self._buildID() }))
项目:jcsclient    作者:jiocloudservices    | 项目源码 | 文件源码
def execute(self):
        # get signature
        auth = DSSAuth(self.http_method, self.access_key, self.secret_key, self.dss_op_path, content_type = 'application/octet-stream')
        signature = auth.get_signature()
        self.http_headers['Authorization'] = signature
        self.http_headers['Date'] = formatdate(usegmt=True)
        statinfo = os.stat(self.local_file_name)
        self.http_headers['Content-Length'] = statinfo.st_size
        self.http_headers['Content-Type'] = 'application/octet-stream'

        # construct request
        request_url = self.dss_url + self.dss_op_path
        data = open(self.local_file_name, 'rb')
        # make request
        resp = requests.put(request_url, headers = self.http_headers, data=data, verify = self.is_secure_request)
        return resp
项目:jcsclient    作者:jiocloudservices    | 项目源码 | 文件源码
def execute(self):
        # get signature
        query_str = 'partNumber=' + self.part_number + '&uploadId=' + self.upload_id
        auth = DSSAuth(self.http_method, self.access_key, self.secret_key, self.dss_op_path, query_str = self.dss_query_str_for_signature, content_type = 'application/octet-stream')
        signature = auth.get_signature()
        self.http_headers['Authorization'] = signature
        self.http_headers['Date'] = formatdate(usegmt=True)
        statinfo = os.stat(self.local_file_name)
        self.http_headers['Content-Length'] = statinfo.st_size
        self.http_headers['Content-Type'] = 'application/octet-stream'

        # construct request
        request_url = self.dss_url + self.dss_op_path 
        if(self.dss_query_str is not None):
            request_url += '?' + self.dss_query_str  
        data = open(self.local_file_name, 'rb')
        # make request
        resp = requests.put(request_url, headers = self.http_headers, data=data, verify = self.is_secure_request)

        return resp
项目:son-mano-framework    作者:sonata-nfv    | 项目源码 | 文件源码
def create_vh(self, sm_type, uuid):
        exists = False
        vh_name = '{0}-{1}'.format(sm_type,uuid)
        api = '/api/vhosts/'
        url_list = '{0}{1}'.format(self.host,api)
        url_create = '{0}{1}{2}'.format(self.host,api,vh_name)
        url_permission = '{0}/api/permissions/{1}/specific-management'.format(self.host,vh_name)
        data = '{"configure":".*","write":".*","read":".*"}'
        list = requests.get(url=url_list, auth= ('guest','guest')).json()
        for i in range(len(list)):
            if list[i]['name'] == vh_name:
                exists = True
                break
        if not exists:
            response = requests.put(url=url_create, headers=self.headers, auth=('guest', 'guest'))
            permission = requests.put(url=url_permission, headers=self.headers, data=data, auth=('guest', 'guest'))
            return response.status_code, permission.status_code
        else:
            return 0,0
项目:zendesk-utils    作者:trailbehind    | 项目源码 | 文件源码
def delete_all_zendesk_articles(self, category):
    '''
        delete all articles in the category being filled with UV articles
        for our app, we had multiple uservoice knowledge bases, and each gets put in a zendesk category
    '''

    print "**DELETING ALL SECTIONS/ARTICLES in destination category {}".format(category)
    url = '{}/api/v2/help_center/categories/{}/sections.json'.format(self.zendesk_url, category)
    response = requests.get(url, headers=self.headers, auth=self.credentials)
    if response.status_code != 200:
      print('FAILED to delete articles with error {}'.format(response.status_code))
      exit()
    sections = response.json()['sections']
    section_ids=list(section['id'] for section in sections)

    for section_id in section_ids:
      url = "{}/api/v2/help_center/sections/{}.json".format(self.zendesk_url, section_id)
      response = requests.delete(url, headers=self.headers, auth=self.credentials)
      if response.status_code != 204:
        print('FAILED to delete sections for category {} with error {}'.format(category, response.status_code))
        exit()
项目:Dallinger    作者:Dallinger    | 项目源码 | 文件源码
def _upload_assets_to_OSF(dlgr_id, osf_id, provider="osfstorage"):
    """Upload experimental assets to the OSF."""
    root = "https://files.osf.io/v1"
    snapshot_filename = "{}-code.zip".format(dlgr_id)
    snapshot_path = os.path.join("snapshots", snapshot_filename)
    r = requests.put(
        "{}/resources/{}/providers/{}/".format(
            root,
            osf_id,
            provider,
        ),
        params={
            "kind": "file",
            "name": snapshot_filename,
        },
        headers={
            "Authorization": "Bearer {}".format(
                config.get("osf_access_token")),
            "Content-Type": "text/plain",
        },
        data=open(snapshot_path, 'rb'),
    )
    r.raise_for_status()
项目:tintri-python-sdk    作者:Tintri    | 项目源码 | 文件源码
def _send_raw_http_request(self, method, url, data=None):
        self.__logger.debug('%s %s' % (method, url))
        if method in ['POST', 'PUT', 'PATCH']:
            self.__logger.log(TINTRI_LOG_LEVEL_DATA, 'Data: %s' % data)

        headers = {'content-type': 'application/json'}
        if self.__session_id:
            headers['cookie'] = 'JSESSIONID=%s' % self.__session_id

        if method in ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']:
            if method == 'GET': httpresp = requests.get(url, headers=headers, verify=False)
            elif method == 'POST': httpresp = requests.post(url, data, headers=headers, verify=False)
            elif method == 'PUT': httpresp = requests.put(url, data, headers=headers, verify=False)
            elif method == 'PATCH': httpresp = requests.patch(url, data, headers=headers, verify=False)
            elif method == 'DELETE': httpresp = requests.delete(url, headers=headers, verify=False)
            self._httpresp = httpresp # self._httpresp is for debugging only, not thread-safe
            return httpresp
        else:
            raise TintriError(None, message='Invalid HTTP method: ' + method) # This should never happen
项目:trackopy    作者:ThaWeatherman    | 项目源码 | 文件源码
def toggle_tracking(self, enabled: bool=True):
        """
        Enable or disable automatic deck tracking

        :param bool enabled: If True, tracking is enabled, else it is disabled
        :return None:
        :raises: requests.exceptions.HTTPError on error
        """
        logger.info('Called toggle_tracking()')
        endpoint = '/profile/settings/decks/toggle'
        url = self._url + endpoint
        val = 'true' if enabled else 'false'
        data = {'user[deck_tracking]': val, '_method': 'put'}
        logger.debug('POST on %s', endpoint)
        r = requests.post(url, auth=self._auth, data=data)
        r.raise_for_status()
项目:python-moira-client    作者:moira-alert    | 项目源码 | 文件源码
def put(self, path='', **kwargs):
        """

        :param path: str api path
        :param kwargs: additional parameters for request
        :return: dict response

        :raises: HTTPError
        :raises: InvalidJSONError
        """
        r = requests.put(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs)
        r.raise_for_status()
        try:
            return r.json()
        except ValueError:
            raise InvalidJSONError(r.content)
项目:lena    作者:coingraham    | 项目源码 | 文件源码
def send_response(event, context, status, reason, data):
    import requests

    body = json.dumps(
        {
            'Status': status,
            'RequestId': event['RequestId'],
            'StackId': event['StackId'],
            'PhysicalResourceId': context.log_stream_name,
            'Reason': reason,
            'LogicalResourceId': event['LogicalResourceId'],
            'Data': data
        }
    )

    headers = {
        'content-type': '',
        'content-length': len(body)
    }

    r = requests.put(event['ResponseURL'], data=body, headers=headers)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_requests_mocks(self):
        self.assertTrue(isinstance(requests.get, Mock))
        self.assertTrue(isinstance(requests.put, Mock))
        self.assertTrue(isinstance(requests.post, Mock))
        self.assertTrue(isinstance(requests.patch, Mock))
        self.assertTrue(isinstance(requests.delete, Mock))
项目:hesperides-cli    作者:voyages-sncf-technologies    | 项目源码 | 文件源码
def put(self, path, params=None, body=None):
        if body:
            return requests.put(self.endpoint + path, params=params, data=body, headers=self.headers,
                                verify=False)
        else:
            return requests.put(self.endpoint + path, params=params, headers=self.headers, verify=False)
项目:katello-publish-cvs    作者:RedHatSatellite    | 项目源码 | 文件源码
def put_json(location, json_data):
    """
    Performs a PUT and passes the data to the URL location
    """

    result = requests.put(location,
                            data=json_data,
                            auth=(USERNAME, PASSWORD),
                            verify=SSL_VERIFY,
                            headers=POST_HEADERS)

    return result.json()
项目:safetyculture-sdk-python    作者:SafetyCulture    | 项目源码 | 文件源码
def authenticated_request_put(self, url, data):
        self.custom_http_headers['content-type'] = 'application/json'
        response = requests.put(url, data, headers=self.custom_http_headers)
        del self.custom_http_headers['content-type']
        return response
项目:django_pipedrive    作者:MasAval    | 项目源码 | 文件源码
def put(self, endpoint, payload):
        return self.call_api('put', endpoint, payload)
项目:django_pipedrive    作者:MasAval    | 项目源码 | 文件源码
def delete(self, endpoint):
        return self.call_api('delete', endpoint, payload=None)

    # put methods
项目:django_pipedrive    作者:MasAval    | 项目源码 | 文件源码
def update(self, element_id, **kwargs):
        """
        updates an element on pipedrive
        using /endpoint/:id as url
        """

        endpoint = str(self.endpoint) + '/' + str(element_id)

        restpoint = self.restify(endpoint)

        content = self.put(restpoint, kwargs)

        return content
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def update(self, api_obj, data, obj_id=0, url_params=''):
        if not data:
            raise TypeError("Missing object data.")
        if url_params:
            url_params = '?' + url_params.lstrip("?")
        if not obj_id:
            obj_id = data['id']
        url = self.server + '/' + api_obj + '/' + str(obj_id) + url_params
        r = requests.put(url, data=json.dumps(data), headers=self.tokenHeaderJson, verify=self.sslVerify)
        return self.__check_result(r)


    # Delete object using HTTP DELETE request.
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def add_job(self, session_id, job_id):
        # endpoint /v1/sessions/{sessions_id}/jobs/{job_id}
        endpoint = '{0}{1}/jobs/{2}'.format(self.endpoint, session_id, job_id)
        r = requests.put(endpoint,
                         headers=self.headers, verify=self.verify)
        if r.status_code != 204:
            raise exceptions.ApiClientException(r)
        return
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def add_job(self, session_id, job_id):
        # endpoint /v1/sessions/{sessions_id}/jobs/{job_id}
        endpoint = '{0}{1}/jobs/{2}'.format(self.endpoint, session_id, job_id)
        r = requests.put(endpoint,
                         headers=self.headers, verify=self.verify)
        if r.status_code != 204:
            raise exceptions.ApiClientException(r)
        return
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def replace_logset(logset_id, params):
    """
    Replace a given logset with the details provided
    """
    headers = api_utils.generate_headers('rw')

    try:
        response = requests.put(_url((logset_id,))[1], json=params, headers=headers)
        handle_response(response, 'Update logset with details %s failed.\n' % params, 200)
    except requests.exceptions.RequestException as error:
        sys.stderr.write(error)
        sys.exit(1)
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def delete_user_from_team(team_id, user_key):
    """
    Delete a user from a team.
    """
    headers = api_utils.generate_headers('rw')
    params = {'teamid': team_id}
    try:
        response = requests.request('GET', _url((team_id,))[1], params=params,
                                    headers=headers)
        if response.status_code == 200:
            params = {
                'team': {
                    'name': response.json()['team']['name'],
                    'users': [user for user in response.json()['team']['users'] if user['id'] !=
                              user_key]
                }
            }
            headers = api_utils.generate_headers('rw')
            try:
                response = requests.put(_url((team_id,))[1], json=params, headers=headers)
                if response_utils.response_error(response):  # Check response has no errors
                    click.echo('Deleting user from team with key: %s failed.' % team_id, err=True)
                    sys.exit(1)
                elif response.status_code == 200:
                    click.echo("Deleted user with key: '%s' from team: %s" % (user_key, team_id))
            except requests.exceptions.RequestException as error:
                click.echo(error, err=True)
                sys.exit(1)
        elif response_utils.response_error(response):
            click.echo('Cannot find team. Deleting user from team %s failed.' % team_id, err=True)
            sys.exit(1)
    except requests.exceptions.RequestException as error:
        click.echo(error, err=True)
        sys.exit(1)
项目:sat6_scripts    作者:RedHatSatellite    | 项目源码 | 文件源码
def put_json(location, json_data):
    """
    Performs a PUT and passes the data to the URL location
    """
    result = requests.put(
        location,
        data=json_data,
        auth=(USERNAME, PASSWORD),
        verify=True,
        headers=POST_HEADERS)
    return result.json()
项目:aniping    作者:kuruoujou    | 项目源码 | 文件源码
def add_update_show(self, beid, subgroup):
        """Adds or edits a show in sonarr.

        Args:
            beid (int): The TVDB ID of the show we're adding or editing.
            subgroup (str): The subgroup we're using for this show.
        """
        _logger.debug("Entering add_update_show.")
        tag = self._subgroup_tag(subgroup)

        show = None
        _logger.debug("Getting all shows being watched and searching for show with ID {0}.".format(beid))
        shows = self.get_watching_shows()
        for item in shows:
            if int(item['tvdbId']) == int(beid):
                _logger.debug("Found show {0} with ID {1} in watching shows! Updating instead of creating.".format(item['title'], item['tvdbId']))
                show = item
        if not show:
            _logger.debug("Show not found in watching list, creating a new one.")
            show = self.get_show(beid)    
            payload = {
                "tvdbId":           int(beid),
                "title":            show['title'],
                "qualityProfileId": self._quality_profile,
                "titleSlug":        show['titleSlug'],
                "images":           show['images'],
                "seasons":          show['seasons'],
                "rootFolderPath":   self._library_path,
                "seriesType":       "anime",
                "seasonFolder":     True,
                "addOptions":       {"ignoreEpisodesWithFiles":True},
                "tags":             [tag]
            }
            out = requests.post("{0}/api/series?apikey={1}".format(self.url, self.api_key), json=payload)
        else:
            show['tags'] = [tag]
            requests.put("{0}/api/series?apikey={1}".format(self.url, self.api_key), json=show)
项目:ucloud-storage-python-wrapper    作者:wishket    | 项目源码 | 文件源码
def put_container(self, container_name):
        """
        create container for authorized account
        :param container_name: container's name for create
        :return: http response
        """
        # create or update container
        url = self.url + '/' + container_name

        # request api
        response = requests.put(url, headers=self.base_headers)

        return response
项目:ucloud-storage-python-wrapper    作者:wishket    | 项目源码 | 文件源码
def put_object_to_container(self, container_name,
                              file_path=None,
                              file_name=None,
                              file_stream=None):
        """
        upload file to target container
        :param container_name:
            target container_name - string
            required=True
        :param file_path: file's path for upload - string
        :param file_name: file's name for get urls - string
        :param file_stream: file's data string - string
        :return: submit response
        """

        # make file object to comfortable for uploading
        if not file_name:
            file_name = file_path.split('/')[-1]

        url = self.url + '/' + container_name + '/' + file_name

        if not file_stream:
            file_stream = open(file_path, 'rb').read()

        response = requests.put(
            url,
            data=file_stream,
            headers=self.base_headers
        )

        return response