Python http.client 模块,OK 实例源码

我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用http.client.OK

项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_validate_image_status_before_upload_ok_v1(self):
        mock_conn = mock.Mock()
        fake_url = 'http://fake_host/fake_path/fake_image_id'
        mock_check_resp_status_and_retry = self.mock_patch_object(
            self.glance, 'check_resp_status_and_retry')
        mock_head_resp = mock.Mock()
        mock_head_resp.status = httplib.OK
        mock_head_resp.read.return_value = 'fakeData'
        mock_head_resp.getheader.return_value = 'queued'
        mock_conn.getresponse.return_value = mock_head_resp

        self.glance.validate_image_status_before_upload_v1(
            mock_conn, fake_url, extra_headers=mock.Mock())

        self.assertTrue(mock_conn.getresponse.called)
        self.assertEqual(mock_head_resp.read.call_count, 2)
        self.assertFalse(mock_check_resp_status_and_retry.called)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_validate_image_status_before_upload_req_head_exception_v1(self):
        mock_conn = mock.Mock()
        mock_conn.request.side_effect = Fake_HTTP_Request_Error()
        fake_url = 'http://fake_host/fake_path/fake_image_id'
        mock_head_resp = mock.Mock()
        mock_head_resp.status = httplib.OK
        mock_head_resp.read.return_value = 'fakeData'
        mock_head_resp.getheader.return_value = 'queued'
        mock_conn.getresponse.return_value = mock_head_resp

        self.assertRaises(self.glance.RetryableError,
                          self.glance.validate_image_status_before_upload_v1,
                          mock_conn, fake_url, extra_headers=mock.Mock())
        mock_conn.request.assert_called_once()
        mock_head_resp.read.assert_not_called()
        mock_conn.getresponse.assert_not_called()
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_validate_image_status_before_upload_ok_v2_using_api_service(self):
        mock_conn = mock.Mock()
        fake_url = 'http://fake_host:fake_port/fake_path/fake_image_id'
        mock_check_resp_status_and_retry = self.mock_patch_object(
            self.glance, 'check_resp_status_and_retry')
        mock_head_resp = mock.Mock()
        mock_head_resp.status = httplib.OK
        mock_head_resp.read.return_value = '{"status": "queued"}'
        mock_conn.getresponse.return_value = mock_head_resp
        fake_extra_headers = mock.Mock()
        expected_api_path = '/v2/images/%s' % 'fake_image_id'

        self.glance.validate_image_status_before_upload_v2(
            mock_conn, fake_url, fake_extra_headers, expected_api_path)

        self.assertTrue(mock_conn.getresponse.called)
        self.assertEqual(
            mock_head_resp.read.call_count, 2)
        self.assertFalse(mock_check_resp_status_and_retry.called)
        mock_conn.request.assert_called_with('GET',
                                             '/v2/images/fake_image_id',
                                             headers=fake_extra_headers)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_validate_image_status_before_upload_ok_v2_using_uwsgi(self):
        mock_conn = mock.Mock()
        fake_url = 'http://fake_host/fake_path/fake_image_id'
        mock_check_resp_status_and_retry = self.mock_patch_object(
            self.glance, 'check_resp_status_and_retry')
        mock_head_resp = mock.Mock()
        mock_head_resp.status = httplib.OK
        mock_head_resp.read.return_value = '{"status": "queued"}'
        mock_conn.getresponse.return_value = mock_head_resp

        fake_extra_headers = mock.Mock()
        fake_patch_path = 'fake_patch_path'

        self.glance.validate_image_status_before_upload_v2(
            mock_conn, fake_url, fake_extra_headers, fake_patch_path)

        self.assertTrue(mock_conn.getresponse.called)
        self.assertEqual(
            mock_head_resp.read.call_count, 2)
        self.assertFalse(mock_check_resp_status_and_retry.called)
        mock_conn.request.assert_called_with('GET',
                                             'fake_patch_path',
                                             headers=fake_extra_headers)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def fetch_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @return data retrieved from URL or None
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        return return_data
    else:
        raise URLFetchError(return_message)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def fetch_from_url_to_file(url, config, output_file, data=None, handlers=None):
    """Writes data retrieved from a URL to a file.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param output_file: output file
    @type output_file: basestring
    @return: tuple (
        returned HTTP status code or 0 if an error occurred
        returned message
        boolean indicating whether access was successful)
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        outfile = open(output_file, "w")
        outfile.write(return_data)
        outfile.close()

    return return_code, return_message, return_code == http_client_.OK
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def fetch_stream_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param data: HTTP POST data
    @type data: str
    @param handlers: list of custom urllib2 handlers to add to the request
    @type handlers: iterable
    @return: data retrieved from URL or None
    @rtype: file derived type
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return response
    else:
        raise URLFetchError(return_message)
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def test_get_picture__success(self):
        test_utils.setup_images()
        item = Item.create(**TEST_ITEM)
        picture = Picture.create(item=item, **TEST_PICTURE)

        open("{path}/{picture_uuid}.jpg".format(
            path=utils.get_image_folder(),
            picture_uuid=picture.uuid), "wb")

        resp = self.app.get('/pictures/{picture_uuid}'.format(
            picture_uuid=picture.uuid))
        assert resp.status_code == client.OK
        test_picture = TEST_PICTURE.copy()
        test_picture['item_uuid'] = item.uuid
        assert resp.data == b''
        assert resp.headers['Content-Type'] == 'image/jpeg'
        test_utils.clean_images()
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def post(self):
        request_data = request.get_json(force=True)

        if 'email' not in request_data or 'password' not in request_data:
            abort(client.BAD_REQUEST)

        email = request_data['email']
        password = request_data['password']

        try:
            user = User.get(User.email == email)
        except User.DoesNotExist:
            abort(client.BAD_REQUEST)

        if not user.verify_password(password):
            abort(client.UNAUTHORIZED)

        login_user(user)
        return generate_response({}, client.OK)
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def get(self):
        query = request.args.get('query')
        limit = int(request.args.get('limit', -1))
        min_limit, max_limit = 0, 100

        limit_in_range = limit > min_limit and limit <= max_limit

        if query is not None and limit_in_range:
            matches = Item.search(query, Item.select(), limit)
            return generate_response(Item.json_list(matches), client.OK)

        def fmt_error(msg):
            return {'detail': msg}

        errors = {"errors": []}

        if not query:
            errors['errors'].append(fmt_error('Missing query.'))

        if not limit_in_range:
            msg = 'Limit out of range. must be between {} and {}. Requested: {}'
            errors['errors'].append(
                fmt_error(msg.format(min_limit, max_limit, limit)))

        return errors, client.BAD_REQUEST
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def fetch_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @return data retrieved from URL or None
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        return return_data
    else:
        raise URLFetchError(return_message)
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def fetch_from_url_to_file(url, config, output_file, data=None, handlers=None):
    """Writes data retrieved from a URL to a file.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param output_file: output file
    @type output_file: basestring
    @return: tuple (
        returned HTTP status code or 0 if an error occurred
        returned message
        boolean indicating whether access was successful)
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        outfile = open(output_file, "w")
        outfile.write(return_data)
        outfile.close()

    return return_code, return_message, return_code == http_client_.OK
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def fetch_stream_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param data: HTTP POST data
    @type data: str
    @param handlers: list of custom urllib2 handlers to add to the request
    @type handlers: iterable
    @return: data retrieved from URL or None
    @rtype: file derived type
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return response
    else:
        raise URLFetchError(return_message)
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def fetch_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @return data retrieved from URL or None
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        return return_data
    else:
        raise URLFetchError(return_message)
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def fetch_from_url_to_file(url, config, output_file, data=None, handlers=None):
    """Writes data retrieved from a URL to a file.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param output_file: output file
    @type output_file: basestring
    @return: tuple (
        returned HTTP status code or 0 if an error occurred
        returned message
        boolean indicating whether access was successful)
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        outfile = open(output_file, "w")
        outfile.write(return_data)
        outfile.close()

    return return_code, return_message, return_code == http_client_.OK
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def fetch_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @return data retrieved from URL or None
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        return return_data
    else:
        raise URLFetchError(return_message)
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def fetch_from_url_to_file(url, config, output_file, data=None, handlers=None):
    """Writes data retrieved from a URL to a file.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param output_file: output file
    @type output_file: basestring
    @return: tuple (
        returned HTTP status code or 0 if an error occurred
        returned message
        boolean indicating whether access was successful)
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        outfile = open(output_file, "w")
        outfile.write(return_data)
        outfile.close()

    return return_code, return_message, return_code == http_client_.OK
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def fetch_stream_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param data: HTTP POST data
    @type data: str
    @param handlers: list of custom urllib2 handlers to add to the request
    @type handlers: iterable
    @return: data retrieved from URL or None
    @rtype: file derived type
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return response
    else:
        raise URLFetchError(return_message)
