Python django.conf.settings 模块,TESTING 实例源码

我们从Python开源项目中,提取了以下24个代码示例,用于说明如何使用django.conf.settings.TESTING

项目:Server    作者:malaonline    | 项目源码 | 文件源码
def test_kuailexue_api2(self):
        # return # ?kuailexue????????
        settings.TESTING = False
        test_stu_id = 'test_student_1'
        test_stu_name = '????1'
        test_tea_id = 'debug_210_7VQy5'
        test_teat_name = '????1'
        test_teat_pswd = '892673'
        klx_stu = klx.klx_register(klx.KLX_ROLE_STUDENT, test_stu_id, test_stu_name)
        _console.info(klx_stu)
        self.assertIsNotNone(klx_stu)
        klx_tea = klx.klx_register(klx.KLX_ROLE_TEACHER, test_tea_id, test_teat_name, test_teat_pswd)
        _console.info(klx_tea)
        self.assertIsNotNone(klx_tea)
        ok = klx.klx_relation(klx_tea, klx_stu)
        self.assertTrue(ok)
项目:open-ledger    作者:creativecommons    | 项目源码 | 文件源码
def clean_description(self):
        desc = self.cleaned_data['description']
        if settings.TESTING:
            check_spam = False
        else:
            akismet = Akismet(settings.AKISMET_KEY, blog="CC Search")
            check_spam = akismet.check(self.request.get_host(),
                                  user_agent=self.request.META.get('user-agent'),
                                  comment_author=self.request.user.username,
                                  comment_content=desc)
        wordfilter = Wordfilter()
        check_words = wordfilter.blacklisted(desc)
        if check_spam or check_words:
            raise forms.ValidationError("This description failed our spam or profanity check; the description has not been updated.")

        return desc
项目:home-data-api    作者:data-skeptic    | 项目源码 | 文件源码
def create_profile(sender, instance, **kwargs):
    new_account = False
    if kwargs['created']:
        profile = Profile.objects.create(user=instance)
        new_account = True
    else:
        profile = Profile.objects.filter(user=instance)
        if len(profile) == 0:
            profile = Profile.objects.create(user=instance)
            new_account = True

    if new_account:
        profile.update_code()
        link_text = settings.HOSTNAME + profile.get_confirmation_link()
        if not settings.TESTING:
            send_signup_email(instance.email, link_text)
项目:home-data-api    作者:data-skeptic    | 项目源码 | 文件源码
def save(self, *args, **kwargs):
        """Save the model after geocoding the supplied address.

        Here the address is geocoded using the Google geocoding service.
        This is limited to 2500 requests per day. There is no current system
        to account for this so models over 2500 will not geocode.
        """
        if not settings.TESTING:
            response = requests.get(
                    'https://maps.googleapis.com/maps/api/geocode/json',
                    params={'address': self.raw_address,
                            'key': settings.G_APPS_KEY})
            if response.status_code == 200:
                address = Address()
                address.raw = response.text
                address.from_google_json(response.json())
                address.save()
                self.address_object = address
        super(Property, self).save(*args, **kwargs)
项目:DjangoBlog    作者:liangliangyy    | 项目源码 | 文件源码
def article_save_callback(sender, **kwargs):
    id = kwargs['id']
    is_update_views = kwargs['is_update_views']
    type = sender.__name__
    obj = None
    from blog.models import Article, Category, Tag
    if type == 'Article':
        obj = Article.objects.get(id=id)
    elif type == 'Category':
        obj = Category.objects.get(id=id)
    elif type == 'Tag':
        obj = Tag.objects.get(id=id)
    if obj is not None:
        if not settings.TESTING and not is_update_views:
            try:
                notify_url = obj.get_full_url()
                SpiderNotify.baidu_notify([notify_url])
            except Exception as ex:
                logger.error("notify sipder", ex)
                print(ex)
项目:montage    作者:storyful    | 项目源码 | 文件源码
def _get_image_url(self, image_gcs_filename):
        """
            Takes the filename of an image stored in Cloud Storage
            and calls the images API to serve it from there.

            Returns the images API serving URL

            No longer used.
        """
        if settings.TESTING or not settings.DEBUG:
            if image_gcs_filename:
                return get_gcs_image_serving_url(
                    image_gcs_filename)
        else:
            # Little local dev hack. We're using the Images API with GCS.
            # This doesn't work locally so instead we pass the image dataUri
            # and use it directly
            return image_gcs_filename
