Python django.core.cache.cache 模块,get() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.core.cache.cache.get()

项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def for_request(self, request, body=None):
        if body and 'oauth_client_id' in body:
            rv = Tenant.objects.get(pk=body['oauth_client_id'])
            if rv is not None:
                return rv, {}

        jwt_data = request.GET.get('signed_request')

        if not jwt_data:
            header = request.META.get('HTTP_AUTHORIZATION', '')
            jwt_data = header[4:] if header.startswith('JWT ') else None

        if not jwt_data:
            raise BadTenantError('Could not find JWT')

        try:
            oauth_id = jwt.decode(jwt_data, verify=False)['iss']
            client = Tenant.objects.get(pk=oauth_id)
            if client is not None:
                data = jwt.decode(jwt_data, client.secret)
                return client, data
        except jwt.exceptions.DecodeError:
            pass

        raise BadTenantError('Could not find tenant')
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def get_event_from_url_params(self, group_id, event_id=None, slug_vars=None):
        if event_id is not None:
            try:
                event = Event.objects.get(pk=int(event_id))
            except (ValueError, Event.DoesNotExist):
                return None
            group = event.group
            if six.text_type(group.id) != group_id:
                return None
        else:
            try:
                group = Group.objects.get(pk=int(group_id))
            except (ValueError, Group.DoesNotExist):
                return None
            event = group.get_latest_event()
        event = self._ensure_and_bind_event(event)
        if event is None:
            return None

        if slug_vars is not None:
            if slug_vars['org_slug'] != group.organization.slug or \
               slug_vars['proj_slug'] != group.project.slug:
                return None

        return event
项目:django-powerpages    作者:Open-E-WEB    | 项目源码 | 文件源码
def url_by_alias(page_alias):
        """
        Try to read page url from cache. If it's missing then try to find
        matching page that could be missing in cache. If page is found
        then refresh all url list since it's too old.
        If no matching is found then return None so we can throw any
        exception we want in other places
        """
        if page_alias:
            url_to_alias = cache.get(cachekeys.URL_LIST_CACHE)
            if url_to_alias is None:
                url_to_alias = PageURLCache.refresh()
            url = url_to_alias.get(page_alias)
        else:
            url = None
        return url


# Models:
项目:sensu_drive    作者:ilavender    | 项目源码 | 文件源码
def clients(request):        

    data = {}

    for word in cache.keys("client_*"):
        client = re.sub(r'^client_', '', word)
        try:

            client_data = cache.get(word)
            data[client] = client_data 

        except:
            raise

    profile_form = ContactForm(instance=Contact.objects.get(user=request.user.id))

    return render(request, 'isubscribe/clients.html', {'DATA':data, 'profile_form': profile_form})
项目:sensu_drive    作者:ilavender    | 项目源码 | 文件源码
def subscriptions(request):        

    data = {}

    for word in r.keys("subscription_*"):
        subscription = re.sub(r'^subscription_', '', str(word.decode('utf-8')))
        try:

            subscription_data = r.lrange(word, 0, -1)
            data[subscription] = subscription_data 

        except:
            raise

    profile_form = ContactForm(instance=Contact.objects.get(user=request.user.id))   

    return render(request, 'isubscribe/subscriptions.html', {'DATA':data, 'profile_form': profile_form})




#@login_required(login_url=reverse_lazy('login'))
项目:sensu_drive    作者:ilavender    | 项目源码 | 文件源码
def user_settings(request):        

    logger.debug('settings view triggered by %s' % (request.user.username))

    form = ContactForm(request.POST, instance=Contact.objects.get(user=request.user.id))

    if form.is_valid:
        try:
            form.save()
            return HttpResponse('Done', status=200)
        except:        
            return HttpResponse(json.dumps(form.errors), status=409)
    else:
        return HttpResponse(json.dumps(form.errors), status=409)



    return render(request, 'isubscribe/user_settings.html', {'DATA':data, 'form': form})
项目:sensu_drive    作者:ilavender    | 项目源码 | 文件源码
def check_config(request):        

    mimetype = 'application/json'   
    data = {}

    if request.method == 'POST' and 'entity' in request.POST and request.POST['entity'] != '':        
        client_name, check_name = request.POST['entity'].split(':')
        #check_name = 'check_gw_tomcat_errors_1h'
        #data = cache.get('check_' + check_name)
        data = cache.get('check_' + request.POST['entity'])


    return HttpResponse(json.dumps(data), mimetype)



