Python itsdangerous 模块,TimedJSONWebSignatureSerializer() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用itsdangerous.TimedJSONWebSignatureSerializer()

项目:Plog    作者:thundernet8    | 项目源码 | 文件源码
def verify_access_token(access_token):
        """
        ?? Access_token
        :param access_token: access_token
        :return: ??????????,???? False
        """
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(access_token)
        except:
            return False
        if data.get('token_usage') != 'access':
            return False
        user = User(user_id=data.get('token_uid'))
        if not user or not user.user_id or not user.is_active:
            return False
        return user
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def reset_password(self, token, new_pass):
        """Reset password. Token is generated by
        :meth:`~User.generate_reset_token`

        :param token:
        :param new_pass:
        :return:
        """
        s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except:
            return False
        if data.get('user_id') == self.id:
            self.password = new_pass
            db.session.add(self)
            db.session.commit()
            return True
        return False
项目:myproject    作者:dengliangshi    | 项目源码 | 文件源码
def change_email(self, token):
        """Change email address using token.
        """
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except:
            return False
        if data.get('change_email') != self.id:
            return False
        new_email = data.get('new_email')
        if new_email is None:
            return False
        if self.query.filter_by(email=new_email).first() is not None:
            return False
        self.email = new_email
        db.session.add(self)
        return True
项目:JmilkFan-s-Blog    作者:JmilkFan    | 项目源码 | 文件源码
def verify_auth_token(token):
        """Validate the token whether is night."""

        serializer = Serializer(
            current_app.config['SECRET_KEY'])
        try:
            # serializer object already has tokens in itself and wait for
            # compare with token from HTTP Request /api/posts Method `POST`.
            data = serializer.loads(token)
        except SignatureExpired:
            return None
        except BadSignature:
            return None

        user = User.query.filter_by(id=data['id']).first()
        return user
项目:Plog    作者:thundernet8    | 项目源码 | 文件源码
def confirm(self, token):
        """
        ????
        :param token: ????? token
        :return: ???? True,???? False
        """
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except:
            return False
        if data.get('confirm_uid') != self.user_id:
            return False
        self.confirmed = 1
        mongo.db.users.update_one({
            'user_id': self.user_id
        }, {
            '$set': {
                'confirmed': 1
            }
        })
        return True
项目:Plog    作者:thundernet8    | 项目源码 | 文件源码
def refresh_access_token(refresh_token, expiration=3600):
        """
        ?? Access_token
        :param refresh_token: refresh_token
        :param expiration: ? Access_token ????
        :return: ???????? Access_token ???,????False
        """
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(refresh_token)
        except:
            return False
        if data.get('token_usage') != 'refresh':
            return False
        if not data.get('token_uid'):
            return False
        sa = Serializer(current_app.config['SECRET_KEY'], expires_in=expiration)
        access_token = sa.dumps({'token_uid': data.get('token_uid'), 'token_usage': 'access'}).decode('ascii')
        return dict(access_token=access_token, refresh_token=refresh_token, expires_in=expiration,
                    expires_at=int(time.time())+expiration, token_type='Bearer')

    ##
    # ????
    ##
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def change_email(self, token):
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except:
            return False
        if data.get('change_email') != self.id:
            return False
        new_email = data.get('new_email')
        if new_email is None:
            return False
        if self.query.filter_by(email=new_email).first() is not None:
            return False
        self.email = new_email
        self.avatar_hash = hashlib.md5(
            self.email.encode('utf-8')).hexdigest()
        db.session.add(self)
        return True
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def change_email(self,token):
        s = Serializer(current_app.config['SECRET_KEY'],expiration)
        try:
            data = s.loads(token)
        except:
            return False
        if data.get('change_email') != self.id:
            return False
        new_email = data.get('new_email')
        if new_email is None:
            return False
        if self.query.filter_by(email=new_email).first() is not None:
            return False
        self.email = new_email
        self.avatar_hash = hashlib.md5(self.email.encode(utf-8)).hexdigest()
        db.session.add(self)
        return True
项目:python_ddd_flask    作者:igorvinnicius    | 项目源码 | 文件源码
def confirm_user_account(token):

        serializer = Serializer(current_app.config['SECRET_KEY'])

        try:
            data = serializer.loads(token)
        except:
            return False

        user = user_repository.get_by_id(data.get('confirm'))

        if user is None:
            return False

        user.confirmed = True
        user_repository.save(user)
        return True
