Python requests 模块,patch() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.patch()

项目:py_fake_server    作者:Telichkin    | 项目源码 | 文件源码
def test_chain_with_different_for_the_nth_time_error(server: FakeServer):
    requests.patch(server.base_uri + "/users", json={"name": "new_name", "level": 5})
    requests.patch(server.base_uri + "/users", json={"name": "new_name", "level": 6})
    requests.patch(server.base_uri + "/users", json={"name": "new_name", "level": 7})

    with pytest.raises(AssertionError) as error:
        expect_that(server.was_requested("patch", "/users").
                    for_the_first_time().
                    with_content_type("application/json").
                    with_body('{"name": "new_name", "level": 6}').
                    for_the_second_time().
                    with_content_type("application/json").
                    with_body('{"name": "new_name", "level": 7}').
                    for_the_3_time().
                    with_content_type("application/json").
                    with_body('{"name": "new_name", "level": 8}'))

    assert str(error.value) == ("Expect that server was requested with [PATCH] http://localhost:8081/users.\n"
                                "For the 1 time: with body '{\"name\": \"new_name\", \"level\": 6}'.\n"
                                "But for the 1 time: body was '{\"name\": \"new_name\", \"level\": 5}'.\n"
                                "For the 2 time: with body '{\"name\": \"new_name\", \"level\": 7}'.\n"
                                "But for the 2 time: body was '{\"name\": \"new_name\", \"level\": 6}'.\n"
                                "For the 3 time: with body '{\"name\": \"new_name\", \"level\": 8}'.\n"
                                "But for the 3 time: body was '{\"name\": \"new_name\", \"level\": 7}'.")
项目:jumpserver-python-sdk    作者:jumpserver    | 项目源码 | 文件源码
def finish_proxy_log(self, data):
        """ ???????, ?????? ???

        :param data: ????
        data = {
            "proxy_log_id": 123123,
            "date_finished": timestamp,
        }
        """
        assert isinstance(data.get('date_finished'), (int, float))
        data['date_finished'] = timestamp_to_datetime_str(data['date_finished'])
        data['is_failed'] = 1 if data.get('is_failed') else 0
        data['is_finished'] = 1
        proxy_log_id = data.get('proxy_log_id') or 0
        r, content = self.patch('finish-proxy-log', pk=proxy_log_id, data=data)

        if r.status_code != 200:
            logging.warning('Finish proxy log failed: %s' % proxy_log_id)
            return False
        return True
项目:tintri-python-sdk    作者:Tintri    | 项目源码 | 文件源码
def _send_raw_http_request(self, method, url, data=None):
        self.__logger.debug('%s %s' % (method, url))
        if method in ['POST', 'PUT', 'PATCH']:
            self.__logger.log(TINTRI_LOG_LEVEL_DATA, 'Data: %s' % data)

        headers = {'content-type': 'application/json'}
        if self.__session_id:
            headers['cookie'] = 'JSESSIONID=%s' % self.__session_id

        if method in ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']:
            if method == 'GET': httpresp = requests.get(url, headers=headers, verify=False)
            elif method == 'POST': httpresp = requests.post(url, data, headers=headers, verify=False)
            elif method == 'PUT': httpresp = requests.put(url, data, headers=headers, verify=False)
            elif method == 'PATCH': httpresp = requests.patch(url, data, headers=headers, verify=False)
            elif method == 'DELETE': httpresp = requests.delete(url, headers=headers, verify=False)
            self._httpresp = httpresp # self._httpresp is for debugging only, not thread-safe
            return httpresp
        else:
            raise TintriError(None, message='Invalid HTTP method: ' + method) # This should never happen
项目:ATX    作者:NetEaseGame    | 项目源码 | 文件源码
def patch(self, udid):
        data = tornado.escape.json_decode(self.request.body)
        id = data['id']
        timeout = float(data.get('timeout', 20.0))
        print 'Timeout:', timeout
        result = self.results.get(id)
        if result is None:
            self.results[id] = self
            yield gen.sleep(timeout)
            if self.results.get(id) == self:
                del(self.results[id])
                self.write('null')
                self.finish()
        else:
            self.write(json.dumps(result))
            self.results.pop(id, None)
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def update_ports(self, service, ports):
        """Update ports in service or remove service if ports emtpy
        :param service: service to update
        :param ports: new ports for service
        :return: updated service
        """
        name = service['metadata']['name']
        namespace = service['metadata']['namespace']
        if ports:
            data = {'spec': {'ports': ports}}
            rv = self.patch(name, namespace, data)
            raise_if_failure(rv, "Couldn't patch service ports")
            return rv
        else:
            rv = self.delete(name, namespace)
            raise_if_failure(rv, "Couldn't delete service")
            return None
