Python django.core.signing 模块,TimestampSigner() 实例源码

我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用django.core.signing.TimestampSigner()

项目:pfb-network-connectivity    作者:azavea    | 项目源码 | 文件源码
def post(self, request):
        errors = []
        fatal = False
        token = request.data.get('token')
        password = request.data.get('password')
        if not token:
            errors.append('Invalid reset token.')
            fatal = True
        if not password:
            errors.append('No password provided.')
        signer = TimestampSigner(salt=settings.RESET_SALT)
        if token:
            try:
                user_uuid = signer.unsign(token, max_age=settings.RESET_TOKEN_LENGTH)
            except BadSignature:
                errors.append('Can not reset password because the reset link used was invalid.')
                fatal = True
        if len(errors) == 0:
            # set password
            user = PFBUser.objects.get(uuid=user_uuid)
            user.set_password(password)
            user.save()
            return Response({'status': 'Success'})
        else:
            return Response({'errors': errors, 'fatal': fatal}, status.HTTP_400_BAD_REQUEST)
项目:bbgo    作者:genonfire    | 项目源码 | 文件源码
def check_validation(request):
    """API check_validation"""
    code = request.POST.get('code')
    email = request.POST.get('email')
    signer = TimestampSigner()

    try:
        value = signer.unsign(code, max_age=settings.VERIFICATION_CODE_VALID)
        code_check = value == email

        if code_check:
            return JsonResponse({'status': 'true'}, status=201)
    except:
        pass

    return JsonResponse({'status': 'false'}, status=400)
项目:pfb-network-connectivity    作者:azavea    | 项目源码 | 文件源码
def get_password_reset_url(request, user):
    """Generates a password reset URL given a Request and user

    Args:
        request (rest_framework.request.Request)
        user (PBBUser): user to generate password reset url for
    """
    signer = TimestampSigner(salt=settings.RESET_SALT)
    token = signer.sign('{}'.format(user.uuid))
    return request.build_absolute_uri('/#/password-reset/?token={}'.format(token))
项目:nightreads    作者:avinassh    | 项目源码 | 文件源码
def generate_key(user, for_subscription=True):
    # not really a proper use of salts
    # but meh
    salt = _get_salt(for_subscription)
    signer = TimestampSigner(settings.SECRET_KEY, salt=salt)
    return signer.sign(str(user.id))
项目:nightreads    作者:avinassh    | 项目源码 | 文件源码
def validate_key(key, user, for_subscription=True):
    salt = _get_salt(for_subscription)
    signer = TimestampSigner(settings.SECRET_KEY, salt=salt)
    value = signer.unsign(key, max_age=settings.EMAIL_LINK_EXPIRY_DAYS)
    return str(user.id) == value
项目:bbgo    作者:genonfire    | 项目源码 | 文件源码
def get_verification_code(request):
    """API get_verification_code"""
    email = request.POST.get('email')

    if User.objects.filter(email__iexact=email).exists():
        msg = _('E-mail exists. Why don\'t you try to find your password?')
        data = {
            'result': False,
            'msg': msg,
        }
        return JsonResponse(data, status=201)

    signer = TimestampSigner()
    value = signer.sign(email)
    subject = _('[%(site_name)s] Verification code for signing in') % {
        'site_name': settings.SITE_NAME
    }
    body = value

    try:
        send_mail(subject, body, settings.EMAIL_HOST_USER, [email], fail_silently=False)
        msg = _('Verification code sent. Please check your E-mail.')
        data = {
            'result': True,
            'msg': msg,
        }
        return JsonResponse(data, status=201)
    except SMTPException:
        return JsonResponse({'status': 'false'}, status=400)
项目:bbgo    作者:genonfire    | 项目源码 | 文件源码
def send_email(request):
    """Send email to user for testing purpose"""
    id_email = request.user.email
    signer = TimestampSigner()
    value = signer.sign(id_email)
    subject = u'Test email.'
    body = u'keyCode: %s' % value

    try:
        send_mail(subject, body, settings.EMAIL_HOST_USER, [id_email], fail_silently=False)
        return error_page(request, "Email sent", status=201)
    except SMTPException:
        return error_page(request, "Error!")
