Python httplib 模块,OK 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用httplib.OK

项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_validate_image_status_before_upload_ok_v1(self):
        mock_conn = mock.Mock()
        fake_url = 'http://fake_host/fake_path/fake_image_id'
        mock_check_resp_status_and_retry = self.mock_patch_object(
            self.glance, 'check_resp_status_and_retry')
        mock_head_resp = mock.Mock()
        mock_head_resp.status = httplib.OK
        mock_head_resp.read.return_value = 'fakeData'
        mock_head_resp.getheader.return_value = 'queued'
        mock_conn.getresponse.return_value = mock_head_resp

        self.glance.validate_image_status_before_upload_v1(
            mock_conn, fake_url, extra_headers=mock.Mock())

        self.assertTrue(mock_conn.getresponse.called)
        self.assertEqual(mock_head_resp.read.call_count, 2)
        self.assertFalse(mock_check_resp_status_and_retry.called)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_validate_image_status_before_upload_req_head_exception_v1(self):
        mock_conn = mock.Mock()
        mock_conn.request.side_effect = Fake_HTTP_Request_Error()
        fake_url = 'http://fake_host/fake_path/fake_image_id'
        mock_head_resp = mock.Mock()
        mock_head_resp.status = httplib.OK
        mock_head_resp.read.return_value = 'fakeData'
        mock_head_resp.getheader.return_value = 'queued'
        mock_conn.getresponse.return_value = mock_head_resp

        self.assertRaises(self.glance.RetryableError,
                          self.glance.validate_image_status_before_upload_v1,
                          mock_conn, fake_url, extra_headers=mock.Mock())
        mock_conn.request.assert_called_once()
        mock_head_resp.read.assert_not_called()
        mock_conn.getresponse.assert_not_called()
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_validate_image_status_before_upload_ok_v2_using_uwsgi(self):
        mock_conn = mock.Mock()
        fake_url = 'http://fake_host/fake_path/fake_image_id'
        mock_check_resp_status_and_retry = self.mock_patch_object(
            self.glance, 'check_resp_status_and_retry')
        mock_head_resp = mock.Mock()
        mock_head_resp.status = httplib.OK
        mock_head_resp.read.return_value = '{"status": "queued"}'
        mock_conn.getresponse.return_value = mock_head_resp

        fake_extra_headers = mock.Mock()
        fake_patch_path = 'fake_patch_path'

        self.glance.validate_image_status_before_upload_v2(
            mock_conn, fake_url, fake_extra_headers, fake_patch_path)

        self.assertTrue(mock_conn.getresponse.called)
        self.assertEqual(
            mock_head_resp.read.call_count, 2)
        self.assertFalse(mock_check_resp_status_and_retry.called)
        mock_conn.request.assert_called_with('GET',
                                             'fake_patch_path',
                                             headers=fake_extra_headers)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def fetch_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @return data retrieved from URL or None
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        return return_data
    else:
        raise URLFetchError(return_message)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def fetch_from_url_to_file(url, config, output_file, data=None, handlers=None):
    """Writes data retrieved from a URL to a file.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param output_file: output file
    @type output_file: basestring
    @return: tuple (
        returned HTTP status code or 0 if an error occurred
        returned message
        boolean indicating whether access was successful)
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        outfile = open(output_file, "w")
        outfile.write(return_data)
        outfile.close()

    return return_code, return_message, return_code == http_client_.OK
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def fetch_stream_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param data: HTTP POST data
    @type data: str
    @param handlers: list of custom urllib2 handlers to add to the request
    @type handlers: iterable
    @return: data retrieved from URL or None
    @rtype: file derived type
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return response
    else:
        raise URLFetchError(return_message)
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def Delete(
    name,
    creds,
    transport
):
  """Delete a tag or digest.

  Args:
    name: a tag or digest to be deleted.
    creds: the creds to use for deletion.
    transport: the transport to use to contact the registry.
  """
  docker_transport = docker_http.Transport(
      name, creds, transport, docker_http.DELETE)

  resp, unused_content = docker_transport.Request(
      '{scheme}://{registry}/v2/{repository}/manifests/{entity}'.format(
          scheme=docker_http.Scheme(name.registry),
          registry=name.registry,
          repository=name.repository,
          entity=_tag_or_digest(name)),
      method='DELETE',
      accepted_codes=[httplib.OK, httplib.ACCEPTED])
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def _content(
      self,
      suffix,
      accepted_mimes=None,
      cache=True
  ):
    """Fetches content of the resources from registry by http calls."""
    if isinstance(self._name, docker_name.Repository):
      suffix = '{repository}/{suffix}'.format(
          repository=self._name.repository,
          suffix=suffix)

    if suffix in self._response:
      return self._response[suffix]

    _, content = self._transport.Request(
        '{scheme}://{registry}/v2/{suffix}'.format(
            scheme=docker_http.Scheme(self._name.registry),
            registry=self._name.registry,
            suffix=suffix),
        accepted_codes=[httplib.OK],
        accepted_mimes=accepted_mimes)
    if cache:
      self._response[suffix] = content
    return content
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def blob_size(self, digest):
    """The byte size of the raw blob."""
    suffix = 'blobs/' + digest
    if isinstance(self._name, docker_name.Repository):
      suffix = '{repository}/{suffix}'.format(
          repository=self._name.repository,
          suffix=suffix)

    resp, unused_content = self._transport.Request(
        '{scheme}://{registry}/v2/{suffix}'.format(
            scheme=docker_http.Scheme(self._name.registry),
            registry=self._name.registry,
            suffix=suffix),
        method='HEAD',
        accepted_codes=[httplib.OK])

    return int(resp['content-length'])

  # Large, do not memoize.
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def catalog(self, page_size=100):
    # TODO(user): Handle docker_name.Repository for /v2/<name>/_catalog
    if isinstance(self._name, docker_name.Repository):
      raise ValueError('Expected docker_name.Registry for "name"')

    url = '{scheme}://{registry}/v2/_catalog?n={page_size}'.format(
        scheme=docker_http.Scheme(self._name.registry),
        registry=self._name.registry,
        page_size=page_size)

    for _, content in self._transport.PaginatedRequest(
        url, accepted_codes=[httplib.OK]):
      wrapper_object = json.loads(content)

      if 'repositories' not in wrapper_object:
        raise docker_http.BadStateException(
            'Malformed JSON response: %s' % content)

      # TODO(user): This should return docker_name.Repository
      for repo in wrapper_object['repositories']:
        yield repo

  # __enter__ and __exit__ allow use as a context manager.
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def _content(
      self,
      suffix,
      accepted_mimes = None,
      cache = True
  ):
    """Fetches content of the resources from registry by http calls."""
    if isinstance(self._name, docker_name.Repository):
      suffix = '{repository}/{suffix}'.format(
          repository=self._name.repository,
          suffix=suffix)

    if suffix in self._response:
      return self._response[suffix]

    _, content = self._transport.Request(
        '{scheme}://{registry}/v2/{suffix}'.format(
            scheme=docker_http.Scheme(self._name.registry),
            registry=self._name.registry,
            suffix=suffix),
        accepted_codes=[httplib.OK],
        accepted_mimes=accepted_mimes)
    if cache:
      self._response[suffix] = content
    return content
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def _put_layer(
      self,
      image,
      layer_id
  ):
    """Upload the aufs tarball for a single layer."""
    # TODO(user): We should stream this instead of loading
    # it into memory.
    docker_http.Request(self._transport,
                        '{scheme}://{endpoint}/v1/images/{layer}/layer'.format(
                            scheme=docker_http.Scheme(self._endpoint),
                            endpoint=self._endpoint,
                            layer=layer_id),
                        self._token_creds,
                        accepted_codes=[httplib.OK],
                        body=image.layer(layer_id),
                        content_type='application/octet-stream')
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def Delete(
    name,
    creds,
    transport
):
  """Delete a tag or digest.

  Args:
    name: a tag or digest to be deleted.
    creds: the credentials to use for deletion.
    transport: the transport to use to contact the registry.
  """
  docker_transport = docker_http.Transport(
      name, creds, transport, docker_http.DELETE)

  resp, unused_content = docker_transport.Request(
      '{scheme}://{registry}/v2/{repository}/manifests/{entity}'.format(
          scheme=docker_http.Scheme(name.registry),
          registry=name.registry,
          repository=name.repository,
          entity=_tag_or_digest(name)),
      method='DELETE',
      accepted_codes=[httplib.OK, httplib.ACCEPTED])
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def _content(self, suffix, cache=True):
    """Fetches content of the resources from registry by http calls."""
    if isinstance(self._name, docker_name.Repository):
      suffix = '{repository}/{suffix}'.format(
          repository=self._name.repository,
          suffix=suffix)

    if suffix in self._response:
      return self._response[suffix]

    _, content = self._transport.Request(
        '{scheme}://{registry}/v2/{suffix}'.format(
            scheme=docker_http.Scheme(self._name.registry),
            registry=self._name.registry,
            suffix=suffix),
        accepted_codes=[httplib.OK])
    if cache:
      self._response[suffix] = content
    return content
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def blob_size(self, digest):
    """The byte size of the raw blob."""
    suffix = 'blobs/' + digest
    if isinstance(self._name, docker_name.Repository):
      suffix = '{repository}/{suffix}'.format(
          repository=self._name.repository,
          suffix=suffix)

    resp, unused_content = self._transport.Request(
        '{scheme}://{registry}/v2/{suffix}'.format(
            scheme=docker_http.Scheme(self._name.registry),
            registry=self._name.registry,
            suffix=suffix),
        method='HEAD',
        accepted_codes=[httplib.OK])

    return int(resp['content-length'])

  # Large, do not memoize.
项目:containerregistry    作者:google    | 项目源码 | 文件源码
def catalog(self, page_size=100):
    # TODO(user): Handle docker_name.Repository for /v2/<name>/_catalog
    if isinstance(self._name, docker_name.Repository):
      raise ValueError('Expected docker_name.Registry for "name"')

    url = '{scheme}://{registry}/v2/_catalog?n={page_size}'.format(
        scheme=docker_http.Scheme(self._name.registry),
        registry=self._name.registry,
        page_size=page_size)

    for _, content in self._transport.PaginatedRequest(
        url, accepted_codes=[httplib.OK]):
      wrapper_object = json.loads(content)

      if 'repositories' not in wrapper_object:
        raise docker_http.BadStateException(
            'Malformed JSON response: %s' % content)

      for repo in wrapper_object['repositories']:
        # TODO(user): This should return docker_name.Repository instead.
        yield repo

  # __enter__ and __exit__ allow use as a context manager.
项目:irida-miseq-uploader    作者:phac-nml    | 项目源码 | 文件源码
def test_validate_URL_existence_url_ok(self, mock_cs, mock_url):

        url_ok = Foo()
        setattr(url_ok, "code", httplib.OK)

        mock_url.side_effect = [url_ok]
        mock_cs.side_effect = [None]

        api = API.apiCalls.ApiCalls("", "", "", "", "")
        validate_URL = api.validate_URL_existence

        url = "http://google.com"
        valid = True

        is_valid = validate_URL(url)
        self.assertEqual(is_valid, valid)
        API.apiCalls.urlopen.assert_called_with(url, timeout=api.max_wait_time)
项目:irida-miseq-uploader    作者:phac-nml    | 项目源码 | 文件源码
def session(self):
        if self._session_set_externally:
            return self._session

        try:
            self._session_lock.acquire()
            response = self._session.options(self.base_URL)
            if response.status_code != httplib.OK:
                raise Exception
            else:
                logging.debug("Existing session still works, going to reuse it.")
        except:
            logging.debug("Token is probably expired, going to get a new session.")
            oauth_service = self.get_oauth_service()
            access_token = self.get_access_token(oauth_service)
            self._session = oauth_service.get_session(access_token)
        finally:
            self._session_lock.release()

        return self._session
项目:beg-django-e-commerce    作者:Apress    | 项目源码 | 文件源码
def test_view_product(self):
        """ test product view loads """
        product = Product.active.all()[0]
        product_url = product.get_absolute_url()
        url_entry = urlresolvers.resolve(product_url)
        template_name = url_entry[2]['template_name']
        response = self.client.get(product_url)
        self.failUnless(response)
        self.assertEqual(response.status_code, httplib.OK)
        self.assertTemplateUsed(response, template_name)
        self.assertContains(response, product.name)
        self.assertContains(response, html.escape(product.description))
        # check for cart form in product page response
        cart_form = response.context[0]['form']
        self.failUnless(cart_form)
        # check that the cart form is instance of correct form class
        self.failUnless(isinstance(cart_form, ProductAddToCartForm))

        product_reviews = response.context[0].get('product_reviews',None)
        self.failIfEqual(product_reviews, None)
项目:orangecloud-client    作者:antechrestos    | 项目源码 | 文件源码
def test_get_folder(self):
        self.client.get.return_value = mock_api_response('/folders/folder-id',
                                                         httplib.OK,
                                                         None,
                                                         'folders', 'GET_{id}_response.json')
        folder = self.folders.get('folder-id')
        self.client.get.assert_called_with(self.client.get.return_value.url, params=None)
        self.assertIsNotNone(folder)
        self.assertEqual('folder-id', folder.id)
        self.assertEqual('folder-name', folder.name)
        self.assertEqual('root-id', folder.parentId)
        self.assertEqual(1, len(folder.files))
        self.assertEqual('file-id', folder.files[0].id)
        self.assertEqual(1, len(folder.subfolders))
        self.assertEqual('subfolder-id', folder.subfolders[0].id)
        self.assertEqual('folder-id', folder.subfolders[0].parentId)
项目:ngas    作者:ICRAR    | 项目源码 | 文件源码
def test_ngas_status():
    """
    Execute the STATUS command against the NGAS server on the host fabric is
    currently pointing at
    """
    try:
        serv = urllib2.urlopen('http://{0}:7777/STATUS'.format(env.host), timeout=5)
    except IOError:
        failure('Problem connecting to server {0}'.format(env.host))
        raise

    response = serv.read()
    serv.close()
    if response.find('Status="SUCCESS"') == -1:
        failure('Problem with response from {0}, not SUCESS as expected'.format(env.host))
        raise ValueError(response)
    else:
        success('Response from {0} OK'.format(env.host))
项目:ngas    作者:ICRAR    | 项目源码 | 文件源码
def upload_to(host, filename, port=7777):
    """
    Simple method to upload a file into NGAS
    """
    with contextlib.closing(httplib.HTTPConnection(host, port)) as conn:
        conn.putrequest('POST', '/QARCHIVE?filename=%s' % (urllib2.quote(os.path.basename(filename)),) )
        conn.putheader('Content-Length', os.stat(filename).st_size)
        conn.endheaders()
        with open(filename) as f:
            for data in iter(functools.partial(f.read, 4096), ''):
                conn.send(data)
        r = conn.getresponse()
        if r.status != httplib.OK:
            raise Exception("Error while QARCHIVE-ing %s to %s:%d:\nStatus: %d\n%s\n\n%s" % (filename, conn.host, conn.port, r.status, r.msg, r.read()))
        else:
            success("{0} successfully archived to {1}!".format(filename, host))
项目:UPPlatform_Python_SDK    作者:Jawbone    | 项目源码 | 文件源码
def _request(self, url, method='GET', data=None, ok_statuses=None):
        """
        Issue an HTTP request using the authorized Http object, handle bad responses, set the Meta object from the
        response content, and return the data as JSON.

        :param url: endpoint to send the request
        :param method: HTTP method (e.g. GET, POST, etc.), defaults to GET
        :return: JSON data
        """
        #
        # TODO: clean up the ability to send a POST and add unit tests.
        #
        if data is None:
            req_body = None
        else:
            req_body = urllib.urlencode(data)
        self.resp, self.content = self.http.request(url, method, body=req_body)

        if ok_statuses is None:
            ok_statuses = [httplib.OK]
        self._raise_for_status(ok_statuses)
        resp_json = json.loads(self.content)
        self.meta = upapi.meta.Meta(**resp_json['meta'])
        return resp_json['data']
项目:UPPlatform_Python_SDK    作者:Jawbone    | 项目源码 | 文件源码
def disconnect(self):
        """
        Revoke access for this user.
        """
        self.delete(upapi.endpoints.DISCONNECT)
        self.credentials = None

    #
    # TODO: finish pubsub implementation.
    #
    # def set_pubsub(self, url):
    #     """
    #     Register a user-specific pubsub webhook.
    #
    #     :param url: user-specific webhook callback URL
    #     """
    #     resp = self.oauth.post(upapi.endpoints.PUBSUB, data={'webhook': url})
    #     self._raise_for_status([httplib.OK], resp)
    #
    # def delete_pubsub(self):
    #     """
    #     Delete a user-specific pubsub webhook.
    #     """
    #     self.delete(upapi.endpoints.PUBSUB)
