Python django.core.serializers.json 模块,DjangoJSONEncoder() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用django.core.serializers.json.DjangoJSONEncoder()

项目:Play.Excel    作者:abhijithasokan    | 项目源码 | 文件源码
def portfolioDataPush():
    for userid in redis_conn.hkeys("online-users"):
        userid = userid.decode("utf-8")
        try:
            portfolioData = portfolio(userid)
            Channel( redis_conn.hget("online-users",userid)).send(
                {
                'text': json.dumps(portfolioData,cls=DjangoJSONEncoder)
                })
        except:
            print("Error in portfolioPush")




#======================= SELL CHANNEL ==========================#
项目:SPBU-DBMS-Project    作者:Betekhtin    | 项目源码 | 文件源码
def history(request):
    username = auth.get_user(request).username
    if (username):
        ?t = city.objects.all().values()
        co = country.objects.all().values()

        city_json = json.dumps(list(?t), cls=DjangoJSONEncoder,ensure_ascii=False)
        country_json = json.dumps(list(co), cls=DjangoJSONEncoder,ensure_ascii=False)
        args={}
        args['city']=city_json
        args['country'] = country_json
        args['max_date'] = []
        for i in ?t:
            args['max_date'].append((temperature.objects.filter(city_id__exact=i['city_id']).latest('date').date))
        return render_to_response("history.html",args)
    else:
        return redirect("/login")
项目:SPBU-DBMS-Project    作者:Betekhtin    | 项目源码 | 文件源码
def forecast(request):
    username = auth.get_user(request).username
    if (username):
        ?t = city.objects.all().values()
        co = country.objects.all().values()

        city_json = json.dumps(list(?t), cls=DjangoJSONEncoder,ensure_ascii=False)
        country_json = json.dumps(list(co), cls=DjangoJSONEncoder,ensure_ascii=False)
        args={}
        args['city']=city_json
        args['country'] = country_json
        args['max_date']=[]
        for i in ?t:
            args['max_date'].append((temperature.objects.filter(city_id__exact = i['city_id']).latest('date').date))

        #args['max_date'] = (temperature.objects.filter(city_id__exact=city).latest('date').date)
        return render_to_response("forecast.html",args)
    else:
        return redirect("/login")
项目:django-eventstream    作者:fanout    | 项目源码 | 文件源码
def messages(request, room_id):
    if request.method == 'POST':
        try:
            room = ChatRoom.objects.get(eid=room_id)
        except ChatRoom.DoesNotExist:
            try:
                room = ChatRoom(eid=room_id)
                room.save()
            except IntegrityError:
                # someone else made the room. no problem
                room = ChatRoom.objects.get(eid=room_id)

        mfrom = request.POST['from']
        text = request.POST['text']
        with transaction.atomic():
            msg = ChatMessage(room=room, user=mfrom, text=text)
            msg.save()
            send_event('room-%s' % room_id, 'message', msg.to_data())
        body = json.dumps(msg.to_data(), cls=DjangoJSONEncoder)
        return HttpResponse(body, content_type='application/json')
    else:
        return HttpResponseNotAllowed(['POST'])
项目:django-eventstream    作者:fanout    | 项目源码 | 文件源码
def append_event(self, channel, event_type, data):
        from . import models

        db_event = models.Event(
            channel=channel,
            type=event_type,
            data=json.dumps(data, cls=DjangoJSONEncoder))
        db_event.save()

        self.trim_event_log()

        e = Event(
            db_event.channel,
            db_event.type,
            data,
            id=db_event.eid)

        return e
项目:django-modern-rpc    作者:alorence    | 项目源码 | 文件源码
def test_jsonrpc_invalid_request_1(live_server):

    # Missing 'method' in payload

    headers = {'content-type': 'application/json'}
    payload = {
        # "method": 'add',
        "params": [5, 6],
        "jsonrpc": "2.0",
        "id": random.randint(1, 1000),
    }
    req_data = json.dumps(payload, cls=DjangoJSONEncoder)
    response = requests.post(live_server.url + '/all-rpc/', data=req_data, headers=headers).json()

    assert 'Missing parameter "method"' in response['error']['message']
    assert RPC_INVALID_REQUEST == response['error']['code']