#@login_required(login_url=reverse_lazy('login'))
项目:sensu_drive    作者:ilavender    | 项目源码 | 文件源码
def onduty_members(self):

        OnDuty = []
        if 'OnDuty' in cache.keys('OnDuty'):
            OnDuty = cache.get('OnDuty')      
        else:
            try:
                event_start, event_end, instance = ScheduledOccurrence.objects.filter(event__in=ScheduledEvent.objects.filter(event=0)).next_occurrence()    
                NOW = datetime.datetime.now(datetime.timezone.utc).timestamp()
                if NOW >= event_start.timestamp() and NOW <= event_end.timestamp():
                    for user in instance.event.members_list():
                        OnDuty.append(user.pk)
                    logger.debug('onduty_members found: %s' % OnDuty)
                    #cache.set('OnDuty', OnDuty, timeout=event_end.timestamp())
                    cache.set('OnDuty', OnDuty, timeout=settings.ON_DUTY_CACHE_MEMBERS)
                else:
                    logger.debug('onduty_members can not find onduty_members')
            except:
                logger.error('onduty_members failed finding onduty_members')
                pass

        return OnDuty
项目:sensu_drive    作者:ilavender    | 项目源码 | 文件源码
def user_dnd(self, user_pk):

        if 'DnD_' + str(user_pk) in cache.keys("DnD_*"):
            #DnD = cache.get('DnD_' + str(user_pk))
            DnD = True        
        else:
            DnD = False
            try:
                event_start, event_end, instance = ScheduledOccurrence.objects.filter(event__in=ScheduledEvent.objects.filter(event=1, members__in=[user_pk])).next_occurrence()    
                NOW = datetime.datetime.now(datetime.timezone.utc).timestamp()
                if NOW >= event_start.timestamp() and NOW <= event_end.timestamp():
                    DnD = True
                    cache.set('DnD_' + str(user_pk), DnD, timeout=event_end.timestamp())
            except:
                pass

        return DnD
项目:django-lrucache-backend    作者:kogan    | 项目源码 | 文件源码
def test_binary_string(self):
        # Binary strings should be cacheable
        cache = self.cache
        from zlib import compress, decompress
        value = 'value_to_be_compressed'
        compressed_value = compress(value.encode())

        # Test set
        cache.set('binary1', compressed_value)
        compressed_result = cache.get('binary1')
        self.assertEqual(compressed_value, compressed_result)
        self.assertEqual(value, decompress(compressed_result).decode())

        # Test add
        cache.add('binary1-add', compressed_value)
        compressed_result = cache.get('binary1-add')
        self.assertEqual(compressed_value, compressed_result)
        self.assertEqual(value, decompress(compressed_result).decode())

        # Test set_many
        cache.set_many({'binary1-set_many': compressed_value})
        compressed_result = cache.get('binary1-set_many')
        self.assertEqual(compressed_value, compressed_result)
        self.assertEqual(value, decompress(compressed_result).decode())
项目:django-lrucache-backend    作者:kogan    | 项目源码 | 文件源码
def test_forever_timeout(self):
        """
        Passing in None into timeout results in a value that is cached forever
        """
        cache = self.cache
        cache.set('key1', 'eggs', None)
        self.assertEqual(cache.get('key1'), 'eggs')

        cache.add('key2', 'ham', None)
        self.assertEqual(cache.get('key2'), 'ham')
        added = cache.add('key1', 'new eggs', None)
        self.assertIs(added, False)
        self.assertEqual(cache.get('key1'), 'eggs')

        cache.set_many({'key3': 'sausage', 'key4': 'lobster bisque'}, None)
        self.assertEqual(cache.get('key3'), 'sausage')
        self.assertEqual(cache.get('key4'), 'lobster bisque')
项目:tecken    作者:mozilla-services    | 项目源码 | 文件源码
def test_client_task_tester(client, clear_redis_store):
    url = reverse('task_tester')

    def fake_task(key, value, expires):
        cache.set(key, value, expires)

    _mock_function = 'tecken.views.sample_task.delay'
    with mock.patch(_mock_function, new=fake_task):

        response = client.get(url)
        assert response.status_code == 400
        assert b'Make a POST request to this URL first' in response.content

        response = client.post(url)
        assert response.status_code == 201
        assert b'Now make a GET request to this URL' in response.content

        response = client.get(url)
        assert response.status_code == 200
        assert b'It works!' in response.content