项目:UPPlatform_Python_SDK    作者:Jawbone    | 项目源码 | 文件源码
def test__raise_for_status(self, mock_resp):
        """
        Verify that an exceptions gets raised for unexpected responses.

        :param mock_resp: mocked httplib Response
        """
        #
        # ok_statuses should not raise
        #
        mock_resp.status = httplib.CREATED
        self.up.resp = mock_resp
        self.up.content = ''
        try:
            self.up._raise_for_status([httplib.OK, httplib.CREATED])
        except Exception as exc:
            self.fail('_raise_for_status unexpectedly threw {}'.format(exc))

        #
        # Anything else should raise.
        #
        mock_resp.status = httplib.ACCEPTED
        self.assertRaises(
            upapi.exceptions.UnexpectedAPIResponse,
            self.up._raise_for_status,
            [httplib.OK, httplib.CREATED])
项目:mWorkerService    作者:smices    | 项目源码 | 文件源码
def _parse_result(http_response, response):
    if http_response.status / 100 == httplib.CONTINUE / 100:
        raise BceClientError('Can not handle 1xx http status code')
    bse = None
    body = http_response.read()
    if body:
        d = json.loads(body)

        if 'message' in d and 'code' in d and 'requestId' in d:
            bse = BceServerError(d['message'], code=d['code'], request_id=d['requestId'])
        elif http_response.status / 100 == httplib.OK / 100:
            response.__dict__.update(json.loads(body, \
                    object_hook=utils.dict_to_python_object).__dict__)
            http_response.close()
            return True
    elif http_response.status / 100 == httplib.OK / 100:
        return True

    if bse is None:
        bse = BceServerError(http_response.reason, request_id=response.metadata.bce_request_id)
    bse.status_code = http_response.status
    raise bse
