Python django.http.request 模块,HttpRequest() 实例源码

我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用django.http.request.HttpRequest()

项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def viewer_login(request: HttpRequest) -> HttpResponse:

    if request.method == 'POST':
        username = request.POST['username']
        password = request.POST['password']
        user = authenticate(username=username, password=password)
        if user is not None:
            if user.is_active:
                login(request, user)
                next_url = request.POST.get('next', 'viewer:main-page')
                return redirect(next_url)
            else:
                return render_error(request, "This account has been disabled.")
        else:
            return render_error(request, "Invalid login credentials.")
    else:
        next_url = request.GET.get('next', 'viewer:main-page')
        d = {'next': next_url}
        return render(request, 'viewer/login.html', d)
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def image_viewer(request: HttpRequest, archive: int, page: int) -> HttpResponse:

    images = Image.objects.filter(archive=archive, extracted=True)
    if not images:
        raise Http404("Archive " + str(archive) + " has no extracted images")

    paginator = Paginator(images, 1)
    try:
        image = paginator.page(page)
    except (InvalidPage, EmptyPage):
        image = paginator.page(paginator.num_pages)

    image_object = image.object_list[0]

    if image_object.image_width / image_object.image_height > 1:
        image_object.is_horizontal = True

    d = {'image': image, 'backurl': redirect(image.object_list[0].archive).url,
         'images_range': range(1, images.count() + 1), 'image_object': image_object}

    return render(request, "viewer/image_viewer.html", d)
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def gallery_thumb(request: HttpRequest, pk: int) -> HttpResponse:
    try:
        gallery = Gallery.objects.get(pk=pk)
    except Gallery.DoesNotExist:
        raise Http404("Gallery does not exist")
    if not gallery.public and not request.user.is_authenticated:
        raise Http404("Gallery is not public")
    if 'HTTP_X_FORWARDED_HOST' in request.META:
        response = HttpResponse()
        response["Content-Type"] = "image/jpeg"
        # response["Content-Disposition"] = 'attachment; filename*=UTF-8\'\'{0}'.format(
        #         archive.pretty_name)
        response['X-Accel-Redirect'] = "/image/{0}".format(gallery.thumbnail.name)
        return response
    else:
        return HttpResponseRedirect(gallery.thumbnail.url)
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def public_stats(request: HttpRequest) -> HttpResponse:
    """Display public galleries and archives stats."""
    if not crawler_settings.urls.enable_public_stats:
        if not request.user.is_staff:
            raise Http404("Page not found")
        else:
            return render_error(request, "Page disabled by settings (urls: enable_public_stats).")

    stats_dict = {
        "n_archives": Archive.objects.filter(public=True).count(),
        "archive": Archive.objects.filter(public=True).filter(filesize__gt=0).aggregate(
            Avg('filesize'), Max('filesize'), Min('filesize'), Sum('filesize')),
        "n_tags": Tag.objects.filter(gallery_tags__public=True).distinct().count(),
        "top_10_tags": Tag.objects.filter(gallery_tags__public=True).distinct().annotate(
            num_archive=Count('gallery_tags')).order_by('-num_archive')[:10]
    }

    d = {'stats': stats_dict}

    return render(request, "viewer/public_stats.html", d)
项目:Rinzler    作者:feliphebueno    | 项目源码 | 文件源码
def no_route_found(self, request):
        """
        Default callback for route not found
        :param request HttpRequest
        :rtype: Response
        """
        response_obj = OrderedDict()
        response_obj["status"] = False
        response_obj["exceptions"] = {
            "message": "No route found for {0} {1}".format(self.__method, self.__uri),
        }
        response_obj["request"] = {
            "method": self.__method,
            "path_info": self.__uri,
            "content": request.body.decode("utf-8")
        }
        response_obj["message"] = "We are sorry, but something went terribly wrong."

        return Response(response_obj, content_type="application/json", status=404, charset="utf-8")
项目:esdc-ce    作者:erigones    | 项目源码 | 文件源码
def get_dummy_request(dc, method=None, user=None, system_user=False):
    """
    Return dummy request object.
    """
    request = HttpRequest()
    request.csrf_processing_done = True
    request.dc = dc

    if method:
        request = set_request_method(request, method, copy_request=False)

    if system_user:
        from api.task.utils import get_system_task_user
        request.user = get_system_task_user()
    elif user:
        request.user = user

    return request
