Python itsdangerous 模块,SignatureExpired() 实例源码

我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用itsdangerous.SignatureExpired()

项目:JmilkFan-s-Blog    作者:JmilkFan    | 项目源码 | 文件源码
def verify_auth_token(token):
        """Validate the token whether is night."""

        serializer = Serializer(
            current_app.config['SECRET_KEY'])
        try:
            # serializer object already has tokens in itself and wait for
            # compare with token from HTTP Request /api/posts Method `POST`.
            data = serializer.loads(token)
        except SignatureExpired:
            return None
        except BadSignature:
            return None

        user = User.query.filter_by(id=data['id']).first()
        return user
项目:fame    作者:certsocietegenerale    | 项目源码 | 文件源码
def password_reset(token):
    try:
        user_id = validate_password_reset_token(token)
    except BadTimeSignature:
        flash('Invalid token', 'danger')
        return redirect('/login')
    except SignatureExpired:
        flash('Expired token', 'danger')
        return redirect('/login')

    if request.method == 'POST':
        password = request.form.get('password', '')
        confirm = request.form.get('password_confirmation', '')

        if valid_new_password(password, confirm):
            user = User(get_or_404(User.get_collection(), _id=user_id))
            change_password(user, password)
            flash('Password was successfully changed.', 'success')
            return redirect('/login')

    return render_template('password_reset.html')
项目:maps4all-signup    作者:hack4impact    | 项目源码 | 文件源码
def change_email(self, token):
        """Verify the new email for this user."""
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except (BadSignature, SignatureExpired):
            return False
        if data.get('change_email') != self.id:
            return False
        new_email = data.get('new_email')
        if new_email is None:
            return False
        if self.query.filter_by(email=new_email).first() is not None:
            return False
        self.email = new_email
        db.session.add(self)
        db.session.commit()
        return True
项目:pypers    作者:frankosan    | 项目源码 | 文件源码
def verify_token(username, token):
    """
    Verify validity of token
    """
    s = TimedJWSSerializer(app.config['SECRET_KEY'])

    try:
        ut.pretty_print("Trying to load the token")
        data = s.loads(token)
    except SignatureExpired:
        ut.pretty_print("ERROR: Expired Token")
        return False
    except BadSignature:
        ut.pretty_print("ERROR: Invalid Token")
        return False
    else:
        ut.pretty_print("Token successfully loaded")
        stored = db.sessions.find_one(filter={'username': data['username']}, sort=[('_id',-1)])

        if not stored:
            return False
        result = json_util.loads(json_util.dumps(stored))

        return pwd_context.verify(data['password'], result['password_hash']) and data['username'] == username
项目:tasking-manager    作者:hotosm    | 项目源码 | 文件源码
def is_valid_token(token, token_expiry):
        """
        Validates if the supplied token is valid, and hasn't expired.
        :param token: Token to check
        :param token_expiry: When the token expires in seconds
        :return: True if token is valid, and user_id contained in token
        """
        entropy = current_app.secret_key if current_app.secret_key else 'un1testingmode'
        serializer = URLSafeTimedSerializer(entropy)

        try:
            tokenised_user_id = serializer.loads(token, max_age=token_expiry)
        except SignatureExpired:
            current_app.logger.debug('Token has expired')
            return False, None
        except BadSignature:
            current_app.logger.debug('Bad Token Signature')
            return False, None

        return True, tokenised_user_id
项目:Konsole    作者:ColinHaley    | 项目源码 | 文件源码
def change_email(self, token):
        """Verify the new email for this user."""
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except (BadSignature, SignatureExpired):
            return False
        if data.get('change_email') != self.id:
            return False
        new_email = data.get('new_email')
        if new_email is None:
            return False
        if self.query.filter_by(email=new_email).first() is not None:
            return False
        self.email = new_email
        db.session.add(self)
        db.session.commit()
        return True
项目:restfulpy    作者:Carrene    | 项目源码 | 文件源码
def try_refresh_token(self, session_id):
        morsel = context.cookies.get(self.refresh_token_key)
        if not morsel or morsel.value is None or not morsel.value.strip():
            self.bad()
            return

        refresh_token_encoded = morsel.value
        # Decoding the refresh token
        try:
            refresh_principal = JwtRefreshToken.load(refresh_token_encoded)
            self.ok(
                self.create_principal(member_id=refresh_principal.id, session_id=session_id),
                setup_header=True
            )
        except itsdangerous.SignatureExpired:
            self.bad()
        except itsdangerous.BadData:
            self.bad()
            raise HttpBadRequest()