项目:mWorkerService    作者:smices    | 项目源码 | 文件源码
def _parse_result(http_response, response):
    if http_response.status / 100 == httplib.CONTINUE / 100:
        raise BceClientError('Can not handle 1xx http status code')
    bse = None
    body = http_response.read()
    if body:
        d = json.loads(body)

        if 'message' in d and 'code' in d and 'requestId' in d:
            bse = BceServerError(d['message'], code=d['code'], request_id=d['requestId'])
        elif http_response.status / 100 == httplib.OK / 100:
            response.__dict__.update(json.loads(body, \
                    object_hook=utils.dict_to_python_object).__dict__)
            http_response.close()
            return True
    elif http_response.status / 100 == httplib.OK / 100:
        return True

    if bse is None:
        bse = BceServerError(http_response.reason, request_id=response.metadata.bce_request_id)
    bse.status_code = http_response.status
    raise bse
项目:qtip    作者:opnfv    | 项目源码 | 文件源码
def check_endpoint_for_error(resource, operation=None):
    def _decorator(func):
        def _execute(name=None):
            try:
                return func(name), httplib.OK
            except error.NotFoundError:
                return connexion.problem(
                    httplib.NOT_FOUND,
                    '{} not found'.format(resource),
                    'Requested {} `{}` not found.'
                    .format(resource.lower(), name))
            except error.ToBeDoneError:
                return connexion.problem(
                    httplib.NOT_IMPLEMENTED,
                    '{} handler not implemented'.format(operation),
                    'Requested operation `{}` on {} not implemented.'
                    .format(operation.lower(), resource.lower()))
        return _execute
    return _decorator
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def fetch_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @return data retrieved from URL or None
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        return return_data
    else:
        raise URLFetchError(return_message)
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def fetch_from_url_to_file(url, config, output_file, data=None, handlers=None):
    """Writes data retrieved from a URL to a file.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param output_file: output file
    @type output_file: basestring
    @return: tuple (
        returned HTTP status code or 0 if an error occurred
        returned message
        boolean indicating whether access was successful)
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        outfile = open(output_file, "w")
        outfile.write(return_data)
        outfile.close()

    return return_code, return_message, return_code == http_client_.OK
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def fetch_stream_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param data: HTTP POST data
    @type data: str
    @param handlers: list of custom urllib2 handlers to add to the request
    @type handlers: iterable
    @return: data retrieved from URL or None
    @rtype: file derived type
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return response
    else:
        raise URLFetchError(return_message)
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def fetch_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @return data retrieved from URL or None
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        return return_data
    else:
        raise URLFetchError(return_message)
项目:package-33c3    作者:info-beamer    | 项目源码 | 文件源码
def fetch_from_url_to_file(url, config, output_file, data=None, handlers=None):
    """Writes data retrieved from a URL to a file.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param output_file: output file
    @type output_file: basestring
    @return: tuple (
        returned HTTP status code or 0 if an error occurred
        returned message
        boolean indicating whether access was successful)
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        outfile = open(output_file, "w")
        outfile.write(return_data)
        outfile.close()

    return return_code, return_message, return_code == http_client_.OK