项目:django-modern-rpc    作者:alorence    | 项目源码 | 文件源码
def test_jsonrpc_invalid_request_2(live_server):

    # Missing 'jsonrpc' in payload

    headers = {'content-type': 'application/json'}
    payload = {
        "method": 'add',
        "params": [5, 6],
        # "jsonrpc": "2.0",
        "id": random.randint(1, 1000),
    }
    req_data = json.dumps(payload, cls=DjangoJSONEncoder)
    response = requests.post(live_server.url + '/all-rpc/', data=req_data, headers=headers).json()

    assert 'Missing parameter "jsonrpc"' in response['error']['message']
    assert RPC_INVALID_REQUEST == response['error']['code']
项目:django-modern-rpc    作者:alorence    | 项目源码 | 文件源码
def test_jsonrpc_invalid_request_3(live_server):

    # Bad value for payload member 'jsonrpc'

    headers = {'content-type': 'application/json'}
    payload = {
        "method": 'add',
        "params": [5, 6],
        "jsonrpc": "1.0",
        "id": random.randint(1, 1000),
    }
    req_data = json.dumps(payload, cls=DjangoJSONEncoder)
    response = requests.post(live_server.url + '/all-rpc/', data=req_data, headers=headers).json()

    assert 'The attribute "jsonrpc" must contain "2.0"' in response['error']['message']
    assert RPC_INVALID_REQUEST == response['error']['code']
项目:django-twilio-tfa    作者:rtindru    | 项目源码 | 文件源码
def serialize_instance(instance):
    """
    Since Django 1.6 items added to the session are no longer pickled,
    but JSON encoded by default. We are storing partially complete models
    in the session (user, account, token, ...). We cannot use standard
    Django serialization, as these are models are not "complete" yet.
    Serialization will start complaining about missing relations et al.
    """
    data = {}
    for k, v in instance.__dict__.items():
        if k.startswith('_') or callable(v):
            continue
        try:
            if isinstance(instance._meta.get_field(k), BinaryField):
                v = force_text(base64.b64encode(v))
        except FieldDoesNotExist:
            pass
        data[k] = v
    return json.loads(json.dumps(data, cls=DjangoJSONEncoder))
项目:gwells    作者:bcgov    | 项目源码 | 文件源码
def common_well_search(request):
    """
        Returns json and array data for a well search.  Used by both the map and text search.
    """
    well_results = None
    well_results_json = '[]'

    form = SearchForm(request.GET)
    if form.is_valid():
        well_results = form.process()

    if well_results and not len(well_results) > SearchForm.WELL_RESULTS_LIMIT:
        well_results_json = json.dumps(
            [well.as_dict() for well in well_results],
            cls=DjangoJSONEncoder)
    return form, well_results, well_results_json
项目:django-chemtrails    作者:inonit    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs):
        """
        :param dump_kwargs: Keyword arguments which will be passed to `json.dumps()`
        :param load_kwargs: Keyword arguments which will be passed to `json.loads()`
        """
        self.dump_kwargs = kwargs.pop('dump_kwargs', {
            'cls': DjangoJSONEncoder,
            'ensure_ascii': False,
            'sort_keys': False,
            'separators': (',', ':')
        })
        self.load_kwargs = kwargs.pop('load_kwargs', {
            'object_pairs_hook': OrderedDict
        })
        if not kwargs.get('null', False):
            kwargs['default'] = kwargs.get('default', defaultdict(OrderedDict))
        super(JSONField, self).__init__(*args, **kwargs)
项目:c3nav    作者:c3nav    | 项目源码 | 文件源码
def _preencode(data, magic_marker, in_coords=False, in_groups=False):
    if isinstance(data, dict):
        data = data.copy()
        for name, value in tuple(data.items()):
            if name in ('bounds', 'point', 'locations') and isinstance(value, (tuple, list)):
                data[name] = magic_marker+json.dumps(value, cls=DjangoJSONEncoder)+magic_marker
            else:
                data[name] = _preencode(value, magic_marker,
                                        in_coords=(name == 'coordinates'), in_groups=(name == 'groups'))
        return data
    elif isinstance(data, (tuple, list)):
        if (in_coords and data and isinstance(data[0], (int, float))) or in_groups:
            return magic_marker+json.dumps(data, cls=DjangoJSONEncoder)+magic_marker
        else:
            return tuple(_preencode(value, magic_marker, in_coords) for value in data)
    else:
        return data