项目:django_rest_example    作者:devslaw    | 项目源码 | 文件源码
def __basic_publish(self):
        """
        Publish message to exchange
        :return:
        """

        if not django_settings.TESTING:
            body = json.dumps(self.body)

            self.channel.basic_publish(
                exchange=self.exchange_name,
                routing_key=self.routing_key,
                body=body,
            )
            print(" [x] Sent %r" % body)
        self.connection.close()
项目:Server    作者:malaonline    | 项目源码 | 文件源码
def add_region(apps, schema_editor):
    if settings.TESTING:
        data_file = "regions_for_test.json"
    else:
        data_file = "regions.txt"
    regions = json.load(open(
        os.path.join(os.path.dirname(__file__), data_file)),
        object_pairs_hook=OrderedDict)
    dfs(apps, regions, 1)
项目:Server    作者:malaonline    | 项目源码 | 文件源码
def get_one_subject_report(self, the_subject, parent, params):
        s_name = the_subject.name
        s_name_en = klx.klx_subject_name(s_name)
        url = klx.KLX_STUDY_URL_FMT.format(subject=s_name_en)
        ans_data = {'subject_id': the_subject.id}
        if settings.TESTING:
            return JsonResponse(ans_data)
        # query the last order
        last_order = models.Order.objects.filter(
                parent=parent, status=models.Order.PAID,
                subject=the_subject).order_by('-created_at').first()
        if not last_order:
            return HttpResponse(status=404)  # Have not joined the course
        ans_data['grade_id'] = last_order.grade_id
        # ???????????
        ans_data.update(self._get_total_nums(url, params))
        # ?????????????????
        ans_data.update(self._get_exercise_total_nums(url, params))

        # ???????
        ans_data['error_rates'] = self._get_error_rates(url, params)
        # ?????????
        ans_data['month_trend'] = self._get_month_trend(url, params)
        # ??????/????????
        ans_data['knowledges_accuracy'] = self._get_knowledges_accuracy(
                url, params)
        # ??????
        ans_data['abilities'] = self._get_abilities(
            url, params, klx.KLX_MATH_ABILITY_KEYS)
        # ?????(?????????????????????)
        ans_data['score_analyses'] = self._get_score_analyses(url, params)

        settings.DEBUG and logger.debug(json.dumps(ans_data))
        return JsonResponse(ans_data)
项目:Server    作者:malaonline    | 项目源码 | 文件源码
def _get_auth_redirect_url(request, state):
    if settings.TESTING:
        return reverse('wechat:phone_page') + '?state=' + str(state)
    checkPhoneURI = get_server_host(request) + reverse('wechat:check_phone')
    params_str = {
        # 'redirect_uri': checkPhoneURI,
        'response_type': "code",
        'scope': "snsapi_base",
        'state': state,
        'connect_redirect': "1"
    }
    redirect_url = wx.WX_AUTH_URL + '&' + urlencode(
        params_str) + '&redirect_uri=' + checkPhoneURI + '#wechat_redirect'
    return redirect_url
项目:Server    作者:malaonline    | 项目源码 | 文件源码
def get_parent(self, request):
        parent = _get_parent(request)
        if parent is None and settings.TESTING:
            # the below line is only for testing
            parent = models.Parent.objects.get(pk=3)
            parent.user.backend = _get_default_bankend_path()
            login(request, parent.user)
        return parent