项目:restfulpy    作者:Carrene    | 项目源码 | 文件源码
def authenticate_request(self):
        if self.token_key not in context.environ:
            self.bad()
            return

        encoded_token = context.environ[self.token_key]
        if encoded_token is None or not encoded_token.strip():
            self.bad()
            return

        try:
            self.ok(self.verify_token(encoded_token))

        except itsdangerous.SignatureExpired as ex:
            # The token has expired. So we're trying to restore it using refresh-token.
            session_id = ex.payload.get('sessionId')
            if session_id:
                self.try_refresh_token(session_id)
            else:
                self.bad()
                raise HttpUnauthorized()
        except itsdangerous.BadData:
            # The token is Malformed
            self.bad()
            raise HttpBadRequest()
项目:Mocha    作者:mardix    | 项目源码 | 文件源码
def unsign_url_safe(token, secret_key, salt=None, **kw):
    """
    To sign url safe data.
    If expires_in is provided it will Time the signature
    :param token:
    :param secret_key:
    :param salt: (string) a namespace key
    :param kw:
    :return:
    """
    if len(token.split(".")) == 3:
        s = URLSafeTimedSerializer2(secret_key=secret_key, salt=salt, **kw)
        value, timestamp = s.loads(token, max_age=None, return_timestamp=True)
        now = datetime.datetime.utcnow()
        if timestamp > now:
            return value
        else:
            raise itsdangerous.SignatureExpired(
                'Signature age %s < %s ' % (timestamp, now),
                payload=value,
                date_signed=timestamp)
    else:
        s = itsdangerous.URLSafeSerializer(secret_key=secret_key, salt=salt, **kw)
        return s.loads(token)
项目:docklet    作者:unias    | 项目源码 | 文件源码
def verify_auth_token(token):
        s = Serializer(app.config['SECRET_KEY'])
        try:
            data = s.loads(b64decode(token))
        except SignatureExpired:
            return None # valid token, but expired
        except BadSignature:
            return None # invalid token
        user = User.query.get(data['id'])
        return user