项目:suite    作者:Staffjoy    | 项目源码 | 文件源码
def confirm(token):
    s = Serializer(current_app.config["SECRET_KEY"])
    try:
        data = s.loads(token)
    except:
        flash("The confirmation link is invalid or has expired.", "danger")
        return redirect(url_for("auth.unconfirmed"))

    u = User.query.get(data.get("confirm"))
    if u is None:
        flash("The confirmation link is invalid or has expired.", "danger")
        return redirect(url_for("auth.unconfirmed"))

    if not u.confirm(token):
        flash("The confirmation link is invalid or has expired.", "danger")
        return redirect(url_for("auth.unconfirmed"))

    # Confirmation complete!
    # Login:
    login_user(u)

    # Tell them they are good:
    flash("You have confirmed your account!", "success")

    return redirect(url_for("main.index"))
项目:suite    作者:Staffjoy    | 项目源码 | 文件源码
def activate_account(self, token, name, password, username):
        s = Serializer(current_app.config["SECRET_KEY"])
        try:
            data = s.loads(token)
        except:
            return False
        if data.get("activation") != self.id:
            return False
        self.password = password
        self.name = name
        self.username = username
        self.confirmed = True
        self.active = True
        db.session.add(self)
        current_app.logger.info("User account activated: user id %s (%s)" %
                                (self.id, self.email))
        self.track_event("activated_account")
        return True
项目:suite    作者:Staffjoy    | 项目源码 | 文件源码
def confirm(self, token):
        s = Serializer(current_app.config["SECRET_KEY"])
        try:
            data = s.loads(token)
        except:
            return False
        if data.get("confirm") != self.id:
            return False

        self.confirmed = True
        self.active = True
        db.session.add(self)
        db.session.commit()
        current_app.logger.info("User account confirmed: user id %s (%s)" %
                                (self.id, self.email))
        self.track_event("confirmed_account")
        if data.get("trial") is True:
            self.track_event("started_free_trial")
        return True
项目:suite    作者:Staffjoy    | 项目源码 | 文件源码
def change_email(self, token):
        s = Serializer(current_app.config["SECRET_KEY"])
        try:
            data = s.loads(token)
        except:
            return False
        if data.get("change_email") != self.id:
            return False
        new_email = data.get("new_email")
        if new_email is None:
            return False
        if self.query.filter_by(email=new_email).first() is not None:
            return False
        self.email = new_email
        try:
            db.session.add(self)
            db.session.commit()
        except:
            db.session.rollback()
            raise Exception("Dirty session")

        self.track_event("changed_email")
        return True
项目:suite    作者:Staffjoy    | 项目源码 | 文件源码
def load_session_token(token):
        """Load cookie session"""
        s = Serializer(current_app.config["SECRET_KEY"],
                       current_app.config.get("SESSION_EXPIRATION"))
        try:
            data = s.loads(token)
        except:
            return None

        if SessionCache.validate_session(
                data.get("user_id", -1), data.get("session_id", "-1")):
            user = User.query.get(data["user_id"])
            user.set_session_id(data["session_id"])
            current_app.logger.debug("Loading user %s from cookie session %s" %
                                     (user.id, user.session_id))
            return user
        return None
项目:suite    作者:Staffjoy    | 项目源码 | 文件源码
def get_auth_token(self):
        """Cookie info. Must be secure."""
        s = Serializer(current_app.config["SECRET_KEY"],
                       current_app.config["COOKIE_EXPIRATION"])
        current_app.logger.debug("Generating auth token for user %s" % self.id)

        if not self.is_authenticated:
            raise Exception("User not authenticated")

        return s.dumps({
            "user_id":
            self.id,
            "session_id":
            SessionCache.create_session(
                self.id, expiration=current_app.config["COOKIE_EXPIRATION"])
        })
项目:maps4all-signup    作者:hack4impact    | 项目源码 | 文件源码
def change_email(self, token):
        """Verify the new email for this user."""
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except (BadSignature, SignatureExpired):
            return False
        if data.get('change_email') != self.id:
            return False
        new_email = data.get('new_email')
        if new_email is None:
            return False
        if self.query.filter_by(email=new_email).first() is not None:
            return False
        self.email = new_email
        db.session.add(self)
        db.session.commit()
        return True
