Python httplib 模块,FORBIDDEN 实例源码

我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用httplib.FORBIDDEN

项目:mWorkerService    作者:smices    | 项目源码 | 文件源码
def does_bucket_exist(self, bucket_name, config=None):
        """
        Check whether there is a bucket with specific name

        :param bucket_name: None
        :type bucket_name: str
        :return:True or False
        :rtype: bool
        """
        try:
            self._send_request(http_methods.HEAD, bucket_name, config=config)
            return True
        except BceHttpClientError as e:
            if isinstance(e.last_error, BceServerError):
                if e.last_error.status_code == httplib.FORBIDDEN:
                    return True
                if e.last_error.status_code == httplib.NOT_FOUND:
                    return False
            raise e
项目:mWorkerService    作者:smices    | 项目源码 | 文件源码
def does_bucket_exist(self, bucket_name, config=None):
        """
        Check whether there is a bucket with specific name

        :param bucket_name: None
        :type bucket_name: str
        :return:True or False
        :rtype: bool
        """
        try:
            self._send_request(http_methods.HEAD, bucket_name, config=config)
            return True
        except BceHttpClientError as e:
            if isinstance(e.last_error, BceServerError):
                if e.last_error.status_code == httplib.FORBIDDEN:
                    return True
                if e.last_error.status_code == httplib.NOT_FOUND:
                    return False
            raise e
项目:endpoints-management-python    作者:cloudendpoints    | 项目源码 | 文件源码
def test_should_include_project_id_in_error_text_when_needed(self):
        resp = sc_messages.CheckResponse(
            checkErrors = [
                sc_messages.CheckError(
                    code=sc_messages.CheckError.CodeValueValuesEnum.PROJECT_DELETED)
            ]
        )
        code, got, _ = check_request.convert_response(resp, self.PROJECT_ID)
        want = u'Project %s has been deleted' % (self.PROJECT_ID,)
        expect(code).to(equal(httplib.FORBIDDEN))
        expect(got).to(equal(want))
项目:endpoints-management-python    作者:cloudendpoints    | 项目源码 | 文件源码
def test_should_include_detail_in_error_text_when_needed(self):
        detail = u'details, details, details'
        resp = sc_messages.CheckResponse(
            checkErrors = [
                sc_messages.CheckError(
                    code=sc_messages.CheckError.CodeValueValuesEnum.IP_ADDRESS_BLOCKED,
                    detail=detail)
            ]
        )
        code, got, _ = check_request.convert_response(resp, self.PROJECT_ID)
        expect(code).to(equal(httplib.FORBIDDEN))
        expect(got).to(equal(detail))
项目:endpoints-management-python    作者:cloudendpoints    | 项目源码 | 文件源码
def test_should_include_project_id_in_error_text_when_needed(self):
        resp = sc_messages.AllocateQuotaResponse(
            allocateErrors = [
                sc_messages.QuotaError(
                    code=sc_messages.QuotaError.CodeValueValuesEnum.PROJECT_DELETED)
            ]
        )
        code, got = quota_request.convert_response(resp, self.PROJECT_ID)
        want = u'Project %s has been deleted' % (self.PROJECT_ID,)
        expect(code).to(equal(httplib.FORBIDDEN))
        expect(got).to(equal(want))
项目:capstone    作者:rackerlabs    | 项目源码 | 文件源码
def test_with_disabled_user(self):
        data = generate_password_auth_data({
            'name': 'disabled',
            'password': self.password,
            'domain': {'id': uuid.uuid4().hex},
        })
        self.authenticate(data, expected_status=httplib.FORBIDDEN)
项目:capstone    作者:rackerlabs    | 项目源码 | 文件源码
def test_with_domain_id(self):
        data = generate_password_auth_data({
            'name': self.username,
            'password': self.password,
        })
        resp = self.authenticate(data)
        token = resp.headers['X-Subject-Token']

        data = generate_token_auth_data_with_scope(
            token_id=token,
            scope={'domain': {'id': self.project_id}})
        resp = self.authenticate(data, httplib.FORBIDDEN)
项目:beg-django-e-commerce    作者:Apress    | 项目源码 | 文件源码
def test_add_to_cart_fails_csrf(self):
        """ adding product fails without including the CSRF token to POST request parameters """
        quantity = 2
        product_url = self.product.get_absolute_url()
        response = self.client.get(product_url)
        self.assertEqual(response.status_code, httplib.OK )

        # perform the post of adding to the cart
        postdata = {'product_slug': self.product.slug, 
                    'quantity': quantity }
        response = self.client.post(product_url, postdata )

        # assert forbidden error due to missing CSRF input
        self.assertEqual(response.status_code, httplib.FORBIDDEN )