项目:authserver    作者:jdelic    | 项目源码 | 文件源码
def get_form(self, req: HttpRequest, obj: Domain=None, **kwargs: Any) -> type:
        if req.GET.get("_prefill_key", "0") == "1":
            def formfield_callback(field: _ModelField, request: HttpRequest=None, **kwargs: Any) -> Type[_FormField]:
                f = self.formfield_for_dbfield(field, request=request, **kwargs)  # type: _FormField
                # f can be None if the dbfield does not get a FormField (like hidden fields
                # or auto increment IDs). Only the dbfield has a .name attribute.
                if f and field.name == "dkimkey":
                    if obj:
                        obj.dkimkey = RSA.generate(2048).exportKey("PEM").decode("utf-8")
                    else:
                        f.initial = RSA.generate(2048).exportKey("PEM").decode("utf-8")
                return f

            kwargs["formfield_callback"] = functools.partial(formfield_callback, request=req)

        form_t = super().get_form(req, obj, **kwargs)
        return form_t
项目:authserver    作者:jdelic    | 项目源码 | 文件源码
def _make_access_token(self, request: HttpRequest, tokenreq: _TokenRequest,
                           rightnow: datetime.datetime, perms: TokenPermissions, for_user: MNUser) -> Dict[str, Any]:
        # TODO: figure out if we need to support more than one access scope
        # This implementation is based around this article, that, among other things,
        # describes the "kid" field required by Docker. The JWK implementation provided
        # by jwcrypto doesn't seem to work.
        # https://umbrella.cisco.com/blog/blog/2016/02/23/implementing-oauth-for-registry-v2/
        _x = []  # type: List[str]
        jwtobj = {
            'exp': int((rightnow + datetime.timedelta(minutes=2)).timestamp()),
            'nbf': int((rightnow - datetime.timedelta(seconds=1)).timestamp()),
            'iat': int(rightnow.timestamp()),
            'iss': request.get_host(),
            'aud': tokenreq.service,
            'sub': str(for_user.pk),
            'access': [{
                "type": perms.type,
                "name": perms.path,
                "actions": _x + (["push"] if perms.push else []) +
                                (["pull"] if perms.pull else []) +
                                (["login"] if perms.type == "login" else [])
            }]
        }  # type: Dict[str, Union[str, int, List[Dict[str, Union[str, List[str]]]]]]
        return jwtobj
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def setUp(self):
        super().setUp()
        self.request = HttpRequest()
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def viewer_logout(request: HttpRequest) -> HttpResponse:
    logout(request)
    return HttpResponseRedirect(reverse('viewer:main-page'))
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def session_settings(request: HttpRequest) -> HttpResponse:
    if request.method == 'POST':
        data = json.loads(request.body.decode("utf-8"))
        if 'viewer_parameters' in data:
            if "viewer_parameters" not in request.session:
                request.session["viewer_parameters"] = {}
            for k, v in data.items():
                if not k == 'viewer_parameters':
                    request.session["viewer_parameters"][k] = v
            request.session.modified = True
        return HttpResponse(json.dumps({'result': "ok"}), content_type="application/json; charset=utf-8")
    elif request.method == 'GET':
        data = json.loads(request.body.decode("utf-8"))
        if 'viewer_parameters' in data:
            if "viewer_parameters" not in request.session:
                request.session["viewer_parameters"] = {}
                request.session["viewer_parameters"]["image_width"] = 900
                request.session.modified = True
            return HttpResponse(
                json.dumps(request.session["viewer_parameters"]), content_type="application/json; charset=utf-8"
            )
        return HttpResponse(json.dumps({'result': "error"}), content_type="application/json; charset=utf-8")
    else:
        return HttpResponse(json.dumps({'result': "error"}), content_type="application/json; charset=utf-8")