项目:github-snooze-button    作者:tdsmith    | 项目源码 | 文件源码
def clear_snooze_label_if_set(github_auth, issue, snooze_label):
    issue_labels = {label["name"] for label in issue.get("labels", [])}
    if snooze_label not in issue_labels:
        logging.debug(
            "clear_snooze_label_if_set: Label {} not set on {}".
            format(snooze_label, issue["html_url"]))
        return False
    issue_labels.remove(snooze_label)
    auth = requests.auth.HTTPBasicAuth(*github_auth)
    r = requests.patch(issue["url"], auth=auth,
                       json={"labels": list(issue_labels)},
                       headers=constants.GITHUB_HEADERS)
    r.raise_for_status()
    logging.debug(
        "clear_snooze_label_if_set: Removed snooze label from {}".
        format(issue["html_url"]))
    return True
项目:AutomatorX    作者:xiaoyaojjian    | 项目源码 | 文件源码
def patch(self, udid):
        data = tornado.escape.json_decode(self.request.body)
        id = data['id']
        timeout = float(data.get('timeout', 20.0))
        print 'Timeout:', timeout
        result = self.results.get(id)
        if result is None:
            self.results[id] = self
            yield gen.sleep(timeout)
            if self.results.get(id) == self:
                del(self.results[id])
                self.write('null')
                self.finish()
        else:
            self.write(json.dumps(result))
            self.results.pop(id, None)
项目:restconf-examples    作者:CiscoDevNet    | 项目源码 | 文件源码
def create_static(host, port, user, password, route, nexthop, insecure):
    """Function to create a static route on CSR1000V."""

    url = "https://{h}:{p}/api/running/native/ip/route".format(h=HOST, p=PORT)
    headers = {'content-type': 'application/vnd.yang.data+json',
               'accept': 'application/vnd.yang.data+json'}
    try:
        result = requests.patch(url, auth=(USER, PASS), data=data,
                                headers=headers, verify=not insecure)
    except Exception:
        print(str(sys.exc_info()[0]))
        return -1

    return result.text

    if result.status_code == 201:
        return 0

    # somethine went wrong
    print(result.status_code, result.text)
    return -1
项目:zalando-deploy-cli    作者:robin-wittler    | 项目源码 | 文件源码
def promote_deployment(config, application, version, release, stage, execute):
    '''Promote deployment to new stage'''
    namespace = config.get('kubernetes_namespace')
    deployment_name = '{}-{}-{}'.format(application, version, release)

    info('Promoting deployment {} to {} stage..'.format(deployment_name, stage))
    cluster_id = config.get('kubernetes_cluster')
    namespace = config.get('kubernetes_namespace')
    path = '/kubernetes-clusters/{}/namespaces/{}/resources'.format(cluster_id, namespace)

    resources_update = ResourcesUpdate()
    resources_update.set_label(deployment_name, 'stage', stage)
    response = request(config, requests.patch, path, json=resources_update.to_dict())
    change_request_id = response.json()['id']

    if execute:
        approve_and_execute(config, change_request_id)
    else:
        print(change_request_id)
项目:zalando-deploy-cli    作者:robin-wittler    | 项目源码 | 文件源码
def scale_deployment(config, application, version, release, replicas, execute):
    '''Scale a single deployment'''
    namespace = config.get('kubernetes_namespace')
    kubectl_login(config)

    deployment_name = '{}-{}-{}'.format(application, version, release)

    info('Scaling deployment {} to {} replicas..'.format(deployment_name, replicas))
    resources_update = ResourcesUpdate()
    resources_update.set_number_of_replicas(deployment_name, replicas)

    cluster_id = config.get('kubernetes_cluster')
    namespace = config.get('kubernetes_namespace')
    path = '/kubernetes-clusters/{}/namespaces/{}/resources'.format(cluster_id, namespace)
    response = request(config, requests.patch, path, json=resources_update.to_dict())
    change_request_id = response.json()['id']

    if execute:
        approve_and_execute(config, change_request_id)
    else:
        print(change_request_id)
项目:juicer    作者:eubr-bigsea    | 项目源码 | 文件源码
def save_job_source_code(base_url, token, job_id, source):
    headers = {
        'X-Auth-Token': str(token),
        'Content-Type': 'application/json'
    }

    url = '{}/jobs/{}/source-code'.format(base_url, job_id)

    r = requests.patch(url,
                       data=json.dumps({'secret': token, 'source': source}),
                       headers=headers)
    if r.status_code == 200:
        return json.loads(r.text)
    else:
        raise RuntimeError(
            u"Error loading data from stand: HTTP {} - {}  ({})".format(
                r.status_code, r.text, url))