项目:learning_flask2    作者:yuyilei    | 项目源码 | 文件源码
def change_email(self,token) :
        s = Serializer(current_app.config['SECRET_KEY'])
        try :
            data = s.loads(token)
        except :
            return False
        if data.get('change_email') != self.id :
            return False
        new_email = data.get('new_email')
        if new_email is None :
            return False
        if self.query.filter_by(email=new_email).first() is not None :
            return False
        self.email = new_email
        self.avatar_hash = hashlib.md5(self.email.encode('utf-8')).hexdigest()
        db.session.add(self)
        return True
项目:flask-api-boilerplate    作者:mikaelm1    | 项目源码 | 文件源码
def verify_auth_token(cls, token):
        """
        Ensures that the token received from the client exists and returns the
        User that the token belongs to. Returns None if token doesn't exist.
        :param token: str
        :return: User object or None
        """
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except:
            return None
        user = User.query.get(data['id'])
        if user and user.session_token == token:
            return user
        return None

    # DB Helpers
项目:project    作者:Junctionzc    | 项目源码 | 文件源码
def change_email(self, token):
        s = Serializer(current_app.config['SECRET_KEY']) 
        try:
            data = s.loads(token)
        except:
            return False
        if data.get('change_email') != self.id:
            return False
        new_email = data.get('new_email')
        if new_email is None:
            return False
        if self.query.filter_by(email = new_email).first() is not None:
            return False
        self.email = new_email
        self.avatar_hash = hashlib.md5(self.email.encode('utf-8')).hexdigest()
        db.session.add(self)
        return True
项目:pypers    作者:frankosan    | 项目源码 | 文件源码
def generate_token(username, password, expiration=600):
    """
    Generate an authorized token
    """

    doc = {'username':username, 'password_hash':pwd_context.encrypt(password)}
    db.sessions.find_one_and_update(
        {'username': username},
        {"$set": doc},
        upsert=True
    )

    if (cfg.ACME_PROD or cfg.ACME_DEV) and (username == 'serveruser'):
        EXPIRES_IN_A_YEAR = 365 * 24 * 60 * 60
        print 'token that EXPIRES_IN_A_YEAR'
        s = TimedJWSSerializer(app.config['SECRET_KEY'], expires_in=EXPIRES_IN_A_YEAR)
    else:
        print 'token that expires', cfg.ACME_LCL
        s = TimedJWSSerializer(app.config['SECRET_KEY'], expires_in=expiration)

    return s.dumps({'username': username, 'password': password})
项目:pypers    作者:frankosan    | 项目源码 | 文件源码
def verify_token(username, token):
    """
    Verify validity of token
    """
    s = TimedJWSSerializer(app.config['SECRET_KEY'])

    try:
        ut.pretty_print("Trying to load the token")
        data = s.loads(token)
    except SignatureExpired:
        ut.pretty_print("ERROR: Expired Token")
        return False
    except BadSignature:
        ut.pretty_print("ERROR: Invalid Token")
        return False
    else:
        ut.pretty_print("Token successfully loaded")
        stored = db.sessions.find_one(filter={'username': data['username']}, sort=[('_id',-1)])

        if not stored:
            return False
        result = json_util.loads(json_util.dumps(stored))

        return pwd_context.verify(data['password'], result['password_hash']) and data['username'] == username
项目:zheye    作者:mathbugua    | 项目源码 | 文件源码
def change_email(self, token):
        """????"""
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except:
            return False
        if data.get('change_email') != self.id:
            return False
        new_email = data.get('new_email')
        if new_email is None:
            return False
        if self.query.filter_by(email=new_email).first() is not None:
            return False
        self.email = new_email
        return operate_model.db_add(self)
项目:Konsole    作者:ColinHaley    | 项目源码 | 文件源码
def change_email(self, token):
        """Verify the new email for this user."""
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except (BadSignature, SignatureExpired):
            return False
        if data.get('change_email') != self.id:
            return False
        new_email = data.get('new_email')
        if new_email is None:
            return False
        if self.query.filter_by(email=new_email).first() is not None:
            return False
        self.email = new_email
        db.session.add(self)
        db.session.commit()
        return True
项目:docklet    作者:unias    | 项目源码 | 文件源码
def generate_auth_token(self, expiration = 3600):
        s = Serializer(app.config['SECRET_KEY'], expires_in = expiration)
        str = s.dumps({'id': self.id})
        return b64encode(str).decode('utf-8')
项目:docklet    作者:unias    | 项目源码 | 文件源码
def verify_auth_token(token):
        s = Serializer(app.config['SECRET_KEY'])
        try:
            data = s.loads(b64decode(token))
        except SignatureExpired:
            return None # valid token, but expired
        except BadSignature:
            return None # invalid token
        user = User.query.get(data['id'])
        return user