# TODO: Generalize this script for several providers.
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def panda_userscript(request: HttpRequest) -> HttpResponse:

    return render(
        request,
        'viewer/panda.user.js',
        {
            "api_key": crawler_settings.api_key,
            "img_url": request.build_absolute_uri(crawler_settings.urls.static_url + 'favicon-160.png'),
            "server_url": request.build_absolute_uri(reverse('viewer:json-parser'))
        },
        content_type='application/javascript'
    )
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def user_archive_preferences(request: HttpRequest, archive_pk: int, setting: str) -> HttpResponse:
    """Archive user favorite toggle."""
    try:
        Archive.objects.get(pk=archive_pk)
    except Archive.DoesNotExist:
        raise Http404("Archive does not exist")

    if setting == 'favorite':
        current_user_archive_preferences, created = UserArchivePrefs.objects.get_or_create(
            user=User.objects.get(pk=request.user.id),
            archive=Archive.objects.get(pk=archive_pk),
            defaults={'favorite_group': 1}
        )
        if not created:
            current_user_archive_preferences.favorite_group = 1
            current_user_archive_preferences.save()
    elif setting == 'unfavorite':
        current_user_archive_preferences, created = UserArchivePrefs.objects.get_or_create(
            user=User.objects.get(pk=request.user.id),
            archive=Archive.objects.get(pk=archive_pk),
            defaults={'favorite_group': 0}
        )
        if not created:
            current_user_archive_preferences.favorite_group = 0
            current_user_archive_preferences.save()
    else:
        return render_error(request, "Unknown user preference.")
    return HttpResponseRedirect(request.META["HTTP_REFERER"],
                                {'user_archive_preferences': current_user_archive_preferences})
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def render_error(request: HttpRequest, message: str) -> HttpResponseRedirect:
    messages.error(request, message, extra_tags='danger')
    if 'HTTP_REFERER' in request.META:
        return HttpResponseRedirect(request.META["HTTP_REFERER"])
    else:
        return HttpResponseRedirect(reverse('viewer:main-page'))
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def about(request: HttpRequest) -> HttpResponse:

    return render(
        request,
        'viewer/about.html'
    )
项目:Rinzler    作者:feliphebueno    | 项目源码 | 文件源码
def default_route_options(request: HttpRequest):
        """
        Default callback for OPTIONS request
        :param request HttpRequest
        :rtype: Response
        """
        response_obj = OrderedDict()

        response_obj["status"] = True
        response_obj["data"] = "Ok"

        return Response(response_obj, content_type="application/json", charset="utf-8")
项目:django-modern-rpc    作者:alorence    | 项目源码 | 文件源码
def test_http_basic_get_user():

    # Basic django request, without authentication info
    request = HttpRequest()

    # Standard middlewares were not applied on this request
    user = http_basic_auth_get_user(request)

    assert user is not None
    assert user_is_anonymous(user)
项目:django-icekit    作者:ic-labs    | 项目源码 | 文件源码
def test_model(self):
        req = HttpRequest()
        self.article.publish()
        self.article_2.publish()
        # test the listing contains the published article
        self.assertTrue(self.article.get_published() in self.listing.get_items_to_list(req))
        # ...not the draft one
        self.assertTrue(self.article not in self.listing.get_items_to_list(req))
        # ...not an article that isn't associated with the listing
        self.assertTrue(self.article_2 not in self.listing.get_items_to_list(req))
        self.assertTrue(self.article_2.get_published() not in self.listing.get_items_to_list(req))
        self.article.unpublish()
        self.article_2.unpublish()
项目:graduate-adventure    作者:dnsdhrj    | 项目源码 | 文件源码
def init_request(self):
        request = HttpRequest()
        request.data = QueryDict().copy()
        request.session = DummySession()
        return request
项目:django-mypy    作者:caulagi    | 项目源码 | 文件源码
def detail(request: HttpRequest, question_id: int) -> HttpResponse:
    question = get_object_or_404(Question, pk=question_id)
    if not question.is_active:
        return 'invalid request'
    return render(request, 'polls/detail.html', {'question': question})
项目:authserver    作者:jdelic    | 项目源码 | 文件源码
def health(request: HttpRequest) -> HttpResponse:
    c = connection.cursor()  # type: CursorWrapper
    try:
        c.execute("SELECT 1")
        res = c.fetchone()
    except DatabaseError as e:
        return HttpResponseServerError(("Health check failed: %s" % str(e)).encode("utf-8"),
                                       content_type="text/plain; charset=utf-8")
    else:
        return HttpResponse(b'All green', status=200,
                            content_type="text/plain; charset=utf-8")
项目:authserver    作者:jdelic    | 项目源码 | 文件源码
def nothing(request: HttpRequest) -> HttpResponse:
    return HttpResponse(b'nothing to see here', status=200,
                        content_type="text/plain; charset=utf-8")
项目:authserver    作者:jdelic    | 项目源码 | 文件源码
def test_error(request: HttpRequest) -> HttpResponse:
    if settings.DEBUG:
        raise Exception("This is a test")
    else:
        return HttpResponseRedirect("/")
项目:authserver    作者:jdelic    | 项目源码 | 文件源码
def branding(request: HttpRequest) -> Dict[str, str]:
    return {
        "company_name": settings.COMPANY_NAME,
        "company_logo": settings.COMPANY_LOGO_URL,
    }
