我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用django.core.signing.TimestampSigner()。
def post(self, request): errors = [] fatal = False token = request.data.get('token') password = request.data.get('password') if not token: errors.append('Invalid reset token.') fatal = True if not password: errors.append('No password provided.') signer = TimestampSigner(salt=settings.RESET_SALT) if token: try: user_uuid = signer.unsign(token, max_age=settings.RESET_TOKEN_LENGTH) except BadSignature: errors.append('Can not reset password because the reset link used was invalid.') fatal = True if len(errors) == 0: # set password user = PFBUser.objects.get(uuid=user_uuid) user.set_password(password) user.save() return Response({'status': 'Success'}) else: return Response({'errors': errors, 'fatal': fatal}, status.HTTP_400_BAD_REQUEST)
def check_validation(request): """API check_validation""" code = request.POST.get('code') email = request.POST.get('email') signer = TimestampSigner() try: value = signer.unsign(code, max_age=settings.VERIFICATION_CODE_VALID) code_check = value == email if code_check: return JsonResponse({'status': 'true'}, status=201) except: pass return JsonResponse({'status': 'false'}, status=400)
def get_password_reset_url(request, user): """Generates a password reset URL given a Request and user Args: request (rest_framework.request.Request) user (PBBUser): user to generate password reset url for """ signer = TimestampSigner(salt=settings.RESET_SALT) token = signer.sign('{}'.format(user.uuid)) return request.build_absolute_uri('/#/password-reset/?token={}'.format(token))
def generate_key(user, for_subscription=True): # not really a proper use of salts # but meh salt = _get_salt(for_subscription) signer = TimestampSigner(settings.SECRET_KEY, salt=salt) return signer.sign(str(user.id))
def validate_key(key, user, for_subscription=True): salt = _get_salt(for_subscription) signer = TimestampSigner(settings.SECRET_KEY, salt=salt) value = signer.unsign(key, max_age=settings.EMAIL_LINK_EXPIRY_DAYS) return str(user.id) == value
def get_verification_code(request): """API get_verification_code""" email = request.POST.get('email') if User.objects.filter(email__iexact=email).exists(): msg = _('E-mail exists. Why don\'t you try to find your password?') data = { 'result': False, 'msg': msg, } return JsonResponse(data, status=201) signer = TimestampSigner() value = signer.sign(email) subject = _('[%(site_name)s] Verification code for signing in') % { 'site_name': settings.SITE_NAME } body = value try: send_mail(subject, body, settings.EMAIL_HOST_USER, [email], fail_silently=False) msg = _('Verification code sent. Please check your E-mail.') data = { 'result': True, 'msg': msg, } return JsonResponse(data, status=201) except SMTPException: return JsonResponse({'status': 'false'}, status=400)
def send_email(request): """Send email to user for testing purpose""" id_email = request.user.email signer = TimestampSigner() value = signer.sign(id_email) subject = u'Test email.' body = u'keyCode: %s' % value try: send_mail(subject, body, settings.EMAIL_HOST_USER, [id_email], fail_silently=False) return error_page(request, "Email sent", status=201) except SMTPException: return error_page(request, "Error!")
def get_signer(salt='email_registration'): """ Returns the signer instance used to sign and unsign the registration link tokens """ return signing.TimestampSigner(salt=salt)
def setUp(self): self.signer = TimestampSigner() super().setUpWithPlugin('welcome_commenter')
def add_welcome_comment( pr: MergeRequest, autorespond_text: str='Hi! This is GitMate v2.0!' ): """ Adds a welcome comment to pull requests. """ sign = TimestampSigner().sign(autorespond_text) msg = ('{}\n\n(Powered by [GitMate.io](https://gitmate.io))\n\n' '<!-- Timestamp signature `{}` -->'.format(autorespond_text, sign)) pr.add_comment(msg)
def __init__(self, value=None): """Initialise new AuthKey instance.""" self.signer = TimestampSigner(salt="auth") self._data = { 'clear': value, 'signed': self.signer.sign(value) }
def make_token(user): return TimestampSigner().sign(user.id)
def check_token(user_id, token): try: key = '%s:%s' % (user_id, token) TimestampSigner().unsign(key, max_age=60 * 60 * 48) # Valid for 2 days except (BadSignature, SignatureExpired): return False return True
def generate_key(user): signer = TimestampSigner(settings.SECRET_KEY) return signer.sign(str(user.id))
def validate_key(key, user): signer = TimestampSigner(settings.SECRET_KEY) try: value = signer.unsign(key, max_age=settings.EMAIL_LINK_EXPIRY_DAYS) return str(user.id) == value except (BadSignature, SignatureExpired): return False
def email_confirm(request, template_name='registration/confirmed.html'): user = request.user email_confirm_value = request.GET.get('email_confirm_value', '-') email_confirm_value = user.email + ":" + email_confirm_value.replace(".", ":") success_url = request.GET.get('success_url', None) signer = TimestampSigner(salt=settings.EMAIL_CONFIRM_SALT) try: signer.unsign(email_confirm_value, max_age=timedelta(seconds=300)) registration, created = Registration.objects.get_or_create(user_id=user.id) # If this is a new registration make sure we at least save the email/username. if created: registration.email = user.username registration.email_confirmed = True registration.save() # Set a message. messages.success(request, 'Email has been confirmed.', extra_tags='success', fail_silently=True) except SignatureExpired: messages.error(request, 'This email confirmation code has expired, please try again.', extra_tags='danger', fail_silently=True) except BadSignature: messages.error(request, 'This email confirmation code is invalid, please try again.', extra_tags='danger', fail_silently=True) # Continue on to the next page, if passed. Otherwise render a default page. if success_url: return redirect(success_url) else: return render(request, template_name)
def send_confirmation_email(self, request): user = request.user success_url = request.data.get('success_url', None) signer = TimestampSigner(salt=settings.EMAIL_CONFIRM_SALT) signed_value = signer.sign(user.email) signed_value = signed_value.split(":")[1] + "." + signed_value.split(":")[2] # Build the link URL then just break it all up. confirm_url = settings.CONFIRM_EMAIL_URL + signed_value confirm_url_parts = list(parse.urlparse(confirm_url)) confirm_url_query = dict(parse.parse_qsl(confirm_url_parts[4])) # Add needed key-value pairs to the request. if success_url: confirm_url_query.update({'success_url': success_url}) # Join everything back together. confirm_url_parts[4] = parse.urlencode(confirm_url_query) confirm_url = parse.urlunparse(confirm_url_parts) logger.debug("[SCIREG][DEBUG][send_confirmation_email] Assembled confirmation URL: %s" % confirm_url) email_send("People-Powered Medicine - E-Mail Verification", [user.email], message="verify", extra={"confirm_url": confirm_url, "user_email": user.email}) return HttpResponse("SENT")
def setup_totp(request): if twofa.models.TOTPDevice.objects.active_for_user(request.user).exists(): messages.error(request, _('You may not have multiple Google Authenticators attached to your account.')) return redirect('twofa:list') setup_signer = TimestampSigner('twofa.views.setup_totp:{}'.format(request.user.pk)) if request.method == 'POST' and 'secret' in request.POST: try: b32_secret = setup_signer.unsign(request.POST['secret'], max_age=600) except SignatureExpired: messages.error(request, _('That took too long and your challenge expired. Here\'s a new one.')) return redirect('twofa:setup-totp') except BadSignature: messages.error(request, _('Whoops - something went wrong. Please try again.')) return redirect('twofa:setup-totp') else: b32_secret = base64.b32encode(secrets.token_bytes(10)) signed_secret = setup_signer.sign(b32_secret) url = 'otpauth://totp/Sponge:{}?{}'.format( urlquote(request.user.username), urlencode({ 'secret': b32_secret, 'issuer': 'Sponge'})) img = qrcode.make(url, image_factory=qrcode.image.svg.SvgPathFillImage) img_buf = io.BytesIO() img.save(img_buf) device = twofa.models.TOTPDevice(base32_secret=b32_secret, owner=request.user) device.activated_at = timezone.now() # this won't be saved unless the form is valid form = device.verify_form(secret=signed_secret) if request.method == 'POST': form = device.verify_form(request.POST, secret=signed_secret) if form.is_valid(): # relying on verify_form to save the new device request.user.twofa_enabled = True request.user.save() messages.success(request, _('Your authenticator has been added to your account.')) return _generate_paper_codes_if_needed(request.user, reverse('twofa:list')) return render(request, 'twofa/setup/totp.html', { 'form': form, 'qr_code_svg': img_buf.getvalue(), 'b32_secret': b32_secret})
def sign_up(request): """Sign up""" if request.method == "POST": userform = RegistrationForm(request.POST) if userform.is_valid(): userform.save(commit=False) username = userform.cleaned_data['username'] q = Q(username__iexact=username) | Q(first_name__iexact=username) if User.objects.filter(q).exists() or \ len(username) < settings.ID_MIN_LENGTH or \ len(username) > settings.ID_MAX_LENGTH: errormsg = _('Please check username.') return error_page(request, errormsg) if settings.ENABLE_NICKNAME: nick = userform.cleaned_data['first_name'] if nick: q = Q(username__iexact=nick) | Q(first_name__iexact=nick) if User.objects.filter(q).exists() or \ len(nick) < settings.NICKNAME_MIN_LENGTH or \ len(nick) > settings.NICKNAME_MAX_LENGTH: errormsg = _('Please check nickname.') return error_page(request, errormsg) code = userform.cleaned_data['code'] email = userform.cleaned_data['email'] signer = TimestampSigner() try: value = signer.unsign( code, max_age=settings.VERIFICATION_CODE_VALID) code_check = value == email if code_check: userform.save() return render( request, "accounts/join.html", ) else: errormsg = _('Verification failure. Please check verification code again.') except: errormsg = _('Verification failure. Please check verification code again.') else: errormsg = _('Sorry. Please try again later.') return error_page(request, errormsg) elif request.method == "GET": userform = RegistrationForm() return render( request, "accounts/signup.html", { 'userform': userform, } )
def authorise(request): client_id = request.GET.get("client_id", None) state = request.GET.get("state", None) if not (client_id and state): response = PrettyJsonResponse({ "ok": False, "error": "incorrect parameters supplied" }) response.status_code = 400 return response try: # We only allow the process to happen if the app exists and has not # been flagged as deleted app = App.objects.filter(client_id=client_id, deleted=False)[0] except IndexError: response = PrettyJsonResponse({ "ok": False, "error": "App does not exist for client id" }) response.status_code = 400 return response if app.callback_url is None: response = PrettyJsonResponse({ "ok": False, "error": "No callback URL set for this app." }) response.status_code = 400 return response # Sign the app and state pair before heading to Shibboleth to help protect # against CSRF and XSS attacks signer = TimestampSigner() data = app.client_id + state signed_data = signer.sign(data) # Build Shibboleth callback URL url = os.environ.get("SHIBBOLETH_ROOT") + "/Login?target=" target = request.build_absolute_uri( "/oauth/shibcallback?appdata={}".format(signed_data) ) target = quote(target) url += target # Send the user to Shibboleth to log in return redirect(url)
def userdeny(request): signer = TimestampSigner() try: signed_data = request.POST.get("signed_app_data") raw_data_str = signer.unsign(signed_data, max_age=300) except: response = PrettyJsonResponse({ "ok": False, "error": ("The signed data received was invalid." " Please try the login process again. " "If this issue persists, please contact support.") }) response.status_code = 400 return response try: data = json.loads(raw_data_str) except: response = PrettyJsonResponse({ "ok": False, "error": ("The JSON data was not in the expected format." " Please contact support.") }) response.status_code = 400 return response # We can trust this value because it came from a signed dictionary app = App.objects.get(client_id=data["client_id"]) state = data["state"] redir = "{}?result=denied&state={}".format(app.callback_url, state) # Now check if a token has been granted in the past. If so, invalidate it. # There shouldn't be a situation where more than one user/app token pair # exists but, just in case, let's invalidate them all. try: users = User.objects.filter(employee_id=data["user_upi"]) user = users[0] except (User.DoesNotExist, KeyError): response = PrettyJsonResponse({ "ok": False, "error": "User does not exist. This should never occur. " "Please contact support." }) response.status_code = 400 return response tokens = OAuthToken.objects.filter(app=app, user=user) for token in tokens: token.active = False token.save() # Send the user to the app's denied permission page return redirect(redir)