Python jwt 模块,ExpiredSignatureError() 实例源码

我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用jwt.ExpiredSignatureError()

项目:flask-jwt-auth    作者:realpython    | 项目源码 | 文件源码
def decode_auth_token(auth_token):
        """
        Validates the auth token
        :param auth_token:
        :return: integer|string
        """
        try:
            payload = jwt.decode(auth_token, app.config.get('SECRET_KEY'))
            is_blacklisted_token = BlacklistToken.check_blacklist(auth_token)
            if is_blacklisted_token:
                return 'Token blacklisted. Please log in again.'
            else:
                return payload['sub']
        except jwt.ExpiredSignatureError:
            return 'Signature expired. Please log in again.'
        except jwt.InvalidTokenError:
            return 'Invalid token. Please log in again.'
项目:bucket_api    作者:jokamjohn    | 项目源码 | 文件源码
def decode_auth_token(token):
        """
        Decoding the token to get the payload and then return the user Id in 'sub'
        :param token: Auth Token
        :return:
        """
        try:
            payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms='HS256')
            is_token_blacklisted = BlackListToken.check_blacklist(token)
            if is_token_blacklisted:
                return 'Token was Blacklisted, Please login In'
            return payload['sub']
        except jwt.ExpiredSignatureError:
            return 'Signature expired, Please sign in again'
        except jwt.InvalidTokenError:
            return 'Invalid token. Please sign in again'
项目:baselayer    作者:cesium-ml    | 项目源码 | 文件源码
def authenticate(self, auth_token):
        try:
            token_payload = jwt.decode(auth_token, secret)
            username = token_payload['username']

            self.username = username
            self.authenticated = True
            self.auth_failures = 0
            self.send_json(action='AUTH OK')

            # If we are the first websocket connecting on behalf of
            # a given user, subscribe to the feed for that user
            if len(WebSocket.sockets[username]) == 0:
                WebSocket.subscribe(username)

            WebSocket.sockets[username].add(self)

        except jwt.DecodeError:
            self.send_json(action='AUTH FAILED')
        except jwt.ExpiredSignatureError:
            self.send_json(action='AUTH FAILED')
项目:mangaNotif    作者:Symphoria    | 项目源码 | 文件源码
def is_authenticated(req):
    auth_token = req.headers.get('Authentication-Token')

    if auth_token:
        try:
            auth_token_payload = jwt.decode(auth_token, os.environ["JWT_SECRET"])
            user_id = int(auth_token_payload['user_id'])
            user = User.query.filter_by(id=user_id, is_active=True).first()

            if user:
                return user
            else:
                return False
        except jwt.ExpiredSignatureError:
            return False
    else:
        return False
项目:opserv-backend    作者:OpServ-Monitoring    | 项目源码 | 文件源码
def get_current_user(self):
        """
        Overrides the built-in function to get the user id based on a JWT-token.
        If there is no token included in the Authorization header None is returned.
        In case the token is not valid an corresponding exception is raised.

        :return: The user id as a string or None if the uid couldn't be extracted
        :raises jwt.InvalidIssuedAtError: Raised if the IAT-claim is less than the last time the user password changed
        :raises jwt.ExpiredSignatureError: Raised if the EXT-claim is less than the current UNIX time
        :raises jwt.InvalidTokenError: Raised if the token is invalid for reasons other than the above mentioned
        """
        auth_header = self.request.headers.get('Authorization')

        if auth_header is not None and auth_header.startswith("Bearer "):
            encoded_jwt_token = auth_header[7:]

            payload = TokenGenerator.decode_token(encoded_jwt_token)

            # TODO Should we check if payload["iat"] < last password change?
            if False:
                raise jwt.InvalidIssuedAtError

            return payload["uid"]
        return None
项目:opserv-backend    作者:OpServ-Monitoring    | 项目源码 | 文件源码
def prepare(self):
        """
        Ensures that the caller is a validated user
        """
        super().prepare()

        try:
            if self.current_user is None:
                self.send_error(401)  # TODO Add details

        except jwt.InvalidIssuedAtError:
            log.error("The IAT-claim of the passed token is less than the last time the user password changed")
            self.send_error(401, summary="invalidated token")

        except jwt.ExpiredSignatureError:
            log.error("The EXT-claim of the passed token is less than the current UNIX time")
            self.send_error(401, summary="expired token")

        except jwt.InvalidTokenError:
            log.error("The passed token could not be validated")
            self.send_error(401, summary="invalid token")