项目:authserver    作者:jdelic    | 项目源码 | 文件源码
def validate_refresh_token(self, refresh_token: str, client: MNApplication,
                               request: HttpRequest, *args: Any, **kwargs: Any) -> bool:
        res = super().validate_refresh_token(refresh_token, client, request, *args, **kwargs)
        if res:
            # our base validated the refresh token, let's check if the client or user permissions
            # changed
            missing_permissions = find_missing_permissions(client, request.user)
            return len(missing_permissions) == 0
        else:
            return False
项目:authserver    作者:jdelic    | 项目源码 | 文件源码
def _make_refresh_token(self, request: HttpRequest, tokenreq: _TokenRequest,
                            rightnow: datetime.datetime, for_user: MNUser) -> Dict[str, Any]:
        jwtobj = {
            'exp': int((rightnow + datetime.timedelta(hours=2)).timestamp()),
            'nbf': int((rightnow - datetime.timedelta(seconds=1)).timestamp()),
            'iat': int(rightnow.timestamp()),
            'iss': request.get_host(),
            'aud': tokenreq.service,
            'sub': str(for_user.pk),
            'malleable': True,
            'client_id': tokenreq.client_id,
        }
        return jwtobj
项目:authserver    作者:jdelic    | 项目源码 | 文件源码
def dispatch(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
        return super().dispatch(request, *args, **kwargs)
项目:django-telegram-bot    作者:jlmadurga    | 项目源码 | 文件源码
def render(self):
        if not self.template_name:
            return None
        try:
            logger.debug("Template name: %s" % self.template_name)
            template = get_template(self.template_name)      
        except TemplateDoesNotExist:
            logger.debug("Template not found: %s" % self.template_name)
            return None
        # TODO: Avoid using a null HttRequest to context processors
        ctx = RequestContext(HttpRequest(), self.ctx)
        return template.render(ctx)
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def gallery_details(request: HttpRequest, pk: int, tool: str=None) -> HttpResponse:
    try:
        gallery = Gallery.objects.get(pk=pk)
    except Gallery.DoesNotExist:
        raise Http404("Gallery does not exist")
    if not (gallery.public or request.user.is_authenticated):
        raise Http404("Gallery does not exist")

    if request.user.is_staff and tool == "download":
        if 'downloader' in request.GET:
            current_settings = Settings(load_from_config=crawler_settings.config)
            current_settings.allow_downloaders_only([request.GET['downloader']], True, True, True)
            current_settings.workers.web_queue.enqueue_args_list((gallery.get_link(),), override_options=current_settings)
        else:
            # Since this is used from the gallery page mainly to download an already added gallery using
            # downloader settings, force replace_metadata
            current_settings = Settings(load_from_config=crawler_settings.config)
            current_settings.replace_metadata = True
            current_settings.workers.web_queue.enqueue_args_list((gallery.get_link(),), override_options=current_settings)
        return HttpResponseRedirect(request.META["HTTP_REFERER"])

    if request.user.is_staff and tool == "toggle-hidden":
        gallery.hidden = not gallery.hidden
        gallery.save()
        return HttpResponseRedirect(request.META["HTTP_REFERER"])

    if request.user.is_staff and tool == "toggle-public":
        gallery.public_toggle()
        return HttpResponseRedirect(request.META["HTTP_REFERER"])

    if request.user.is_staff and tool == "mark-deleted":
        gallery.mark_as_deleted()
        return HttpResponseRedirect(request.META["HTTP_REFERER"])

    if request.user.is_staff and tool == "recall-api":

        current_settings = Settings(load_from_config=crawler_settings.config)

        current_settings.set_update_metadata_options(providers=(gallery.provider,))

        current_settings.workers.web_queue.enqueue_args_list((gallery.get_link(),), override_options=current_settings)

        frontend_logger.info(
            'Updating gallery API data for gallery: {} and related archives'.format(
                gallery.get_absolute_url()
            )
        )

        return HttpResponseRedirect(request.META["HTTP_REFERER"])

    tag_lists = sort_tags(gallery.tags.all())

    d = {'gallery': gallery, 'tag_lists': tag_lists, 'settings': crawler_settings}
    return render(request, "viewer/gallery.html", d)
项目:Rinzler    作者:feliphebueno    | 项目源码 | 文件源码
def route(self, request: HttpRequest):
        """
        Prepares for the CallBackResolver and handles the response and exceptions
        :param request HttpRequest
        :rtype: HttpResponse
        """
        self.flush()
        self.__request = request
        self.__uri = request.path[1:]
        self.__method = request.method
        self.__bound_routes = dict()
        self.register('log', logging.getLogger(os.urandom(3).hex().upper()))
        self.register('router', RouteMapping())

        self.__app['router'].flush_routes()
        routes = self.__callable().connect(self.__app)

        self.__bound_routes = routes['router'].get__routes()

        request_headers = request.META
        if 'HTTP_USER_AGENT' in request_headers:
            indent = 2 if re.match("[Mozilla]{7}", request_headers['HTTP_USER_AGENT']) else 0
        else:
            indent = 0

        if self.set_end_point_uri() is False:
            return self.set_response_headers(self.no_route_found(self.__request).render(indent))

        acutal_params = self.get_url_params(self.get_end_point_uri())

        try:
            response = self.exec_route_callback(acutal_params)
            if type(response) == Response:
                return self.set_response_headers(response.render(indent))
            else:
                return self.set_response_headers(response)
        except InvalidInputException:
            self.__app['log'].error("< 400", exc_info=True)
            return self.set_response_headers(Response(None, status=400).render(indent))
        except AuthException as e:
            self.__app['log'].error("< 403", exc_info=True)
            return self.set_response_headers(Response(None, status=403).render(indent))
        except NotFoundException:
            self.__app['log'].error("< 404", exc_info=True)
            return self.set_response_headers(Response(None, status=404).render(indent))
        except RequestDataTooBig:
            self.__app['log'].error("< 413", exc_info=True)
            return self.set_response_headers(Response(None, status=413).render(indent))
        except BaseException as e:
            self.__app['log'].error("< 500", exc_info=True)
            return self.set_response_headers(Response(None, status=500).render(indent))
        finally:
            del self
项目:authserver    作者:jdelic    | 项目源码 | 文件源码
def post(self, request: HttpRequest, *args: Any, **kwargs: Any) -> HttpResponse:
        # POST received:
        # <QueryDict: {
        #       'client_id': ['docker'],
        #       'refresh_token': ['boink'],
        #       'service': ['registry.maurusnet.test'],
        #       'scope': ['repository:dev/destalinator:push,pull'],
        #       'grant_type': ['refresh_token']}>
        if "refresh_token" in request.POST and request.POST["grant_type"] == "refresh_token":
            tr = _tkr_parse(request.POST)

            if tr.scope:
                tp = TokenPermissions.parse_scope(tr.scope)
            else:
                return HttpResponseBadRequest("Can't issue access token without valid scope (scope=%s)", tr.scope)

            try:
                client = DockerRegistry.objects.get(client_id=tr.service)  # type: DockerRegistry
            except DockerRegistry.DoesNotExist:
                return HttpResponseNotFound("No such registry/client from refresh token(%s)" % str(tr))

            user = self._user_from_refresh_token(request.POST["refresh_token"], client.public_key_pem(),
                                                 expected_issuer=request.get_host(),
                                                 expected_audience=tr.service)
            if user:
                try:
                    drepo = DockerRepo.objects.get(name=tp.path, registry_id=client.id)
                except DockerRepo.DoesNotExist:
                    if settings.DOCKERAUTH_ALLOW_UNCONFIGURED_REPOS:
                        drepo = DockerRepo()
                        drepo.name = tp.path
                        drepo.registry = client
                        drepo.unauthenticated_read = True
                        drepo.unauthenticated_write = True
                    else:
                        return HttpResponseNotFound("No such repo '%s'" % tp.path)

                if drepo.registry.has_access(user, tp) or drepo.has_access(user, tp):
                    rightnow = datetime.datetime.now(tz=pytz.UTC)
                    return HttpResponse(content=json.dumps({
                        "access_token": self._create_jwt(
                            self._make_access_token(request, tr, rightnow, tp, user),
                            client.private_key_pem(),
                        ),
                        "scope": tr.scope,
                        "expires_in": 119,
                        "refresh_token": self._create_jwt(
                            self._make_refresh_token(request, tr, rightnow, user),
                            client.private_key_pem(),
                        )
                    }), status=200, content_type="application/json")
                else:
                    return HttpResponseForbidden("User %s doesn't have access to repo %s" % (user.pk, tp.path))
            else:
                return HttpResponse("Unauthorized", status=401)
        else:
            return HttpResponseBadRequest("POSTing to this endpoint requires a refresh_token")