Python requests 模块,delete() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.delete()

项目:upstox-python    作者:upstox    | 项目源码 | 文件源码
def api_call(self, url, http_method, data):

        headers = {"Content-Type" : "application/json", "x-api-key" : self.api_key,
                   "authorization" : "Bearer " + self.access_token}

        r = None

        if http_method is PyCurlVerbs.POST:
            r = requests.post(url, data=json.dumps(data), headers=headers)
        elif http_method is PyCurlVerbs.DELETE:
            r = requests.delete(url, headers=headers)
        elif http_method is PyCurlVerbs.PUT:
            r = requests.put(url, data=json.dumps(data), headers=headers)
        elif http_method is PyCurlVerbs.GET:
            r = requests.get(url, headers=headers)

        return r
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def delete(api_key_id):
    """
    Delete an api key with the provided ID
    """
    action, url = _url((api_key_id,))
    headers = api_utils.generate_headers('owner', method='DELETE', body='', action=action)

    try:
        response = requests.delete(url, headers=headers)
        if response_utils.response_error(response):
            sys.stderr.write('Deleting api key failed.')
            sys.exit(1)
        elif response.status_code == 204:
            sys.stdout.write('Deleted api key with id: %s \n' % api_key_id)
    except requests.exceptions.RequestException as error:
        sys.stderr.write(error)
        sys.exit(1)
项目:django_pipedrive    作者:MasAval    | 项目源码 | 文件源码
def call_api(self, method, endpoint, payload):
        url = urlparse.urljoin(self.api_base_url, endpoint)

        if method == 'POST':
            response = requests.post(url, data=payload)
        elif method == 'delete':
            response = requests.delete(url)
        elif method == 'put':
            response = requests.put(url, data=payload)
        else:
            if self.api_key:
                payload.update({'api_token': self.api_key})
            response = requests.get(url, params=payload)

        content = json.loads(response.content)

        return content
项目:foremast    作者:gogoair    | 项目源码 | 文件源码
def delete_pipeline(app='', pipeline_name=''):
    """Delete _pipeline_name_ from _app_."""
    safe_pipeline_name = normalize_pipeline_name(name=pipeline_name)
    url = murl.Url(API_URL)

    LOG.warning('Deleting Pipeline: %s', safe_pipeline_name)

    url.path = 'pipelines/{app}/{pipeline}'.format(app=app, pipeline=safe_pipeline_name)
    response = requests.delete(url.url, verify=GATE_CA_BUNDLE, cert=GATE_CLIENT_CERT)

    if not response.ok:
        LOG.debug('Delete response code: %d', response.status_code)
        if response.status_code == requests.status_codes.codes['method_not_allowed']:
            raise SpinnakerPipelineDeletionFailed('Failed to delete "{0}" from "{1}", '
                                                  'possibly invalid Pipeline name.'.format(safe_pipeline_name, app))
        else:
            LOG.debug('Pipeline missing, no delete required.')

    LOG.debug('Deleted "%s" Pipeline response:\n%s', safe_pipeline_name, response.text)

    return response.text
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def delete_log(log_id):
    """
    Delete a log with the provided log ID
    """
    headers = api_utils.generate_headers('rw')

    try:
        response = requests.delete(_url(('logs', log_id))[1], headers=headers)
        if response_utils.response_error(response):
            sys.stderr.write('Delete log failed.')
            sys.exit(1)
        elif response.status_code == 204:
            sys.stdout.write('Deleted log with id: %s \n' % log_id)
    except requests.exceptions.RequestException as error:
        sys.stderr.write(error)
        sys.exit(1)
项目:lecli    作者:rapid7    | 项目源码 | 文件源码
def delete_team(team_id):
    """
    Delete a team with the provided team ID.
    """
    headers = api_utils.generate_headers('rw')

    try:
        response = requests.delete(_url((team_id,))[1], headers=headers)
        if response_utils.response_error(response):  # Check response has no errors
            click.echo('Delete team failed.', err=True)
            sys.exit(1)
        elif response.status_code == 204:
            click.echo('Deleted team with id: %s.' % team_id)
    except requests.exceptions.RequestException as error:
        click.echo(error, err=True)
        sys.exit(1)
