Python responses 模块,HEAD 实例源码

我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用responses.HEAD

项目:edx-video-pipeline    作者:edx    | 项目源码 | 文件源码
def test_submit_media_exceptions(self, response):
        """
        Tests media submission exceptions
        """
        responses.add(
            responses.HEAD,
            u'https://s3.amazonaws.com/bkt/video.mp4',
            headers={'Content-Type': u'video/mp4'},
            status=200,
        )
        responses.add(responses.GET, u'https://api.3playmedia.com/caption_imports/available_languages', **{
            'status': 200,
            'body': json.dumps([{
                "iso_639_1_code": "en",
                "language_id": 1,
            }])
        })
        responses.add(responses.POST, u'https://api.3playmedia.com/files', **response)

        three_play_client = ThreePlayMediaClient(**self.video_transcript_preferences)
        with self.assertRaises(ThreePlayMediaPerformTranscriptionError):
            three_play_client.submit_media()
项目:edx-video-pipeline    作者:edx    | 项目源码 | 文件源码
def test_generate_transcripts_exceptions(self, first_response, second_response, third_response, mock_log):
        """
        Tests the proper exceptions during transcript generation.
        """
        responses.add(responses.HEAD, u'https://s3.amazonaws.com/bkt/video.mp4', **first_response)
        responses.add(
            responses.GET, u'https://api.3playmedia.com/caption_imports/available_languages', **second_response
        )
        responses.add(responses.POST, u'https://api.3playmedia.com/files', **third_response)
        three_play_client = ThreePlayMediaClient(**self.video_transcript_preferences)
        three_play_client.generate_transcripts()

        self.assertFalse(mock_log.info.called)
        mock_log.exception.assert_called_with(
            u'[3PlayMedia] Could not process transcripts for video=%s source_language=%s.',
            VIDEO_DATA['studio_id'],
            VIDEO_DATA['source_language'],
        )
        self.assertEqual(TranscriptProcessMetadata.objects.count(), 0)
项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def set_up_glove(url: str, byt: bytes, change_etag_every: int = 1000):
    # Mock response for the datastore url that returns glove vectors
    responses.add(
            responses.GET,
            url,
            body=byt,
            status=200,
            content_type='application/gzip',
            stream=True,
            headers={'Content-Length': str(len(byt))}
    )

    etags_left = change_etag_every
    etag = "0"
    def head_callback(_):
        """
        Writing this as a callback allows different responses to different HEAD requests.
        In our case, we're going to change the ETag header every `change_etag_every`
        requests, which will allow us to simulate having a new version of the file.
        """
        nonlocal etags_left, etag
        headers = {"ETag": etag}
        # countdown and change ETag
        etags_left -= 1
        if etags_left <= 0:
            etags_left = change_etag_every
            etag = str(int(etag) + 1)
        return (200, headers, "")

    responses.add_callback(
            responses.HEAD,
            url,
            callback=head_callback
    )
项目:edx-video-pipeline    作者:edx    | 项目源码 | 文件源码
def test_validate_media_url(self, response):
        """
        Tests media url validations.
        """
        responses.add(responses.HEAD, u'https://s3.amazonaws.com/bkt/video.mp4', **response)
        three_play_client = ThreePlayMediaClient(**self.video_transcript_preferences)
        with self.assertRaises(ThreePlayMediaUrlError):
            three_play_client.validate_media_url()
项目:open-ledger    作者:creativecommons    | 项目源码 | 文件源码
def test_remove_from_search_after_sync(self):
        """When an image is removed from the source, it should be removed from the search engine"""
        self._index_img(self.removed)
        s = self.s.query(Q("match", title="removed"))
        r = s.execute()
        self.assertEquals(1, r.hits.total)
        with responses.RequestsMock() as rsps:
            rsps.add(responses.HEAD, FOREIGN_URL + TEST_IMAGE_REMOVED, status=404)
            self.removed.sync()
        signals._update_search_index(self.removed)
        self.es.indices.refresh()
        s = self.s.query(Q("match", title="removed"))
        r = s.execute()
        self.assertEquals(0, r.hits.total)