项目:django-happymailer    作者:barbuza    | 项目源码 | 文件源码
def export_action(self, request):
        response = HttpResponse(content_type='application/json')
        response['Content-Disposition'] = 'attachment; filename=templatemodel_export.json'

        items = [{
            'name': t.name,
            'layout': t.layout,
            'subject': t.subject,
            'body': t.body,
            'version': t.version,
            'enabled': t.enabled,
            'has_errors': t.has_errors,
            'created_at': t.created_at,
            'updated_at': t.updated_at,
            'history': [{
                'layout': h.layout,
                'subject': h.subject,
                'body': h.body,
                'version': h.version,
                'archived_at': h.archived_at,
            } for h in t.history.all()]
        } for t in TemplateModel.objects.all()]

        json.dump(items, response, cls=DjangoJSONEncoder, indent=2, sort_keys=True)
        return response
项目:CSCE482-WordcloudPlus    作者:ggaytan00    | 项目源码 | 文件源码
def calc_frequency(areaNum, word_counts, curr_timeslot_word_counts):
    timeslot_area_percentage = {}

    for i in word_counts:
        for j in curr_timeslot_word_counts:
            #on same word (key) calculate actual percent
            if i == j:
                timeslot_area_percentage[i] = (word_counts[i]/(curr_timeslot_word_counts[j] + 0.00)) * 100
                timeslot_area_percentage[i] = round(timeslot_area_percentage[i], 2)

    return timeslot_area_percentage
    # if areaNum == 1:
    #   return {"area1" : timeslot_area_percentage} # TODO: convert to django friendly using below??
    # elif areaNum == 2:
    #   return {"area2" : timeslot_area_percentage}
    # elif areaNum == 3:
    #   return {"area3" : timeslot_area_percentage}

    #convert site percents to django friendly JSON
    # site1_percentage_json = json.dumps(dict(site1_percentage), cls=DjangoJSONEncoder)
    # site2_percentage_json = json.dumps(dict(site2_percentage), cls=DjangoJSONEncoder)

# takes a url as a string and returns a SET of TUPLES of all of the words
# that are used on that webpage in order of frequency
项目:skp_edu_docker    作者:TensorMSA    | 项目源码 | 文件源码
def get(self, request, nnid):
        """
        Common Network Info Get Method
        ---
        # Class Name : CommonNNInfoList

        # Description:
            Structure : <nninfo> - version - batch version
            Search deinfed list of neural networks
        """
        try:
            condition = {}
            condition['nn_id'] = nnid
            if str(nnid).lower() == 'all':
                condition['nn_id'] = '%'
            elif str(nnid).lower() == 'seq':
                condition['nn_id'] = 'seq'
            return_data = NNCommonManager().get_nn_info(condition)
            logging.info(return_data)
            return Response(json.dumps(return_data, cls=DjangoJSONEncoder))
        except Exception as e:
            return_data = {"status": "404", "result": str(e)}
            return Response(json.dumps(return_data, cls=DjangoJSONEncoder))
项目:djangoevents    作者:ApplauseOSS    | 项目源码 | 文件源码
def test_serialize_and_deserialize_1():
    transcoder = UnifiedTranscoder(json_encoder_cls=DjangoJSONEncoder)
    # test serialize
    created = SampleAggregate.Created(entity_id='b089a0a6-e0b3-480d-9382-c47f99103b3d', attr1='val1', attr2='val2',
                                      metadata={'command_id': 123})
    created_stored_event = transcoder.serialize(created)
    assert created_stored_event.event_type == 'sample_aggregate_created'
    assert created_stored_event.event_version == 1
    assert created_stored_event.event_data == '{"attr1":"val1","attr2":"val2"}'
    assert created_stored_event.aggregate_id == 'b089a0a6-e0b3-480d-9382-c47f99103b3d'
    assert created_stored_event.aggregate_version == 0
    assert created_stored_event.aggregate_type == 'SampleAggregate'
    assert created_stored_event.module_name == 'djangoevents.tests.test_unifiedtranscoder'
    assert created_stored_event.class_name == 'SampleAggregate.Created'
    assert created_stored_event.metadata == '{"command_id":123}'
    # test deserialize
    created_copy = transcoder.deserialize(created_stored_event)
    assert 'metadata' not in created_copy.__dict__
    created.__dict__.pop('metadata')  # metadata is not included in deserialization
    assert created.__dict__ == created_copy.__dict__