项目:harmony    作者:nickolas360    | 项目源码 | 文件源码
def set_settings(
            self, explicit_filter, allow_dms, friend_all, friend_mutual,
            friend_mutual_guild):
        settings = {}
        if explicit_filter is not None:
            if not (0 <= explicit_filter <= 2):
                raise ValueError("Explicit filter must be from 0 to 2.")
            settings["explicit_content_filter"] = explicit_filter
        if allow_dms is not None:
            settings["default_guilds_restricted"] = not allow_dms
        if not (friend_all is friend_mutual is friend_mutual_guild):
            friend_all = friend_all and friend_mutual and friend_mutual_guild
            settings["friend_source_flags"] = {
                "all": bool(friend_all),
                "mutual_friends": bool(friend_mutual),
                "mutual_guilds": bool(friend_mutual_guild),
            }

        r = self.patch("users/@me/settings", auth=True, json=settings)
        # Returns form errors if invalid; new settings otherwise
        return (r.ok, r.json())
项目:TheHive4py    作者:CERT-BDF    | 项目源码 | 文件源码
def update_case(self, case):
        """
        Update a case.
        :param case: The case to update. The case's `id` determines which case to update.
        :return:
        """
        req = self.url + "/api/case/{}".format(case.id)

        # Choose which attributes to send
        update_keys = [
            'title', 'description', 'severity', 'startDate', 'owner', 'flag', 'tlp', 'tags', 'resolutionStatus',
            'impactStatus', 'summary', 'endDate', 'metrics'
        ]
        data = {k: v for k, v in case.__dict__.items() if k in update_keys}

        try:
            return requests.patch(req, headers={'Content-Type': 'application/json'}, json=data, proxies=self.proxies, auth=self.auth, verify=self.cert)
        except requests.exceptions.RequestException:
            sys.exit(1)
项目:shrub    作者:Backgammon-    | 项目源码 | 文件源码
def edit_issue(auth_token, username, repo, issue_number, issue_title, issue_body, insecure=False):
    """
        Edit an issue.
    """
    # https://developer.github.com/v3/issues/#edit-an-issue
    # Parameters (not required):
    # title - String containing the title of the issue.
    # body - String containing the body of the issue.
    parameters = {"title": issue_title, "body": issue_body}

    try:
        response = requests.patch("https://api.github.com/repos/{}/issues/{}".format(repo, issue_number),
                            json=parameters,
                            headers=auth_header(auth_token))
    except requests.SSLError:
        print("A network error occurred.")

        if insecure:
            print("An SSL certificate error occurred.")

    return response.json()
项目:shrub    作者:Backgammon-    | 项目源码 | 文件源码
def edit_issue_comment(auth_token, username, repo, comment_id, comment_body, insecure=False):
    """
        Edits an existing comment on a specified issue.
    """
    # https://developer.github.com/v3/issues/comments/#edit-a-comment
    # Note: Issue Comments are ordered by ascending ID.
    # Parameters:
    # body - Reqired string. The contents of the comment.
    parameters = {"body": comment_body}

    try:
        response = requests.patch("https://api.github.com/repos/{}/issues/comments/{}".format(repo, comment_id),
                            json=parameters,
                            headers=auth_header(auth_token))
    except:
        print("A network error occurred.")

        if insecure:
            print("An SSL certificate error occurred.")

    return response.json()
项目:bucket-list    作者:arpitbbhayani    | 项目源码 | 文件源码
def patch(*args, **kwargs):
        headers = {
            'X-Access-Token': appconfig.get('provider_config', 'access-token'),
            'X-Client-ID': appconfig.get('provider_config', 'client-id')
        }

        resp = None
        for _ in range(WRequests.MAX_RETRIES):
            try:
                resp = requests.patch(*args, headers=headers, **kwargs).json()
            except json.decoder.JSONDecodeError:
                resp = None
            else:
                break

        raise_if_error(resp)
        return resp
项目:ConnectPyse    作者:joshuamsmith    | 项目源码 | 文件源码
def patch(self, the_id, user_data, user_params={}, user_headers={}):

        strjsondata = ujson.dumps(user_data, ensure_ascii=False)

        resp = req.patch(
            self._url(self.endpoint, the_id),
            data=strjsondata,
            headers=self._headers(user_headers),
            params=self._params(user_params)
        )

        if resp.status_code != 200:
            raise ApiError(
                "GET",
                self.endpoint,
                resp.status_code,
                resp.text)
        else:
            return self._formatreturn(resp)