项目:circleci-demo-python-flask    作者:CircleCI-Public    | 项目源码 | 文件源码
def generate_confirmation_token(self, expiration=3600):
        s = Serializer(current_app.config['SECRET_KEY'], expiration)
        return s.dumps({'confirm': self.id})
项目:circleci-demo-python-flask    作者:CircleCI-Public    | 项目源码 | 文件源码
def confirm(self, token):
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except:
            return False
        if data.get('confirm') != self.id:
            return False
        self.confirmed = True
        db.session.add(self)
        return True
项目:circleci-demo-python-flask    作者:CircleCI-Public    | 项目源码 | 文件源码
def generate_reset_token(self, expiration=3600):
        s = Serializer(current_app.config['SECRET_KEY'], expiration)
        return s.dumps({'reset': self.id})
项目:circleci-demo-python-flask    作者:CircleCI-Public    | 项目源码 | 文件源码
def reset_password(self, token, new_password):
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except:
            return False
        if data.get('reset') != self.id:
            return False
        self.password = new_password
        db.session.add(self)
        return True
项目:circleci-demo-python-flask    作者:CircleCI-Public    | 项目源码 | 文件源码
def generate_email_change_token(self, new_email, expiration=3600):
        s = Serializer(current_app.config['SECRET_KEY'], expiration)
        return s.dumps({'change_email': self.id, 'new_email': new_email})
项目:circleci-demo-python-flask    作者:CircleCI-Public    | 项目源码 | 文件源码
def generate_auth_token(self, expiration):
        s = Serializer(current_app.config['SECRET_KEY'],
                       expires_in=expiration)
        return s.dumps({'id': self.id}).decode('ascii')
项目:circleci-demo-python-flask    作者:CircleCI-Public    | 项目源码 | 文件源码
def verify_auth_token(token):
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except:
            return None
        return User.query.get(data['id'])
项目:flask-vue-example    作者:levi-lq    | 项目源码 | 文件源码
def generate_auth_token(self):
        """
        desc:   ???????token
        params: user_id     ???????
        return: token
        date:   2016-10-28

        """
        s = TimedJSONWebSignatureSerializer(current_app.config.get("SECRET_KEY", "No secret key"),
                                            current_app.config.get("USER_TOKEN_EXPIRATION", 3600))

        return s.dumps({"user_id": self.id})