项目:authserver    作者:jdelic    | 项目源码 | 文件源码
def _user_from_refresh_token(self, jwtstr: str, key_pemstr: str, expected_issuer: Optional[str]=None,
                                 expected_audience: Optional[str]=None) -> Optional[MNUser]:
        _log.debug("Received refresh token: %s", jwtstr)
        try:
            token = jwt.decode(jwtstr, key_pemstr, algorithms=["RS256"], leeway=10,
                               issuer=expected_issuer, audience=expected_audience)
        except (jwt.ExpiredSignatureError, jwt.InvalidAlgorithmError,
                jwt.InvalidIssuerError, jwt.InvalidTokenError) as e:
            _log.warning("Rejected refresh token because of %s", str(e))
            return None

        if "sub" not in token:
            _log.error("BUG? Valid refresh token without user in subject. %s", jwtstr)
            return None

        try:
            user = MNUser.objects.get(pk=token["sub"])  # type: MNUser
        except MNUser.DoesNotExist:
            _log.warning("No such user from valid JWT. %s", jwtstr)
            return None
        return user
项目:IntegraTI-API    作者:discentes-imd    | 项目源码 | 文件源码
def verify_token():
    """
    Verify if the token is valid, not expired and not blacklisted
    """
    if 'Authorization' in request.headers:
        if request.headers['Authorization'] in cache.blacklisted_tokens:
            abort(403, 'Error: invalid token')
        try:
            payload = jwt.decode(request.headers['Authorization'], config.SECRET_KEY)
            g.current_user = payload['id_user']
        except jwt.ExpiredSignatureError:
            abort(403, 'Error: token expired')
        except jwt.DecodeError:
            abort(403, 'Error: invalid token')
项目:flask-microservices-users    作者:realpython    | 项目源码 | 文件源码
def decode_auth_token(auth_token):
        """Decodes the auth token - :param auth_token: - :return: integer|string"""
        try:
            payload = jwt.decode(
                auth_token, current_app.config.get('SECRET_KEY'))
            return payload['sub']
        except jwt.ExpiredSignatureError:
            return 'Signature expired. Please log in again.'
        except jwt.InvalidTokenError:
            return 'Invalid token. Please log in again.'
项目:Casper-HC    作者:jamfit    | 项目源码 | 文件源码
def verify_jwt(signed_token):
    unvalidated_token = jwt.decode(signed_token, verify=False)
    hipchat_room = HipChatRoom.query.filter(HipChatRoom.hipchat_oauth_id == unvalidated_token['iss']).first_or_404()
    try:
        jwt.decode(signed_token, hipchat_room.hipchat_oauth_secret)
    except jwt.exceptions.DecodeError:
        flask.flash('Unable to decode the Java Web Token provided', 'error')
        flask.abort(401)
    except jwt.ExpiredSignatureError:
        flask.flash('The provided Java Web Token is expired: try refreshing the page', 'error')
        flask.abort(401)
    else:
        return hipchat_room
项目:xiaodi    作者:shenaishiren    | 项目源码 | 文件源码
def verify_token(cls, user, token):
        try:
            data = jwt.decode(token, str(user.id) + SECRET_CODE)
        except (jwt.ExpiredSignatureError, jwt.DecodeError, AttributeError):
            return False
        else:
            if data["username"] == user.username:
                return True
            else:
                return False
项目:auth-srv    作者:openpermissions    | 项目源码 | 文件源码
def decode_token(token):
    """
    Get the organisation ID from a token

    :param token: a JSON Web Token
    :returns: str, organisation ID
    :raises:
        jwt.DecodeError: Invalid token or not signed with our key
        jwt.ExpiredSignatureError: Token has expired
        jwt.InvalidAudienceError: Invalid "aud" claim
        jwt.InvalidIssuerError: Invalid "iss" claim
        jwt.MissingRequiredClaimError: Missing a required claim
    """
    cert_file = getattr(options, 'ssl_cert', None) or LOCALHOST_CRT
    with open(cert_file) as f:
        cert = load_pem_x509_certificate(f.read(), default_backend())
        public_key = cert.public_key()

    payload = jwt.decode(token,
                         public_key,
                         audience=audience(),
                         issuer=issuer(),
                         algorithms=[ALGORITHM],
                         verify=True)

    if not payload.get('sub'):
        raise jwt.MissingRequiredClaimError('"sub" claim is required')

    payload['scope'] = Scope(payload['scope'])

    return payload