项目:acceptable    作者:canonical-ols    | 项目源码 | 文件源码
def test_mock_works_with_multiple_methods(self):
        double = ServiceMock(
            service='foo',
            methods=['GET', 'POST', 'PATCH'],
            url='/foo',
            input_schema={'type': 'object'},
            output_schema={'type': 'array'},
            output=[]
        )
        set_service_locations(dict(foo="http://localhost:1234/"))

        self.useFixture(double)

        self.assertEqual(
            200,
            requests.post("http://localhost:1234/foo", json={}).status_code
        )
        self.assertEqual(
            200,
            requests.get("http://localhost:1234/foo", json={}).status_code
        )
        self.assertEqual(
            200,
            requests.patch("http://localhost:1234/foo", json={}).status_code
        )
项目:openshift-event-controller    作者:redhat-cop    | 项目源码 | 文件源码
def update_route(route_fqdn, route_name, config, logger, watcher):
    ipa_client = IPAClient(
                    ipa_user = config.get('ipa_user'),
                    ipa_password = config.get('ipa_password'),
                    ipa_url = config.get('ipa_url'),
                    ca_trust = config.get('ipa_ca_cert', False)
                )
    ipa_client.create_host(route_fqdn)
    certificate, key = ipa_client.create_cert(route_fqdn, config.get('ipa_realm'))
    logger.info("[CERT CREATED]: {0}".format(route_fqdn))
    logger.debug("Cert: {0}\nKey: {1}\n".format(certificate, key.exportKey().decode('UTF-8')))

    req = requests.patch('https://{0}/oapi/v1/namespaces/{1}/routes/{2}'.format(watcher.config.k8s_endpoint, watcher.config.k8s_namespace, route_name),
                         headers={'Authorization': 'Bearer {0}'.format(watcher.config.k8s_token), 'Content-Type':'application/strategic-merge-patch+json'},
                         data=json.dumps({'metadata': {'annotations': {'{0}.state'.format(config.get('need_cert_annotation')): 'created'}}, 'spec': {'tls': {'certificate': '-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----'.format(
                             '\n'.join(certificate[i:i+65] for i in six.moves.range(0, len(certificate), 65))),
                                                           'key': '{0}'.format(key.exportKey('PEM').decode('UTF-8'))}}}),
                         params="", verify=watcher.config.k8s_ca)
    logger.info("[ROUTE UPDATED]: {0}".format(route_fqdn))
项目:stardust-rpg    作者:johnthagen    | 项目源码 | 文件源码
def set_attributes(login: Roll20Login, character_id: str,
                   attributes: Mapping[str, Union[str, int]],
                   attribute_position: AttributePosition = AttributePosition.current) -> None:
    attribute_ids = get_attributes_ids(login=login, character_id=character_id)

    for attribute_name, attribute_value in attributes.items():
        # Cast int values to str.
        attribute_value = str(attribute_value)

        attribute_id = attribute_ids[attribute_name]
        url = (f'{login.firebase_root}{login.campaign_path}/char-attribs/char/{character_id}/'
               f'{attribute_id}/.json?auth={login.auth_token}')
        response = requests.patch(url,
                                  data=json.dumps({attribute_position.value: attribute_value}))
        if response.status_code == requests.status_codes.codes.UNAUTHORIZED:
            raise Roll20PermissionError('Permission denied trying to set attribute.')
        else:
            updated_attribute = json.loads(response.text)[attribute_position.value]
            logger.debug(f'New {attribute_name}: {updated_attribute}')
项目:stardust-rpg    作者:johnthagen    | 项目源码 | 文件源码
def create_ability(login: Roll20Login, character_id: str, ability_name: str, ability_action: str,
                   is_token_action: bool = True) -> None:
    new_ability = {'action': ability_action,
                   'name': ability_name,
                   'istokenaction': is_token_action}
    url = (f'{login.firebase_root}{login.campaign_path}/'
           f'char-abils/char/{character_id}.json?auth={login.auth_token}')
    response = requests.post(url, data=json.dumps(new_ability))
    if response.status_code == requests.status_codes.codes.UNAUTHORIZED:
        raise Roll20PermissionError('Permission denied trying to create ability.')
    else:
        new_ability_id = json.loads(response.text)['name']
        logger.debug(f'Created {ability_name} {new_ability_id}')

        url = (f'{login.firebase_root}{login.campaign_path}/char-abils/char/'
               f'{character_id}/{new_ability_id}/.json?auth={login.auth_token}')
        requests.patch(url, data=json.dumps({'id': new_ability_id}))