项目:python-netatmo-client    作者:antechrestos    | 项目源码 | 文件源码
def _is_token_expired(i):
        if i.status_code == httplib.FORBIDDEN and type(i.body) == dict and i.body.get('error') is not None:
            code = i.body['error'].get('code')
            return code == NetatmoClient.INVALID_ACCESS_TOKEN or code == NetatmoClient.ACCESS_TOKEN_EXPIRED
        else:
            return False
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def check_resp_status_and_retry(resp, image_id, url):
    # Note(Jesse): This branch sorts errors into those that are permanent,
    # those that are ephemeral, and those that are unexpected.
    if resp.status in (httplib.BAD_REQUEST,                      # 400
                       httplib.UNAUTHORIZED,                     # 401
                       httplib.PAYMENT_REQUIRED,                 # 402
                       httplib.FORBIDDEN,                        # 403
                       httplib.METHOD_NOT_ALLOWED,               # 405
                       httplib.NOT_ACCEPTABLE,                   # 406
                       httplib.PROXY_AUTHENTICATION_REQUIRED,    # 407
                       httplib.CONFLICT,                         # 409
                       httplib.GONE,                             # 410
                       httplib.LENGTH_REQUIRED,                  # 411
                       httplib.PRECONDITION_FAILED,              # 412
                       httplib.REQUEST_ENTITY_TOO_LARGE,         # 413
                       httplib.REQUEST_URI_TOO_LONG,             # 414
                       httplib.UNSUPPORTED_MEDIA_TYPE,           # 415
                       httplib.REQUESTED_RANGE_NOT_SATISFIABLE,  # 416
                       httplib.EXPECTATION_FAILED,               # 417
                       httplib.UNPROCESSABLE_ENTITY,             # 422
                       httplib.LOCKED,                           # 423
                       httplib.FAILED_DEPENDENCY,                # 424
                       httplib.UPGRADE_REQUIRED,                 # 426
                       httplib.NOT_IMPLEMENTED,                  # 501
                       httplib.HTTP_VERSION_NOT_SUPPORTED,       # 505
                       httplib.NOT_EXTENDED,                     # 510
                       ):
        raise PluginError("Got Permanent Error response [%i] while "
                          "uploading image [%s] to glance [%s]"
                          % (resp.status, image_id, url))
    # Nova service would process the exception
    elif resp.status == httplib.NOT_FOUND:                        # 404
        exc = XenAPI.Failure('ImageNotFound')
        raise exc
    # NOTE(nikhil): Only a sub-set of the 500 errors are retryable. We
    # optimistically retry on 500 errors below.
    elif resp.status in (httplib.REQUEST_TIMEOUT,                # 408
                         httplib.INTERNAL_SERVER_ERROR,          # 500
                         httplib.BAD_GATEWAY,                    # 502
                         httplib.SERVICE_UNAVAILABLE,            # 503
                         httplib.GATEWAY_TIMEOUT,                # 504
                         httplib.INSUFFICIENT_STORAGE,           # 507
                         ):
        raise RetryableError("Got Ephemeral Error response [%i] while "
                             "uploading image [%s] to glance [%s]"
                             % (resp.status, image_id, url))
    else:
        # Note(Jesse): Assume unexpected errors are retryable. If you are
        # seeing this error message, the error should probably be added
        # to either the ephemeral or permanent error list.
        raise RetryableError("Got Unexpected Error response [%i] while "
                             "uploading image [%s] to glance [%s]"
                             % (resp.status, image_id, url))
项目:Orator-Google-App-Engine    作者:MakarenaLabs    | 项目源码 | 文件源码
def check_status(status, expected, path, headers=None,
                 resp_headers=None, body=None, extras=None):
  """Check HTTP response status is expected.

  Args:
    status: HTTP response status. int.
    expected: a list of expected statuses. A list of ints.
    path: filename or a path prefix.
    headers: HTTP request headers.
    resp_headers: HTTP response headers.
    body: HTTP response body.
    extras: extra info to be logged verbatim if error occurs.

  Raises:
    AuthorizationError: if authorization failed.
    NotFoundError: if an object that's expected to exist doesn't.
    TimeoutError: if HTTP request timed out.
    ServerError: if server experienced some errors.
    FatalError: if any other unexpected errors occurred.
  """
  if status in expected:
    return

  msg = ('Expect status %r from Google Storage. But got status %d.\n'
         'Path: %r.\n'
         'Request headers: %r.\n'
         'Response headers: %r.\n'
         'Body: %r.\n'
         'Extra info: %r.\n' %
         (expected, status, path, headers, resp_headers, body, extras))

  if status == httplib.UNAUTHORIZED:
    raise AuthorizationError(msg)
  elif status == httplib.FORBIDDEN:
    raise ForbiddenError(msg)
  elif status == httplib.NOT_FOUND:
    raise NotFoundError(msg)
  elif status == httplib.REQUEST_TIMEOUT:
    raise TimeoutError(msg)
  elif status == httplib.REQUESTED_RANGE_NOT_SATISFIABLE:
    raise InvalidRange(msg)
  elif (status == httplib.OK and 308 in expected and
        httplib.OK not in expected):
    raise FileClosedError(msg)
  elif status >= 500:
    raise ServerError(msg)
  else:
    raise FatalError(msg)