项目:tecken    作者:mozilla-services    | 项目源码 | 文件源码
def task_tester(request):
    if request.method == 'POST':
        cache.set('marco', 'ping', 100)
        sample_task.delay('marco', 'polo', 10)
        return http.HttpResponse(
            'Now make a GET request to this URL\n',
            status=201,
        )
    else:
        if not cache.get('marco'):
            return http.HttpResponseBadRequest(
                'Make a POST request to this URL first\n'
            )
        for i in range(3):
            value = cache.get('marco')
            if value == 'polo':
                return http.HttpResponse('It works!\n')
            time.sleep(1)

        return http.HttpResponseServerError(
            'Tried 4 times (4 seconds) and no luck :(\n'
        )
项目:zing    作者:evernote    | 项目源码 | 文件源码
def get_canonical(cls, language_code):
        """Returns the canonical `Language` object matching `language_code`.

        If no language can be matched, `None` will be returned.

        :param language_code: the code of the language to retrieve.
        """
        try:
            return cls.objects.get(code__iexact=language_code)
        except cls.DoesNotExist:
            _lang_code = language_code
            if "-" in language_code:
                _lang_code = language_code.replace("-", "_")
            elif "_" in language_code:
                _lang_code = language_code.replace("_", "-")
            try:
                return cls.objects.get(code__iexact=_lang_code)
            except cls.DoesNotExist:
                return None
项目:zing    作者:evernote    | 项目源码 | 文件源码
def validate_email(self):
        """Ensure emails are unique across the models tracking emails.

        Since it's essential to keep email addresses unique to support our
        workflows, a `ValidationError` will be raised if the email trying
        to be saved is already assigned to some other user.
        """
        lookup = Q(email__iexact=self.email)
        if self.pk is not None:
            # When there's an update, ensure no one else has this address
            lookup &= ~Q(user=self)

        try:
            EmailAddress.objects.get(lookup)
        except EmailAddress.DoesNotExist:
            pass
        else:
            raise ValidationError({
                'email': [_('This email address already exists.')]
            })
项目:valhalla    作者:LCOGT    | 项目源码 | 文件源码
def get(self, request):
        try:
            start, end = get_start_end_paramters(request, default_days_back=1)
        except ValueError as e:
            return HttpResponseBadRequest(str(e))
        combine = request.query_params.get('combine')
        sites = request.query_params.getlist('site')
        telescopes = request.query_params.getlist('telescope')
        try:
            telescope_availability = get_telescope_availability_per_day(
                start, end, sites=sites, telescopes=telescopes
            )
        except ElasticSearchException:
            logger.warning('Error connecting to ElasticSearch. Is SBA reachable?')
            return Response('ConnectionError')
        if combine:
            telescope_availability = combine_telescope_availabilities_by_site_and_class(telescope_availability)
        str_telescope_availability = {str(k): v for k, v in telescope_availability.items()}

        return Response(str_telescope_availability)
项目:valhalla    作者:LCOGT    | 项目源码 | 文件源码
def archive_bearer_token(self):
        # During testing, you will probably have to copy access tokens from prod for this to work
        try:
            app = Application.objects.get(name='Archive')
        except Application.DoesNotExist:
            logger.error('Archive application not found. Oauth applications need to be populated.')
            return ''
        access_token = AccessToken.objects.filter(user=self.user, application=app, expires__gt=timezone.now()).last()
        if not access_token:
            access_token = AccessToken(
                user=self.user,
                application=app,
                token=uuid.uuid4().hex,
                expires=timezone.now() + timedelta(days=30)
            )
            access_token.save()
        return access_token.token