项目:lepidopter-update    作者:OpenObservatory    | 项目源码 | 文件源码
def update_latest_version(tag_name):
    params = {
        "access_token": GITHUB_TOKEN
    }
    r = requests.get(GH_BASE_URL + "/releases/tags/latest",
                     params=params)
    if r.status_code == 404:
        release_id = _create_latest_version()
    elif r.status_code == 200:
        release_id = r.json()["id"]

    data = {
        "target_commitish": "master"
    }
    r = requests.patch(GH_BASE_URL + "/releases/{0}".format(release_id),
                       params=params, json=data)
    assert r.status_code == 200

    upload_url = r.json()['upload_url'].replace("{?name,label}", "")
    _delete_all_assets(release_id)
    _upload_asset(upload_url, "version", "text/plain", tag_name)
项目:floyd-cli    作者:floydhub    | 项目源码 | 文件源码
def _upload_chunk(self, data, offset, file_endpoint, headers=None, auth=None):
        floyd_logger.debug("Uploading %s bytes chunk from offset: %s", len(data), offset)

        h = {
            'Content-Type': 'application/offset+octet-stream',
            'Upload-Offset': str(offset),
            'Tus-Resumable': self.TUS_VERSION,
        }

        if headers:
            h.update(headers)

        response = requests.patch(file_endpoint, headers=h, data=data, auth=auth)
        self.check_response_status(response)

        return int(response.headers["Upload-Offset"])
项目:ansible-kong-module    作者:toast38coza    | 项目源码 | 文件源码
def add_or_update(self, name, config=None):

        # does it exist already?
        plugins_response = self.list()
        plugins_list = plugins_response.json().get('data', [])

        data = {
            "name": name,
        }
        if config is not None:
            data.update(config)   

        plugin_id = self._get_plugin_id(name, plugins_list)
        if plugin_id is None:            
            return requests.post(self.base_url, data, auth=self.auth)
        else:
            url = "{}/{}" . format (self.base_url, plugin_id)
            return requests.patch(url, data, auth=self.auth)
项目:python-wiremock    作者:platinummonkey    | 项目源码 | 文件源码
def _partial_update(cls, entity, parameters=None, ids={}):  # pragma: no cover
        entity_id = getattr(entity, 'id', None)
        if entity_id is not None:
            ids['id'] = entity_id
        if isinstance(entity, BaseAbstractEntity):
            response = cls.REST_CLIENT.patch(
                cls.get_base_uri(cls.endpoint_single(), **ids), json=entity.get_json_data(), headers=make_headers(),
                params=parameters
            )
        else:
            response = cls.REST_CLIENT.patch(
                cls.get_base_uri(cls.endpoint_single(), **ids), data=json.dumps(entity), headers=make_headers(),
                params=parameters
            )

        response = cls.REST_CLIENT.handle_response(response)

        if cls.entity_class() is None or not issubclass(cls.entity_class(), BaseAbstractEntity):
            return response  # pragma: no cover
        else:
            return cls.entity_class().from_dict(response.json())
项目:ansible-demo-operations    作者:ansible    | 项目源码 | 文件源码
def enable_survey(tower_config, job_template_id):
    url = 'https://{0}/api/v1/job_templates/{1}/'.format(
        tower_config['host'],
        job_template_id)

    headers = {'Content-type': 'application/json'}

    response = requests.patch(
        url,
        verify=False,
        auth=(tower_config['username'], tower_config['password']),
        headers=headers,
        data=json.dumps({
            'survey_enabled': True}))

    if response.status_code != 200:
        exit_failure('error enabling job template survey')
项目:CodeGra.fs    作者:CodeGra-de    | 项目源码 | 文件源码
def test_list_assigned_submissions(mount, mount_dir, teacher_jwt, teacher_id):
    course = 'Programmeertalen'

    for assig in ls(mount_dir, course):
        for sub in ls(mount_dir, course, assig):
            if 'Stupid1' in sub:
                with open(join(mount_dir, course, assig, sub, '.cg-submission-id')) as f:
                    sub_id = f.read().strip()
                r = requests.patch(
                    f'http://localhost:5000/api/v1/submissions/{sub_id}/grader',
                    json={'user_id': teacher_id},
                    headers={
                        'Authorization': 'Bearer ' + teacher_jwt,
                    }
                )
                assert r.status_code == 204

    mount(assigned_to_me=True)

    for assig in ls(mount_dir, course):
        for sub in ls(mount_dir, course, assig):
            assert 'Stupid1' in sub or sub[0] == '.'