项目:django-authlib    作者:matthiask    | 项目源码 | 文件源码
def get_signer(salt='email_registration'):
    """
    Returns the signer instance used to sign and unsign the registration
    link tokens
    """
    return signing.TimestampSigner(salt=salt)
项目:gitmate-2    作者:GitMateIO    | 项目源码 | 文件源码
def setUp(self):
        self.signer = TimestampSigner()
        super().setUpWithPlugin('welcome_commenter')
项目:gitmate-2    作者:GitMateIO    | 项目源码 | 文件源码
def add_welcome_comment(
    pr: MergeRequest,
    autorespond_text: str='Hi! This is GitMate v2.0!'
):
    """
    Adds a welcome comment to pull requests.
    """
    sign = TimestampSigner().sign(autorespond_text)
    msg = ('{}\n\n(Powered by [GitMate.io](https://gitmate.io))\n\n'
           '<!-- Timestamp signature `{}` -->'.format(autorespond_text, sign))
    pr.add_comment(msg)
项目:djangolg    作者:wolcomm    | 项目源码 | 文件源码
def __init__(self, value=None):
        """Initialise new AuthKey instance."""
        self.signer = TimestampSigner(salt="auth")
        self._data = {
            'clear': value,
            'signed': self.signer.sign(value)
        }
项目:perdiem-django    作者:RevolutionTech    | 项目源码 | 文件源码
def make_token(user):
    return TimestampSigner().sign(user.id)
项目:perdiem-django    作者:RevolutionTech    | 项目源码 | 文件源码
def check_token(user_id, token):
    try:
        key = '%s:%s' % (user_id, token)
        TimestampSigner().unsign(key, max_age=60 * 60 * 48)  # Valid for 2 days
    except (BadSignature, SignatureExpired):
        return False
    return True
项目:della    作者:avinassh    | 项目源码 | 文件源码
def generate_key(user):
    signer = TimestampSigner(settings.SECRET_KEY)
    return signer.sign(str(user.id))
项目:della    作者:avinassh    | 项目源码 | 文件源码
def validate_key(key, user):
    signer = TimestampSigner(settings.SECRET_KEY)
    try:
        value = signer.unsign(key, max_age=settings.EMAIL_LINK_EXPIRY_DAYS)
        return str(user.id) == value
    except (BadSignature, SignatureExpired):
        return False
项目:SciReg    作者:hms-dbmi    | 项目源码 | 文件源码
def email_confirm(request, template_name='registration/confirmed.html'):
    user = request.user

    email_confirm_value = request.GET.get('email_confirm_value', '-')
    email_confirm_value = user.email + ":" + email_confirm_value.replace(".", ":")
    success_url = request.GET.get('success_url', None)

    signer = TimestampSigner(salt=settings.EMAIL_CONFIRM_SALT)

    try:
        signer.unsign(email_confirm_value, max_age=timedelta(seconds=300))
        registration, created = Registration.objects.get_or_create(user_id=user.id)

        # If this is a new registration make sure we at least save the email/username.
        if created:
            registration.email = user.username

        registration.email_confirmed = True
        registration.save()

        # Set a message.
        messages.success(request, 'Email has been confirmed.',
                         extra_tags='success', fail_silently=True)

    except SignatureExpired:
        messages.error(request, 'This email confirmation code has expired, please try again.',
                       extra_tags='danger', fail_silently=True)

    except BadSignature:
        messages.error(request, 'This email confirmation code is invalid, please try again.',
                       extra_tags='danger', fail_silently=True)

    # Continue on to the next page, if passed. Otherwise render a default page.
    if success_url:
        return redirect(success_url)
    else:
        return render(request, template_name)