项目:djangoevents    作者:ApplauseOSS    | 项目源码 | 文件源码
def test_serialize_and_deserialize_2():
    transcoder = UnifiedTranscoder(json_encoder_cls=DjangoJSONEncoder)
    # test serialize
    updated = SampleAggregate.Updated(entity_id='b089a0a6-e0b3-480d-9382-c47f99103b3d', entity_version=10, attr1='val1',
                                      attr2='val2', metadata={'command_id': 123})
    updated_stored_event = transcoder.serialize(updated)
    assert updated_stored_event.event_type == 'overridden_event_type'
    assert updated_stored_event.event_version == 1
    assert updated_stored_event.event_data == '{"attr1":"val1","attr2":"val2"}'
    assert updated_stored_event.aggregate_id == 'b089a0a6-e0b3-480d-9382-c47f99103b3d'
    assert updated_stored_event.aggregate_version == 10
    assert updated_stored_event.aggregate_type == 'SampleAggregate'
    assert updated_stored_event.module_name == 'djangoevents.tests.test_unifiedtranscoder'
    assert updated_stored_event.class_name == 'SampleAggregate.Updated'
    assert updated_stored_event.metadata == '{"command_id":123}'
    # test deserialize
    updated_copy = transcoder.deserialize(updated_stored_event)
    assert 'metadata' not in updated_copy.__dict__
    updated.__dict__.pop('metadata') # metadata is not included in deserialization
    assert updated.__dict__ == updated_copy.__dict__
项目:djangoevents    作者:ApplauseOSS    | 项目源码 | 文件源码
def test_transcoder_passes_all_attributes_to_event_constructor():
    attributes = {
        'entity_id': 'b089a0a6-e0b3-480d-9382-c47f99103b3d',
        'foo': 0,
        'bar': 1,
    }

    event = SampleAggregate.Created(**attributes)
    transcoder = UnifiedTranscoder(json_encoder_cls=DjangoJSONEncoder)
    serialized_event = transcoder.serialize(event)

    init_call_args = None
    def init(self, *args, **kwargs):
        nonlocal init_call_args
        init_call_args = (args, kwargs)

    with mock.patch.object(SampleAggregate.Created, '__init__', init):
        transcoder.deserialize(serialized_event)

    args, kwargs = init_call_args
    assert args == tuple()

    for key, value in attributes.items():
        assert key in kwargs
        assert kwargs[key] == value
项目:Provo-Housing-Database    作者:marcopete5    | 项目源码 | 文件源码
def serialize_instance(instance):
    """
    Since Django 1.6 items added to the session are no longer pickled,
    but JSON encoded by default. We are storing partially complete models
    in the session (user, account, token, ...). We cannot use standard
    Django serialization, as these are models are not "complete" yet.
    Serialization will start complaining about missing relations et al.
    """
    data = {}
    for k, v in instance.__dict__.items():
        if k.startswith('_') or callable(v):
            continue
        try:
            if isinstance(instance._meta.get_field(k), BinaryField):
                v = force_text(base64.b64encode(v))
        except FieldDoesNotExist:
            pass
        data[k] = v
    return json.loads(json.dumps(data, cls=DjangoJSONEncoder))
项目:MetaCI    作者:SalesforceFoundation    | 项目源码 | 文件源码
def set_service(self, service_name, service_config):
        try:
            service = Service.objects.get(name=service_name)
            service.json = json.dumps(service_config.config)
        except Service.DoesNotExist:
            service = Service(
                name=service_name,
                json=json.dumps(service_config.config, cls=DjangoJSONEncoder),
            )
        service.save()
项目:MetaCI    作者:SalesforceFoundation    | 项目源码 | 文件源码
def _init_scratch_org(self, org_config):
        if not org_config.scratch:
            # Only run against scratch orgs
            return

        # Set up the logger to output to the build.log field
        init_logger(self.build)

        # Create the scratch org and get its info
        info = org_config.scratch_info

        # Get the contents of the org's json file from Salesforce DX
        json_path = os.path.join(os.path.expanduser('~'), '.sfdx', info['username'] + '.json')
        if not os.path.isfile(json_path):
            json_path = os.path.join(os.path.expanduser('~'), '.local', '.sfdx', info['username'] + '.json')
        with open(json_path, 'r') as json_file:
            dx_json = json_file.read()

        org_json = json.dumps(org_config.config, cls=DjangoJSONEncoder)

        # Create a ScratchOrgInstance to store the org info
        instance = ScratchOrgInstance(
            org=org_config.org,
            build=self.build,
            sf_org_id=info['org_id'],
            username=info['username'],
            json_dx=dx_json,
            json=org_json,
        )
        instance.save()
        org_config.org_instance = instance

        return org_config