项目:valhalla    作者:LCOGT    | 项目源码 | 文件源码
def get_rise_set_intervals(request_dict, site=''):
    intervals = []
    site = site if site else request_dict['location'].get('site', '')
    telescope_details = configdb.get_telescopes_with_instrument_type_and_location(
            request_dict['molecules'][0]['instrument_name'],
            site,
            request_dict['location'].get('observatory', ''),
            request_dict['location'].get('telescope', '')
    )
    if not telescope_details:
        return intervals

    intervals_by_site = get_rise_set_intervals_by_site(request_dict)
    intervalsets_by_telescope = intervals_by_site_to_intervalsets_by_telescope(intervals_by_site, telescope_details.keys())
    filtered_intervalsets_by_telescope = filter_out_downtime_from_intervalsets(intervalsets_by_telescope)
    filtered_intervalset = Intervals().union(filtered_intervalsets_by_telescope.values())
    filtered_intervals = filtered_intervalset.toTupleList()

    return filtered_intervals
项目:valhalla    作者:LCOGT    | 项目源码 | 文件源码
def __init__(self, start, end, telescopes=None, sites=None, instrument_types=None):
        try:
            self.es = Elasticsearch([settings.ELASTICSEARCH_URL])
        except LocationValueError:
            logger.error('Could not find host. Make sure ELASTICSEARCH_URL is set.')
            raise ImproperlyConfigured('ELASTICSEARCH_URL')

        self.instrument_types = instrument_types
        self.available_telescopes = self._get_available_telescopes()

        sites = list({tk.site for tk in self.available_telescopes}) if not sites else sites
        telescopes = list({tk.telescope for tk in self.available_telescopes if tk.site in sites}) \
            if not telescopes else telescopes

        self.start = start.replace(tzinfo=timezone.utc).replace(microsecond=0)
        self.end = end.replace(tzinfo=timezone.utc).replace(microsecond=0)
        cached_event_data = cache.get('tel_event_data')
        if cached_event_data:
            self.event_data = cached_event_data
        else:
            self.event_data = self._get_es_data(sites, telescopes)
            cache.set('tel_event_data', self.event_data, 1800)
项目:valhalla    作者:LCOGT    | 项目源码 | 文件源码
def get(self):
        telescope_states = {}
        current_lump = dict(reasons=None, types=None, start=None)

        for event in self.event_data:
            if self._telescope(event['_source']) not in self.available_telescopes:
                continue

            if current_lump['start'] is None:
                current_lump = self._set_lump(event)
                continue

            if self._belongs_in_lump(event['_source'], current_lump):
                current_lump = self._update_lump(current_lump, event)
            else:
                lump_end = self._lump_end(current_lump, event['_source'])
                if lump_end >= self.start:
                    telescope_states = self._update_states(telescope_states, current_lump, lump_end)
                    current_lump = self._set_lump(event)

        if current_lump['start']:
            lump_end = self._lump_end(current_lump)
            telescope_states = self._update_states(telescope_states, current_lump, lump_end)

        return telescope_states
项目:malvo    作者:shivamMg    | 项目源码 | 文件源码
def set_mcqs_in_cache():
    """
    Set MCQs in cache if they have changed or have not been set.
    """
    languages = {
        'C': 'c_mcqs',
        'J': 'java_mcqs',
    }

    # If MCQs have been changed or have not been created
    if not cache.get('mcqs_flag', False):
        for lang_code, cache_key in languages.items():
            mcqs_json = extract_mcqs(lang_code)
            cache.set(cache_key, mcqs_json)

        # Mark MCQs as unchanged
        cache.set('mcqs_flag', True)
项目:malvo    作者:shivamMg    | 项目源码 | 文件源码
def _get_question_statuses(team):
    """
    Returns a dictionary of Question numbers and statuses as key-value pairs.
    Status could be:
        'S': Solved
        'U': Unattempted
    """
    status_dict = {}

    for ques in Question.objects.filter(language=team.lang_pref):
        try:
            team.teammcqanswer_set.get(question_no=ques.question_no)
            status = 'S'
        except TeamMcqAnswer.DoesNotExist:
            status = 'U'

        status_dict[ques.question_no] = status

    return status_dict
项目:django-wechat-base    作者:ChanMo    | 项目源码 | 文件源码
def get_token(self):
        """Get wechat access token.Store in cache"""
        access_token = cache.get('wx_access_token')
        if access_token:
            return access_token
        else:
            param = {
                'grant_type': 'client_credential',
                'appid': self.appid,
                'secret': self.appsecret,
            }
            url = self.get_url('token', param)
            data = self.get_data(url)
            cache.set('wx_access_token', data['access_token'],\
                    int(data['expires_in']))
            return data['access_token']