项目:micromaterials-api    作者:lpmi-13    | 项目源码 | 文件源码
def deconstruct_list(response_data, model=None, status_code=OK):
    view_name = None
    if not response_data:
        raise KrumpException(
            'Response data is empty; the view name at least is required.')
    if len(response_data) >= 1:
        view_name = required_view_name(response_data[0])
    if len(response_data) >= 2:
        model = response_data[1]
    if len(response_data) >= 3:
        status_code = response_data[2]

    if model is None:
        model = {}
    if status_code is None:
        status_code = OK

    return view_name, model, status_code
项目:forseti-security    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def test_get_value_for_attribute_with_a_present_attribute(self, mock_meta_req):
        """Test get_value_for_attribute returns correctly.

        Setup:
            * Mock out a httplib.HTTPResponse .
            * Return that from _issue_http_request.

        Expected results:
            * A matching string.
        """
        mock_response = 'expected_response'

        with mock.patch('httplib.HTTPResponse',
                        mock.mock_open(read_data=mock_response)) as mock_http_resp:
            mock_http_resp.return_value.status = httplib.OK
            mock_meta_req.side_effect = mock_http_resp

            actual_response = metadata_server.get_value_for_attribute('')

        self.assertEqual(actual_response, mock_response)
项目:xblock-video    作者:appsembler    | 项目源码 | 文件源码
def _refresh_access_token(self):
        """
        Request new access token to send with requests to Brightcove. Access Token expires every 5 minutes.
        """
        url = "https://oauth.brightcove.com/v3/access_token"
        params = {"grant_type": "client_credentials"}
        auth_string = base64.encodestring(
            '{}:{}'.format(self.api_key, self.api_secret)
        ).replace('\n', '')
        headers = {
            "Content-Type": "application/x-www-form-urlencoded",
            "Authorization": "Basic " + auth_string
        }
        resp = requests.post(url, headers=headers, data=params)
        if resp.status_code == httplib.OK:
            result = resp.json()
            return result['access_token']