项目:auth-srv    作者:openpermissions    | 项目源码 | 文件源码
def test_decode_expired_token():
    in_the_past = NOW - timedelta(minutes=EXPIRY * 2 + 1)
    with patch.object(_token.datetime, 'utcnow', return_value=in_the_past):
        token, expiry = generate_token(CLIENT, SCOPE, 'grant_type')
    with pytest.raises(jwt.ExpiredSignatureError):
        decode_token(token)
项目:varapp-backend-py    作者:varapp    | 项目源码 | 文件源码
def verify_jwt(auth_header, secret):
    """Extract the jwt token from the header, verify its signature,
       its expiration time, and return the payload."""
    if not auth_header or auth_header == 'null':
        logging.warning("No Authorization header")
        return [None, "Unauthorized access: missing authentication"]
    method,token = auth_header.split()   # separate 'JWT' from the jwt itself
    token = bytes(token, 'utf-8')
    try:
        payload = jwt.decode(token, secret, algorithms=['HS256'], verify=True)
    except (jwt.ExpiredSignatureError, jwt.DecodeError) as err:
        return [None, str(err)]
    return [payload, '']
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def test_expired_token(app):
    with app.test_request_context():
        delta = timedelta(minutes=-5)
        access_token = create_access_token('username', expires_delta=delta)
        refresh_token = create_refresh_token('username', expires_delta=delta)
        with pytest.raises(ExpiredSignatureError):
            decode_token(access_token)
        with pytest.raises(ExpiredSignatureError):
            decode_token(refresh_token)
项目:barmanapi    作者:emin100    | 项目源码 | 文件源码
def verify_token(self, token):
        try:
            decode = jwt.decode(token, self.token_config.get('secret'), algorithm=self.token_config.get('algorithm'))
        except jwt.ExpiredSignatureError:
            raise AuthException('Time is up', 401, AuthException.TOKEN_TIME_UP)
        except:
            raise AuthException('Token is not valid', 401, AuthException.TOKEN_ERROR)

        return decode
项目:son-cli    作者:sonata-nfv    | 项目源码 | 文件源码
def check_token(key, access_token):
    import jwt
    """
    Simple request to check if session has expired (TBD)
    :return: Token status
    """
    try:
        contents = jwt.decode(access_token, key, True, algorithms='RS256', audience='adapter')
        # options={'verify_aud': False})
        print('contents', contents)
        return True
    except jwt.ExpiredSignatureError:
        print('Signature has expired')
        return False
项目:mangaNotif    作者:Symphoria    | 项目源码 | 文件源码
def check_token():
    data = request.get_json()
    auth_token = data['token']

    try:
        auth_token_payload = jwt.decode(auth_token, os.environ["JWT_SECRET"])
        expiration_time = auth_token_payload['exp']
        is_valid = datetime.now() + timedelta(minutes=30) <= datetime.fromtimestamp(expiration_time)
    except jwt.ExpiredSignatureError:
        is_valid = False

    return make_response(jsonify({'isValid': is_valid})), 200
项目:django-rest-jwt    作者:aamishbaloch    | 项目源码 | 文件源码
def is_token_valid(token):
        """
        Check if token is valid.
        """
        try:
            jwt.decode(token, 'secret', algorithms=JWTHelper.JWT_ALGORITHM)
            return True, "Valid"
        except jwt.ExpiredSignatureError:
            return False, "Token Expired"
        except jwt.InvalidTokenError:
            return False, "Token is Invalid"
项目:stethoscope    作者:Netflix    | 项目源码 | 文件源码
def decode_token(self, token):
    try:
      decoded = jwt.decode(token, self.config['JWT_SECRET_KEY'],
          algorithms=[self.config['JWT_ALGORITHM']], leeway=self.config.get('JWT_LEEWAY', 0))
    except jwt.ExpiredSignatureError:
      raise werkzeug.exceptions.Unauthorized("JWT Error: Token is expired.")
    except jwt.DecodeError:
      raise werkzeug.exceptions.Unauthorized("JWT Error: Token could not be decoded.")
    except jwt.InvalidTokenError:
      raise werkzeug.exceptions.Unauthorized("JWT Error: Token is invalid.")
    return decoded
项目:flask-rest-api    作者:gitgik    | 项目源码 | 文件源码
def decode_token(token):
        """Decode the access token from the Authorization header."""
        try:
            payload = jwt.decode(token, current_app.config.get('SECRET'))
            return payload['sub']
        except jwt.ExpiredSignatureError:
            return "Expired token. Please log in to get a new token"
        except jwt.InvalidTokenError:
            return "Invalid token. Please register or login"