项目:Server    作者:malaonline    | 项目源码 | 文件源码
def verify_order(self, request):
        # get request params
        prepay_id = request.POST.get('prepay_id')
        order_id = request.POST.get('order_id')
        if settings.TESTING:
            ret_code = set_order_paid(order_id=order_id)
            if ret_code == 1:
                return JsonResponse(
                    {'ok': False, 'msg': FAIL_HINT_MSG, 'code': 4})
            return JsonResponse({'ok': True, 'msg': '', 'code': 0})
        query_ret = wx.wx_pay_order_query(order_id=order_id)
        if query_ret['ok']:
            trade_state = query_ret['data']['trade_state']
            if trade_state == wx.WX_SUCCESS:
                # ????, ????????, ????????
                try:
                    ret_code = set_order_paid(prepay_id=prepay_id,
                                              order_id=query_ret['data'][
                                                  'out_trade_no'],
                                              open_id=query_ret['data'][
                                                  'openid'])
                    if ret_code == 1:
                        return JsonResponse(
                            {'ok': False, 'msg': FAIL_HINT_MSG, 'code': 4})
                except (OrderStatusIncorrect, RefundError):
                    return JsonResponse(
                        {'ok': False, 'msg': FAIL_HINT_MSG, 'code': 4})
                except Exception as ex:
                    logger.exception(ex)
                    return JsonResponse(
                        {'ok': False, 'msg': '????, ?????', 'code': 5})
                return JsonResponse({'ok': True, 'msg': '', 'code': 0})
            else:
                if trade_state == wx.WX_PAYERROR:
                    return {'ok': False, 'msg': '????', 'code': 2}
                else:
                    return {'ok': False, 'msg': '???', 'code': 3}
        else:
            return {'ok': False, 'msg': query_ret['msg'], 'code': 1}
项目:Server    作者:malaonline    | 项目源码 | 文件源码
def _get_wx_jsapi_ticket(access_token):
    if settings.TESTING:
        return "", ""
    jsapi_ticket = _get_wx_jsapi_ticket_from_db()
    msg = None
    if not jsapi_ticket:
        result = wx.wx_get_jsapi_ticket(access_token)
        if result['ok']:
            jsapi_ticket = result['ticket']
        else:
            msg = result['msg']
    return jsapi_ticket, msg
项目:Server    作者:malaonline    | 项目源码 | 文件源码
def _get_wx_token():
    if settings.TESTING:
        return "", ""
    token = _get_wx_token_from_db()
    msg = None
    if not token:
        result = wx.wx_get_token()
        if result['ok']:
            token = result['token']
        else:
            msg = result['msg']
    return token, msg
项目:open-ledger    作者:creativecommons    | 项目源码 | 文件源码
def update_search_index(sender, instance, **kwargs):
    """When an Image instance is saved, tell the search engine about it."""
    if not settings.TESTING:
        _update_search_index(instance)
项目:handelsregister    作者:Amsterdam    | 项目源码 | 文件源码
def autocomplete_queries(self, query_string):
        """provide autocomplete suggestions"""

        analyzer = InputQAnalyzer(query_string)

        query_components = [
            vestiging_query(analyzer),
            mac_query(analyzer)
        ]

        result_data = []

        # Ignoring cache in case debug is on
        ignore_cache = settings.TESTING

        # create elk queries
        for q in query_components:  # type: ElasticQueryWrapper

            search = q.to_elasticsearch_object(self.client)

            # get the result from elastic
            try:
                result = search.execute(ignore_cache=ignore_cache)
            except:
                log.exception('FAILED ELK SEARCH: %s',
                              json.dumps(search.to_dict(), indent=2))
                continue
            # Get the datas!
            result_data.append(result)

        return result_data
项目:django-ios-notifications    作者:nnsnodnb    | 项目源码 | 文件源码
def setup_test_environment(self, **kwargs):
        super(TestRunner, self).setup_test_environment(**kwargs)
        settings.TESTING = True
项目:django-ios-notifications    作者:nnsnodnb    | 项目源码 | 文件源码
def teardown_test_environment(self, **kwargs):
        super(TestRunner, self).teardown_test_environment(**kwargs)
        settings.TESTING = True
