Python responses 模块,PUT 实例源码

我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用responses.PUT

项目:drift    作者:dgnorth    | 项目源码 | 文件源码
def _setup_mocking(self):
        def _mock_callback(request):
            method = request.method.lower()
            url = request.path_url
            handler = getattr(self.app, method)
            r = handler(
                url,
                data=request.body,
                headers=dict(request.headers)
            )
            return (r.status_code, r.headers, r.data)

        pattern = re.compile("{}/(.*)".format(self.host))
        methods = [
            responses.GET,
            responses.POST,
            responses.PUT,
            responses.DELETE,
            responses.PATCH,
        ]
        for method in methods:
            responses.add_callback(
                method, pattern,
                callback=_mock_callback
            )
项目:ros-interop    作者:mcgill-robotics    | 项目源码 | 文件源码
def set_put_object_response(self, id, object_, user=1, code=200):
        """Sets mock PUT /api/odlcs/<id> response.

        Args:
            id (int): Target ID.
            object_ (dict): Target data to update with.
            user (int): User number to respond with.
            code (int): Status code to respond with.
        """
        object_.update({"id": id, "user": user})
        content = json.dumps(object_)

        self.rsps.add(
            responses.PUT,
            "{}/api/odlcs/{:d}".format(self.url, id),
            status=code,
            body=content if code == 200 else "",
            content_type="application/json")
项目:genericclient-requests    作者:genericclient    | 项目源码 | 文件源码
def test_resource_save(self):
        with responses.RequestsMock() as rsps:
            rsps.add(responses.GET, MOCK_API_URL + '/users/1', json={
                'id': 1,
                'username': 'user1',
                'group': 'watchers',
            })

            user1 = generic_client.users.get(id=1)
            self.assertEqual(user1.username, 'user1')

        with responses.RequestsMock() as rsps:
            rsps.add(responses.PUT, MOCK_API_URL + '/users/1', json={
                'id': 1,
                'username': 'user1',
                'group': 'admins',
            })

            user1.group = 'admins'
            user1.save()
项目:genericclient-requests    作者:genericclient    | 项目源码 | 文件源码
def test_resource_save_uuid(self):
        with responses.RequestsMock() as rsps:
            rsps.add(responses.GET, MOCK_API_URL + '/users/1', json={
                'uuid': 1,
                'username': 'user1',
                'group': 'watchers',
            })

            user1 = generic_client.users.get(uuid=1)
            self.assertEqual(user1.username, 'user1')

        with responses.RequestsMock() as rsps:
            rsps.add(responses.PUT, MOCK_API_URL + '/users/1', json={
                'uuid': 1,
                'username': 'user1',
                'group': 'admins',
            })

            user1.group = 'admins'
            user1.save()
项目:genericclient-requests    作者:genericclient    | 项目源码 | 文件源码
def test_resource_save(self):
        with responses.RequestsMock() as rsps:
            rsps.add(responses.GET, MOCK_API_URL + '/users/1/', json={
                'id': 1,
                'username': 'user1',
                'group': 'watchers',
            })

            user1 = generic_client.users.get(id=1)
            self.assertEqual(user1.username, 'user1')

        with responses.RequestsMock() as rsps:
            rsps.add(responses.PUT, MOCK_API_URL + '/users/1/', json={
                'id': 1,
                'username': 'user1',
                'group': 'admins',
            })

            user1.group = 'admins'
            user1.save()
项目:dpm-py    作者:oki-archive    | 项目源码 | 文件源码
def test_publish_invalid(self):
        # GIVEN datapackage that can be treated as valid by the dpm
        self.valid_dp = datapackage.DataPackage({
                "name": "some-datapackage",
                "resources": [
                    { "name": "some-resource", "path": "./data/some_data.csv", }
                ]
            },
            default_base_path='.')
        patch('dpm.client.DataPackage', lambda *a: self.valid_dp).start()
        patch('dpm.client.exists', lambda *a: True).start()

        # AND the server that accepts any user
        responses.add(
                responses.POST, 'http://127.0.0.1:5000/api/auth/token',
                json={'token': 'blabla'},
                status=200)
        # AND server rejects any datapackage as invalid
        responses.add(
                responses.PUT, 'http://127.0.0.1:5000/api/package/user/some-datapackage',
                json={'message': 'invalid datapackage json'},
                status=400)

        # AND the client
        client = Client(dp1_path, self.config)

        # WHEN publish() is invoked
        try:
            result = client.publish()
        except Exception as e:
            result = e

        # THEN HTTPStatusError should be raised
        assert isinstance(result, HTTPStatusError)
        # AND 'invalid datapackage json' should be printed to stdout
        self.assertRegexpMatches(str(result), 'invalid datapackage json')
项目:slims-python-api    作者:genohm    | 项目源码 | 文件源码
def test_add(self):
        slims = Slims("testSlims", "http://localhost:9999", "admin", "admin")
        responses.add(
            responses.PUT,
            'http://localhost:9999/rest/Content',
            json={"entities": [{
                "pk": 1,
                "tableName": "Content",
                "columns": []
            }]},
            content_type='application/json',
        )

        added = slims.add("Content", {"test": "foo"})
        self.assertIsInstance(added, Record)