项目:django-wechat-base    作者:ChanMo    | 项目源码 | 文件源码
def get_token(self):
        """Get wechat access token.Store in cache"""
        access_token = cache.get('wx_access_token')
        if access_token:
            return access_token
        else:
            param = {
                'grant_type': 'client_credential',
                'appid': self.appid,
                'secret': self.appsecret,
            }
            url = self.get_url('token', param)
            data = self.get_data(url)
            cache.set('wx_access_token', data['access_token'],\
                    int(data['expires_in']))
            return data['access_token']
项目:endorsementdb.com    作者:endorsementdb    | 项目源码 | 文件源码
def search_endorsers(request):
    query = request.GET.get('q')
    endorsers = []
    endorser_pks = set()
    if query:
        # First find the endorsers whose names start with this query.
        results = Endorser.objects.filter(name__istartswith=query)
        for endorser in results[:5]:
            endorser_pks.add(endorser.pk)
            endorsers.append(endorser)

        if results.count() < 5:
            results = Endorser.objects.filter(name__icontains=query)
            for endorser in results:
                if endorser.pk in endorser_pks:
                    continue

                endorsers.append(endorser)
                if len(endorsers) == 5:
                    break

    return JsonResponse({
        'endorsers': [{'pk': e.pk, 'name': e.name} for e in endorsers],
    })
项目:endorsementdb.com    作者:endorsementdb    | 项目源码 | 文件源码
def progress_wikipedia_missing(request, slug):
    if slug == NEWSPAPER_SLUG:
        endorsements = Endorsement.objects.filter(
            endorser__importednewspaper=None,
            endorser__tags=Tag.objects.get(name='Publication')
        )
    else:
        position = Position.objects.get(slug=SLUG_MAPPING[slug])
        endorsements = Endorsement.objects.filter(
            position=position,
            endorser__importedendorsement=None
        )

    context = {
        'slug': slug,
        'endorsements': endorsements,
    }
    return render(request, 'progress/wikipedia_missing.html', context)
项目:feincms3    作者:matthiask    | 项目源码 | 文件源码
def render(self, region, context, timeout=None):
        """render(self, region, context, *, timeout=None)
        Render a single region using the context passed

        If ``timeout`` is ``None`` caching is disabled.

        .. note::
           You should treat anything except for the ``region`` and ``context``
           argument as keyword-only.
        """
        if timeout is not None:
            key = self.cache_key(region)
            html = cache.get(key)
            if html is not None:
                return html

        html = mark_safe(''.join(
            self._renderer.render_plugin_in_context(plugin, context)
            for plugin in self._contents[region]
        ))

        if timeout is not None:
            cache.set(key, html, timeout=timeout)
        return html
项目:base_function    作者:Rockyzsu    | 项目源码 | 文件源码
def general_image(self, image_format='PNG'):
        fm_width = self.cleaned_data['width']
        fm_height = self.cleaned_data['height']

        key = '{}.{}.{}'.format(fm_width, fm_height, image_format)
        content = cache.get(key)
        if content is None:
            image = Image.new('RGB', (fm_width, fm_height), color=122)

            draw = ImageDraw.Draw(image)
            text = '{}x{}'.format(fm_width, fm_height)
            text_width, text_height = draw.textsize(text)

            if text_width < fm_width and text_height < fm_height:
                text_top = (fm_height - text_height) // 2
                text_left = (fm_width - text_width) // 2

                draw.text((text_top, text_left), text, fill=(255, 255, 255))

            content = BytesIO()
            image.save(content, image_format)
            content.seek(0)
            cache.set(key, content, 60 * 60)

        return content
项目:django-redis-sentinel-redux    作者:danigosa    | 项目源码 | 文件源码
def test_setnx(self):
        # we should ensure there is no test_key_nx in redis
        self.cache.delete("test_key_nx")
        res = self.cache.get("test_key_nx", None)
        self.assertEqual(res, None)

        res = self.cache.set("test_key_nx", 1, nx=True)
        self.assertTrue(res)
        # test that second set will have
        res = self.cache.set("test_key_nx", 2, nx=True)
        self.assertFalse(res)
        res = self.cache.get("test_key_nx")
        self.assertEqual(res, 1)

        self.cache.delete("test_key_nx")
        res = self.cache.get("test_key_nx", None)
        self.assertEqual(res, None)
