我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用httplib.OK。
def test_validate_image_status_before_upload_ok_v1(self): mock_conn = mock.Mock() fake_url = 'http://fake_host/fake_path/fake_image_id' mock_check_resp_status_and_retry = self.mock_patch_object( self.glance, 'check_resp_status_and_retry') mock_head_resp = mock.Mock() mock_head_resp.status = httplib.OK mock_head_resp.read.return_value = 'fakeData' mock_head_resp.getheader.return_value = 'queued' mock_conn.getresponse.return_value = mock_head_resp self.glance.validate_image_status_before_upload_v1( mock_conn, fake_url, extra_headers=mock.Mock()) self.assertTrue(mock_conn.getresponse.called) self.assertEqual(mock_head_resp.read.call_count, 2) self.assertFalse(mock_check_resp_status_and_retry.called)
def test_validate_image_status_before_upload_req_head_exception_v1(self): mock_conn = mock.Mock() mock_conn.request.side_effect = Fake_HTTP_Request_Error() fake_url = 'http://fake_host/fake_path/fake_image_id' mock_head_resp = mock.Mock() mock_head_resp.status = httplib.OK mock_head_resp.read.return_value = 'fakeData' mock_head_resp.getheader.return_value = 'queued' mock_conn.getresponse.return_value = mock_head_resp self.assertRaises(self.glance.RetryableError, self.glance.validate_image_status_before_upload_v1, mock_conn, fake_url, extra_headers=mock.Mock()) mock_conn.request.assert_called_once() mock_head_resp.read.assert_not_called() mock_conn.getresponse.assert_not_called()
def test_validate_image_status_before_upload_ok_v2_using_uwsgi(self): mock_conn = mock.Mock() fake_url = 'http://fake_host/fake_path/fake_image_id' mock_check_resp_status_and_retry = self.mock_patch_object( self.glance, 'check_resp_status_and_retry') mock_head_resp = mock.Mock() mock_head_resp.status = httplib.OK mock_head_resp.read.return_value = '{"status": "queued"}' mock_conn.getresponse.return_value = mock_head_resp fake_extra_headers = mock.Mock() fake_patch_path = 'fake_patch_path' self.glance.validate_image_status_before_upload_v2( mock_conn, fake_url, fake_extra_headers, fake_patch_path) self.assertTrue(mock_conn.getresponse.called) self.assertEqual( mock_head_resp.read.call_count, 2) self.assertFalse(mock_check_resp_status_and_retry.called) mock_conn.request.assert_called_with('GET', 'fake_patch_path', headers=fake_extra_headers)
def fetch_from_url(url, config, data=None, handlers=None): """Returns data retrieved from a URL. @param url: URL to attempt to open @type url: basestring @param config: SSL context configuration @type config: Configuration @return data retrieved from URL or None """ return_code, return_message, response = open_url(url, config, data=data, handlers=handlers) if return_code and return_code == http_client_.OK: return_data = response.read() response.close() return return_data else: raise URLFetchError(return_message)
def fetch_from_url_to_file(url, config, output_file, data=None, handlers=None): """Writes data retrieved from a URL to a file. @param url: URL to attempt to open @type url: basestring @param config: SSL context configuration @type config: Configuration @param output_file: output file @type output_file: basestring @return: tuple ( returned HTTP status code or 0 if an error occurred returned message boolean indicating whether access was successful) """ return_code, return_message, response = open_url(url, config, data=data, handlers=handlers) if return_code == http_client_.OK: return_data = response.read() response.close() outfile = open(output_file, "w") outfile.write(return_data) outfile.close() return return_code, return_message, return_code == http_client_.OK
def fetch_stream_from_url(url, config, data=None, handlers=None): """Returns data retrieved from a URL. @param url: URL to attempt to open @type url: basestring @param config: SSL context configuration @type config: Configuration @param data: HTTP POST data @type data: str @param handlers: list of custom urllib2 handlers to add to the request @type handlers: iterable @return: data retrieved from URL or None @rtype: file derived type """ return_code, return_message, response = open_url(url, config, data=data, handlers=handlers) if return_code and return_code == http_client_.OK: return response else: raise URLFetchError(return_message)
def Delete( name, creds, transport ): """Delete a tag or digest. Args: name: a tag or digest to be deleted. creds: the creds to use for deletion. transport: the transport to use to contact the registry. """ docker_transport = docker_http.Transport( name, creds, transport, docker_http.DELETE) resp, unused_content = docker_transport.Request( '{scheme}://{registry}/v2/{repository}/manifests/{entity}'.format( scheme=docker_http.Scheme(name.registry), registry=name.registry, repository=name.repository, entity=_tag_or_digest(name)), method='DELETE', accepted_codes=[httplib.OK, httplib.ACCEPTED])
def _content( self, suffix, accepted_mimes=None, cache=True ): """Fetches content of the resources from registry by http calls.""" if isinstance(self._name, docker_name.Repository): suffix = '{repository}/{suffix}'.format( repository=self._name.repository, suffix=suffix) if suffix in self._response: return self._response[suffix] _, content = self._transport.Request( '{scheme}://{registry}/v2/{suffix}'.format( scheme=docker_http.Scheme(self._name.registry), registry=self._name.registry, suffix=suffix), accepted_codes=[httplib.OK], accepted_mimes=accepted_mimes) if cache: self._response[suffix] = content return content
def blob_size(self, digest): """The byte size of the raw blob.""" suffix = 'blobs/' + digest if isinstance(self._name, docker_name.Repository): suffix = '{repository}/{suffix}'.format( repository=self._name.repository, suffix=suffix) resp, unused_content = self._transport.Request( '{scheme}://{registry}/v2/{suffix}'.format( scheme=docker_http.Scheme(self._name.registry), registry=self._name.registry, suffix=suffix), method='HEAD', accepted_codes=[httplib.OK]) return int(resp['content-length']) # Large, do not memoize.
def catalog(self, page_size=100): # TODO(user): Handle docker_name.Repository for /v2/<name>/_catalog if isinstance(self._name, docker_name.Repository): raise ValueError('Expected docker_name.Registry for "name"') url = '{scheme}://{registry}/v2/_catalog?n={page_size}'.format( scheme=docker_http.Scheme(self._name.registry), registry=self._name.registry, page_size=page_size) for _, content in self._transport.PaginatedRequest( url, accepted_codes=[httplib.OK]): wrapper_object = json.loads(content) if 'repositories' not in wrapper_object: raise docker_http.BadStateException( 'Malformed JSON response: %s' % content) # TODO(user): This should return docker_name.Repository for repo in wrapper_object['repositories']: yield repo # __enter__ and __exit__ allow use as a context manager.
def _content( self, suffix, accepted_mimes = None, cache = True ): """Fetches content of the resources from registry by http calls.""" if isinstance(self._name, docker_name.Repository): suffix = '{repository}/{suffix}'.format( repository=self._name.repository, suffix=suffix) if suffix in self._response: return self._response[suffix] _, content = self._transport.Request( '{scheme}://{registry}/v2/{suffix}'.format( scheme=docker_http.Scheme(self._name.registry), registry=self._name.registry, suffix=suffix), accepted_codes=[httplib.OK], accepted_mimes=accepted_mimes) if cache: self._response[suffix] = content return content
def _put_layer( self, image, layer_id ): """Upload the aufs tarball for a single layer.""" # TODO(user): We should stream this instead of loading # it into memory. docker_http.Request(self._transport, '{scheme}://{endpoint}/v1/images/{layer}/layer'.format( scheme=docker_http.Scheme(self._endpoint), endpoint=self._endpoint, layer=layer_id), self._token_creds, accepted_codes=[httplib.OK], body=image.layer(layer_id), content_type='application/octet-stream')
def Delete( name, creds, transport ): """Delete a tag or digest. Args: name: a tag or digest to be deleted. creds: the credentials to use for deletion. transport: the transport to use to contact the registry. """ docker_transport = docker_http.Transport( name, creds, transport, docker_http.DELETE) resp, unused_content = docker_transport.Request( '{scheme}://{registry}/v2/{repository}/manifests/{entity}'.format( scheme=docker_http.Scheme(name.registry), registry=name.registry, repository=name.repository, entity=_tag_or_digest(name)), method='DELETE', accepted_codes=[httplib.OK, httplib.ACCEPTED])
def _content(self, suffix, cache=True): """Fetches content of the resources from registry by http calls.""" if isinstance(self._name, docker_name.Repository): suffix = '{repository}/{suffix}'.format( repository=self._name.repository, suffix=suffix) if suffix in self._response: return self._response[suffix] _, content = self._transport.Request( '{scheme}://{registry}/v2/{suffix}'.format( scheme=docker_http.Scheme(self._name.registry), registry=self._name.registry, suffix=suffix), accepted_codes=[httplib.OK]) if cache: self._response[suffix] = content return content
def catalog(self, page_size=100): # TODO(user): Handle docker_name.Repository for /v2/<name>/_catalog if isinstance(self._name, docker_name.Repository): raise ValueError('Expected docker_name.Registry for "name"') url = '{scheme}://{registry}/v2/_catalog?n={page_size}'.format( scheme=docker_http.Scheme(self._name.registry), registry=self._name.registry, page_size=page_size) for _, content in self._transport.PaginatedRequest( url, accepted_codes=[httplib.OK]): wrapper_object = json.loads(content) if 'repositories' not in wrapper_object: raise docker_http.BadStateException( 'Malformed JSON response: %s' % content) for repo in wrapper_object['repositories']: # TODO(user): This should return docker_name.Repository instead. yield repo # __enter__ and __exit__ allow use as a context manager.
def test_validate_URL_existence_url_ok(self, mock_cs, mock_url): url_ok = Foo() setattr(url_ok, "code", httplib.OK) mock_url.side_effect = [url_ok] mock_cs.side_effect = [None] api = API.apiCalls.ApiCalls("", "", "", "", "") validate_URL = api.validate_URL_existence url = "http://google.com" valid = True is_valid = validate_URL(url) self.assertEqual(is_valid, valid) API.apiCalls.urlopen.assert_called_with(url, timeout=api.max_wait_time)
def session(self): if self._session_set_externally: return self._session try: self._session_lock.acquire() response = self._session.options(self.base_URL) if response.status_code != httplib.OK: raise Exception else: logging.debug("Existing session still works, going to reuse it.") except: logging.debug("Token is probably expired, going to get a new session.") oauth_service = self.get_oauth_service() access_token = self.get_access_token(oauth_service) self._session = oauth_service.get_session(access_token) finally: self._session_lock.release() return self._session
def test_view_product(self): """ test product view loads """ product = Product.active.all()[0] product_url = product.get_absolute_url() url_entry = urlresolvers.resolve(product_url) template_name = url_entry[2]['template_name'] response = self.client.get(product_url) self.failUnless(response) self.assertEqual(response.status_code, httplib.OK) self.assertTemplateUsed(response, template_name) self.assertContains(response, product.name) self.assertContains(response, html.escape(product.description)) # check for cart form in product page response cart_form = response.context[0]['form'] self.failUnless(cart_form) # check that the cart form is instance of correct form class self.failUnless(isinstance(cart_form, ProductAddToCartForm)) product_reviews = response.context[0].get('product_reviews',None) self.failIfEqual(product_reviews, None)
def test_get_folder(self): self.client.get.return_value = mock_api_response('/folders/folder-id', httplib.OK, None, 'folders', 'GET_{id}_response.json') folder = self.folders.get('folder-id') self.client.get.assert_called_with(self.client.get.return_value.url, params=None) self.assertIsNotNone(folder) self.assertEqual('folder-id', folder.id) self.assertEqual('folder-name', folder.name) self.assertEqual('root-id', folder.parentId) self.assertEqual(1, len(folder.files)) self.assertEqual('file-id', folder.files[0].id) self.assertEqual(1, len(folder.subfolders)) self.assertEqual('subfolder-id', folder.subfolders[0].id) self.assertEqual('folder-id', folder.subfolders[0].parentId)
def test_ngas_status(): """ Execute the STATUS command against the NGAS server on the host fabric is currently pointing at """ try: serv = urllib2.urlopen('http://{0}:7777/STATUS'.format(env.host), timeout=5) except IOError: failure('Problem connecting to server {0}'.format(env.host)) raise response = serv.read() serv.close() if response.find('Status="SUCCESS"') == -1: failure('Problem with response from {0}, not SUCESS as expected'.format(env.host)) raise ValueError(response) else: success('Response from {0} OK'.format(env.host))
def upload_to(host, filename, port=7777): """ Simple method to upload a file into NGAS """ with contextlib.closing(httplib.HTTPConnection(host, port)) as conn: conn.putrequest('POST', '/QARCHIVE?filename=%s' % (urllib2.quote(os.path.basename(filename)),) ) conn.putheader('Content-Length', os.stat(filename).st_size) conn.endheaders() with open(filename) as f: for data in iter(functools.partial(f.read, 4096), ''): conn.send(data) r = conn.getresponse() if r.status != httplib.OK: raise Exception("Error while QARCHIVE-ing %s to %s:%d:\nStatus: %d\n%s\n\n%s" % (filename, conn.host, conn.port, r.status, r.msg, r.read())) else: success("{0} successfully archived to {1}!".format(filename, host))
def _request(self, url, method='GET', data=None, ok_statuses=None): """ Issue an HTTP request using the authorized Http object, handle bad responses, set the Meta object from the response content, and return the data as JSON. :param url: endpoint to send the request :param method: HTTP method (e.g. GET, POST, etc.), defaults to GET :return: JSON data """ # # TODO: clean up the ability to send a POST and add unit tests. # if data is None: req_body = None else: req_body = urllib.urlencode(data) self.resp, self.content = self.http.request(url, method, body=req_body) if ok_statuses is None: ok_statuses = [httplib.OK] self._raise_for_status(ok_statuses) resp_json = json.loads(self.content) self.meta = upapi.meta.Meta(**resp_json['meta']) return resp_json['data']
def disconnect(self): """ Revoke access for this user. """ self.delete(upapi.endpoints.DISCONNECT) self.credentials = None # # TODO: finish pubsub implementation. # # def set_pubsub(self, url): # """ # Register a user-specific pubsub webhook. # # :param url: user-specific webhook callback URL # """ # resp = self.oauth.post(upapi.endpoints.PUBSUB, data={'webhook': url}) # self._raise_for_status([httplib.OK], resp) # # def delete_pubsub(self): # """ # Delete a user-specific pubsub webhook. # """ # self.delete(upapi.endpoints.PUBSUB)
def test__raise_for_status(self, mock_resp): """ Verify that an exceptions gets raised for unexpected responses. :param mock_resp: mocked httplib Response """ # # ok_statuses should not raise # mock_resp.status = httplib.CREATED self.up.resp = mock_resp self.up.content = '' try: self.up._raise_for_status([httplib.OK, httplib.CREATED]) except Exception as exc: self.fail('_raise_for_status unexpectedly threw {}'.format(exc)) # # Anything else should raise. # mock_resp.status = httplib.ACCEPTED self.assertRaises( upapi.exceptions.UnexpectedAPIResponse, self.up._raise_for_status, [httplib.OK, httplib.CREATED])
def _parse_result(http_response, response): if http_response.status / 100 == httplib.CONTINUE / 100: raise BceClientError('Can not handle 1xx http status code') bse = None body = http_response.read() if body: d = json.loads(body) if 'message' in d and 'code' in d and 'requestId' in d: bse = BceServerError(d['message'], code=d['code'], request_id=d['requestId']) elif http_response.status / 100 == httplib.OK / 100: response.__dict__.update(json.loads(body, \ object_hook=utils.dict_to_python_object).__dict__) http_response.close() return True elif http_response.status / 100 == httplib.OK / 100: return True if bse is None: bse = BceServerError(http_response.reason, request_id=response.metadata.bce_request_id) bse.status_code = http_response.status raise bse
def check_endpoint_for_error(resource, operation=None): def _decorator(func): def _execute(name=None): try: return func(name), httplib.OK except error.NotFoundError: return connexion.problem( httplib.NOT_FOUND, '{} not found'.format(resource), 'Requested {} `{}` not found.' .format(resource.lower(), name)) except error.ToBeDoneError: return connexion.problem( httplib.NOT_IMPLEMENTED, '{} handler not implemented'.format(operation), 'Requested operation `{}` on {} not implemented.' .format(operation.lower(), resource.lower())) return _execute return _decorator
def deconstruct_list(response_data, model=None, status_code=OK): view_name = None if not response_data: raise KrumpException( 'Response data is empty; the view name at least is required.') if len(response_data) >= 1: view_name = required_view_name(response_data[0]) if len(response_data) >= 2: model = response_data[1] if len(response_data) >= 3: status_code = response_data[2] if model is None: model = {} if status_code is None: status_code = OK return view_name, model, status_code
def test_get_value_for_attribute_with_a_present_attribute(self, mock_meta_req): """Test get_value_for_attribute returns correctly. Setup: * Mock out a httplib.HTTPResponse . * Return that from _issue_http_request. Expected results: * A matching string. """ mock_response = 'expected_response' with mock.patch('httplib.HTTPResponse', mock.mock_open(read_data=mock_response)) as mock_http_resp: mock_http_resp.return_value.status = httplib.OK mock_meta_req.side_effect = mock_http_resp actual_response = metadata_server.get_value_for_attribute('') self.assertEqual(actual_response, mock_response)
def _refresh_access_token(self): """ Request new access token to send with requests to Brightcove. Access Token expires every 5 minutes. """ url = "https://oauth.brightcove.com/v3/access_token" params = {"grant_type": "client_credentials"} auth_string = base64.encodestring( '{}:{}'.format(self.api_key, self.api_secret) ).replace('\n', '') headers = { "Content-Type": "application/x-www-form-urlencoded", "Authorization": "Basic " + auth_string } resp = requests.post(url, headers=headers, data=params) if resp.status_code == httplib.OK: result = resp.json() return result['access_token']
def get(self, url, headers=None, can_retry=True): """ Issue REST GET request to a given URL. Can throw ApiClientError or its subclass. Arguments: url (str): API url to fetch a resource from. headers (dict): Headers necessary as per API, e.g. authorization bearer to perform authorised requests. can_retry (bool): True if in a case of authentication error it can refresh access token and retry a call. Returns: Response in python native data format. """ headers_ = {'Authorization': 'Bearer ' + str(self.access_token)} if headers is not None: headers_.update(headers) resp = requests.get(url, headers=headers_) if resp.status_code == httplib.OK: return resp.json() elif resp.status_code == httplib.UNAUTHORIZED and can_retry: self.access_token = self._refresh_access_token() return self.get(url, headers, can_retry=False) else: raise BrightcoveApiClientError
def get(self, url, headers=None, can_retry=False): """ Issue REST GET request to a given URL. Can throw ApiClientError or its subclass. Arguments: url (str): API url to fetch a resource from. headers (dict): Headers necessary as per API, e.g. authorization bearer to perform authorised requests. Returns: Response in python native data format. """ headers_ = { 'Authorization': 'Bearer {}'.format(self.access_token.encode(encoding='utf-8')), 'Accept': 'application/json' } if headers is not None: headers_.update(headers) resp = requests.get(url, headers=headers_) if resp.status_code == httplib.OK: return resp.json() else: raise VimeoApiClientError(_("Can't fetch requested data from API."))
def connect_intercept(self): hostname = self.path.split(':')[0] certpath = "%s/%s.crt" % (self.certdir.rstrip('/'), hostname) with self.lock: if not os.path.isfile(certpath): epoch = "%d" % (time.time() * 1000) p1 = Popen(["openssl", "req", "-new", "-key", self.certkey, "-subj", "/CN=%s" % hostname], stdout=PIPE) p2 = Popen(["openssl", "x509", "-req", "-days", "3650", "-CA", self.cacert, "-CAkey", self.cakey, "-set_serial", epoch, "-out", certpath], stdin=p1.stdout, stderr=PIPE) p2.communicate() self.wfile.write("%s %d %s\r\n" % (self.protocol_version, httplib.OK, 'Connection Established')) self.end_headers() self.connection = ssl.wrap_socket(self.connection, keyfile=self.certkey, certfile=certpath, server_side=True) self.rfile = self.connection.makefile("rb", self.rbufsize) self.wfile = self.connection.makefile("wb", self.wbufsize) conntype = self.headers.get('Proxy-Connection', '') if conntype.lower() == 'close': self.close_connection = 1 elif conntype.lower() == 'keep-alive' and self.protocol_version >= "HTTP/1.1": self.close_connection = 0
def connect_relay(self): address = self.path.split(':', 1) address[1] = int(address[1]) or 443 try: s = socket.create_connection(address, timeout=self.timeout) except: self.send_error(httplib.BAD_GATEWAY) return self.send_response(httplib.OK, 'Connection Established') self.end_headers() conns = [self.connection, s] self.close_connection = 0 while not self.close_connection: rlist, wlist, xlist = select.select(conns, [], conns, self.timeout) if xlist or not rlist: break for r in rlist: other = conns[1] if r is conns[0] else conns[0] data = r.recv(8192) if not data: self.close_connection = 1 break other.sendall(data)
def wopiGetFile(fileid): '''Implements the GetFile WOPI call''' Wopi.refreshconfig() try: acctok = jwt.decode(flask.request.args['access_token'], Wopi.wopisecret, algorithms=['HS256']) if acctok['exp'] < time.time(): raise jwt.exceptions.ExpiredSignatureError Wopi.log.info('msg="GetFile" user="%s:%s" filename="%s" fileid="%s" token="%s"' % \ (acctok['ruid'], acctok['rgid'], acctok['filename'], fileid, flask.request.args['access_token'][-20:])) # stream file from storage to client resp = flask.Response(xrdcl.readfile(acctok['filename'], acctok['ruid'], acctok['rgid']), mimetype='application/octet-stream') resp.status_code = httplib.OK return resp except (jwt.exceptions.DecodeError, jwt.exceptions.ExpiredSignatureError) as e: Wopi.log.warning('msg="Signature verification failed" client="%s" requestedUrl="%s" token="%s"' % \ (flask.request.remote_addr, flask.request.base_url, flask.request.args['access_token'])) return 'Invalid access token', httplib.UNAUTHORIZED except Exception, e: return _logGeneralExceptionAndReturn(e, flask.request) # # The following operations are all called on POST /wopi/files/<fileid> #
def wopiUnlock(fileid, reqheaders, acctok): '''Implements the Unlock WOPI call''' lock = reqheaders['X-WOPI-Lock'] retrievedLock = _retrieveWopiLock(fileid, 'UNLOCK', lock, acctok) if not _compareWopiLocks(retrievedLock, lock): return _makeConflictResponse('UNLOCK', retrievedLock, lock, '', acctok['filename']) # OK, the lock matches. Remove any extended attribute related to locks and conflicts handling try: xrdcl.removefile(_getLockName(acctok['filename']), Wopi.lockruid, Wopi.lockrgid, 1) except IOError: # ignore, it's not worth to report anything here pass try: xrdcl.rmxattr(acctok['filename'], acctok['ruid'], acctok['rgid'], LASTSAVETIMEKEY) except IOError: # same as above pass # and update our internal list of opened files try: del Wopi.openfiles[acctok['filename']] except KeyError: # already removed? pass return 'OK', httplib.OK
def wopiGetLock(fileid, reqheaders_unused, acctok): '''Implements the GetLock WOPI call''' resp = flask.Response() # throws exception if no lock resp.headers['X-WOPI-Lock'] = _retrieveWopiLock(fileid, 'GETLOCK', '', acctok) resp.status_code = httplib.OK # for statistical purposes, check whether a lock exists and update internal bookkeeping if resp.headers['X-WOPI-Lock']: try: # the file was already opened for write, check whether this is a new user if not acctok['username'] in Wopi.openfiles[acctok['filename']][1]: # yes it's a new user Wopi.openfiles[acctok['filename']][1].add(acctok['username']) if len(Wopi.openfiles[acctok['filename']][1]) > 1: # for later monitoring, explicitly log that this file is being edited by at least two users Wopi.log.info('msg="Collaborative editing detected" filename="%s" token="%s" users="%s"' % \ (acctok['filename'], flask.request.args['access_token'][-20:], list(Wopi.openfiles[acctok['filename']][1]))) except KeyError: # existing lock but missing Wopi.openfiles[acctok['filename']] ? Wopi.openfiles[acctok['filename']] = (time.asctime(), sets.Set([acctok['username']])) return resp
def _copy(gcs_stub, filename, headers): """Copy file. Args: gcs_stub: an instance of gcs stub. filename: dst filename of format /bucket/filename headers: a dict of request headers. Must contain _XGoogCopySource header. Returns: An _FakeUrlFetchResult instance. """ source = _XGoogCopySource(headers).value result = _handle_head(gcs_stub, source) if result.status_code == httplib.NOT_FOUND: return result directive = headers.pop('x-goog-metadata-directive', 'COPY') if directive == 'REPLACE': gcs_stub.put_copy(source, filename, headers) else: gcs_stub.put_copy(source, filename, None) return _FakeUrlFetchResult(httplib.OK, {}, '')
def _handle_head(gcs_stub, filename): """Handle HEAD request.""" filestat = gcs_stub.head_object(filename) if not filestat: return _FakeUrlFetchResult(httplib.NOT_FOUND, {}, '') http_time = common.posix_time_to_http(filestat.st_ctime) response_headers = { 'x-goog-stored-content-length': filestat.st_size, 'content-length': 0, 'content-type': filestat.content_type, 'etag': filestat.etag, 'last-modified': http_time } if filestat.metadata: response_headers.update(filestat.metadata) return _FakeUrlFetchResult(httplib.OK, response_headers, '')