项目:Snakepit    作者:K4lium    | 项目源码 | 文件源码
def saveScore(analysis_id, score):
    """Save calculated score back to pit."""
    a = ANALYSIS_TEMPLATE.copy()
    a['id'] = analysis_id
    a['score'] = score

    resp = requests.patch('/'.join([pit_url, 'analysis', str(analysis_id)]),
                          data=json.dumps(a), headers=post_headers)
    resp.raise_for_status()


# For an analysis item:
# * Fetch the item.
# * Fetch all relevant rules.
# * Evaluate each rule, keeping a running total of score.
# * Save score to analysis item.
项目:CritsAide    作者:corumir    | 项目源码 | 文件源码
def build_relationship(self, Left_TLO, Right_TLO):
        right_type = Right_TLO.type[:1].upper() + Right_TLO.type[1:] #handles CAPS issue
        submit_url = '{}/{}/{}/'.format(self.url, Left_TLO.collection, Left_TLO.get_ID())

        params = {'api_key': self.api_key,'username': self.username}
        data = {
            'action': 'forge_relationship',
            'type_': Left_TLO.type,
            'id_': Left_TLO.get_ID(),
            'right_type': right_type,
            'right_id': Right_TLO.get_ID(),
            'rel_type': 'Related To',
            'rel_date': datetime.datetime.now(),
            'rel_reason': '',
            'rel_confidence': 'low',
            'get_rels': True
        }

        r = requests.patch(submit_url, params=params, data=data, verify=False)
        if r.status_code == 200:
            return json.loads(r.text)
        else:
            return None
项目:PyMoe    作者:ccubed    | 项目源码 | 文件源码
def update(self, eid, data, token):
        """
        Update a given Library Entry.

        :param eid str: Entry ID
        :param data dict: Attributes
        :param token str: OAuth token
        :return: True or ServerError
        :rtype: Bool or Exception
        """
        final_dict = {"data": {"id": eid, "type": "libraryEntries", "attributes": data}}
        final_headers = self.header
        final_headers['Authorization'] = "Bearer {}".format(token)

        r = requests.patch(self.apiurl + "/library-entries/{}".format(eid), json=final_dict, headers=final_headers)

        if r.status_code != 200:
            raise ConnectionError(r.text)

        return True
项目:PyMoe    作者:ccubed    | 项目源码 | 文件源码
def update(self, uid, data, token):
        """
        Update a user's data. Requires an auth token.

        :param uid str: User ID to update
        :param data dict: The dictionary of data attributes to change. Just the attributes.
        :param token str: The authorization token for this user
        :return: True or Exception
        :rtype: Bool or ServerError
        """
        final_dict = {"data": {"id": uid, "type": "users", "attributes": data}}
        final_headers = self.header
        final_headers['Authorization'] = "Bearer {}".format(token)
        r = requests.patch(self.apiurl + "/users/{}".format(uid), json=final_dict, headers=final_headers)

        if r.status_code != 200:
            raise ServerError

        return True
项目:lainctl    作者:laincloud    | 项目源码 | 文件源码
def maintain(self, nodename, remove=False):
        """
        maintain node will disable or enable deployment onto the maintained node.
        """
        node = NodeInfo(nodename)
        base_url = "http://deployd.lain:9003/api/constraints"
        operator = "Remove" if remove else "Add"
        if not remove:
            url = base_url + "?type=node&value=%s" % node.name
            info("PATCH %s" % url)
            resp = requests.patch(url)
        else:
            url = base_url + "?type=node&value=%s" % node.name
            info("DELETE %s" % url)
            resp = requests.delete(url)
        if resp.status_code >= 300:
            error("%s constraint on node %s fail: %s" % (operator, node.name, resp.text))
        else:
            info("%s constraint on node %s success." % (operator, node.name))
项目:apimas    作者:grnet    | 项目源码 | 文件源码
def partial_update(self, resource_id, raise_exception=True, headers=None,
                       data=None):
        """
        Method for the partial update of resource, i.e. only a part of the
        resource's fields are updated.

        Example: PATCH endpoint/<pk>/
        """
        request_kwargs = {
            'headers': headers,
            'auth': self.auth,
        }
        request_kwargs.update(self.extract_write_data(
            data, raise_exception, partial=True))
        r = requests.patch(self.format_endpoint(resource_id), **request_kwargs)
        return r