项目:cvpysdk    作者:CommvaultEngg    | 项目源码 | 文件源码
def _is_valid_service(self):
        """Checks if the service url is a valid url or not.

            Returns:
                True - if the service url is valid

                False - if the service url is invalid

            Raises:
                requests Connection Error   --  requests.exceptions.ConnectionError

                requests Timeout Error      --  requests.exceptions.Timeout
        """
        try:
            response = requests.get(self._commcell_object._web_service, timeout=184)

            # Valid service if the status code is 200 and response is True
            return response.status_code == httplib.OK and response.ok
        except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as error:
            raise error
项目:cvpysdk    作者:CommvaultEngg    | 项目源码 | 文件源码
def _logout(self):
        """Posts a logout request to the server.

            Returns:
                str - response string from server upon logout success
        """
        flag, response = self.make_request('POST', self._commcell_object._services['LOGOUT'])

        if flag:
            self._commcell_object._headers['Authtoken'] = None

            if response.status_code == httplib.OK:
                return response.text
            else:
                return 'Failed to logout the user'
        else:
            return 'User already logged out'
项目:erpnext_quickbooks    作者:indictranstech    | 项目源码 | 文件源码
def download_pdf(self, qbbo, item_id):
        url = self.api_url + "/company/{0}/{1}/{2}/pdf".format(self.company_id, qbbo.lower(), item_id)

        if self.session is None:
            self.create_session()

        headers = {
            'Content-Type': 'application/pdf',
            'Accept': 'application/pdf, application/json',
        }

        response = self.session.request("GET", url, True, self.company_id, headers=headers)

        if response.status_code is not httplib.OK:
            try:
                json = response.json()
            except:
                raise QuickbooksException("Error reading json response: {0}".format(response.text), 10000)
            self.handle_exceptions(json["Fault"])
        else:
            return response.content
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_upload_tarball_by_url_http_v1(self):
        fake_conn = mock.Mock()
        mock_HTTPConn = self.mock_patch_object(
            self.glance, '_create_connection', fake_conn)
        mock_validate_image = self.mock_patch_object(
            self.glance, 'validate_image_status_before_upload_v1')
        mock_create_tarball = self.mock_patch_object(
            self.glance.utils, 'create_tarball')
        mock_check_resp_status = self.mock_patch_object(
            self.glance, 'check_resp_status_and_retry')
        self.glance._create_connection().getresponse = mock.Mock()
        self.glance._create_connection().getresponse().status = httplib.OK
        fake_extra_headers = {}
        fake_properties = {}
        fake_endpoint = 'http://fake_netloc/fake_path'
        expected_url = "%(glance_endpoint)s/v1/images/%(image_id)s" % {
            'glance_endpoint': fake_endpoint,
            'image_id': 'fake_image_id'}

        self.glance._upload_tarball_by_url_v1(
            'fake_staging_path', 'fake_image_id', fake_endpoint,
            fake_extra_headers, fake_properties)

        self.assertTrue(mock_HTTPConn.called)
        mock_validate_image.assert_called_with(fake_conn,
                                               expected_url,
                                               fake_extra_headers)
        self.assertTrue(mock_create_tarball.called)
        self.assertTrue(
            mock_HTTPConn.return_value.getresponse.called)
        self.assertFalse(mock_check_resp_status.called)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_upload_tarball_by_url_https_v1(self):
        fake_conn = mock.Mock()
        mock_HTTPSConn = self.mock_patch_object(
            self.glance, '_create_connection', fake_conn)
        mock_validate_image = self.mock_patch_object(
            self.glance, 'validate_image_status_before_upload_v1')
        mock_create_tarball = self.mock_patch_object(
            self.glance.utils, 'create_tarball')
        mock_check_resp_status = self.mock_patch_object(
            self.glance, 'check_resp_status_and_retry')
        self.glance._create_connection().getresponse = mock.Mock()
        self.glance._create_connection().getresponse().status = httplib.OK
        fake_extra_headers = {}
        fake_properties = {}
        fake_endpoint = 'https://fake_netloc/fake_path'
        expected_url = "%(glance_endpoint)s/v1/images/%(image_id)s" % {
            'glance_endpoint': fake_endpoint,
            'image_id': 'fake_image_id'}

        self.glance._upload_tarball_by_url_v1(
            'fake_staging_path', 'fake_image_id', fake_endpoint,
            fake_extra_headers, fake_properties)

        self.assertTrue(mock_HTTPSConn.called)
        mock_validate_image.assert_called_with(fake_conn,
                                               expected_url,
                                               fake_extra_headers)
        self.assertTrue(mock_create_tarball.called)
        self.assertTrue(
            mock_HTTPSConn.return_value.getresponse.called)
        self.assertFalse(mock_check_resp_status.called)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_update_image_meta_ok_v2_using_api_service(self):
        fake_conn = mock.Mock()
        fake_extra_headers = {'fake_type': 'fake_content'}
        fake_properties = {'fake_path': True}
        new_fake_properties = {'path': '/fake-path',
                               'value': "True",
                               'op': 'add'}
        fake_body = [
            {"path": "/container_format", "value": "ovf", "op": "add"},
            {"path": "/disk_format", "value": "vhd", "op": "add"},
            {"path": "/visibility", "value": "private", "op": "add"}]
        fake_body.append(new_fake_properties)
        fake_body_json = json.dumps(fake_body)
        fake_headers = {
            'Content-Type': 'application/openstack-images-v2.1-json-patch'}
        fake_headers.update(**fake_extra_headers)
        fake_conn.getresponse.return_value = mock.Mock()
        fake_conn.getresponse().status = httplib.OK
        expected_api_path = '/v2/images/%s' % 'fake_image_id'

        self.glance._update_image_meta_v2(fake_conn, fake_extra_headers,
                                          fake_properties, expected_api_path)
        fake_conn.request.assert_called_with('PATCH',
                                             '/v2/images/%s' % 'fake_image_id',
                                             body=fake_body_json,
                                             headers=fake_headers)
        fake_conn.getresponse.assert_called()
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_update_image_meta_ok_v2_using_uwsgi_service(self):
        fake_conn = mock.Mock()
        fake_extra_headers = {'fake_type': 'fake_content'}
        fake_properties = {'fake_path': True}
        new_fake_properties = {'path': '/fake-path',
                               'value': "True",
                               'op': 'add'}
        fake_body = [
            {"path": "/container_format", "value": "ovf", "op": "add"},
            {"path": "/disk_format", "value": "vhd", "op": "add"},
            {"path": "/visibility", "value": "private", "op": "add"}]
        fake_body.append(new_fake_properties)
        fake_body_json = json.dumps(fake_body)
        fake_headers = {
            'Content-Type': 'application/openstack-images-v2.1-json-patch'}
        fake_headers.update(**fake_extra_headers)
        fake_conn.getresponse.return_value = mock.Mock()
        fake_conn.getresponse().status = httplib.OK
        expected_wsgi_path = '/fake_path/v2/images/%s' % 'fake_image_id'

        self.glance._update_image_meta_v2(fake_conn, fake_extra_headers,
                                          fake_properties, expected_wsgi_path)
        fake_conn.request.assert_called_with('PATCH',
                                             '/fake_path/v2/images/%s' %
                                             'fake_image_id',
                                             body=fake_body_json,
                                             headers=fake_headers)
        fake_conn.getresponse.assert_called()
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_validate_image_status_before_upload_rep_body_too_long_v1(self):
        mock_conn = mock.Mock()
        fake_url = 'http://fake_host/fake_path/fake_image_id'
        mock_head_resp = mock.Mock()
        mock_head_resp.status = httplib.OK
        mock_head_resp.read.return_value = 'fakeData longer than 8'
        mock_head_resp.getheader.return_value = 'queued'
        mock_conn.getresponse.return_value = mock_head_resp

        self.assertRaises(self.glance.RetryableError,
                          self.glance.validate_image_status_before_upload_v1,
                          mock_conn, fake_url, extra_headers=mock.Mock())
        mock_conn.request.assert_called_once()
        mock_conn.getresponse.assert_called_once()
        mock_head_resp.read.assert_called_once()
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_validate_image_status_before_upload_get_image_failed_v2(self):
        mock_conn = mock.Mock()
        mock_conn.request.side_effect = Fake_HTTP_Request_Error()
        fake_url = 'http://fake_host/fake_path/fake_image_id'
        mock_head_resp = mock.Mock()
        mock_head_resp.status = httplib.OK
        mock_conn.getresponse.return_value = mock_head_resp
        expected_wsgi_path = '/fake_path/v2/images/%s' % 'fake_image_id'

        self.assertRaises(self.glance.RetryableError,
                          self.glance.validate_image_status_before_upload_v2,
                          mock_conn, fake_url, mock.Mock(), expected_wsgi_path)
        mock_conn.request.assert_called_once()
        mock_head_resp.read.assert_not_called()
        mock_conn.getresponse.assert_not_called()
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def test_get_item_pictures__success(self):
        item = Item.create(**TEST_ITEM)
        Picture.create(item=item, **TEST_PICTURE)
        Picture.create(item=item, **TEST_PICTURE2)
        resp = self.app.get('/items/{item_uuid}/pictures/'.format(
            item_uuid=item.uuid))
        assert resp.status_code == client.OK

        test_utils.assert_valid_response(
            resp.data, EXPECTED_RESULTS['get_item_pictures__success'])
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def test_get_items__success(self):
        Item.create(**TEST_ITEM)
        Item.create(**TEST_ITEM2)

        resp = self.app.get('/items/')

        assert resp.status_code == client.OK
        assert_valid_response(resp.data, EXPECTED_RESULTS['get_items__success'])
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def test_get_item__success(self):
        item = Item.create(**TEST_ITEM)
        resp = self.app.get('/items/{item_uuid}'.format(item_uuid=item.uuid))

        assert resp.status_code == client.OK
        assert_valid_response(resp.data, EXPECTED_RESULTS['get_item__success'])
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def test_patch_change1item__success(self):
        item = Item.create(**TEST_ITEM)

        post_data = format_jsonapi_request('item', {'name': 'new-name'})
        resp = self.app.patch('/items/{item_uuid}'.format(item_uuid=item.uuid),
                              data=json.dumps(post_data),
                              content_type='application/json')

        assert resp.status_code == client.OK
        expected_result = EXPECTED_RESULTS['patch_change1item__success']
        assert_valid_response(resp.data, expected_result)

        assert Item.get().name == 'new-name'
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def get(self, item_uuid):
        """Retrieve every picture of an item"""
        pictures = Picture.select().join(Item).where(Item.uuid == item_uuid)

        if pictures:
            data = Picture.json_list(pictures)
            return generate_response(data, client.OK)

        return None, client.NOT_FOUND
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def get(self):
        user_addrs = list(Address.select().where(
            Address.user == auth.current_user))

        return generate_response(Address.json_list(user_addrs), OK)
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def get(self, address_uuid):
        try:
            addr = Address.get(
                Address.user == auth.current_user,
                Address.uuid == address_uuid
            )
            return generate_response(addr.json(), OK)

        except Address.DoesNotExist:
            return None, NOT_FOUND
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def patch(self, address_uuid):
        try:
            obj = Address.get(Address.user == auth.current_user,
                              Address.uuid == address_uuid)
        except Address.DoesNotExist:
            return None, NOT_FOUND

        res = request.get_json(force=True)

        errors = Address.validate_input(res)
        if errors:
            return errors, BAD_REQUEST

        data = res['data']['attributes']

        country = data.get('country')
        city = data.get('city')
        post_code = data.get('post_code')
        address = data.get('address')
        phone = data.get('phone')

        if country and country != obj.country:
            obj.country = country

        if city and city != obj.city:
            obj.city = city

        if post_code and post_code != obj.post_code:
            obj.post_code = post_code

        if address and address != obj.address:
            obj.address = address

        if phone and phone != obj.phone:
            obj.phone = phone

        obj.save()

        return generate_response(obj.json(), OK)
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def get(self):
        """Retrieve every item"""
        data = Item.json_list(Item.select())
        return generate_response(data, client.OK)
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def get(self, item_uuid):
        """Retrieve the item specified by item_uuid"""
        try:
            item = Item.get(Item.uuid == item_uuid)
            return generate_response(item.json(), client.OK)
        except Item.DoesNotExist:
            return None, client.NOT_FOUND