项目:xblock-video    作者:appsembler    | 项目源码 | 文件源码
def get(self, url, headers=None, can_retry=True):
        """
        Issue REST GET request to a given URL. Can throw ApiClientError or its subclass.

        Arguments:
            url (str): API url to fetch a resource from.
            headers (dict): Headers necessary as per API, e.g. authorization bearer to perform authorised requests.
            can_retry (bool): True if in a case of authentication error it can refresh access token and retry a call.
        Returns:
            Response in python native data format.
        """
        headers_ = {'Authorization': 'Bearer ' + str(self.access_token)}
        if headers is not None:
            headers_.update(headers)
        resp = requests.get(url, headers=headers_)
        if resp.status_code == httplib.OK:
            return resp.json()
        elif resp.status_code == httplib.UNAUTHORIZED and can_retry:
            self.access_token = self._refresh_access_token()
            return self.get(url, headers, can_retry=False)
        else:
            raise BrightcoveApiClientError
项目:xblock-video    作者:appsembler    | 项目源码 | 文件源码
def get(self, url, headers=None, can_retry=False):
        """
        Issue REST GET request to a given URL. Can throw ApiClientError or its subclass.

        Arguments:
            url (str): API url to fetch a resource from.
            headers (dict): Headers necessary as per API, e.g. authorization bearer to perform
            authorised requests.
        Returns:
            Response in python native data format.
        """
        headers_ = {
            'Authorization': 'Bearer {}'.format(self.access_token.encode(encoding='utf-8')),
            'Accept': 'application/json'
        }
        if headers is not None:
            headers_.update(headers)
        resp = requests.get(url, headers=headers_)
        if resp.status_code == httplib.OK:
            return resp.json()
        else:
            raise VimeoApiClientError(_("Can't fetch requested data from API."))