项目:Kong    作者:TangentMicroServices    | 项目源码 | 文件源码
def add_or_update(self, name, config=None):

        # does it exist already?
        plugins_response = self.list()
        plugins_list = plugins_response.json().get('data', [])

        data = {
            "name": name,
        }
        if config is not None:
            data.update(config)   

        plugin_id = self._get_plugin_id(name, plugins_list)
        if plugin_id is None:            
            return requests.post(self.base_url, data)
        else:
            url = "{}/{}" . format (self.base_url, plugin_id)
            return requests.patch(url, data)
项目:socialhome    作者:jaywink    | 项目源码 | 文件源码
def test_requests_mocks(self):
        self.assertTrue(isinstance(requests.get, Mock))
        self.assertTrue(isinstance(requests.put, Mock))
        self.assertTrue(isinstance(requests.post, Mock))
        self.assertTrue(isinstance(requests.patch, Mock))
        self.assertTrue(isinstance(requests.delete, Mock))
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def _get_url_and_header(self, path):
        url = self._base_url + path
        header = {'Content-Type': 'application/merge-patch+json',
                  'Accept': 'application/json'}
        if self.token:
            header.update({'Authorization': 'Bearer %s' % self.token})

        return url, header
项目:kuryr-kubernetes    作者:openstack    | 项目源码 | 文件源码
def patch_status(self, path, data):

        LOG.debug("Patch_status %(path)s: %(data)s", {
            'path': path, 'data': data})
        path = path + '/status'
        url, header = self._get_url_and_header(path)

        response = requests.patch(url, json={"status": data},
                                  headers=header, cert=self.cert,
                                  verify=self.verify_server)
        if response.ok:
            return response.json().get('status')
        raise exc.K8sClientException(response.text)
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def update(self, job_id, update_doc):
        endpoint = self.endpoint + job_id
        r = requests.patch(endpoint,
                           headers=self.headers,
                           data=json.dumps(update_doc),
                           verify=self.verify)
        if r.status_code != 200:
            raise exceptions.ApiClientException(r)
        return r.json()['version']
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def update(self, action_id, update_doc):
        endpoint = self.endpoint + action_id
        r = requests.patch(endpoint,
                           headers=self.headers,
                           data=json.dumps(update_doc), verify=self.verify)
        if r.status_code != 200:
            raise exceptions.ApiClientException(r)
        return r.json()['version']
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def update(self, session_id, update_doc):
        endpoint = self.endpoint + session_id
        r = requests.patch(endpoint,
                           headers=self.headers,
                           data=json.dumps(update_doc),
                           verify=self.verify)
        if r.status_code != 200:
            raise exceptions.ApiClientException(r)
        return r.json()['version']
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def update(self, job_id, update_doc):
        endpoint = self.endpoint + job_id
        r = requests.patch(endpoint,
                           headers=self.headers,
                           data=json.dumps(update_doc),
                           verify=self.verify)
        if r.status_code != 200:
            raise exceptions.ApiClientException(r)
        return r.json()['version']