项目:MetaCI    作者:SalesforceFoundation    | 项目源码 | 文件源码
def set_org(self, org_config):
        org_json = json.dumps(org_config.config, cls=DjangoJSONEncoder)
        try:
            org = Org.objects.get(repo=self.build.repo, name=org_config.name)
            org.json = org_json
        except Org.DoesNotExist:
            org = Org(
                name=org_config.name,
                json=org_json,
                repo=self.build.repo,
            )

        org.scratch = isinstance(org_config, ScratchOrgConfig)
        if not org.scratch:
            org.save()
项目:django-jchart    作者:matthisk    | 项目源码 | 文件源码
def __init__(self, height=None, width=None,
                 html_id=None, json_encoder_class=DjangoJSONEncoder):
        self.height = height
        self.width = width
        self.json_encoder_class = json_encoder_class
        self.html_id = html_id

        if ((height is not None or width is not None) and
                (self.responsive is None or self.responsive)):
            raise ImproperlyConfigured(
                'Using the height/width parameter will have no '
                'effect if the chart is in responsive mode. '
                'Disable responsive mode by setting chart.responsive '
                'to False')
项目:django-jchart    作者:matthisk    | 项目源码 | 文件源码
def __init__(self, height=None, width=None,
                 html_id=None, json_encoder_class=DjangoJSONEncoder):
        self.height = height
        self.width = width
        self.json_encoder_class = json_encoder_class
        self.html_id = html_id

        if ((height is not None or width is not None) and
                (self.responsive is None or self.responsive)):
            raise ImproperlyConfigured(
                'Using the height/width parameter will have no '
                'effect if the chart is in responsive mode. '
                'Disable responsive mode by setting chart.responsive '
                'to False')
项目:CodingDojo    作者:ComputerSocietyUNB    | 项目源码 | 文件源码
def __init__(self, data, encoder=DjangoJSONEncoder, safe=True,
                 json_dumps_params=None, **kwargs):
        if safe and not isinstance(data, dict):
            raise TypeError('In order to allow non-dict objects to be '
                'serialized set the safe parameter to False')
        if json_dumps_params is None:
            json_dumps_params = {}
        kwargs.setdefault('content_type', 'application/json')
        data = json.dumps(data, cls=encoder, **json_dumps_params)
        super(JsonResponse, self).__init__(content=data, **kwargs)
项目:django-admino    作者:erdem    | 项目源码 | 文件源码
def api_detail(self, request, *args, **kwargs):
        obj = self.get_object(request, object_id=kwargs.get("pk"))
        ModelForm = self.get_form(request, obj=obj)
        form = ModelForm(instance=obj)
        data = self.obj_as_dict(request, form.instance)
        return HttpResponse(json.dumps(data, cls=DjangoJSONEncoder), content_type='application/json')
项目:django-admino    作者:erdem    | 项目源码 | 文件源码
def api_create(self, request, *args, **kwargs):
        data = json.loads(request.body)

        ModelForm = self.get_form(request, obj=None)
        form = ModelForm(data=data, files=request.FILES)
        if form.is_valid():
            obj = form.save()
            data = self.obj_as_dict(request, obj)
            return HttpResponse(json.dumps(data, cls=DjangoJSONEncoder), content_type="application/json")
        else:
            errors = {
                "errors": json.loads(form.errors.as_json())
            }
            return HttpResponse(json.dumps(errors), status=400, content_type="application/json")
项目:Play.Excel    作者:abhijithasokan    | 项目源码 | 文件源码
def niftyChannelDataPush():

    nifty_data = niftyData()
    Group('nifty-channel').send( 
        {
            'text': json.dumps(nifty_data,cls=DjangoJSONEncoder)
        })





#================ Leaderboard channel =======================#

# @http_session_user
# @isLoggedInCh
# def connect_to_leaderboard_channel(message):
#   Group('leaderboard-channel').add(message.reply_channel)
#   message.reply_channel.send({
#       'text' : json.dumps({"accept": True}) #{ "close" : True }
#       })
#   print("New leaderboard listener added!")