项目:DjangoBlog    作者:liangliangyy    | 项目源码 | 文件源码
def handler(self):
        info = self.message.content

        if self.userinfo.isAdmin and info.upper() == 'EXIT':
            self.userinfo = WxUserInfo()
            self.savesession()
            return "????"
        if info.upper() == 'ADMIN':
            self.userinfo.isAdmin = True
            self.savesession()
            return "???????"
        if self.userinfo.isAdmin and not self.userinfo.isPasswordSet:
            passwd = settings.WXADMIN
            if settings.TESTING:
                passwd='123'
            if passwd.upper() == get_md5(get_md5(info)).upper():
                self.userinfo.isPasswordSet = True
                self.savesession()
                return "????,???????????????:??helpme????"
            else:
                if self.userinfo.Count >= 3:
                    self.userinfo = WxUserInfo()
                    self.savesession()
                    return "??????"
                self.userinfo.Count += 1
                self.savesession()
                return "???????????????:"
        if self.userinfo.isAdmin and self.userinfo.isPasswordSet:
            if self.userinfo.Command != '' and info.upper() == 'Y':
                return cmdhandler.run(self.userinfo.Command)
            else:
                if info.upper() == 'HELPME':
                    return cmdhandler.get_help()
                self.userinfo.Command = info
                self.savesession()
                return "????: " + info + " ???"
        rsp = tuling.getdata(info)
        return rsp
项目:DjangoBlog    作者:liangliangyy    | 项目源码 | 文件源码
def fileupload(request):
    if request.method == 'POST':
        response = []
        for filename in request.FILES:
            timestr = datetime.datetime.now().strftime('%Y/%m/%d')
            imgextensions = ['jpg', 'png', 'jpeg', 'bmp']
            fname = u''.join(str(filename))

            isimage = len([i for i in imgextensions if fname.find(i) >= 0]) > 0

            basepath = r'/var/www/resource/{type}/{timestr}'.format(
                type='files' if not isimage else'image', timestr=timestr)
            if settings.TESTING:
                basepath = settings.BASE_DIR + '/uploads'
            url = 'https://resource.lylinux.net/{type}/{timestr}/{filename}'.format(
                type='files' if not isimage else'image', timestr=timestr, filename=filename)
            if not os.path.exists(basepath):
                os.makedirs(basepath)
            savepath = os.path.join(basepath, filename)
            with open(savepath, 'wb+') as wfile:
                for chunk in request.FILES[filename].chunks():
                    wfile.write(chunk)
            if isimage:
                from PIL import Image
                image = Image.open(savepath)
                image.save(savepath, quality=20, optimize=True)
            response.append(url)
        return HttpResponse(response)

    else:
        return HttpResponse("only for post")
项目:occasions-server    作者:travisbloom    | 项目源码 | 文件源码
def mutate_and_get_payload(cls, input, context, info):
        if not context.user.stripe_user_id:
            error_msg = 'User tried to create a transaction before they have a stripe account'
            logger.warning(error_msg, extra={'request': context})
            raise MutationException(error_msg)
        formatted_input = convert_input_global_ids_to_pks(input)
        serializer = CreateTransactionSerializer(data=formatted_input, context={
            'user': context.user,
            'receiving_person_id': formatted_input.get('receiving_person_id')
        })
        serializer.is_valid(raise_exception=True)

        product = Product.objects.get(pk=formatted_input.get('product_id'))
        associated_event = AssociatedEvent.objects.get(pk=formatted_input.get('associated_event_id'))
        transaction = Transaction(
            **formatted_input,
            associated_event_date=associated_event.event.next_date,
            cost_usd=product.cost_usd,
            user=context.user)
        transaction.save()
        if settings.TESTING:
            mock_create_stripe_charge(
                user=context.user,
                transaction=transaction,
                request=context
            )
        else:
            create_stripe_charge(
                user=context.user,
                transaction=transaction,
                request=context
            )
        return CreateTransaction(transaction=transaction)
项目:django-query-logger    作者:4word    | 项目源码 | 文件源码
def __init__(self, **kwargs):
        self.connection_name = kwargs.get('connection_name',
                                          getattr(settings, 'LOG_QUERY_DATABASE_CONNECTION', 'default'))
        self.log_duplicate_queries = kwargs.get('log_duplicate_queries',
                                                getattr(settings, 'LOG_QUERY_DUPLICATE_QUERIES', True))
        self.log_tracebacks = kwargs.get('log_tracebacks',
                                         getattr(settings, 'LOG_QUERY_TRACEBACKS', False))
        self.log_long_running_time = kwargs.get('log_long_running_time',
                                                getattr(settings, 'LOG_QUERY_TIME_ABSOLUTE_LIMIT', 1000))

        self.logging_extras = kwargs.get('logging_extra_dict', {})

        # This is for internal testing only. If you add unit tests yourself for the query debbuging mixin, then you can
        # define a settings.TESTING variable that is True when unit tests are running.
        self.testing = getattr(settings, 'TESTING', False)