项目:django-redis-sentinel-redux    作者:danigosa    | 项目源码 | 文件源码
def test_setnx_timeout(self):
        # test that timeout still works for nx=True
        res = self.cache.set("test_key_nx", 1, timeout=2, nx=True)
        self.assertTrue(res)
        time.sleep(3)
        res = self.cache.get("test_key_nx", None)
        self.assertEqual(res, None)

        # test that timeout will not affect key, if it was there
        self.cache.set("test_key_nx", 1)
        res = self.cache.set("test_key_nx", 2, timeout=2, nx=True)
        self.assertFalse(res)
        time.sleep(3)
        res = self.cache.get("test_key_nx", None)
        self.assertEqual(res, 1)

        self.cache.delete("test_key_nx")
        res = self.cache.get("test_key_nx", None)
        self.assertEqual(res, None)
项目:django-redis-sentinel-redux    作者:danigosa    | 项目源码 | 文件源码
def test_save_dict(self):
        if isinstance(self.cache.client._serializer,
                      json_serializer.JSONSerializer):
            self.skipTest("Datetimes are not JSON serializable")

        if isinstance(self.cache.client._serializer,
                      msgpack_serializer.MSGPackSerializer):
            # MSGPackSerializer serializers use the isoformat for datetimes
            # https://github.com/msgpack/msgpack-python/issues/12
            now_dt = datetime.datetime.now().isoformat()
        else:
            now_dt = datetime.datetime.now()

        test_dict = {"id": 1, "date": now_dt, "name": "Foo"}

        self.cache.set("test_key", test_dict)
        res = self.cache.get("test_key")

        self.assertIsInstance(res, dict)
        self.assertEqual(res["id"], 1)
        self.assertEqual(res["name"], "Foo")
        self.assertEqual(res["date"], now_dt)
项目:django-redis-sentinel-redux    作者:danigosa    | 项目源码 | 文件源码
def test_timeout_parameter_as_positional_argument(self):
        self.cache.set("test_key", 222, -1)
        res = self.cache.get("test_key", None)
        self.assertIsNone(res)

        self.cache.set("test_key", 222, 1)
        res1 = self.cache.get("test_key", None)
        time.sleep(2)
        res2 = self.cache.get("test_key", None)
        self.assertEqual(res1, 222)
        self.assertEqual(res2, None)

        # nx=True should not overwrite expire of key already in db
        self.cache.set("test_key", 222, 0)
        self.cache.set("test_key", 222, -1, nx=True)
        res = self.cache.get("test_key", None)
        self.assertEqual(res, 222)
项目:django-redis-sentinel-redux    作者:danigosa    | 项目源码 | 文件源码
def test_sentinel_switching(self):
        if not isinstance(self.cache.client,
                          SentinelClient):
            self.skipTest("Not Sentinel clients use default master-slave setup")
        try:
            cache = caches["sample"]
            client = cache.client
            master = client.get_client(write=True)
            slave = client.get_client(write=False)

            master.set("Foo", "Bar")
            self.assertEqual(slave.get("Foo"), "Bar")
            self.assertEqual(master.info()['role'], "master")
            self.assertEqual(slave.info()['role'], "slave")
        except NotImplementedError:
            pass