项目:aniping    作者:kuruoujou    | 项目源码 | 文件源码
def remove_show(self, beid):
        """Remove a given show from sonarr.

        It will not delete files. The backend ID we're given is not the ID we need, so the show
        is looked up first. It will only delete shows if the DELETE_SHOWS config value is set.

        Args:
            beid (int): The TVDB ID of the show.
        """
        _logger.debug("Entering remove_show. Getting all shows being watched from sonarr.")
        if self._delete_shows:
            _logger.debug("Config line set to delete shows from sonarr. Continuing.")
            shows = self.get_watching_shows()
            _logger.debug("Got all shows. Attempting to find show with ID {0}".format(beid))
            show = [x for x in shows if x['beid'] == beid][0]
            _logger.debug("Found show {0}. Deleting.".format(show['title']))
            requests.delete("{0}/api/series/{1}?apikey={2}".format(self.url, show['id'], self.api_key))
        _logger.debug("Config line set to leave shows in sonarr, exiting with no changes.")
项目:ucloud-storage-python-wrapper    作者:wishket    | 项目源码 | 文件源码
def post_account_metadata(self, params, action):
        """
        post account's metadata for add, delete
        :param params: params should iterable by key, value
        :param action: means add or delete ['add', 'delete']
        :return: result status code
        """
        # post account's metadata
        url = self.url

        headers = self.base_headers
        for key, value in params.iteritems():
            if action == 'add':
                headers['X-Account-Meta-' + key] = value
            else:
                headers['X-Remove-Account-Meta-' + key] = value
        response = requests.post(url, headers=headers)
        return response.status_code
项目:ucloud-storage-python-wrapper    作者:wishket    | 项目源码 | 文件源码
def post_container_metadata(self, container_name, params, action):
        """
        post account's metadata for add, delete
        :param params: params should iterable by key, value
        :param action: means add or delete ['add', 'delete']
        :return: result status code
        """
        # post account's metadata
        url = self.url + '/' + container_name

        headers = self.base_headers
        for key, value in params.iteritems():
            if action == 'add':
                headers['X-Container-Meta-' + key] = value
            else:
                headers['X-Remove-Container-Meta-' + key] = value
        response = requests.post(url, headers=headers)
        return response.status_code
项目:ucloud-storage-python-wrapper    作者:wishket    | 项目源码 | 文件源码
def post_object_metadata(self, container_name, file_name, params, action):
        """
        post account's metadata for add, delete
        :param params: params should iterable by key, value
        :param action: means add or delete ['add', 'delete']
        :return: result status code
        """
        # post account's metadata
        url = self.url + '/' + container_name + '/' + file_name

        headers = self.base_headers
        for key, value in params.iteritems():
            if action == 'add':
                headers['X-Container-Meta-' + key] = value
            else:
                headers['X-Remove-Container-Meta-' + key] = value
        response = check_response_status(
            requests.post(url, headers=headers)
        )
        return response.status_code
项目:ucloud-storage-python-wrapper    作者:wishket    | 项目源码 | 文件源码
def post_container_metadata(self, container_name, params, action):
        """
        post account's metadata for add, delete
        :param container_name: target container name
        :param params: params should iterable by key, value
        :param action: means add or delete ['add', 'delete']
        :return: result status code
        """
        # post account's metadata
        url = self.url + '/' + container_name

        headers = self.base_headers
        for key, value in params.iteritems():
            if action == 'add':
                headers['X-Container-Meta-' + key] = value
            else:
                headers['X-Remove-Container-Meta-' + key] = value
        response = requests.post(url, headers=headers)
        return response.status_code
项目:ucloud-storage-python-wrapper    作者:wishket    | 项目源码 | 文件源码
def post_object_metadata(self, container_name, file_name, params, action):
        """
        post account's metadata for add, delete
        :param params: params should iterable by key, value
        :param action: means add or delete ['add', 'delete']
        :return: result status code
        """
        # post account's metadata
        url = self.url + '/' + container_name + '/' + file_name

        headers = self.base_headers
        for key, value in params.iteritems():
            if action == 'add':
                headers['X-Container-Meta-' + key] = value
            else:
                headers['X-Remove-Container-Meta-' + key] = value
        response = check_response_status(
            requests.post(url, headers=headers)
        )
        return response.status_code
项目:python-wp    作者:myles    | 项目源码 | 文件源码
def delete_post(self, pk, force=False):
        """
        Delete a Post.

        Arguments
        ---------

        pk : int
            The post id you want to delete.
        force : bool
            Whether to bypass trash and force deletion.
        """
        resp = self._delete('posts/{0}'.format(pk), params=locals())

        if resp.status_code == 200:
            return True
        else:
            raise Exception(resp.json())

    # Post Reivion Methods