项目:Protector    作者:trivago    | 项目源码 | 文件源码
def connect_intercept(self):
        hostname = self.path.split(':')[0]
        certpath = "%s/%s.crt" % (self.certdir.rstrip('/'), hostname)

        with self.lock:
            if not os.path.isfile(certpath):
                epoch = "%d" % (time.time() * 1000)
                p1 = Popen(["openssl", "req", "-new", "-key", self.certkey, "-subj", "/CN=%s" % hostname], stdout=PIPE)
                p2 = Popen(["openssl", "x509", "-req", "-days", "3650", "-CA", self.cacert, "-CAkey", self.cakey,
                            "-set_serial", epoch, "-out", certpath], stdin=p1.stdout, stderr=PIPE)
                p2.communicate()

        self.wfile.write("%s %d %s\r\n" % (self.protocol_version, httplib.OK, 'Connection Established'))
        self.end_headers()

        self.connection = ssl.wrap_socket(self.connection, keyfile=self.certkey, certfile=certpath, server_side=True)
        self.rfile = self.connection.makefile("rb", self.rbufsize)
        self.wfile = self.connection.makefile("wb", self.wbufsize)

        conntype = self.headers.get('Proxy-Connection', '')
        if conntype.lower() == 'close':
            self.close_connection = 1
        elif conntype.lower() == 'keep-alive' and self.protocol_version >= "HTTP/1.1":
            self.close_connection = 0