项目:Server    作者:malaonline    | 项目源码 | 文件源码
def get_subjects_summary(self, parent, params):
        subjects_list = []
        if settings.TESTING:
            return JsonResponse({'results': subjects_list})
        purchased_subjects = []
        # get subject from order
        ordered_subjects = models.Order.objects.filter(
                parent=parent, status=models.Order.PAID).values(
                        'subject', 'grade', 'created_at').order_by(
                                '-created_at')
        for tmp_order in ordered_subjects:
            # only support math presently
            tmp_subject = models.Subject.objects.get(id=tmp_order['subject'])
            s_name = tmp_subject.name
            if s_name in purchased_subjects:
                continue
            purchased_subjects.append(s_name)
            if s_name in klx.KLX_REPORT_SUBJECTS:
                s_name_en = klx.klx_subject_name(s_name)
                url = klx.KLX_STUDY_URL_FMT.format(subject=s_name_en)
                subject_data = {
                        'subject_id': tmp_subject.id, 'supported': True,
                        'purchased': True, 'grade_id': tmp_order['grade']}
                # ???????????
                subject_data.update(self._get_total_nums(url, params))
                subjects_list.append(subject_data)
            else:
                subjects_list.append({
                    'subject_id': tmp_subject.id,
                    'purchased': True,
                    'grade_id': tmp_order['grade'],
                    'supported': False,
                })
        # subjects supported, but user did not purchase
        should_buy_subjects = [b for b in klx.KLX_REPORT_SUBJECTS
                               if b not in purchased_subjects]
        if should_buy_subjects:
            to_buy_subjects = models.Subject.objects.filter(
                    name__in=should_buy_subjects)
            for s in to_buy_subjects:
                subjects_list.append({
                    'subject_id': s.id,
                    'supported': True,
                    'purchased': False,
                })
        settings.DEBUG and logger.debug(json.dumps(subjects_list))
        return JsonResponse({'results': subjects_list})
项目:Server    作者:malaonline    | 项目源码 | 文件源码
def klx_register(role, uid, name, password=None, subject=None):
    '''

    :param role: 1???? 2???
    :param uid: models.User.id
    :param name: Student Name or Teacher Name
    :param password: ??123456
    :param subject: ?role??????????
    :return: kuailexue username
    '''
    klx_url = settings.KUAILEXUE_SERVER + '/third-partner/register'
    # _logger.debug(klx_url)
    params = {
        'role': role,
        'uid': uid,
        'name': name,
    }
    if password is not None:
        params['password'] = password
    if subject is not None:
        params['subject'] = subject
    params = klx_build_params(params, True)
    if settings.TESTING:
        return klx_verify_sign(params) and '1' or '0'
    # _logger.debug(params)
    try:
        resp = requests.post(klx_url, data=params, timeout=10)
    except Exception as err:
        _logger.error('cannot reach kuailexue server')
        _logger.exception(err)
        return None
    if resp.status_code != 200:
        _logger.error('cannot reach kuailexue server, http_status is %s' % (resp.status_code))
        # raise KuailexueServerError('cannot reach kuailexue server, http_status is %s' % (resp.status_code))
        return None
    if _get_is_cold_testing():
        _console.warning(klx_url+'?'+urllib.parse.urlencode(params))
    ret_json = json.loads(resp.content.decode('utf-8'))
    if _get_is_cold_testing():
        _console.info(ret_json)
    if ret_json.get('data') is not None: # code == 0, ??????code != 0
        ret_data = ret_json.get('data')
        return ret_data.get('username')  # (????)??????? KUAILEXUE_PARTNER+uid+'_'+${YYYY}
    else:
        req_url = klx_url+'?'+urllib.parse.urlencode(params)
        _logger.error('kuailexue reponse data error, CODE: %s, MSG: %s. (URL=%s)' % (ret_json.get('code'), ret_json.get('message'), req_url))
        # raise KuailexueDataError('get kuailexue wrong data, CODE: %s, MSG: %s' % (ret_json.get('code'), ret_json.get('message')))
        return None