项目:edx-video-pipeline    作者:edx    | 项目源码 | 文件源码
def test_heal(self):
        val_response = {
            'courses': [{u'WestonHS/PFLC1x/3T2015': None}],
            'encoded_videos': [{
                'url': 'https://testurl.mp4',
                'file_size': 8499040,
                'bitrate': 131,
                'profile': 'mobile_low',
            }]
        }
        responses.add(
            responses.POST,
            CONFIG_DATA['val_token_url'],
            '{"access_token": "1234567890"}',
            status=200
        )
        responses.add(
            responses.GET,
            build_url(CONFIG_DATA['val_api_url'], self.video_id),
            body=json.dumps(val_response),
            content_type='application/json',
            status=200
        )
        responses.add(
            responses.PUT,
            build_url(CONFIG_DATA['val_api_url'], self.video_id),
            status=200
        )

        heal = VedaHeal()
        heal.discovery()
项目:quilt    作者:quiltdata    | 项目源码 | 文件源码
def test_push(self):
        mydir = os.path.dirname(__file__)
        build_path = os.path.join(mydir, './build_simple.yml')
        command.build('foo/bar', build_path)

        pkg_obj = store.PackageStore.find_package('foo', 'bar')
        pkg_hash = pkg_obj.get_hash()
        assert pkg_hash
        contents = pkg_obj.get_contents()

        all_hashes = set(find_object_hashes(contents))
        upload_urls = {
            blob_hash: dict(
                head="https://example.com/head/{owner}/{hash}".format(owner='foo', hash=blob_hash),
                put="https://example.com/put/{owner}/{hash}".format(owner='foo', hash=blob_hash)
            ) for blob_hash in all_hashes
        }

        # We will push the package twice, so we're mocking all responses twice.

        for blob_hash in all_hashes:
            urls = upload_urls[blob_hash]

            # First time the package is pushed, s3 HEAD 404s, and we get a PUT.
            self.requests_mock.add(responses.HEAD, urls['head'], status=404)
            self.requests_mock.add(responses.PUT, urls['put'])

            # Second time, s3 HEAD succeeds, and we're not expecting a PUT.
            self.requests_mock.add(responses.HEAD, urls['head'])

        self._mock_put_package('foo/bar', pkg_hash, upload_urls)
        self._mock_put_tag('foo/bar', 'latest')

        self._mock_put_package('foo/bar', pkg_hash, upload_urls)
        self._mock_put_tag('foo/bar', 'latest')

        # Push a new package.
        command.push('foo/bar')

        # Push it again; this time, we're verifying that there are no s3 uploads.
        command.push('foo/bar')
项目:quilt    作者:quiltdata    | 项目源码 | 文件源码
def _mock_put_package(self, package, pkg_hash, upload_urls):
        pkg_url = '%s/api/package/%s/%s' % (command.get_registry_url(), package, pkg_hash)
        # Dry run, then the real thing.
        self.requests_mock.add(responses.PUT, pkg_url, json.dumps(dict(upload_urls=upload_urls)))
        self.requests_mock.add(responses.PUT, pkg_url, json.dumps(dict()))
项目:quilt    作者:quiltdata    | 项目源码 | 文件源码
def _mock_put_tag(self, package, tag):
        tag_url = '%s/api/tag/%s/%s' % (command.get_registry_url(), package, tag)
        self.requests_mock.add(responses.PUT, tag_url, json.dumps(dict()))
项目:quilt    作者:quiltdata    | 项目源码 | 文件源码
def test_version_add_confirmed(self, mock_input, mock_match_hash):
        registry_url = command.get_registry_url()
        mock_input.return_value = 'y'
        mock_match_hash.return_value = 'fabc123'

        # Response content is not checked by version_add, so
        # status ok and URL verification are enough
        self.requests_mock.add(
            responses.PUT,
            registry_url + "/api/version/user/test/2.9.12",
            status=200,
        )

        command.version_add('user/test', '2.9.12', 'fabc123')
项目:quilt    作者:quiltdata    | 项目源码 | 文件源码
def test_short_hashes(self):
        """
        Test various functions that use short hashes
        """
        table_data, table_hash = self.make_table_data()
        file_data, file_hash = self.make_file_data()
        contents, contents_hash = self.make_contents(table=table_hash, file=file_hash)

        self._mock_log('foo/bar', contents_hash)
        self._mock_tag('foo/bar', 'mytag', contents_hash[0:6], cmd=responses.PUT)
        command.tag_add('foo/bar', 'mytag', contents_hash[0:6])

        self._mock_version('foo/bar', '1.0', contents_hash[0:6], cmd=responses.PUT)
        command.version_add('foo/bar', '1.0', contents_hash[0:6], force=True)
项目:pytest-vts    作者:bhodorog    | 项目源码 | 文件源码
def setup_recording(self, **kwargs):
        _logger.info("recording ...")
        self.responses.reset()
        all_requests_re = re.compile("http.*")
        methods = (responses.GET, responses.POST, responses.PUT,
                   responses.PATCH, responses.DELETE, responses.HEAD,
                   responses.OPTIONS)
        for http_method in methods:
            self.responses.add_callback(
                http_method, all_requests_re,
                match_querystring=False,
                callback=self.record())