项目:flask-vue-example    作者:levi-lq    | 项目源码 | 文件源码
def verify_auth_token(cls, token):
        s = TimedJSONWebSignatureSerializer(current_app.config.get("SECRET_KEY", "No secret key"))
        try:
            data = s.loads(token)
        except SignatureExpired:
            raise TokenExpired(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token?????????"}))
        except BadSignature:
            raise BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token????????"}))

        try:
            user = User.get_object(id=data["user_id"])
        except ObjectNotExists:
            raise BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token????????"}))
        return user
项目:pyt    作者:python-security    | 项目源码 | 文件源码
def _verify_token(self, token):
        s = Serializer(current_app.config['SECRET_KEY'])
        data = None
        expired, invalid = False, False
        try:
            data = s.loads(token)
        except SignatureExpired:
            expired = True
        except Exception:
            invalid = True
        return expired, invalid, data
项目:external_items    作者:natfoster82    | 项目源码 | 文件源码
def lab(token):
    try:
        lab_id, instance_id, exam_id, response_id = external_serializer.loads(token, max_age=60)
    except (BadSignature, SignatureExpired):
        abort(403)

    url_base = '{0}/labapiConnection/ShowLab?labInstanceGuid={1}&fullScreen=False'
    okay_states = {'STARTING', 'ACTIVE'}
    if instance_id:
        lab_instance_url = '{0}/labapi/v1/instance?id={1}'.format(current_app.config['XTREME_URL'],
                                                                  instance_id)
        resp = requests.get(lab_instance_url, auth=HTTPBasicAuth(username=current_app.config['XTREME_ID'],
                                                                 password=current_app.config['XTREME_SECRET']))
        if resp.status_code != 200 or resp.json()['state'] not in okay_states:
            abort(400)
        url = resp.json()['connectionUrl'] or url_base.format(current_app.config['XTREME_URL'], instance_id)
    else:
        payload = {
            'labID': lab_id
        }
        lab_url = '{0}/labapi/v1/Create'.format(current_app.config['XTREME_URL'])
        resp = requests.put(lab_url, json=payload, auth=HTTPBasicAuth(username=current_app.config['XTREME_ID'],
                                                                      password=current_app.config['XTREME_SECRET']))
        if resp.status_code != 200:
            abort(400)
        instance_id = resp.json()['id']
        redis_store.setex(response_id, 3600, instance_id)
        url = url_base.format(current_app.config['XTREME_URL'], instance_id)
    return render_template('xtreme.html', url=url, response_id=response_id, instance_id=instance_id, exam_id=exam_id)
项目:captcha_project    作者:zhanghe06    | 项目源码 | 文件源码
def check_token(self, token_sign):
        """
        ?? token, ?????? token
        """
        from itsdangerous import TimestampSigner, SignatureExpired, BadTimeSignature
        s = TimestampSigner(self._sign_key)
        try:
            token = s.unsign(token_sign, max_age=60)  # 60???
            return {'success': token}
        except SignatureExpired as e:
            # ??????
            return {'error': e.message}
        except BadTimeSignature as e:
            # ??????
            return {'error': e.message}
项目:flask-blog    作者:ClayAndMore    | 项目源码 | 文件源码
def verfy_auth_token(token):

        serializer=Serializer(
            current_app.config['SECRET_KEY']
        )
        try:
            data=serializer.loads(token)
        except SignatureExpired:
            return None
        except BadSignature:
            return None

        user=User.query.filter_by(id=data['id']).first()
        return user
项目:devpi    作者:devpi    | 项目源码 | 文件源码
def _get_auth_status(self, authuser, authpassword):
        try:
            val = self.serializer.loads(authpassword, max_age=self.LOGIN_EXPIRATION)
        except itsdangerous.SignatureExpired:
            return dict(status="expired")
        except itsdangerous.BadData:
            # check if we got user/password direct authentication
            return self._validate(authuser, authpassword)
        else:
            if not isinstance(val, list) or len(val) != 2 or val[0] != authuser:
                threadlog.debug("mismatch credential for user %r", authuser)
                return dict(status="nouser")
            return dict(status="ok", groups=val[1])
项目:fabric8-analytics-server    作者:fabric8-analytics    | 项目源码 | 文件源码
def get_by_token(cls, token):
        s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY'])
        # may raise BadSignature or SignatureExpired
        data = s.loads(token)
        user = current_app.user_datastore.find_user(id=data['id'])
        if not user:
            raise HTTPError(401, 'Unknow user with id {}'.format(data['id']))
        if user.token == token:
            if datetime.datetime.utcnow() < user.token_expires:
                return user
            raise SignatureExpired('bad token')
        raise BadSignature('bad token')
项目:maps4all-signup    作者:hack4impact    | 项目源码 | 文件源码
def confirm_account(self, token):
        """Verify that the provided token is for this user's id."""
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except (BadSignature, SignatureExpired):
            return False
        if data.get('confirm') != self.id:
            return False
        self.confirmed = True
        db.session.add(self)
        db.session.commit()
        return True
项目:maps4all-signup    作者:hack4impact    | 项目源码 | 文件源码
def reset_password(self, token, new_password):
        """Verify the new password for this user."""
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except (BadSignature, SignatureExpired):
            return False
        if data.get('reset') != self.id:
            return False
        self.password = new_password
        db.session.add(self)
        db.session.commit()
        return True
项目:bug_report_distributing_server    作者:XY-e    | 项目源码 | 文件源码
def verify_auth_token(token):
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except SignatureExpired:
            return None  # valid token, but expired
        except BadSignature:
            return None  # invalid token
        user = User.query.get(data['id'])
        return user
项目:Konsole    作者:ColinHaley    | 项目源码 | 文件源码
def confirm_account(self, token):
        """Verify that the provided token is for this user's id."""
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except (BadSignature, SignatureExpired):
            return False
        if data.get('confirm') != self.id:
            return False
        self.confirmed = True
        db.session.add(self)
        db.session.commit()
        return True
项目:Konsole    作者:ColinHaley    | 项目源码 | 文件源码
def reset_password(self, token, new_password):
        """Verify the new password for this user."""
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except (BadSignature, SignatureExpired):
            return False
        if data.get('reset') != self.id:
            return False
        self.password = new_password
        db.session.add(self)
        db.session.commit()
        return True
项目:python-flask-security    作者:weinbergdavid    | 项目源码 | 文件源码
def get_token_status(token, serializer, max_age=None, return_data=False):
    """Get the status of a token.

    :param token: The token to check
    :param serializer: The name of the seriailzer. Can be one of the
                       following: ``confirm``, ``login``, ``reset``
    :param max_age: The name of the max age config option. Can be on of
                    the following: ``CONFIRM_EMAIL``, ``LOGIN``, ``RESET_PASSWORD``
    """
    serializer = getattr(_security, serializer + '_serializer')
    max_age = get_max_age(max_age)
    user, data = None, None
    expired, invalid = False, False

    try:
        data = serializer.loads(token, max_age=max_age)
    except SignatureExpired:
        d, data = serializer.loads_unsafe(token)
        expired = True
    except (BadSignature, TypeError, ValueError):
        invalid = True

    if data:
        user = _datastore.find_user(id=data[0])

    expired = expired and (user is not None)

    if return_data:
        return expired, invalid, user, data
    else:
        return expired, invalid, user
项目:restfulpy    作者:Carrene    | 项目源码 | 文件源码
def verify_token(self, encoded_token):
        principal = super().verify_token(encoded_token)
        if not self.validate_session(principal.session_id):
            raise itsdangerous.SignatureExpired('The token has already invalidated', principal.payload)
        return principal
项目:restfulpy    作者:Carrene    | 项目源码 | 文件源码
def verify_token(self, encoded_token):
        principal = super().verify_token(encoded_token)
        if token_expired:
            raise itsdangerous.SignatureExpired('Simulating', payload=principal.payload)
        return principal
项目:rest_api    作者:opentargets    | 项目源码 | 文件源码
def get_payload_from_token(token):
        s = Serializer(current_app.config['SECRET_KEY'])
        cipher = AESCipher(current_app.config['SECRET_KEY'][:16])
        try:
            data = json.loads(cipher.decrypt(s.loads(token)))
            return data
        except SignatureExpired, se:
            time_offset = (datetime.now()- se.date_signed).total_seconds()
            current_app.logger.error('token expired: %s. signature date %s. offset with current date = %s'%(se.message,str(se.date_signed),str(time_offset)))
            current_app.logger.error('current date %s, token date %s'%(str(datetime.now()), str(se.date_signed)))
            if -1<= time_offset < 0:#allow for 1 seconds out of sync machines
                current_app.logger.info('token time offset within grace period. allowing auth')
                return json.loads(cipher.decrypt(se.payload))
            else:
                LogApiTokenExpired()
                # raise SignatureExpired(se)
                raise TokenExpired()
                # abort(419, message = 'Authentication expired.')
        except BadSignature, e:
            current_app.logger.error('bad signature in token')
            encoded_payload = e.payload
            if encoded_payload is not None:
                try:
                    decoded_payload = s.load_payload(encoded_payload)
                    payload= json.loads(cipher.decrypt(decoded_payload))
                    LogApiTokenInvalid(payload)
                except BadData:
                    LogApiTokenInvalid(dict(error='bad data in token',
                                            token=token))
            abort(401, message = 'bad signature in token')
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def validate_csrf(data, secret_key=None, time_limit=None, token_key=None):
    """Check if the given data is a valid CSRF token. This compares the given
    signed token to the one stored in the session.

    :param data: The signed CSRF token to be checked.
    :param secret_key: Used to securely sign the token. Default is
        ``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
    :param time_limit: Number of seconds that the token is valid. Default is
        ``WTF_CSRF_TIME_LIMIT`` or 3600 seconds (60 minutes).
    :param token_key: Key where token is stored in session for comparision.
        Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.

    :raises ValidationError: Contains the reason that validation failed.

    .. versionchanged:: 0.14
        Raises ``ValidationError`` with a specific error message rather than
        returning ``True`` or ``False``.
    """

    secret_key = _get_config(
        secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
        message='A secret key is required to use CSRF.'
    )
    field_name = _get_config(
        token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
        message='A field name is required to use CSRF.'
    )
    time_limit = _get_config(
        time_limit, 'WTF_CSRF_TIME_LIMIT', 3600, required=False
    )

    if not data:
        raise ValidationError('The CSRF token is missing.')

    if field_name not in session:
        raise ValidationError('The CSRF session token is missing.')

    s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')

    try:
        token = s.loads(data, max_age=time_limit)
    except SignatureExpired:
        raise ValidationError('The CSRF token has expired.')
    except BadData:
        raise ValidationError('The CSRF token is invalid.')

    if not safe_str_cmp(session[field_name], token):
        raise ValidationError('The CSRF tokens do not match.')
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def validate_csrf(data, secret_key=None, time_limit=None, token_key=None):
    """Check if the given data is a valid CSRF token. This compares the given
    signed token to the one stored in the session.

    :param data: The signed CSRF token to be checked.
    :param secret_key: Used to securely sign the token. Default is
        ``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
    :param time_limit: Number of seconds that the token is valid. Default is
        ``WTF_CSRF_TIME_LIMIT`` or 3600 seconds (60 minutes).
    :param token_key: Key where token is stored in session for comparision.
        Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.

    :raises ValidationError: Contains the reason that validation failed.

    .. versionchanged:: 0.14
        Raises ``ValidationError`` with a specific error message rather than
        returning ``True`` or ``False``.
    """

    secret_key = _get_config(
        secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
        message='A secret key is required to use CSRF.'
    )
    field_name = _get_config(
        token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
        message='A field name is required to use CSRF.'
    )
    time_limit = _get_config(
        time_limit, 'WTF_CSRF_TIME_LIMIT', 3600, required=False
    )

    if not data:
        raise ValidationError('The CSRF token is missing.')

    if field_name not in session:
        raise ValidationError('The CSRF session token is missing.')

    s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')

    try:
        token = s.loads(data, max_age=time_limit)
    except SignatureExpired:
        raise ValidationError('The CSRF token has expired.')
    except BadData:
        raise ValidationError('The CSRF token is invalid.')

    if not safe_str_cmp(session[field_name], token):
        raise ValidationError('The CSRF tokens do not match.')
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def load_token(token):
    """
    Flask-Login token_loader callback.
    The token_loader function asks this function to take the token that was
    stored on the users computer process it to check if its valid and then
    return a User Object if its valid or None if its not valid.

    :param token: Token generated by :meth:`app.models.User.get_auth_token`
    """

    # The Token itself was generated by User.get_auth_token.  So it is up to
    # us to known the format of the token data itself.

    # The Token was encrypted using itsdangerous.URLSafeTimedSerializer which
    # allows us to have a max_age on the token itself.  When the cookie is
    # stored
    # on the users computer it also has a exipry date, but could be changed by
    # the user, so this feature allows us to enforce the exipry date of the
    # token
    # server side and not rely on the users cookie to exipre.

    max_age = current_app.config['REMEMBER_COOKIE_DURATION'].total_seconds()

    # Decrypt the Security Token, data = [username, hashpass, id]
    s = URLSafeTimedSerializer(
        current_app.config['SECRET_KEY'],
        salt='user-auth',
        signer_kwargs=dict(key_derivation='hmac',
                           digest_method=hashlib.sha256))
    try:
        data = s.loads(token, max_age=max_age)
    except (BadTimeSignature, SignatureExpired):
        return None

    # Find the User
    user = User.query.get(data[2])

    # 2FA check
    totp_endpoint = request.endpoint == 'auth.verify_totp'
    if user and user.otp_enabled and not totp_endpoint and len(data) < 4:
        return None

    # Check Password and return user or None
    if user and data[1] == user._password:
        return user
    return None
项目:FileStoreGAE    作者:liantian-cn    | 项目源码 | 文件源码
def validate_csrf(data, secret_key=None, time_limit=None, token_key=None):
    """Check if the given data is a valid CSRF token. This compares the given
    signed token to the one stored in the session.

    :param data: The signed CSRF token to be checked.
    :param secret_key: Used to securely sign the token. Default is
        ``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
    :param time_limit: Number of seconds that the token is valid. Default is
        ``WTF_CSRF_TIME_LIMIT`` or 3600 seconds (60 minutes).
    :param token_key: Key where token is stored in session for comparision.
        Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.

    :raises ValidationError: Contains the reason that validation failed.

    .. versionchanged:: 0.14
        Raises ``ValidationError`` with a specific error message rather than
        returning ``True`` or ``False``.
    """

    secret_key = _get_config(
        secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
        message='A secret key is required to use CSRF.'
    )
    field_name = _get_config(
        token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
        message='A field name is required to use CSRF.'
    )
    time_limit = _get_config(
        time_limit, 'WTF_CSRF_TIME_LIMIT', 3600, required=False
    )

    if not data:
        raise ValidationError('The CSRF token is missing.')

    if field_name not in session:
        raise ValidationError('The CSRF session token is missing.')

    s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')

    try:
        token = s.loads(data, max_age=time_limit)
    except SignatureExpired:
        raise ValidationError('The CSRF token has expired.')
    except BadData:
        raise ValidationError('The CSRF token is invalid.')

    if not safe_str_cmp(session[field_name], token):
        raise ValidationError('The CSRF tokens do not match.')
项目:python-group-proj    作者:Sharcee    | 项目源码 | 文件源码
def validate_csrf(data, secret_key=None, time_limit=None, token_key=None):
    """Check if the given data is a valid CSRF token. This compares the given
    signed token to the one stored in the session.

    :param data: The signed CSRF token to be checked.
    :param secret_key: Used to securely sign the token. Default is
        ``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
    :param time_limit: Number of seconds that the token is valid. Default is
        ``WTF_CSRF_TIME_LIMIT`` or 3600 seconds (60 minutes).
    :param token_key: Key where token is stored in session for comparision.
        Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.

    :raises ValidationError: Contains the reason that validation failed.

    .. versionchanged:: 0.14
        Raises ``ValidationError`` with a specific error message rather than
        returning ``True`` or ``False``.
    """

    secret_key = _get_config(
        secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
        message='A secret key is required to use CSRF.'
    )
    field_name = _get_config(
        token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
        message='A field name is required to use CSRF.'
    )
    time_limit = _get_config(
        time_limit, 'WTF_CSRF_TIME_LIMIT', 3600, required=False
    )

    if not data:
        raise ValidationError('The CSRF token is missing.')

    if field_name not in session:
        raise ValidationError('The CSRF session token is missing.')

    s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')

    try:
        token = s.loads(data, max_age=time_limit)
    except SignatureExpired:
        raise ValidationError('The CSRF token has expired.')
    except BadData:
        raise ValidationError('The CSRF token is invalid.')

    if not safe_str_cmp(session[field_name], token):
        raise ValidationError('The CSRF tokens do not match.')
项目:webapp    作者:superchilli    | 项目源码 | 文件源码
def validate_csrf(data, secret_key=None, time_limit=None, token_key=None):
    """Check if the given data is a valid CSRF token. This compares the given
    signed token to the one stored in the session.

    :param data: The signed CSRF token to be checked.
    :param secret_key: Used to securely sign the token. Default is
        ``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
    :param time_limit: Number of seconds that the token is valid. Default is
        ``WTF_CSRF_TIME_LIMIT`` or 3600 seconds (60 minutes).
    :param token_key: Key where token is stored in session for comparision.
        Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.

    :raises ValidationError: Contains the reason that validation failed.

    .. versionchanged:: 0.14
        Raises ``ValidationError`` with a specific error message rather than
        returning ``True`` or ``False``.
    """

    secret_key = _get_config(
        secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
        message='A secret key is required to use CSRF.'
    )
    field_name = _get_config(
        token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
        message='A field name is required to use CSRF.'
    )
    time_limit = _get_config(
        time_limit, 'WTF_CSRF_TIME_LIMIT', 3600, required=False
    )

    if not data:
        raise ValidationError('The CSRF token is missing.')

    if field_name not in session:
        raise ValidationError('The CSRF session token is missing.')

    s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')

    try:
        token = s.loads(data, max_age=time_limit)
    except SignatureExpired:
        raise ValidationError('The CSRF token has expired.')
    except BadData:
        raise ValidationError('The CSRF token is invalid.')

    if not safe_str_cmp(session[field_name], token):
        raise ValidationError('The CSRF tokens do not match.')
项目:gardenbot    作者:GoestaO    | 项目源码 | 文件源码
def validate_csrf(data, secret_key=None, time_limit=None, token_key=None):
    """Check if the given data is a valid CSRF token. This compares the given
    signed token to the one stored in the session.

    :param data: The signed CSRF token to be checked.
    :param secret_key: Used to securely sign the token. Default is
        ``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
    :param time_limit: Number of seconds that the token is valid. Default is
        ``WTF_CSRF_TIME_LIMIT`` or 3600 seconds (60 minutes).
    :param token_key: Key where token is stored in session for comparision.
        Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.

    :raises ValidationError: Contains the reason that validation failed.

    .. versionchanged:: 0.14
        Raises ``ValidationError`` with a specific error message rather than
        returning ``True`` or ``False``.
    """

    secret_key = _get_config(
        secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
        message='A secret key is required to use CSRF.'
    )
    field_name = _get_config(
        token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
        message='A field name is required to use CSRF.'
    )
    time_limit = _get_config(
        time_limit, 'WTF_CSRF_TIME_LIMIT', 3600, required=False
    )

    if not data:
        raise ValidationError('The CSRF token is missing.')

    if field_name not in session:
        raise ValidationError('The CSRF session token is missing.')

    s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')

    try:
        token = s.loads(data, max_age=time_limit)
    except SignatureExpired:
        raise ValidationError('The CSRF token has expired.')
    except BadData:
        raise ValidationError('The CSRF token is invalid.')

    if not safe_str_cmp(session[field_name], token):
        raise ValidationError('The CSRF tokens do not match.')
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def validate_csrf(data, secret_key=None, time_limit=None, token_key=None):
    """Check if the given data is a valid CSRF token. This compares the given
    signed token to the one stored in the session.

    :param data: The signed CSRF token to be checked.
    :param secret_key: Used to securely sign the token. Default is
        ``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
    :param time_limit: Number of seconds that the token is valid. Default is
        ``WTF_CSRF_TIME_LIMIT`` or 3600 seconds (60 minutes).
    :param token_key: Key where token is stored in session for comparision.
        Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.

    :raises ValidationError: Contains the reason that validation failed.

    .. versionchanged:: 0.14
        Raises ``ValidationError`` with a specific error message rather than
        returning ``True`` or ``False``.
    """

    secret_key = _get_config(
        secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
        message='A secret key is required to use CSRF.'
    )
    field_name = _get_config(
        token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
        message='A field name is required to use CSRF.'
    )
    time_limit = _get_config(
        time_limit, 'WTF_CSRF_TIME_LIMIT', 3600, required=False
    )

    if not data:
        raise ValidationError('The CSRF token is missing.')

    if field_name not in session:
        raise ValidationError('The CSRF session token is missing.')

    s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')

    try:
        token = s.loads(data, max_age=time_limit)
    except SignatureExpired:
        raise ValidationError('The CSRF token has expired.')
    except BadData:
        raise ValidationError('The CSRF token is invalid.')

    if not safe_str_cmp(session[field_name], token):
        raise ValidationError('The CSRF tokens do not match.')
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def validate_csrf(data, secret_key=None, time_limit=None, token_key=None):
    """Check if the given data is a valid CSRF token. This compares the given
    signed token to the one stored in the session.

    :param data: The signed CSRF token to be checked.
    :param secret_key: Used to securely sign the token. Default is
        ``WTF_CSRF_SECRET_KEY`` or ``SECRET_KEY``.
    :param time_limit: Number of seconds that the token is valid. Default is
        ``WTF_CSRF_TIME_LIMIT`` or 3600 seconds (60 minutes).
    :param token_key: Key where token is stored in session for comparision.
        Default is ``WTF_CSRF_FIELD_NAME`` or ``'csrf_token'``.

    :raises ValidationError: Contains the reason that validation failed.

    .. versionchanged:: 0.14
        Raises ``ValidationError`` with a specific error message rather than
        returning ``True`` or ``False``.
    """

    secret_key = _get_config(
        secret_key, 'WTF_CSRF_SECRET_KEY', current_app.secret_key,
        message='A secret key is required to use CSRF.'
    )
    field_name = _get_config(
        token_key, 'WTF_CSRF_FIELD_NAME', 'csrf_token',
        message='A field name is required to use CSRF.'
    )
    time_limit = _get_config(
        time_limit, 'WTF_CSRF_TIME_LIMIT', 3600, required=False
    )

    if not data:
        raise ValidationError('The CSRF token is missing.')

    if field_name not in session:
        raise ValidationError('The CSRF session token is missing.')

    s = URLSafeTimedSerializer(secret_key, salt='wtf-csrf-token')

    try:
        token = s.loads(data, max_age=time_limit)
    except SignatureExpired:
        raise ValidationError('The CSRF token has expired.')
    except BadData:
        raise ValidationError('The CSRF token is invalid.')

    if not safe_str_cmp(session[field_name], token):
        raise ValidationError('The CSRF tokens do not match.')