项目:ecommerce_api    作者:nbschool    | 项目源码 | 文件源码
def patch(self, item_uuid):
        """Edit the item specified by item_uuid"""
        try:
            obj = Item.get(Item.uuid == item_uuid)
        except Item.DoesNotExist:
            return None, client.NOT_FOUND

        request_data = request.get_json(force=True)

        errors = Item.validate_input(request_data, partial=True)
        if errors:
            return errors, client.BAD_REQUEST

        data = request_data['data']['attributes']

        name = data.get('name')
        price = data.get('price')
        description = data.get('description')
        availability = data.get('availability')
        category = data.get('category')

        if name:
            obj.name = name

        if price:
            obj.price = price

        if description:
            obj.description = description

        if availability:
            obj.availability = availability

        if category:
            obj.category = category

        obj.save()

        return generate_response(obj.json(), client.OK)
项目:python-phoenixdb    作者:lalinsky    | 项目源码 | 文件源码
def _apply(self, request_data, expected_response_type=None):
        logger.debug("Sending request\n%s", pprint.pformat(request_data))

        request_name = request_data.__class__.__name__
        message = common_pb2.WireMessage()
        message.name = 'org.apache.calcite.avatica.proto.Requests${}'.format(request_name)
        message.wrapped_message = request_data.SerializeToString()
        body = message.SerializeToString()
        headers = {'content-type': 'application/x-google-protobuf'}

        response = self._post_request(body, headers)
        response_body = response.read()

        if response.status != httplib.OK:
            logger.debug("Received response\n%s", response_body)
            if b'<html>' in response_body:
                parse_error_page(response_body)
            else:
                # assume the response is in protobuf format
                parse_error_protobuf(response_body)
            raise errors.InterfaceError('RPC request returned invalid status code', response.status)

        message = common_pb2.WireMessage()
        message.ParseFromString(response_body)

        logger.debug("Received response\n%s", message)

        if expected_response_type is None:
            expected_response_type = request_name.replace('Request', 'Response')

        expected_response_type = 'org.apache.calcite.avatica.proto.Responses$' + expected_response_type
        if message.name != expected_response_type:
            raise errors.InterfaceError('unexpected response type "{}"'.format(message.name))

        return message.wrapped_message