项目:python-freezerclient    作者:openstack    | 项目源码 | 文件源码
def update(self, action_id, update_doc):
        endpoint = self.endpoint + action_id
        r = requests.patch(endpoint,
                           headers=self.headers,
                           data=json.dumps(update_doc), verify=self.verify)
        if r.status_code != 200:
            raise exceptions.ApiClientException(r)
        return r.json()['version']
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def update(api_key_id, active):
    """
    Enable or disable an api key with given ID
    """
    action, url = _url((api_key_id,))
    payload = {
        "apikey":
            {
                "active": active
            }
    }

    headers = api_utils.generate_headers('owner', method='PATCH', body=json.dumps(payload),
                                         action=action)
    try:
        response = requests.patch(url, json=payload, headers=headers)
        if response_utils.response_error(response):
            sys.stderr.write('Failed to %s api key with id: %s \n' %
                             ('enable' if active else 'disable', api_key_id))
            sys.exit(1)
        elif response.status_code == 200:
            sys.stdout.write('%s api key with id: %s\n' %
                             ('Enabled' if active else 'Disabled', api_key_id))
            handle_api_key_response(response)
    except requests.exceptions.RequestException as error:
        sys.stderr.write(error)
        sys.exit(1)
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def rename_team(team_id, team_name):
    """
    Rename team with the provided team_id.
    """
    params = {
        'team': {
            'name': team_name,
            # as this is a patch request, it won't modify users in the team.
            # what we want is to update the name of the team only.
            'users': [
                {'id': ''}
            ]
        }
    }
    headers = api_utils.generate_headers('rw')

    try:
        response = requests.patch(_url((team_id,))[1], json=params, headers=headers)
        if response_utils.response_error(response):  # Check response has no errors
            click.echo('Renaming team with id: %s failed.' % team_id, err=True)
            sys.exit(1)
        elif response.status_code == 200:
            click.echo("Team: '%s' renamed to: '%s'" % (team_id, team_name))
    except requests.exceptions.RequestException as error:
        click.echo(error, err=True)
        sys.exit(1)
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def add_user_to_team(team_id, user_key):
    """
    Add user with the provided user_key to team with provided team_id.
    """
    headers = api_utils.generate_headers('rw')
    params = {'teamid': team_id}
    try:
        response = requests.get(_url((team_id,))[1], params=params, headers=headers)
        if response.status_code == 200:
            params = {
                'team': {
                    'name': response.json()['team']['name'],
                    'users': [
                        # we are doing a patch request here so it's safe to include the user_key
                        # we want to add here
                        {'id': user_key}
                    ]
                }
            }
            headers = api_utils.generate_headers('rw')
            try:
                response = requests.patch(_url((team_id,))[1], json=params, headers=headers)
                if response_utils.response_error(response):  # Check response has no errors
                    click.echo('Adding user to team with key: %s failed.' % team_id, err=True)
                    sys.exit(1)
                elif response.status_code == 200:
                    click.echo('Added user with key: %s to team.' % user_key)
            except requests.exceptions.RequestException as error:
                click.echo(error, err=True)
                sys.exit(1)
        elif response_utils.response_error(response):
            click.echo('Cannot find team. Adding user to team %s failed.' % team_id, err=True)
            sys.exit(1)
    except requests.exceptions.RequestException as error:
        click.echo(error, err=True)
        sys.exit(1)
项目:py_fake_server    作者:Telichkin    | 项目源码 | 文件源码
def test_handler_calls_many_times(server: FakeServer):
    server. \
        on_("patch", "/channels"). \
        response(status=204)

    response_0 = requests.patch(server.base_uri + "/channels")
    response_1 = requests.patch(server.base_uri + "/channels")
    assert response_0.status_code == 204
    assert response_1.status_code == 204
项目:py_fake_server    作者:Telichkin    | 项目源码 | 文件源码
def test_handler_called_nth_times(server: FakeServer):
    server. \
        on_("patch", "/songs"). \
        response(status=204)._3_times()

    response_0 = requests.patch(server.base_uri + "/songs")
    response_1 = requests.patch(server.base_uri + "/songs")
    response_2 = requests.patch(server.base_uri + "/songs")
    response_3 = requests.patch(server.base_uri + "/songs")
    assert response_0.status_code == 204
    assert response_1.status_code == 204
    assert response_2.status_code == 204
    assert response_3.status_code == 500
    assert response_3.text == "Server has not responses for [PATCH] http://localhost:8081/songs"
    assert response_3.headers["content-type"] == "text/plain"
项目:py_fake_server    作者:Telichkin    | 项目源码 | 文件源码
def test_explicit_content_type_remove_application_json(server: FakeServer):
    (server.
        on_("patch", "/games").
        response(status=200, json={"change": "content-type"}, content_type="text/plain"))

    response = requests.patch(server.base_uri + "/games")
    assert response.status_code == 200
    assert response.headers["Content-Type"] == "text/plain"
项目:py_fake_server    作者:Telichkin    | 项目源码 | 文件源码
def test_with_body_not_raise_error(server: FakeServer):
    requests.patch(server.base_uri + "/sleep", data="Good night!")

    expect_that(server.was_requested("patch", "/sleep").
                for_the_first_time().
                with_body("Good night!"))
项目:py_fake_server    作者:Telichkin    | 项目源码 | 文件源码
def test_chain_of_different_with_errors(server: FakeServer):
    requests.patch(server.base_uri + "/users", json={"name": "new_name"})

    with pytest.raises(AssertionError) as error:
        expect_that(server.was_requested("patch", "/users").
                    for_the_first_time().
                    with_content_type("text/plain").
                    with_body("old name"))

    assert str(error.value) == ("Expect that server was requested with [PATCH] http://localhost:8081/users.\n"
                                "For the 1 time: with content type 'text/plain'.\n"
                                "But for the 1 time: content type was 'application/json'.\n"
                                "For the 1 time: with body 'old name'.\n"
                                "But for the 1 time: body was '{\"name\": \"new_name\"}'.")