项目:SciReg    作者:hms-dbmi    | 项目源码 | 文件源码
def send_confirmation_email(self, request):
        user = request.user
        success_url = request.data.get('success_url', None)

        signer = TimestampSigner(salt=settings.EMAIL_CONFIRM_SALT)
        signed_value = signer.sign(user.email)

        signed_value = signed_value.split(":")[1] + "." + signed_value.split(":")[2]

        # Build the link URL then just break it all up.
        confirm_url = settings.CONFIRM_EMAIL_URL + signed_value
        confirm_url_parts = list(parse.urlparse(confirm_url))
        confirm_url_query = dict(parse.parse_qsl(confirm_url_parts[4]))

        # Add needed key-value pairs to the request.
        if success_url:
            confirm_url_query.update({'success_url': success_url})

        # Join everything back together.
        confirm_url_parts[4] = parse.urlencode(confirm_url_query)
        confirm_url = parse.urlunparse(confirm_url_parts)

        logger.debug("[SCIREG][DEBUG][send_confirmation_email] Assembled confirmation URL: %s" % confirm_url)

        email_send("People-Powered Medicine - E-Mail Verification", [user.email],
                   message="verify",
                   extra={"confirm_url": confirm_url, "user_email": user.email})

        return HttpResponse("SENT")
项目:SpongeAuth    作者:lukegb    | 项目源码 | 文件源码
def setup_totp(request):
    if twofa.models.TOTPDevice.objects.active_for_user(request.user).exists():
        messages.error(request, _('You may not have multiple Google Authenticators attached to your account.'))
        return redirect('twofa:list')

    setup_signer = TimestampSigner('twofa.views.setup_totp:{}'.format(request.user.pk))

    if request.method == 'POST' and 'secret' in request.POST:
        try:
            b32_secret = setup_signer.unsign(request.POST['secret'], max_age=600)
        except SignatureExpired:
            messages.error(request, _('That took too long and your challenge expired. Here\'s a new one.'))
            return redirect('twofa:setup-totp')
        except BadSignature:
            messages.error(request, _('Whoops - something went wrong. Please try again.'))
            return redirect('twofa:setup-totp')
    else:
        b32_secret = base64.b32encode(secrets.token_bytes(10))
    signed_secret = setup_signer.sign(b32_secret)

    url = 'otpauth://totp/Sponge:{}?{}'.format(
        urlquote(request.user.username),
        urlencode({
            'secret': b32_secret,
            'issuer': 'Sponge'}))
    img = qrcode.make(url, image_factory=qrcode.image.svg.SvgPathFillImage)
    img_buf = io.BytesIO()
    img.save(img_buf)

    device = twofa.models.TOTPDevice(base32_secret=b32_secret, owner=request.user)
    device.activated_at = timezone.now()  # this won't be saved unless the form is valid
    form = device.verify_form(secret=signed_secret)
    if request.method == 'POST':
        form = device.verify_form(request.POST, secret=signed_secret)

        if form.is_valid():
            # relying on verify_form to save the new device
            request.user.twofa_enabled = True
            request.user.save()

            messages.success(request, _('Your authenticator has been added to your account.'))
            return _generate_paper_codes_if_needed(request.user, reverse('twofa:list'))

    return render(request, 'twofa/setup/totp.html', {
        'form': form, 'qr_code_svg': img_buf.getvalue(), 'b32_secret': b32_secret})