项目:erpnext_quickbooks    作者:indictranstech    | 项目源码 | 文件源码
def make_request(self, request_type, url, request_body=None, content_type='application/json'):

        params = {}

        if self.minorversion:
            params['minorversion'] = self.minorversion
        print "params",type (params), params 

        if not request_body:
            request_body = {}

        if self.session is None:
            self.create_session()

        headers = {
            'Content-Type': content_type,
            'Accept': 'application/json'
        }

        req = self.session.request(request_type, url, True, self.company_id, headers=headers, params=params, data=request_body)

        try:
            result = req.json()
        except:
            raise QuickbooksException("Error reading json response: {0}".format(req.text), 10000)

        if req.status_code is not httplib.OK or "Fault" in result:
            self.handle_exceptions(result["Fault"])
        else:
            return result
项目:erpnext_quickbooks    作者:indictranstech    | 项目源码 | 文件源码
def make_request_query(self, request_type, url, request_body=None, content_type='application/json'):

        params = {}
        if self.minorversion:
            params['minorversion'] = self.minorversion
        params['query'] =  request_body

        print "params",type (params), params ,type (params['query'])
        if not request_body:
            request_body = {}

        if self.session is None:
            self.create_session()

        headers = {
            'Content-Type': content_type,
            'Accept': 'application/json'
        }

        req = self.session.request(request_type, url, True, str(self.company_id), headers=headers, params=params)
        # req = self.session.request(request_type, url, True, self.company_id, headers=headers, params=params, data=request_body)

        try:
            result = req.json()
        except:
            raise QuickbooksException("Error reading json response: {0}".format(req.text), 10000)

        if req.status_code is not httplib.OK or "Fault" in result:
            self.handle_exceptions(result["Fault"])
        else:
            return result
项目:flask-openapi    作者:remcohaszing    | 项目源码 | 文件源码
def init_app(self, app):
        """
        Initialize the OpenAPI instance for the given app.

        This should be used for a deferred initialization, supporting
        the Flask factory pattern.

        Args:
            app (flask.Flask): The Flask app to apply this extension to.

        """
        self.app = app
        for key, value in _DEFAULT_CONFIG.items():
            app.config.setdefault(key, value)

        swagger_json_url = self._config('swagger_json_url')
        swagger_yaml_url = self._config('swagger_yaml_url')

        @self.response(OK, {
            'description': 'This OpenAPI specification document.'
        })
        @app.route(swagger_json_url)
        def json_handler():
            """
            Get the `swagger` object in JSON format.

            """
            return jsonify(self.swagger)

        @self.response(OK, {
            'description': 'This OpenAPI specification document.'
        })
        @app.route(swagger_yaml_url)
        def yaml_handler():
            """
            Get the `swagger` object in YAML format.

            """
            # YAML doesn't have an official mime type yet.
            # https://www.ietf.org/mail-archive/web/media-types/current/msg00688.html
            return yaml.dump(self.swagger, default_flow_style=False), {
                'Content-Type': 'application/yaml'
            }