项目:django-redis-sentinel-redux    作者:danigosa    | 项目源码 | 文件源码
def test_invalid_key(self):
        # Submitting an invalid session key (either by guessing, or if the db has
        # removed the key) results in a new key being generated.
        try:
            session = self.backend('1')
            try:
                session.save()
            except AttributeError:
                self.fail(
                    "The session object did not save properly. "
                    "Middleware may be saving cache items without namespaces."
                )
            self.assertNotEqual(session.session_key, '1')
            self.assertEqual(session.get('cat'), None)
            session.delete()
        finally:
            # Some backends leave a stale cache entry for the invalid
            # session key; make sure that entry is manually deleted
            session.delete('1')
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def query_user_by_id(user_id=0, use_cache=True):
        """
        ????????
        :param user_id: ???ID
        :param use_cache: ??????
        """
        key = CACHE_KEY+str(user_id)
        if use_cache:
            account = cache.get(key)
            if account:
                return account
        try:
            account = UserAccount.objects.get(id=user_id)
            cache.set(key, account, CACHE_TIME)
            return account
        except UserAccount.DoesNotExist:
            return None
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def query_token_by_user_id(user_id, use_cache=True):
        """
        ?????ID?????Token
        :param user_id: ??ID
        :param use_cache: ??????
        """
        key = CACHE_TOKEN_ID+str(user_id)
        if use_cache:
            token = cache.get(key)
            if token:
                return token
        try:
            token = AccessToken.objects.order_by("-id").filter(status=1).get(user_id=user_id)
            cache.set(key, token, CACHE_TIME)
            return token
        except AccessToken.DoesNotExist:
            return None
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def query_token(token, use_cache=True):
        """
        ????token??????
        """
        key = CACHE_TOKEN+token
        if use_cache:
            token_result = cache.get(key)
            if token_result:
                return token_result

        try:
            token = AccessToken.objects.order_by("-id").filter(status=1).get(access_token=token)
            cache.set(key, token, CACHE_TIME)
            return token
        except AccessToken.DoesNotExist:
            return None
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def query_user_meta_count(user_id, is_follow=True, use_cache=True):
        """
        ???????????????
        :param user_id: ???????ID
        :param is_follow: ???True???????????????
        :param use_cache: ??????
        """
        cache_key = CACHE_FANS_COUNT_KEY + str(user_id)
        if is_follow:
            cache_key = CACHE_FOLLOW_COUNT_KEY + str(user_id)

        if use_cache:
            count = cache.get(cache_key)
            if count:
                return count

        if is_follow:
            count = UserFollow.objects.filter(user_id=user_id, status=1).aggregate(Count("id"))
        else:
            count = UserFollow.objects.filter(follow_user=user_id, status=1).aggregate(Count("id"))

        count = count["id__count"]
        cache.set(cache_key, count, CACHE_COUNT_TIME)

        return count
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def query_format_info_by_ease_mob(ease_mob, use_cache=True):
        """
        ?????????????????
        """
        key = CACHE_EASE_MOB_KEY + ease_mob
        if use_cache:
            result = cache.get(key)
            if result:
                return result

        try:
            user_info = UserInfo.objects.get(ease_mob=ease_mob)
            format_user_info = UserInfo.format_user_info(user_info)
            cache.set(key, format_user_info, CACHE_TIME)
            return format_user_info
        except UserInfo.DoesNotExist:
            return None
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def query_format_info_by_user_id(user_id, use_cache=True):
        """
        ????ID??????
        :param user_id: ??ID
        :param use_cache: ??????
        """
        key = CACHE_KEY + str(user_id)
        if use_cache:
            result = cache.get(key)
            if result:
                return result

        try:
            user_info = UserInfo.objects.get(user_id=user_id)
            format_user_info = UserInfo.format_user_info(user_info)
            cache.set(key, format_user_info, CACHE_TIME)
            return format_user_info
        except UserInfo.DoesNotExist:
            return None
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def query_published_article_count(user_id, use_cache=True):
        """
        ??????????????
        :param user_id: ??ID
        :param use_cache: ??????
        """
        cache_key = CACHE_ARTICLE_COUNT + str(user_id)
        if use_cache:
            count = cache.get(cache_key)
            if count:
                return count

        count = BlogArticle.objects.filter(user_id=user_id, status=1).aggregate(Count("id"))
        count = count["id__count"]
        if count:
            cache.set(cache_key, count, CACHE_TIME)
        return count
项目:pony    作者:Eastwu5788    | 项目源码 | 文件源码
def query_article_by_id(article_id=0, use_cache=True):
        """
        ????ID???????
        :param article_id: ??ID
        :param use_cache: ??????
        """
        key = CACHE_KEY_ID + str(article_id)
        if use_cache:
            cache_result = cache.get(key)
            if cache_result:
                return cache_result

        try:
            article = BlogArticle.objects.get(id=article_id)
            article = BlogArticle.format_article(article)
            cache.set(key, article, CACHE_TIME)
            return article
        except BlogArticle.DoesNotExist:
            return None
项目:steemprojects.com    作者:noisy    | 项目源码 | 文件源码
def commits_over_52(self):
        cache_name = self.cache_namer(self.commits_over_52)
        value = cache.get(cache_name)
        if value is not None:
            return value
        now = datetime.now()
        commits = self.commit_set.filter(
            commit_date__gt=now - timedelta(weeks=52),
        ).values_list('commit_date', flat=True)

        weeks = [0] * 52
        for cdate in commits:
            age_weeks = (now - cdate).days // 7
            if age_weeks < 52:
                weeks[age_weeks] += 1

        value = ','.join(map(str, reversed(weeks)))
        cache.set(cache_name, value)
        return value