# def disconnect_from_leaderboard_channel(message):
#   Group('leaderboard-channel').discard(message.reply_channel)

# def leaderboardChannelDataPush():

#   leaderboard_data = leaderboardData()
#   Group('leaderboard-channel').send( 
#       {
#           'text': json.dumps(leaderboard_data,cls=DjangoJSONEncoder)
#       })


#=============== Graph Channel =======================#
项目:Play.Excel    作者:abhijithasokan    | 项目源码 | 文件源码
def graphDataPush():
    graphData = graph('NIFTY 50')
    Group('NIFTY-50').send({
        'text': json.dumps(graphData,cls=DjangoJSONEncoder),
        })






#==================== portfolio channel ========================#
项目:Play.Excel    作者:abhijithasokan    | 项目源码 | 文件源码
def sellDataPush():
    for userid in redis_conn.hkeys("online-sellers"):
        userid = userid.decode("utf-8")
        try:
            sellData = sell_data(userid)
            Channel( redis_conn.hget("online-sellers",userid) ).send(
                {
                'text' : json.dumps(sellData,cls=DjangoJSONEncoder)
                })
        except:
            print("sellDataPush failed!")



#================ Ticker Channel =======================#
项目:Play.Excel    作者:abhijithasokan    | 项目源码 | 文件源码
def tickerDataPush():
    tickerData = ticker_data()
    Group('Ticker').send({
        'text': json.dumps(tickerData,cls=DjangoJSONEncoder),
        })
项目:Play.Excel    作者:abhijithasokan    | 项目源码 | 文件源码
def kryptos_leader_channel_push(data):
    Group('kryptos-leader-channel').send( 
        {
            'text': json.dumps(data,cls=DjangoJSONEncoder)
        })
项目:Play.Excel    作者:abhijithasokan    | 项目源码 | 文件源码
def echo_leader_channel_push(data):
    Group('echo-leader-channel').send( 
        {
            'text': json.dumps(data,cls=DjangoJSONEncoder)
        })
项目:Play.Excel    作者:abhijithasokan    | 项目源码 | 文件源码
def userDataPush(userid,data):
    Channel( redis_conn.hget("online-users",userid) ).send(
        {
        'text' : json.dumps(sellData,cls=DjangoJSONEncoder)
        }
    )
项目:Play.Excel    作者:abhijithasokan    | 项目源码 | 文件源码
def hashinclude_channel_push(data):
    Group('hashinclude-channel').send( 
        {
            'text': json.dumps(data,cls=DjangoJSONEncoder)
        })
项目:valhalla    作者:LCOGT    | 项目源码 | 文件源码
def test_no_pond_blocks_no_state_changed(self, modify_mock):
        pond_blocks = []
        now = timezone.now()
        mixer.cycle(3).blend(Window, request=(r for r in self.requests), start=now - timedelta(days=2),
                             end=now + timedelta(days=1))

        responses.add(responses.GET, settings.POND_URL + '/pond/pond/blocks/new/',
                      body=json.dumps(pond_blocks, cls=DjangoJSONEncoder), status=200, content_type='application/json')
        one_week_ahead = timezone.now() + timedelta(weeks=1)
        response = self.client.get(reverse('api:isDirty') + '?last_query_time=' + parse.quote(one_week_ahead.isoformat()))
        response_json = response.json()

        self.assertFalse(response_json['isDirty'])