项目:provisioning-lib    作者:meraki    | 项目源码 | 文件源码
def deladmin(apikey, orgid, adminid, suppressprint=False):
    #
    # Confirm API Key has Admin Access Otherwise Raise Error
    #
    __hasorgaccess(apikey, orgid)
    calltype = 'Administrator'

    delurl = '{0}/organizations/{1}/admins/{2}'.format(str(base_url), str(orgid), str(adminid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }
    dashboard = requests.delete(delurl, headers=headers)
    #
    # Call return handler function to parse Dashboard response
    #
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result


### CLIENTS ###

# List the clients of a device, up to a maximum of a month ago. The usage of each client is returned in kilobytes. If the device is a switch, the switchport is returned; otherwise the switchport field is null.
# https://dashboard.meraki.com/api_docs#list-the-clients-of-a-device-up-to-a-maximum-of-a-month-ago
项目:provisioning-lib    作者:meraki    | 项目源码 | 文件源码
def deltemplate(apikey, orgid, templateid, suppressprint=False):
    #
    # Confirm API Key has Admin Access Otherwise Raise Error
    #
    __hasorgaccess(apikey, orgid)
    calltype = 'Template'

    delurl = '{0}/organizations/{1}/configTemplates/{2}'.format(str(base_url), str(orgid), str(templateid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }
    dashboard = requests.delete(delurl, headers=headers)
    #
    # Call return handler function to parse Dashboard response
    #
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result


### DEVICES ###

# List the devices in a network
# https://dashboard.meraki.com/api_docs#list-the-devices-in-a-network
项目:provisioning-lib    作者:meraki    | 项目源码 | 文件源码
def delnetwork(apikey, networkid, suppressprint=False):
    calltype = 'Network'
    delurl = '{0}/networks/{1}'.format(str(base_url), str(networkid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }
    dashboard = requests.delete(delurl, headers=headers)
    #
    # Call return handler function to parse Dashboard response
    #
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result


# Bind a network to a template.
# https://dashboard.meraki.com/api_docs#bind-a-network-to-a-template
项目:provisioning-lib    作者:meraki    | 项目源码 | 文件源码
def delphone(apikey, networkid, serial, suppressprint=False):
    calltype = 'Phone Assignment'
    delurl = '{0}/networks/{1}/phoneAssignments/{2}'.format(str(base_url), str(networkid), str(serial))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }
    dashboard = requests.delete(delurl, headers=headers)
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result


### PHONE CONTACTS ###

# List the phone contacts in a network
# https://dashboard.meraki.com/api_docs#list-the-phone-contacts-in-a-network
项目:provisioning-lib    作者:meraki    | 项目源码 | 文件源码
def delcontact(apikey, networkid, contactid, suppressprint=False):
    calltype = 'Phone Contact'
    delurl = '{0}/networks/{1}/phoneContacts/{2}'.format(str(base_url), str(networkid), str(contactid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }

    dashboard = requests.delete(delurl, headers=headers)
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result


### PHONE NUMBERS ###

# List all the phone numbers in a network
# https://dashboard.meraki.com/api_docs#list-all-the-phone-numbers-in-a-network
项目:provisioning-lib    作者:meraki    | 项目源码 | 文件源码
def delsamlrole(apikey, orgid, roleid, suppressprint=False):
    #
    # Confirm API Key has Admin Access Otherwise Raise Error
    #
    __hasorgaccess(apikey, orgid)
    calltype = 'SAML Role'

    delurl = '{0}/organizations/{1}/samlRoles/{2}'.format(str(base_url), str(orgid), str(roleid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }
    dashboard = requests.delete(delurl, headers=headers)
    #
    # Call return handler function to parse Dashboard response
    #
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result


### SM (Systems Manager) ###

# List the devices enrolled in an SM network with various specified fields and filters
# https://dashboard.meraki.com/api_docs#list-the-devices-enrolled-in-an-sm-network-with-various-specified-fields-and-filters
项目:provisioning-lib    作者:meraki    | 项目源码 | 文件源码
def addstaticroute(apikey, networkid, name, subnet, ip, suppressprint=False):
    calltype = 'Static Route'
    posturl = '{0}/networks/{1}/staticRoutes'.format(str(base_url), str(networkid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }
    postdata = {
        'name': format(str(name)),
        'subnet': format(str(subnet)),
        'gatewayIp': format(str(ip))
    }

    dashboard = requests.post(posturl, data=json.dumps(postdata), headers=headers)
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result


# Delete a static route from a network
# https://dashboard.meraki.com/api_docs#delete-a-static-route-from-a-network
项目:provisioning-lib    作者:meraki    | 项目源码 | 文件源码
def delstaticroute(apikey, networkid, routeid, suppressprint=False):
    calltype = 'Static Route'
    delurl = '{0}/networks/{1}/staticRoutes/{2}'.format(str(base_url), str(networkid), str(routeid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }

    dashboard = requests.delete(delurl, headers=headers)
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result


### SWITCH PORTS ###

# List the switch ports for a switch
# https://dashboard.meraki.com/api_docs#list-the-switch-ports-for-a-switch
项目:provisioning-lib    作者:meraki    | 项目源码 | 文件源码
def delvlan(apikey, networkid, vlanid, suppressprint=False):
    calltype = 'VLAN'
    delurl = '{0}/networks/{1}/vlans/{2}'.format(str(base_url), str(networkid), str(vlanid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }
    dashboard = requests.delete(delurl, headers=headers)
    #
    # Call return handler function to parse Dashboard response
    #
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result



# MX performance score
项目:provisioning-lib    作者:meraki    | 项目源码 | 文件源码
def deltemplate(apikey, orgid, templateid, suppressprint=False):
    #
    # Confirm API Key has Admin Access Otherwise Raise Error
    #
    __hasorgaccess(apikey, orgid)
    calltype = 'Template'

    delurl = '{0}/organizations/{1}/configTemplates/{2}'.format(str(base_url), str(orgid), str(templateid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }
    dashboard = requests.delete(delurl, headers=headers)
    #
    # Call return handler function to parse Dashboard response
    #
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result
项目:provisioning-lib    作者:meraki    | 项目源码 | 文件源码
def delsamlrole(apikey, orgid, roleid, suppressprint=False):
    #
    # Confirm API Key has Admin Access Otherwise Raise Error
    #
    __hasorgaccess(apikey, orgid)
    calltype = 'SAML Role'

    delurl = '{0}/organizations/{1}/samlRoles/{2}'.format(str(base_url), str(orgid), str(roleid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }
    dashboard = requests.delete(delurl, headers=headers)
    #
    # Call return handler function to parse Dashboard response
    #
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result
项目:provisioning-lib    作者:meraki    | 项目源码 | 文件源码
def deladmin(apikey, orgid, adminid, suppressprint=False):
    #
    # Confirm API Key has Admin Access Otherwise Raise Error
    #
    __hasorgaccess(apikey, orgid)
    calltype = 'Administrator'

    delurl = '{0}/organizations/{1}/admins/{2}'.format(str(base_url), str(orgid), str(adminid))
    headers = {
        'x-cisco-meraki-api-key': format(str(apikey)),
        'Content-Type': 'application/json'
    }
    dashboard = requests.delete(delurl, headers=headers)
    #
    # Call return handler function to parse Dashboard response
    #
    result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint)
    return result
项目:provisioning-lib    作者:meraki    | 项目源码 | 文件源码
def del_admin(self, admin_id):
        """ Delete a specified admin account.
            Args:
                admin_id: ID string or email of the admin to be deleted.
            Returns:
                deleted: The request object of the deleted admin, or None if the
                passed admin ID doesn't exist.
        """

        exists = self.__admin_exists(admin_id)
        if not exists:
            return None
        elif not admin_id.isdigit():
            admin_id = exists["id"]

        url = self.url+"/"+admin_id

        return requests.delete(url, headers=self.headers)
项目:vmware-nsx-tempest-plugin    作者:openstack    | 项目源码 | 文件源码
def cleanup_os_logical_ports(self):
        """
        Delete all logical ports created by OpenStack
        """
        lports = self.get_logical_ports()
        os_lports = self.get_os_resources(lports)
        LOG.info("Number of OS Logical Ports to be deleted: %s",
                 len(os_lports))
        # logical port vif detachment
        self.update_logical_port_attachment(os_lports)
        for p in os_lports:
            endpoint = '/logical-ports/%s' % p['id']
            response = self.delete(endpoint=endpoint)
            if response.status_code == requests.codes.ok:
                LOG.info("Successfully deleted logical port %s", p['id'])
            else:
                LOG.error("Failed to delete lport %(port_id)s, response "
                          "code %(code)s",
                          {'port_id': p['id'], 'code': response.status_code})
项目:stackstorm-ghost2logger    作者:StackStorm-Exchange    | 项目源码 | 文件源码
def _ghost2loggergpurge(self):
        # Let's deal with the JSON API calls here
        self._logger.info('[Ghost2logger]: Purge Ghost2logger')

        HEADERS = {"Content-Type": 'application/json'}

        _URL = self._ghost_url + "/v1/all"
        try:
            r = requests.delete(_URL, headers=HEADERS, verify=False,
                                auth=(self._username, self._password))

            _data = r.json()
            self._logger.info('[Ghost2logger]: Purged ghost2logger' +
                              str(_data))
        except Exception as e:
            self._logger.info('[Ghost2logger]: Cannot purge Ghost2logger: ' +
                              str(e))
项目:Provision-API-Python-Lib    作者:apierson27    | 项目源码 | 文件源码
def del_admin(self, admin_id):
        """ Delete a specified admin account.
            Args:
                admin_id: ID string or email of the admin to be deleted.
            Returns:
                deleted: The request object of the deleted admin, or None if the
                passed admin ID doesn't exist.
        """

        exists = self.__admin_exists(admin_id)
        if not exists:
            return None
        elif not admin_id.isdigit():
            admin_id = exists["id"]

        url = self.url+"/"+admin_id

        return requests.delete(url, headers=self.headers)
项目:retrieve-and-rank-tuning    作者:rchaks    | 项目源码 | 文件源码
def _delete_test_cluster(self, cluster_id):
        self.logger.info("Attempting to clean up the test cluster that was spun up for the unit test: %s" % cluster_id)
        config = load_config()
        url, user_id, password = get_rnr_credentials(config)

        response = requests.get('%s/v1/solr_clusters/%s' % (url, cluster_id),
                                auth=(user_id, password),
                                headers={'x-global-transaction-id': 'Rishavs app',
                                         'Content-type': 'application/json'})
        response_text = json.dumps(response.json(), indent=4, sort_keys=True)
        if response.status_code == 200:
            self.logger.info('Found a test cluster that needs cleanup: %s' % response_text)
            response = requests.delete('%s/v1/solr_clusters/%s' % (url, cluster_id),
                                       auth=(user_id, password),
                                       headers={'x-global-transaction-id': 'Rishavs app',
                                                'Content-type': 'application/json'})
            response.raise_for_status()
            self.logger.info("Successfully deleted test cluster: %s" % cluster_id)
        else:
            self.logger.info('No cleanup required for cluster id: %s (got response: %s)' % (cluster_id, response_text))
项目:retrieve-and-rank-tuning    作者:rchaks    | 项目源码 | 文件源码
def _try_deleting_ranker(self, ranker_id):
        config = load_config()
        url, user_id, password = get_rnr_credentials(config)

        response = requests.get(path.join(url, 'v1/rankers', ranker_id),
                                auth=(user_id, password),
                                headers={'x-global-transaction-id': 'Rishavs app',
                                         'Content-type': 'application/json'})
        response_text = json.dumps(response.json(), indent=4, sort_keys=True)
        if response.status_code == 200:
            self.logger.info('Found a test ranker that needs cleanup: %s' % response_text)
            response = requests.delete(path.join(url, 'v1/rankers', ranker_id),
                                       auth=(user_id, password),
                                       headers={'x-global-transaction-id': 'Rishavs app',
                                                'Content-type': 'application/json'})
            response.raise_for_status()
            self.logger.info("Successfully deleted test ranker: %s" % ranker_id)
        else:
            self.logger.info('No cleanup required for ranker id: %s (got response: %s)' % (ranker_id, response_text))
项目:gremlin    作者:amalgam8    | 项目源码 | 文件源码
def delete_recipe(recipe_id):
    # clear fault injection rules
    url = '{0}/v1/rules?tag={1}'.format(a8_controller_url, recipe_id)
    headers = {}
    if a8_controller_token != "" :
        headers['Authorization'] = "Bearer " + token

    try:
        r = requests.delete(url, headers=headers)
    except Exception, e:
        sys.stderr.write("Could not DELETE {0}".format(url))
        sys.stderr.write("\n")
        sys.stderr.write(str(e))
        sys.stderr.write("\n")
        abort(500, "Could not DELETE {0}".format(url))

    if r.status_code != 200 and r.status_code != 204:
        abort(r.status_code)

    return ""
项目:gremlin    作者:amalgam8    | 项目源码 | 文件源码
def clear_rules_from_all_proxies(self):
        """
            Clear fault injection rules from all known service proxies.
        """
        if self.debug:
            print 'Clearing rules'
        try:
            headers = {"Content-Type" : "application/json"}
            if self.a8_controller_token != "" :
                headers['Authorization'] = "Bearer " + self.a8_controller_token
            for rule_id in self._rule_ids:
                resp = requests.delete(self.a8_controller_url + "?id=" + rule_id,
                                       headers = headers)
                resp.raise_for_status()
        except requests.exceptions.ConnectionError, e:
            print "FAILURE: Could not communicate with control plane %s" % self.a8_controller_url
            print e
            sys.exit(3)

    #TODO: Create a plugin model here, to support gremlinproxy and nginx
项目:son-mano-framework    作者:sonata-nfv    | 项目源码 | 文件源码
def roll_back_instantiation(self, serv_id):
        """
        This method tries to roll back the instantiation workflow if an error
        occured. It will send messages to the SMR and the IA to remove deployed
        SSMs, FSMs and stacks. It will instruct the Repositories to delete the
        records.
        """

        # Kill the stack
        corr_id = str(uuid.uuid4())
        payload = json.dumps({'instance_uuid': serv_id})
        self.manoconn.notify(t.IA_REMOVE,
                             payload,
                             reply_to=t.IA_REMOVE,
                             correlation_id=corr_id)

        # Kill the SSMs and FSMs
        self.terminate_ssms(serv_id, require_resp=False)

        self.terminate_fsms(serv_id, require_resp=False)

        LOG.info("Instantiation aborted, cleanup completed")

        # TODO: Delete the records
项目:son-mano-framework    作者:sonata-nfv    | 项目源码 | 文件源码
def tearDown(self):
        #Killing the scaling Executive
        if self.plex_proc is not None:
            self.plex_proc.terminate()
        del self.plex_proc

        #Killing the connection with the broker
        try:
            self.manoconn.stop_connection()
            self.sm_connection.stop_connection()
        except Exception as e:
            LOG.exception("Stop connection exception.")

        #Clearing the threading helpers
        del self.wait_for_event1
        del self.wait_for_event2

        url_user = "{0}/api/users/specific-management".format(self.man_host)
        url_vhost = "{0}/api/vhosts/fsm-1234".format(self.man_host)
        requests.delete(url=url_user, headers=self.headers, auth=('guest', 'guest'))
        requests.delete(url=url_vhost, headers=self.headers, auth=('guest', 'guest'))

    #Method that terminates the timer that waits for an event
项目:SigSciApiPy    作者:signalsciences    | 项目源码 | 文件源码
def delete_configuration(self, EP):
        try:
            url = self.base_url + self.CORPS_EP + self.corp + self.SITES_EP + self.site + EP

            with open(self.file) as data_file:
                data = json.load(data_file)

            for config in data['data']:
                requests.delete(url + "/" + config['id'], cookies=self.authn.cookies, headers=self.get_headers())

            print("Delete complete!")

        except Exception as e:
            print('Error: %s ' % str(e))
            print('Query: %s ' % url)
            quit()
项目:zendesk-utils    作者:trailbehind    | 项目源码 | 文件源码
def delete_all_zendesk_articles(self, category):
    '''
        delete all articles in the category being filled with UV articles
        for our app, we had multiple uservoice knowledge bases, and each gets put in a zendesk category
    '''

    print "**DELETING ALL SECTIONS/ARTICLES in destination category {}".format(category)
    url = '{}/api/v2/help_center/categories/{}/sections.json'.format(self.zendesk_url, category)
    response = requests.get(url, headers=self.headers, auth=self.credentials)
    if response.status_code != 200:
      print('FAILED to delete articles with error {}'.format(response.status_code))
      exit()
    sections = response.json()['sections']
    section_ids=list(section['id'] for section in sections)

    for section_id in section_ids:
      url = "{}/api/v2/help_center/sections/{}.json".format(self.zendesk_url, section_id)
      response = requests.delete(url, headers=self.headers, auth=self.credentials)
      if response.status_code != 204:
        print('FAILED to delete sections for category {} with error {}'.format(category, response.status_code))
        exit()
项目:tintri-python-sdk    作者:Tintri    | 项目源码 | 文件源码
def _send_raw_http_request(self, method, url, data=None):
        self.__logger.debug('%s %s' % (method, url))
        if method in ['POST', 'PUT', 'PATCH']:
            self.__logger.log(TINTRI_LOG_LEVEL_DATA, 'Data: %s' % data)

        headers = {'content-type': 'application/json'}
        if self.__session_id:
            headers['cookie'] = 'JSESSIONID=%s' % self.__session_id

        if method in ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']:
            if method == 'GET': httpresp = requests.get(url, headers=headers, verify=False)
            elif method == 'POST': httpresp = requests.post(url, data, headers=headers, verify=False)
            elif method == 'PUT': httpresp = requests.put(url, data, headers=headers, verify=False)
            elif method == 'PATCH': httpresp = requests.patch(url, data, headers=headers, verify=False)
            elif method == 'DELETE': httpresp = requests.delete(url, headers=headers, verify=False)
            self._httpresp = httpresp # self._httpresp is for debugging only, not thread-safe
            return httpresp
        else:
            raise TintriError(None, message='Invalid HTTP method: ' + method) # This should never happen
项目:phpIPAM    作者:michaelluich    | 项目源码 | 文件源码
def sections_delete(self, section_id,):
        headers = {'token': self.token}
        p = requests.delete(self.base + "sections/%s/" %(section_id), headers=headers)
        print p.text
        sections_delete = json.loads(p.text)

        if p.status_code != 200:
            logging.error("phpipam.sections_delete: Failure %s" % (p.status_code))
            logging.error(sections_delete)
            self.error = p.status_code
            self.error_message = sections_delete['message']
            return self.error, self.error_message

        if not sections_delete['success']:
            logging.error("phpipam.sections_delete: FAILURE: %s" % (sections_delete['code']))
            self.error = sections_delete['code']
            return self.error

        logging.info("phpipam.sections_delete: success %s" % (sections_delete['success']))
        return sections_delete['code']
项目:phpIPAM    作者:michaelluich    | 项目源码 | 文件源码
def subnet_delete(self, subnet_id, ):
        headers = {'token': self.token}
        p = requests.delete(self.base + "subnets/%s/" % (subnet_id), headers=headers)
        print p.text
        subnets_delete = json.loads(p.text)

        if p.status_code != 200:
            logging.error("phpipam.subnets_delete: Failure %s" % (p.status_code))
            logging.error(subnets_delete)
            self.error = p.status_code
            self.error_message = subnets_delete['message']
            return self.error, self.error_message

        if not subnets_delete['success']:
            logging.error("phpipam.subnets_delete: FAILURE: %s" % (subnets_delete['code']))
            self.error = subnets_delete['code']
            return self.error

        logging.info("phpipam.subnets_delete: success %s" % (subnets_delete['success']))
        return subnets_delete['code']
项目:phpIPAM    作者:michaelluich    | 项目源码 | 文件源码
def vlan_delete(self, vlan_id, ):
        headers = {'token': self.token}
        p = requests.delete(self.base + "vlans/%s/" % (vlan_id), headers=headers)
        print p.text
        vlans_delete = json.loads(p.text)

        if p.status_code != 200:
            logging.error("phpipam.vlans_delete: Failure %s" % (p.status_code))
            logging.error(vlans_delete)
            self.error = p.status_code
            self.error_message = vlans_delete['message']
            return self.error, self.error_message

        if not vlans_delete['success']:
            logging.error("phpipam.vlans_delete: FAILURE: %s" % (vlans_delete['code']))
            self.error = vlans_delete['code']
            return self.error

        logging.info("phpipam.vlans_delete: success %s" % (vlans_delete['success']))
        return vlans_delete['code']
项目:fbbotw    作者:JoabMendes    | 项目源码 | 文件源码
def delete_domain_whitelist():
    """ Deletes the domain whitelist set previously .

    :usage:

        >>> response = fbbotw.delete_domain_whitelist()
    :return: `Response object <http://docs.python-requests.org/en/\
    master/api/#requests.Response>`_
    :facebook docs: `/domain-whitelisting#delete <https://developers.facebook.\
    com/docs/messenger-platform/messenger-profile/domain-whitelisting#delete>`_
    """
    url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN)
    payload = {}
    payload['fields'] = ["whitelisted_domains"]
    data = json.dumps(payload)
    status = requests.delete(url, headers=HEADER, data=data)
    return status
项目:desec-stack    作者:desec-io    | 项目源码 | 文件源码
def _pdns_delete(url):
    # We first delete the zone from nslord, the main authoritative source of our DNS data.
    # However, we do not want to wait for the zone to expire on the slave ("nsmaster").
    # We thus issue a second delete request on nsmaster to delete the zone there immediately.
    r1 = requests.delete(settings.NSLORD_PDNS_API + url, headers=headers_nslord)
    if r1.status_code < 200 or r1.status_code >= 300:
        # Deletion technically does not fail if the zone didn't exist in the first place
        if r1.status_code == 422 and 'Could not find domain' in r1.text:
            pass
        else:
            raise PdnsException(r1)

    # Delete from nsmaster as well
    r2 = requests.delete(settings.NSMASTER_PDNS_API + url, headers=headers_nsmaster)
    if r2.status_code < 200 or r2.status_code >= 300:
        # Deletion technically does not fail if the zone didn't exist in the first place
        if r2.status_code == 422 and 'Could not find domain' in r2.text:
            pass
        else:
            raise PdnsException(r2)

    return (r1, r2)
项目:esignature-recipes-python    作者:docusign    | 项目源码 | 文件源码
def logs_download():
    """Download (and then delete) the logs from DocuSign

    The logs are stored in files/log/<account_id>/log_<timestamp>_<orig_name>
    <orig_name> is the original name of the file from the platform. Eg:
        00_OK_GetAccountSharedAccess.txt
        01_OK_GetFolderList.txt
        02_OK_ExecuteLoggedApiBusinessLogic.txt
        03_Created_RequestRecipientToken.txt
        04_OK_GetEnvelope.txt
        05_OK_GetEnvelope.txt
        06_OK_SendEnvelope.txt

    Returns {err: False or a problem string, entries: log_entries, err_code }
        returns an array of just the new log_entries
    """
    global auth
    auth = ds_authentication.get_auth()
    if auth["err"]:
        return {"err": auth["err"], "err_code": auth["err_code"]}
    r = logging_do_download() # returns {err, new_entries}
    return r
项目:python-moira-client    作者:moira-alert    | 项目源码 | 文件源码
def delete(self, path='', **kwargs):
        """

        :param path: str api path
        :param kwargs: additional parameters for request
        :return: dict response

        :raises: HTTPError
        :raises: InvalidJSONError
        """
        r = requests.delete(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs)
        r.raise_for_status()
        try:
            return r.json()
        except ValueError:
            raise InvalidJSONError(r.content)
项目:ATX    作者:NetEaseGame    | 项目源码 | 文件源码
def httpdo(method, url, data=None):
    """
    Do HTTP Request
    """
    if isinstance(data, dict):
        data = json.dumps(data)
    if DEBUG:
        print "Shell: curl -X {method} -d '{data}' '{url}'".format(method=method, data=data or '', url=url)

    fn = dict(GET=requests.get, POST=requests.post, DELETE=requests.delete)[method]
    response = fn(url, data=data)
    retjson = response.json()
    if DEBUG:
        print 'Return:', json.dumps(retjson, indent=4)
    r = convert(retjson)
    if r.status != 0:
        raise WebDriverError(r.status, r.value)
    return r
项目:aliyun-log-python-sdk    作者:aliyun    | 项目源码 | 文件源码
def delete_logstore(self, project_name, logstore_name):
        """ delete log store
        Unsuccessful opertaion will cause an LogException.

        :type project_name: string
        :param project_name: the Project name 

        :type logstore_name: string
        :param logstore_name: the logstore name

        :return: DeleteLogStoreResponse

        :raise: LogException
        """
        headers = {}
        params = {}
        resource = "/logstores/" + logstore_name
        (resp, header) = self._send("DELETE", project_name, None, resource, params, headers)
        return DeleteLogStoreResponse(header, resp)
项目:aliyun-log-python-sdk    作者:aliyun    | 项目源码 | 文件源码
def delete_shard(self, project_name, logstore_name, shardId):
        """ delete a readonly shard 
        Unsuccessful opertaion will cause an LogException.

        :type project_name: string
        :param project_name: the Project name 

        :type logstore_name: string
        :param logstore_name: the logstore name

        :type shardId: int
        :param shardId: the read only shard id

        :return: ListShardResponse

        :raise: LogException
        """
        headers = {}
        params = {}
        resource = "/logstores/" + logstore_name + "/shards/" + str(shardId)
        (resp, header) = self._send("DELETE", project_name, None, resource, params, headers)
        return DeleteShardResponse(header, resp)
项目:aliyun-log-python-sdk    作者:aliyun    | 项目源码 | 文件源码
def delete_index(self, project_name, logstore_name):
        """ delete index of a logstore
        Unsuccessful opertaion will cause an LogException.

        :type project_name: string
        :param project_name: the Project name 

        :type logstore_name: string
        :param logstore_name: the logstore name

        :return: DeleteIndexResponse

        :raise: LogException
        """

        headers = {}
        params = {}
        resource = "/logstores/" + logstore_name + "/index"
        (resp, header) = self._send("DELETE", project_name, None, resource, params, headers)
        return DeleteIndexResponse(header, resp)
项目:aliyun-log-python-sdk    作者:aliyun    | 项目源码 | 文件源码
def delete_logtail_config(self, project_name, config_name):
        """ delete logtail config in a project
        Unsuccessful opertaion will cause an LogException.

        :type project_name: string
        :param project_name: the Project name 

        :type config_name: string
        :param config_name: the logtail config name

        :return: DeleteLogtailConfigResponse

        :raise: LogException
        """

        headers = {}
        params = {}
        resource = "/configs/" + config_name
        (resp, headers) = self._send("DELETE", project_name, None, resource, params, headers)
        return DeleteLogtailConfigResponse(headers, resp)