项目:django-cmsplugin-diff    作者:doctormo    | 项目源码 | 文件源码
def record_plugin_history(self, sender, instance, **kwargs):
        """When a plugin is created or edited"""
        from cms.models import CMSPlugin, Page
        from .models import EditHistory
        if not isinstance(instance, CMSPlugin):
            return

        user_id = cache.get('cms-user-id')
        comment = cache.get('cms-comment')
        content = generate_content(instance)
        if content is None:
            return

        # Don't record a history of change if nothing changed.
        history = EditHistory.objects.filter(plugin_id=instance.id)
        if history.count() > 0:
            # Temporary history object for uuid
            this = EditHistory(content=content)
            latest = history.latest()
            if latest.content == content or this.uuid == latest.uuid:
                return

        EditHistory.objects.record(instance, user_id, comment, content)
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def get_token(self, token_only=True, scopes=None):
        if scopes is None:
            scopes = ['send_notification', 'view_room']

        cache_key = 'hipchat-tokens:%s:%s' % (self.id, ','.join(scopes))

        def gen_token():
            data = {
                'grant_type': 'client_credentials',
                'scope': ' '.join(scopes),
            }
            resp = requests.post(
                self.token_url, data=data, auth=HTTPBasicAuth(self.id, self.secret), timeout=10
            )
            if resp.status_code == 200:
                return resp.json()
            elif resp.status_code == 401:
                raise OauthClientInvalidError(self)
            else:
                raise Exception('Invalid token: %s' % resp.text)

        if token_only:
            token = cache.get(cache_key)
            if not token:
                data = gen_token()
                token = data['access_token']
                cache.set(cache_key, token, data['expires_in'] - 20)
            return token
        return gen_token()
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def update_room_info(self, commit=True):
        headers = {
            'Authorization': 'Bearer %s' % self.get_token(),
            'Content-Type': 'application/json'
        }
        room = requests.get(
            urljoin(self.api_base_url, 'room/%s') % self.room_id, headers=headers, timeout=5
        ).json()
        self.room_name = room['name']
        self.room_owner_id = six.text_type(room['owner']['id'])
        self.room_owner_name = room['owner']['name']
        if commit:
            self.save()
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def __exit__(self, exc_type, exc_value, tb):
        # If we get an invalid oauth client we better clean up the tenant
        # and swallow the error.
        if isinstance(exc_value, OauthClientInvalidError):
            self.tenant.delete()
            return True
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def for_request(request, body=None):
        """Creates the context for a specific request."""
        tenant, jwt_data = Tenant.objects.for_request(request, body)
        webhook_sender_id = jwt_data.get('sub')
        sender_data = None

        if body and 'item' in body:
            if 'sender' in body['item']:
                sender_data = body['item']['sender']
            elif 'message' in body['item'] and 'from' in body['item']['message']:
                sender_data = body['item']['message']['from']

        if sender_data is None:
            if webhook_sender_id is None:
                raise BadTenantError('Cannot identify sender in tenant')
            sender_data = {'id': webhook_sender_id}

        return Context(
            tenant=tenant,
            sender=HipchatUser(
                id=sender_data.get('id'),
                name=sender_data.get('name'),
                mention_name=sender_data.get('mention_name'),
            ),
            signed_request=request.GET.get('signed_request'),
            context=jwt_data.get('context') or {},
        )
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def get_event(self, event_id):
        try:
            event = Event.objects.get(pk=int(event_id))
        except (ValueError, Event.DoesNotExist):
            return None
        return self._ensure_and_bind_event(event)
项目:django-powerpages    作者:Open-E-WEB    | 项目源码 | 文件源码
def process_request(self, request, extra_context=None):
        """Main page processing logic"""
        context = self.get_rendering_context(request)
        if extra_context:
            context.update(extra_context)
        cache_key, seconds = self.get_cache_settings(request)
        if cache_key:
            content = cache.get(cache_key)
            if content is None:
                content = self.render(context)
                cache.set(cache_key, content, seconds)
        else:
            content = self.render(context)
        return self.create_response(request, content)