项目:valhalla    作者:LCOGT    | 项目源码 | 文件源码
def test_pond_blocks_no_state_changed(self, modify_mock):
        now = timezone.now()
        mixer.cycle(3).blend(Window, request=(r for r in self.requests), start=now - timedelta(days=2),
                             end=now + timedelta(days=1))
        molecules1 = basic_mixer.cycle(3).blend(PondMolecule, completed=False, failed=False, request_num=self.requests[0].id,
                                                tracking_num=self.ur.id, event=[])
        molecules2 = basic_mixer.cycle(3).blend(PondMolecule, completed=False, failed=False, request_num=self.requests[1].id,
                                                tracking_num=self.ur.id, event=[])
        molecules3 = basic_mixer.cycle(3).blend(PondMolecule, completed=False, failed=False, request_num=self.requests[2].id,
                                                tracking_num=self.ur.id, event=[])
        pond_blocks = basic_mixer.cycle(3).blend(PondBlock, molecules=(m for m in [molecules1, molecules2, molecules3]),
                                                 start=now + timedelta(minutes=30), end=now + timedelta(minutes=40))
        pond_blocks = [pb._to_dict() for pb in pond_blocks]
        responses.add(responses.GET, settings.POND_URL + '/pond/pond/blocks/new/',
                      body=json.dumps(pond_blocks, cls=DjangoJSONEncoder), status=200, content_type='application/json')

        one_week_ahead = timezone.now() + timedelta(weeks=1)
        response = self.client.get(reverse('api:isDirty') + '?last_query_time=' + parse.quote(one_week_ahead.isoformat()))
        response_json = response.json()

        self.assertFalse(response_json['isDirty'])
        for i, req in enumerate(self.requests):
            req.refresh_from_db()
            self.assertEqual(req.state, 'PENDING')
        self.ur.refresh_from_db()
        self.assertEqual(self.ur.state, 'PENDING')
项目:valhalla    作者:LCOGT    | 项目源码 | 文件源码
def test_pond_blocks_state_change_completed(self, modify_mock):
        now = timezone.now()
        mixer.cycle(3).blend(Window, request=(r for r in self.requests), start=now - timedelta(days=2),
                             end=now - timedelta(days=1))
        molecules1 = basic_mixer.cycle(3).blend(PondMolecule, completed=True, failed=False, request_num=self.requests[0].id,
                                                tracking_num=self.ur.id, event=[])
        molecules2 = basic_mixer.cycle(3).blend(PondMolecule, completed=False, failed=False, request_num=self.requests[1].id,
                                                tracking_num=self.ur.id, event=[])
        molecules3 = basic_mixer.cycle(3).blend(PondMolecule, completed=False, failed=False, request_num=self.requests[2].id,
                                                tracking_num=self.ur.id, event=[])
        pond_blocks = basic_mixer.cycle(3).blend(PondBlock, molecules=(m for m in [molecules1, molecules2, molecules3]),
                                                 start=now - timedelta(minutes=30), end=now - timedelta(minutes=20))
        pond_blocks = [pb._to_dict() for pb in pond_blocks]
        responses.add(responses.GET, settings.POND_URL + '/pond/pond/blocks/new/',
                      body=json.dumps(pond_blocks, cls=DjangoJSONEncoder), status=200, content_type='application/json')

        response = self.client.get(reverse('api:isDirty'))
        response_json = response.json()

        self.assertTrue(response_json['isDirty'])

        request_states = ['COMPLETED', 'WINDOW_EXPIRED', 'WINDOW_EXPIRED']
        for i, req in enumerate(self.requests):
            req.refresh_from_db()
            self.assertEqual(req.state, request_states[i])
        self.ur.refresh_from_db()
        self.assertEqual(self.ur.state, 'COMPLETED')
项目:wagtailannotatedimage    作者:takeflight    | 项目源码 | 文件源码
def get_prep_value(self, value):
        return json.dumps(value, cls=DjangoJSONEncoder)
项目:django-eventstream    作者:fanout    | 项目源码 | 文件源码
def sse_encode_event(event_type, data, event_id=None, escape=False):
    data_str = json.dumps(data, cls=DjangoJSONEncoder)
    if escape:
        event_type = build_id_escape(event_type)
        data_str = build_id_escape(data_str)
    out = 'event: %s\n' % event_type
    if event_id:
        out += 'id: %s\n' % event_id
    out += 'data: %s\n\n' % data_str
    return out
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def get_filter_info(self, request, **kwargs):
        self.setup_filters(**kwargs)

        search = request.GET.get("search", None)
        if search:
            self.apply_search(search)

        name = request.GET.get("name", None)
        table_filter = self.filter_map.get_filter(name)
        return json.dumps(table_filter.to_json(self.queryset),
                          indent=2,
                          cls=DjangoJSONEncoder)
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def get(self, request, *args, **kwargs):
        def response(data):
            return HttpResponse(json.dumps(data,
                                           indent=2,
                                           cls=DjangoJSONEncoder),
                                content_type="application/json")

        error = "ok"

        search_term = request.GET.get("search", None)
        if search_term is None:
            # We got no search value so return empty reponse
            return response({'error': error, 'results': []})

        try:
            prj = Project.objects.get(pk=kwargs['pid'])
        except KeyError:
            prj = None

        results = self.apply_search(search_term,
                                    prj,
                                    request)[:ToasterTypeAhead.MAX_RESULTS]

        if len(results) > 0:
            try:
                self.validate_fields(results[0])
            except self.MissingFieldsException as e:
                error = e

        data = {'results': results,
                'error': error}

        return response(data)