项目:flask-vue-example    作者:levi-lq    | 项目源码 | 文件源码
def verify_auth_token(cls, token):
        s = TimedJSONWebSignatureSerializer(current_app.config.get("SECRET_KEY", "No secret key"))
        try:
            data = s.loads(token)
        except SignatureExpired:
            raise TokenExpired(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token?????????"}))
        except BadSignature:
            raise BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token????????"}))

        try:
            user = User.get_object(id=data["user_id"])
        except ObjectNotExists:
            raise BadToken(http_responses.HTTP_400_BAD_REQUEST(msg={"error": u"Token????????"}))
        return user
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def set_password(token):
    """Set initial customer password. The template for this route contains
    bootstrap.css, bootstrap-theme.css and main.css.

    This is similar to the password reset option with two exceptions:
    it has a longer expiration time and does not require old password.

    :param token: Token generated by
        :meth:`app.models.User.generate_reset_token`

    :return:
    """
    s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY'])
    try:
        s.loads(token)
    except BadSignature:
        flash('Signature expired.')
        return redirect(url_for('main.index'))
    form = SetPasswordForm()
    if form.validate_on_submit():
        User.set_password(token, form.data['password'])
        flash('Your new password has been set.')
        return redirect(url_for('main.index'))
    for field, err in form.errors.items():
        flash(err[0], 'danger')
    return render_template('auth/set_password.html', form=form, token=token)
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def generate_reset_token(self, expiry=900):
        """Generate a JSON Web Signature that will be used to reset customer's
        password. For details see
        :meth:`itsdangerous.JSONWebSignatureSerializer.dumps`

        :param expiry: Token expiration time (seconds)
        :return:
        """
        s = TimedJSONWebSignatureSerializer(
            current_app.config['SECRET_KEY'], expiry
        )
        return s.dumps({'user_id': self.id})
项目:do-portal    作者:certeu    | 项目源码 | 文件源码
def set_password(cls, token, passwd):
        """Set the password for user

        :param token:
        :param passwd:
        :return:
        """
        s = TimedJSONWebSignatureSerializer(current_app.config['SECRET_KEY'])
        data = s.loads(token)
        user = cls.get(data.get('user_id'))
        user.password = passwd
        db.session.add(user)
        db.session.commit()
项目:myproject    作者:dengliangshi    | 项目源码 | 文件源码
def generate_reset_token(self, expiration=3600):
        """Generate token for password reset only with email address.
        """
        s = Serializer(current_app.config['SECRET_KEY'], expiration)
        return s.dumps({'reset': self.id})
项目:myproject    作者:dengliangshi    | 项目源码 | 文件源码
def reset_password(self, token, new_password):
        """Reset password with tokens.
        """
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except:
            return False
        if data.get('reset') != self.id:
            return False
        self.password = new_password
        db.session.add(self)
        return True
项目:myproject    作者:dengliangshi    | 项目源码 | 文件源码
def generate_email_change_token(self, new_email, expiration=3600):
        """Generate token for changing email address.
        """
        s = Serializer(current_app.config['SECRET_KEY'], expiration)
        return s.dumps({'change_email': self.id, 'new_email': new_email})
项目:myproject    作者:dengliangshi    | 项目源码 | 文件源码
def generate_auth_token(self, expiration):
        """Generate token for authentication.
        """
        s = Serializer(current_app.config['SECRET_KEY'],
                       expires_in=expiration)
        return s.dumps({'id': self.id}).decode('ascii')
项目:1ibrary-gzhu    作者:1ibrary    | 项目源码 | 文件源码
def verify_token(token):
        from flask import current_app
        expire_time = current_app.config.get("EXPIRES_TIME") or 3600
        token_key = current_app.config["APP_KEY"]

        s = TimedJSONWebSignatureSerializer(token_key, expires_in=expire_time)

        try:
            d = s.loads(token)
            user = User.query.get(d["uid"])
            g.session_id = d["session"]
            return user
        except:
            return None
项目:1ibrary-gzhu    作者:1ibrary    | 项目源码 | 文件源码
def generate_token(self, session):
        from flask import current_app
        expire_time = current_app.config.get("EXPIRES_TIME") or 3600
        token_key = current_app.config["APP_KEY"]

        s = TimedJSONWebSignatureSerializer(token_key, expires_in=expire_time)
        d = s.dumps({"username": self.xh, "uid": self.id_, "session": session})
        return d.decode("ascii")
项目:JmilkFan-s-Blog    作者:JmilkFan    | 项目源码 | 文件源码
def post(self):
        """Can be execute when receive HTTP Method `POST`."""

        args = parsers.user_post_parser.parse_args()
        user = User.query.filter_by(username=args['username']).first()

        # Check the args['password'] whether as same as user.password.
        if user.check_password(args['password']):
            # serializer object will be saved the token period of time.
            serializer = Serializer(
                current_app.config['SECRET_KEY'],
                expires_in=600)
            return {'token': serializer.dumps({'id': user.id})}
        else:
            abort(401)
项目:pyt    作者:python-security    | 项目源码 | 文件源码
def _make_token(self, data, timeout):
        s = Serializer(current_app.config['SECRET_KEY'], timeout)
        return s.dumps(data)
项目:pyt    作者:python-security    | 项目源码 | 文件源码
def _verify_token(self, token):
        s = Serializer(current_app.config['SECRET_KEY'])
        data = None
        expired, invalid = False, False
        try:
            data = s.loads(token)
        except SignatureExpired:
            expired = True
        except Exception:
            invalid = True
        return expired, invalid, data
项目:Simpleblog    作者:Blackyukun    | 项目源码 | 文件源码
def generate_auth_token(self, expiration):
        s = Serializer(current_app.config['ECRET_KEY'],expires_in=expiration)
        return s.dumps({'id': self.id})
项目:Simpleblog    作者:Blackyukun    | 项目源码 | 文件源码
def verify_auth_token(token):
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except:
            return None
        return User.query.get(data['id'])

    # ??????JSON????????
    # ???????????????????????
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def generate_confirmation_token(self, expiration=3600):
        s = Serializer(current_app.config['SECRET_KEY'], expiration)
        return s.dumps({'confirm': self.id})
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def confirm(self, token):
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except:
            return False
        if data.get('confirm') != self.id:
            return False
        self.confirmed = True
        db.session.add(self)
        return True