项目:Protector    作者:trivago    | 项目源码 | 文件源码
def connect_relay(self):
        address = self.path.split(':', 1)
        address[1] = int(address[1]) or 443
        try:
            s = socket.create_connection(address, timeout=self.timeout)
        except:
            self.send_error(httplib.BAD_GATEWAY)
            return
        self.send_response(httplib.OK, 'Connection Established')
        self.end_headers()

        conns = [self.connection, s]
        self.close_connection = 0
        while not self.close_connection:
            rlist, wlist, xlist = select.select(conns, [], conns, self.timeout)
            if xlist or not rlist:
                break
            for r in rlist:
                other = conns[1] if r is conns[0] else conns[0]
                data = r.recv(8192)
                if not data:
                    self.close_connection = 1
                    break
                other.sendall(data)
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def fetch_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @return data retrieved from URL or None
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        return return_data
    else:
        raise URLFetchError(return_message)
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def fetch_from_url_to_file(url, config, output_file, data=None, handlers=None):
    """Writes data retrieved from a URL to a file.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param output_file: output file
    @type output_file: basestring
    @return: tuple (
        returned HTTP status code or 0 if an error occurred
        returned message
        boolean indicating whether access was successful)
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code == http_client_.OK:
        return_data = response.read()
        response.close()
        outfile = open(output_file, "w")
        outfile.write(return_data)
        outfile.close()

    return return_code, return_message, return_code == http_client_.OK
项目:slack_scholar    作者:xLeitix    | 项目源码 | 文件源码
def fetch_stream_from_url(url, config, data=None, handlers=None):
    """Returns data retrieved from a URL.
    @param url: URL to attempt to open
    @type url: basestring
    @param config: SSL context configuration
    @type config: Configuration
    @param data: HTTP POST data
    @type data: str
    @param handlers: list of custom urllib2 handlers to add to the request
    @type handlers: iterable
    @return: data retrieved from URL or None
    @rtype: file derived type
    """
    return_code, return_message, response = open_url(url, config, data=data,
                                                     handlers=handlers)
    if return_code and return_code == http_client_.OK:
        return response
    else:
        raise URLFetchError(return_message)
项目:wopiserver    作者:cernbox    | 项目源码 | 文件源码
def wopiGetFile(fileid):
  '''Implements the GetFile WOPI call'''
  Wopi.refreshconfig()
  try:
    acctok = jwt.decode(flask.request.args['access_token'], Wopi.wopisecret, algorithms=['HS256'])
    if acctok['exp'] < time.time():
      raise jwt.exceptions.ExpiredSignatureError
    Wopi.log.info('msg="GetFile" user="%s:%s" filename="%s" fileid="%s" token="%s"' % \
                  (acctok['ruid'], acctok['rgid'], acctok['filename'], fileid, flask.request.args['access_token'][-20:]))
    # stream file from storage to client
    resp = flask.Response(xrdcl.readfile(acctok['filename'], acctok['ruid'], acctok['rgid']), mimetype='application/octet-stream')
    resp.status_code = httplib.OK
    return resp
  except (jwt.exceptions.DecodeError, jwt.exceptions.ExpiredSignatureError) as e:
    Wopi.log.warning('msg="Signature verification failed" client="%s" requestedUrl="%s" token="%s"' % \
                     (flask.request.remote_addr, flask.request.base_url, flask.request.args['access_token']))
    return 'Invalid access token', httplib.UNAUTHORIZED
  except Exception, e:
    return _logGeneralExceptionAndReturn(e, flask.request)


#
# The following operations are all called on POST /wopi/files/<fileid>
#
项目:wopiserver    作者:cernbox    | 项目源码 | 文件源码
def wopiUnlock(fileid, reqheaders, acctok):
  '''Implements the Unlock WOPI call'''
  lock = reqheaders['X-WOPI-Lock']
  retrievedLock = _retrieveWopiLock(fileid, 'UNLOCK', lock, acctok)
  if not _compareWopiLocks(retrievedLock, lock):
    return _makeConflictResponse('UNLOCK', retrievedLock, lock, '', acctok['filename'])
  # OK, the lock matches. Remove any extended attribute related to locks and conflicts handling
  try:
    xrdcl.removefile(_getLockName(acctok['filename']), Wopi.lockruid, Wopi.lockrgid, 1)
  except IOError:
    # ignore, it's not worth to report anything here
    pass
  try:
    xrdcl.rmxattr(acctok['filename'], acctok['ruid'], acctok['rgid'], LASTSAVETIMEKEY)
  except IOError:
    # same as above
    pass
  # and update our internal list of opened files
  try:
    del Wopi.openfiles[acctok['filename']]
  except KeyError:
    # already removed?
    pass
  return 'OK', httplib.OK
项目:wopiserver    作者:cernbox    | 项目源码 | 文件源码
def wopiGetLock(fileid, reqheaders_unused, acctok):
  '''Implements the GetLock WOPI call'''
  resp = flask.Response()
  # throws exception if no lock
  resp.headers['X-WOPI-Lock'] = _retrieveWopiLock(fileid, 'GETLOCK', '', acctok)
  resp.status_code = httplib.OK
  # for statistical purposes, check whether a lock exists and update internal bookkeeping
  if resp.headers['X-WOPI-Lock']:
    try:
      # the file was already opened for write, check whether this is a new user
      if not acctok['username'] in Wopi.openfiles[acctok['filename']][1]:
        # yes it's a new user
        Wopi.openfiles[acctok['filename']][1].add(acctok['username'])
        if len(Wopi.openfiles[acctok['filename']][1]) > 1:
          # for later monitoring, explicitly log that this file is being edited by at least two users
          Wopi.log.info('msg="Collaborative editing detected" filename="%s" token="%s" users="%s"' % \
                         (acctok['filename'], flask.request.args['access_token'][-20:], list(Wopi.openfiles[acctok['filename']][1])))
    except KeyError:
      # existing lock but missing Wopi.openfiles[acctok['filename']] ?
      Wopi.openfiles[acctok['filename']] = (time.asctime(), sets.Set([acctok['username']]))
  return resp
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def _copy(gcs_stub, filename, headers):
  """Copy file.

  Args:
    gcs_stub: an instance of gcs stub.
    filename: dst filename of format /bucket/filename
    headers: a dict of request headers. Must contain _XGoogCopySource header.

  Returns:
    An _FakeUrlFetchResult instance.
  """
  source = _XGoogCopySource(headers).value
  result = _handle_head(gcs_stub, source)
  if result.status_code == httplib.NOT_FOUND:
    return result
  directive = headers.pop('x-goog-metadata-directive', 'COPY')
  if directive == 'REPLACE':
    gcs_stub.put_copy(source, filename, headers)
  else:
    gcs_stub.put_copy(source, filename, None)
  return _FakeUrlFetchResult(httplib.OK, {}, '')
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def _handle_head(gcs_stub, filename):
  """Handle HEAD request."""
  filestat = gcs_stub.head_object(filename)
  if not filestat:
    return _FakeUrlFetchResult(httplib.NOT_FOUND, {}, '')

  http_time = common.posix_time_to_http(filestat.st_ctime)

  response_headers = {
      'x-goog-stored-content-length': filestat.st_size,
      'content-length': 0,
      'content-type': filestat.content_type,
      'etag': filestat.etag,
      'last-modified': http_time
  }

  if filestat.metadata:
    response_headers.update(filestat.metadata)

  return _FakeUrlFetchResult(httplib.OK, response_headers, '')