项目:quilt    作者:quiltdata    | 项目源码 | 文件源码
def test_push(self):
        mydir = os.path.dirname(__file__)
        build_path = os.path.join(mydir, './build_simple.yml')
        command.build('foo/bar', build_path)

        pkg_obj = store.PackageStore.find_package('foo', 'bar')
        pkg_hash = pkg_obj.get_hash()
        assert pkg_hash
        contents = pkg_obj.get_contents()

        all_hashes = set(find_object_hashes(contents))
        upload_urls = {
            blob_hash: dict(
                head="https://example.com/head/{owner}/{hash}".format(owner='foo', hash=blob_hash),
                put="https://example.com/put/{owner}/{hash}".format(owner='foo', hash=blob_hash)
            ) for blob_hash in all_hashes
        }

        # We will push the package twice, so we're mocking all responses twice.

        for blob_hash in all_hashes:
            urls = upload_urls[blob_hash]

            # First time the package is pushed, s3 HEAD 404s, and we get a PUT.
            self.requests_mock.add(responses.HEAD, urls['head'], status=404)
            self.requests_mock.add(responses.PUT, urls['put'])

            # Second time, s3 HEAD succeeds, and we're not expecting a PUT.
            self.requests_mock.add(responses.HEAD, urls['head'])

        self._mock_put_package('foo/bar', pkg_hash, upload_urls)
        self._mock_put_tag('foo/bar', 'latest')

        self._mock_put_package('foo/bar', pkg_hash, upload_urls)
        self._mock_put_tag('foo/bar', 'latest')

        # Push a new package.
        command.push('foo/bar')

        # Push it again; this time, we're verifying that there are no s3 uploads.
        command.push('foo/bar')
项目:pytest-vts    作者:bhodorog    | 项目源码 | 文件源码
def setup_recording(self, **kwargs):
        _logger.info("recording ...")
        self.responses.reset()
        all_requests_re = re.compile("http.*")
        methods = (responses.GET, responses.POST, responses.PUT,
                   responses.PATCH, responses.DELETE, responses.HEAD,
                   responses.OPTIONS)
        for http_method in methods:
            self.responses.add_callback(
                http_method, all_requests_re,
                match_querystring=False,
                callback=self.record())
项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def test_get_from_cache(self):
        url = 'http://fake.datastore.com/glove.txt.gz'
        set_up_glove(url, self.glove_bytes, change_etag_every=2)

        filename = get_from_cache(url, cache_dir=self.TEST_DIR)
        assert filename == os.path.join(self.TEST_DIR, url_to_filename(url, etag="0"))

        # We should have made one HEAD request and one GET request.
        method_counts = Counter(call.request.method for call in responses.calls)
        assert len(method_counts) == 2
        assert method_counts['HEAD'] == 1
        assert method_counts['GET'] == 1

        # And the cached file should have the correct contents
        with open(filename, 'rb') as cached_file:
            assert cached_file.read() == self.glove_bytes

        # A second call to `get_from_cache` should make another HEAD call
        # but not another GET call.
        filename2 = get_from_cache(url, cache_dir=self.TEST_DIR)
        assert filename2 == filename

        method_counts = Counter(call.request.method for call in responses.calls)
        assert len(method_counts) == 2
        assert method_counts['HEAD'] == 2
        assert method_counts['GET'] == 1

        with open(filename2, 'rb') as cached_file:
            assert cached_file.read() == self.glove_bytes

        # A third call should have a different ETag and should force a new download,
        # which means another HEAD call and another GET call.
        filename3 = get_from_cache(url, cache_dir=self.TEST_DIR)
        assert filename3 == os.path.join(self.TEST_DIR, url_to_filename(url, etag="1"))

        method_counts = Counter(call.request.method for call in responses.calls)
        assert len(method_counts) == 2
        assert method_counts['HEAD'] == 3
        assert method_counts['GET'] == 2

        with open(filename3, 'rb') as cached_file:
            assert cached_file.read() == self.glove_bytes