项目:bbgo    作者:genonfire    | 项目源码 | 文件源码
def sign_up(request):
    """Sign up"""
    if request.method == "POST":
        userform = RegistrationForm(request.POST)
        if userform.is_valid():
            userform.save(commit=False)

            username = userform.cleaned_data['username']
            q = Q(username__iexact=username) | Q(first_name__iexact=username)
            if User.objects.filter(q).exists() or \
                len(username) < settings.ID_MIN_LENGTH or \
                    len(username) > settings.ID_MAX_LENGTH:
                errormsg = _('Please check username.')
                return error_page(request, errormsg)

            if settings.ENABLE_NICKNAME:
                nick = userform.cleaned_data['first_name']
                if nick:
                    q = Q(username__iexact=nick) | Q(first_name__iexact=nick)
                    if User.objects.filter(q).exists() or \
                        len(nick) < settings.NICKNAME_MIN_LENGTH or \
                            len(nick) > settings.NICKNAME_MAX_LENGTH:
                        errormsg = _('Please check nickname.')
                        return error_page(request, errormsg)

            code = userform.cleaned_data['code']
            email = userform.cleaned_data['email']
            signer = TimestampSigner()

            try:
                value = signer.unsign(
                    code, max_age=settings.VERIFICATION_CODE_VALID)
                code_check = value == email

                if code_check:
                    userform.save()
                    return render(
                        request,
                        "accounts/join.html",
                    )
                else:
                    errormsg = _('Verification failure. Please check verification code again.')
            except:
                errormsg = _('Verification failure. Please check verification code again.')
        else:
            errormsg = _('Sorry. Please try again later.')

        return error_page(request, errormsg)
    elif request.method == "GET":
        userform = RegistrationForm()

    return render(
        request,
        "accounts/signup.html",
        {
            'userform': userform,
        }
    )
项目:uclapi    作者:uclapi    | 项目源码 | 文件源码
def authorise(request):
    client_id = request.GET.get("client_id", None)
    state = request.GET.get("state", None)
    if not (client_id and state):
        response = PrettyJsonResponse({
            "ok": False,
            "error": "incorrect parameters supplied"
        })
        response.status_code = 400
        return response

    try:
        # We only allow the process to happen if the app exists and has not
        # been flagged as deleted
        app = App.objects.filter(client_id=client_id, deleted=False)[0]
    except IndexError:
        response = PrettyJsonResponse({
            "ok": False,
            "error": "App does not exist for client id"
        })
        response.status_code = 400
        return response

    if app.callback_url is None:
        response = PrettyJsonResponse({
            "ok": False,
            "error": "No callback URL set for this app."
        })
        response.status_code = 400
        return response

    # Sign the app and state pair before heading to Shibboleth to help protect
    # against CSRF and XSS attacks
    signer = TimestampSigner()
    data = app.client_id + state
    signed_data = signer.sign(data)

    # Build Shibboleth callback URL
    url = os.environ.get("SHIBBOLETH_ROOT") + "/Login?target="
    target = request.build_absolute_uri(
        "/oauth/shibcallback?appdata={}".format(signed_data)
    )
    target = quote(target)
    url += target

    # Send the user to Shibboleth to log in
    return redirect(url)
项目:uclapi    作者:uclapi    | 项目源码 | 文件源码
def userdeny(request):
    signer = TimestampSigner()

    try:
        signed_data = request.POST.get("signed_app_data")
        raw_data_str = signer.unsign(signed_data, max_age=300)
    except:
        response = PrettyJsonResponse({
            "ok": False,
            "error": ("The signed data received was invalid."
                      " Please try the login process again. "
                      "If this issue persists, please contact support.")
        })
        response.status_code = 400
        return response

    try:
        data = json.loads(raw_data_str)
    except:
        response = PrettyJsonResponse({
            "ok": False,
            "error": ("The JSON data was not in the expected format."
                      " Please contact support.")
        })
        response.status_code = 400
        return response

    # We can trust this value because it came from a signed dictionary
    app = App.objects.get(client_id=data["client_id"])
    state = data["state"]

    redir = "{}?result=denied&state={}".format(app.callback_url, state)

    # Now check if a token has been granted in the past. If so, invalidate it.
    # There shouldn't be a situation where more than one user/app token pair
    # exists but, just in case, let's invalidate them all.
    try:
        users = User.objects.filter(employee_id=data["user_upi"])
        user = users[0]
    except (User.DoesNotExist, KeyError):
        response = PrettyJsonResponse({
            "ok": False,
            "error":
                "User does not exist. This should never occur. "
                "Please contact support."
        })
        response.status_code = 400
        return response

    tokens = OAuthToken.objects.filter(app=app, user=user)
    for token in tokens:
        token.active = False
        token.save()

    # Send the user to the app's denied permission page
    return redirect(redir)