项目:enkiWS    作者:juliettef    | 项目源码 | 文件源码
def check_status(status, expected, path, headers=None,
                 resp_headers=None, body=None, extras=None):
  """Check HTTP response status is expected.

  Args:
    status: HTTP response status. int.
    expected: a list of expected statuses. A list of ints.
    path: filename or a path prefix.
    headers: HTTP request headers.
    resp_headers: HTTP response headers.
    body: HTTP response body.
    extras: extra info to be logged verbatim if error occurs.

  Raises:
    AuthorizationError: if authorization failed.
    NotFoundError: if an object that's expected to exist doesn't.
    TimeoutError: if HTTP request timed out.
    ServerError: if server experienced some errors.
    FatalError: if any other unexpected errors occurred.
  """
  if status in expected:
    return

  msg = ('Expect status %r from Google Storage. But got status %d.\n'
         'Path: %r.\n'
         'Request headers: %r.\n'
         'Response headers: %r.\n'
         'Body: %r.\n'
         'Extra info: %r.\n' %
         (expected, status, path, headers, resp_headers, body, extras))

  if status == httplib.UNAUTHORIZED:
    raise AuthorizationError(msg)
  elif status == httplib.FORBIDDEN:
    raise ForbiddenError(msg)
  elif status == httplib.NOT_FOUND:
    raise NotFoundError(msg)
  elif status == httplib.REQUEST_TIMEOUT:
    raise TimeoutError(msg)
  elif status == httplib.REQUESTED_RANGE_NOT_SATISFIABLE:
    raise InvalidRange(msg)
  elif (status == httplib.OK and 308 in expected and
        httplib.OK not in expected):
    raise FileClosedError(msg)
  elif status >= 500:
    raise ServerError(msg)
  else:
    raise FatalError(msg)
项目:docker-zenoss4    作者:krull    | 项目源码 | 文件源码
def _authenticate_with_kerberos(conn_info, url, agent, gss_client=None):
    service = '{0}@{1}'.format(conn_info.scheme.upper(), conn_info.hostname)
    if gss_client is None:
        gss_client = AuthGSSClient(
            service,
            conn_info)

    base64_client_data = yield gss_client.get_base64_client_data()
    auth = 'Kerberos {0}'.format(base64_client_data)
    k_headers = Headers(_CONTENT_TYPE)
    k_headers.addRawHeader('Authorization', auth)
    k_headers.addRawHeader('Content-Length', '0')
    response = yield agent.request('POST', url, k_headers, None)
    auth_header = response.headers.getRawHeaders('WWW-Authenticate')[0]
    auth_details = get_auth_details(auth_header)

    if response.code == httplib.UNAUTHORIZED:
        try:
            if auth_details:
                gss_client._step(auth_details)
        except kerberos.GSSError as e:
            msg = "HTTP Unauthorized received on kerberos initialization.  "\
                "Kerberos error code {0}: {1}.".format(e.args[1][1], e.args[1][0])
            raise Exception(msg)
        raise UnauthorizedError(
            "HTTP Unauthorized received on initial kerberos request.  Check username and password")
    elif response.code == httplib.FORBIDDEN:
        raise ForbiddenError(
            "Forbidden. Check WinRM port and version.")
    elif response.code != httplib.OK:
        proto = _StringProtocol()
        response.deliverBody(proto)
        xml_str = yield proto.d
        xml_str = gss_client.decrypt_body(xml_str)
        raise Exception(
            "status code {0} received on initial kerberos request {1}"
            .format(response.code, xml_str))
    if not auth_details:
        raise Exception(
            'negotiate not found in WWW-Authenticate header: {0}'
            .format(auth_header))
    k_username = gss_client.get_username(auth_details)
    log.debug('kerberos auth successful for user: {0} / {1} '
              .format(conn_info.username, k_username))
    defer.returnValue(gss_client)