项目:django-multiple-file-chunked-upload    作者:tcztzy    | 项目源码 | 文件源码
def __init__(self, content, status=200, *args, **kwargs):
        super(UploadResponse, self).__init__(
            content=DjangoJSONEncoder().encode(content),
            content_type='application/json',
            status=status,
            *args, **kwargs
        )
项目:wagtailsurveys    作者:torchbox    | 项目源码 | 文件源码
def process_form_submission(self, form):
        self.get_submission_class().objects.create(
            form_data=json.dumps(form.cleaned_data, cls=DjangoJSONEncoder),
            page=self, user=form.user
        )
项目:wagtailsurveys    作者:torchbox    | 项目源码 | 文件源码
def process_form_submission(self, form):
        """
        Accepts form instance with submitted data, user and page.
        Creates submission instance.

        You can override this method if you want to have custom creation logic.
        For example, if you want to save reference to a user.
        """

        self.get_submission_class().objects.create(
            form_data=json.dumps(form.cleaned_data, cls=DjangoJSONEncoder),
            page=self,
        )
项目:morango    作者:learningequality    | 项目源码 | 文件源码
def serialized_facility_factory(identifier):
    facility = Facility(name="Facility {}".format(identifier), id=identifier)
    return DjangoJSONEncoder().encode(facility.serialize())
项目:django-rest-framework-client    作者:qvantel    | 项目源码 | 文件源码
def __get__(self, instance, owner):
        from restframeworkclient.models import Model

        def callable(**kwargs):
            def get_value(value):
                if isinstance(value, dict):
                    return json.dumps(value, cls=DjangoJSONEncoder)
                if value in (datetime.datetime.now, timezone.now):
                    return value()
                if isinstance(value, Model):
                    return value.pk
                return value

            kwargs = {k: get_value(v) for k, v in kwargs.items()}
            arg_name = 'data' if self.method == 'POST' else 'params'
            if self.static:
                url = owner._resources_url() + self.subresource + '/'
                data = owner._rest_call(url, method=self.method, **{arg_name: kwargs})
            else:
                url = instance._resource_url(instance.pk) + self.subresource + '/'
                data = instance._rest_call(url, method=self.method, **{arg_name: kwargs})
            if self.unwrapping_key:
                data = data[self.unwrapping_key]
            if self.model:
                obj = self.model(**data)
                obj._persisted = True
                return obj
            return data
        return callable() if self.as_property else callable
项目:kolibri    作者:learningequality    | 项目源码 | 文件源码
def kolibri_set_server_time():
    html = ("<script type='text/javascript'>"
            "{0}.utils.serverClock.setServerTime({1});"
            "</script>".format(settings.KOLIBRI_CORE_JS_NAME,
                               json.dumps(now(), cls=DjangoJSONEncoder)))
    return mark_safe(html)
项目:django-oscar-bluelight    作者:thelabnyc    | 项目源码 | 文件源码
def _store_form_kwargs(self, form):
        session_data = self.request.session.setdefault(self.wizard_name, {})
        # Adjust kwargs to avoid trying to save the instances
        form_data = form.cleaned_data.copy()
        for prop in ('range', 'benefit', 'condition', 'offer_group'):
            obj = form_data.get(prop, None)
            if obj is not None:
                form_data[prop] = obj.id
        form_kwargs = {'data': form_data}
        json_data = json.dumps(form_kwargs, cls=DjangoJSONEncoder)
        session_data[self._key()] = json_data
        self.request.session.save()
项目:DjangoBlog    作者:0daybug    | 项目源码 | 文件源码
def __init__(self, data, encoder=DjangoJSONEncoder, safe=True, **kwargs):
        if safe and not isinstance(data, dict):
            raise TypeError('In order to allow non-dict objects to be '
                'serialized set the safe parameter to False')
        kwargs.setdefault('content_type', 'application/json')
        data = json.dumps(data, cls=encoder)
        super(JsonResponse, self).__init__(content=data, **kwargs)