项目:flask-jwt-extended    作者:vimalloc    | 项目源码 | 文件源码
def _set_error_handler_callbacks(self, app):
        """
        Sets the error handler callbacks used by this extension
        """
        @app.errorhandler(NoAuthorizationError)
        def handle_auth_error(e):
            return self._unauthorized_callback(str(e))

        @app.errorhandler(CSRFError)
        def handle_auth_error(e):
            return self._unauthorized_callback(str(e))

        @app.errorhandler(ExpiredSignatureError)
        def handle_expired_error(e):
            return self._expired_token_callback()

        @app.errorhandler(InvalidHeaderError)
        def handle_invalid_header_error(e):
            return self._invalid_token_callback(str(e))

        @app.errorhandler(InvalidTokenError)
        def handle_invalid_token_error(e):
            return self._invalid_token_callback(str(e))

        @app.errorhandler(JWTDecodeError)
        def handle_jwt_decode_error(e):
            return self._invalid_token_callback(str(e))

        @app.errorhandler(WrongTokenError)
        def handle_wrong_token_error(e):
            return self._invalid_token_callback(str(e))

        @app.errorhandler(RevokedTokenError)
        def handle_revoked_token_error(e):
            return self._revoked_token_callback()

        @app.errorhandler(FreshTokenRequired)
        def handle_fresh_token_required(e):
            return self._needs_fresh_token_callback()

        @app.errorhandler(UserLoadError)
        def handler_user_load_error(e):
            # The identity is already saved before this exception was raised,
            # otherwise a different exception would be raised, which is why we
            # can safely call get_jwt_identity() here
            identity = get_jwt_identity()
            return self._user_loader_error_callback(identity)

        @app.errorhandler(UserClaimsVerificationError)
        def handle_failed_user_claims_verification(e):
            return self._claims_verification_failed_callback()
项目:son-cli    作者:sonata-nfv    | 项目源码 | 文件源码
def check_token_status(self):
        """
        Simple request to check if session has expired (TBD)
        :return: Token status
        """

        if self.access_token is None:
            try:
                token_file = self.platform['credentials']['token_file']
                token_path = os.path.join(
                    self.workspace.workspace_root,
                    self.workspace.platforms_dir,
                    token_file)

                # Construct the POST login request
                with open(token_path, "r") as _file:
                    self.access_token = _file.read
            except:
                return True

        print('Public_key=', self.platform_public_key)

        if self.platform_public_key is None:
            return True

        # Some old PyJWT versions crash with public key binary string, instead add
        # self.platform_public_key.decode('utf-8')
        try:
            print('access_token=', self.access_token)
            print('platform_public_key=', self.platform_public_key.decode('utf-8'))
            decoded = jwt.decode(self.access_token, self.platform_public_key.decode('utf-8'),
                                 True, algorithms='RS256', audience='adapter')
            # options={'verify_aud': False})
            print('contents', decoded)
            try:
                self.username = decoded['preferred_username']
                return True
            except:
                return True
        except jwt.DecodeError:
            print('Token cannot be decoded because it failed validation')
            return False
        except jwt.ExpiredSignatureError:
            print('Signature has expired')
            return False
        except jwt.InvalidIssuerError:
            return False
        except jwt.InvalidIssuedAtError:
            return False
项目:son-cli    作者:sonata-nfv    | 项目源码 | 文件源码
def check_token_status():
    """
    Simple request to check if session has expired (TBD)
    :return: Token status
    """
    import jwt

    access_token = ''
    platform_public_key = b''
    print('Public_key=', platform_public_key)

    if platform_public_key is None:
        return True

    # Some old PyJWT versions crash with public key binary string, instead add
    # self.platform_public_key.decode('utf-8')
    try:
        print('access_token=', access_token)
        print('platform_public_key=', platform_public_key.decode('utf-8'))
        decoded = jwt.decode(access_token, platform_public_key.decode('utf-8'),
                             True, algorithms='RS256', audience='adapter')
        # options={'verify_aud': False})
        print('contents', decoded)
        try:
            username = decoded['preferred_username']
            return True
        except:
            return True
    except jwt.DecodeError:
        print('Token cannot be decoded because it failed validation')
        return False
    except jwt.ExpiredSignatureError:
        print('Signature has expired')
        return False
    except jwt.InvalidIssuerError:
        return False
    except jwt.InvalidIssuedAtError:
        return False


# generate_keypair()
# token = user_login('user04', '1234')
# print(token)
# key = get_platform_public_key()
# check_token(key, token)
# sign()

# result = check_token_status()
# print("RESULT=", result)