Python bson.json_util 模块,dumps() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用bson.json_util.dumps()

项目:PrivacyScore    作者:PrivacyScore    | 项目源码 | 文件源码
def post(self):
        try:
            #Format von "options":
            # {
            #   name: 1,
            #   description: 1,
            #   tags: 1
            # }
            args = parser.parse_args()
            searchtext = args['searchtext']
            options = args['options'] # geht momentan nicht, weil der Request form-encoded reinkommt, nicht als JSON, daher ist options dann None

            List = dumps(connector.Search(searchtext, options))
            return List # hier stand vorher return Scan, das haette nicht geklappt
        except Exception as ex:
            return '{"type":"error", "message":"' + GetException() + '"}'
项目:api-manager    作者:nebula-orchestrator    | 项目源码 | 文件源码
def restart_app(app_name):
    rabbit_channel = rabbit_login(rabbit_user, rabbit_password, rabbit_host, rabbit_port, rabbit_vhost,
                                  rabbit_heartbeat)
    app_exists, app_json = mongo_get_app(mongo_collection, app_name)
    # check app exists first
    if app_exists is False:
        rabbit_close(rabbit_channel)
        return "{\"app_exists\": \"False\"}", 403
    # check if app already running:
    if app_json["running"] is False:
        rabbit_close(rabbit_channel)
        return "{\"running_before_restart\": \"False\"}", 403
    # post to rabbit to restart app
    app_json["command"] = "restart"
    rabbit_send(rabbit_channel, app_name + "_fanout", dumps(app_json))
    rabbit_close(rabbit_channel)
    return dumps(app_json), 202


# rolling restart an app
项目:api-manager    作者:nebula-orchestrator    | 项目源码 | 文件源码
def roll_app(app_name):
    rabbit_channel = rabbit_login(rabbit_user, rabbit_password, rabbit_host, rabbit_port, rabbit_vhost,
                                  rabbit_heartbeat)
    app_exists, app_json = mongo_get_app(mongo_collection, app_name)
    # check app exists first
    if app_exists is False:
        rabbit_close(rabbit_channel)
        return "{\"app_exists\": \"False\"}", 403
    # check if app already running:
    if app_json["running"] is False:
        rabbit_close(rabbit_channel)
        return "{\"running_before_restart\": \"False\"}", 403
    # post to rabbit to restart app
    app_json["command"] = "roll"
    rabbit_send(rabbit_channel, app_name + "_fanout", dumps(app_json))
    rabbit_close(rabbit_channel)
    return dumps(app_json), 202


# stop an app
项目:api-manager    作者:nebula-orchestrator    | 项目源码 | 文件源码
def stop_app(app_name):
    rabbit_channel = rabbit_login(rabbit_user, rabbit_password, rabbit_host, rabbit_port, rabbit_vhost,
                                  rabbit_heartbeat)
    # check app exists first
    app_exists = mongo_check_app_exists(mongo_collection, app_name)
    if app_exists is False:
        rabbit_close(rabbit_channel)
        return "{\"app_exists\": \"False\"}", 403
    # post to db
    app_json = mongo_update_app_running_state(mongo_collection, app_name, False)
    # post to rabbit to stop app
    app_json["command"] = "stop"
    rabbit_send(rabbit_channel, app_name + "_fanout", dumps(app_json))
    rabbit_close(rabbit_channel)
    return dumps(app_json), 202


# start an app
项目:api-manager    作者:nebula-orchestrator    | 项目源码 | 文件源码
def start_app(app_name):
    rabbit_channel = rabbit_login(rabbit_user, rabbit_password, rabbit_host, rabbit_port, rabbit_vhost,
                                  rabbit_heartbeat)
    # check app exists first
    app_exists = mongo_check_app_exists(mongo_collection, app_name)
    if app_exists is False:
        rabbit_close(rabbit_channel)
        return "{\"app_exists\": \"False\"}", 403
    # post to db
    app_json = mongo_update_app_running_state(mongo_collection, app_name, True)
    # post to rabbit to stop app
    app_json["command"] = "start"
    rabbit_send(rabbit_channel, app_name + "_fanout", dumps(app_json))
    rabbit_close(rabbit_channel)
    return dumps(app_json), 202


# POST update an app
项目:api-manager    作者:nebula-orchestrator    | 项目源码 | 文件源码
def release_app(app_name):
    rabbit_channel = rabbit_login(rabbit_user, rabbit_password, rabbit_host, rabbit_port, rabbit_vhost,
                                  rabbit_heartbeat)
    app_exists, app_json = mongo_get_app(mongo_collection, app_name)
    # check app exists first
    if app_exists is False:
        rabbit_close(rabbit_channel)
        return "{\"app_exists\": \"False\"}", 403
    # post to rabbit to restart app
    app_json["command"] = "release"
    rabbit_send(rabbit_channel, app_name + "_fanout", dumps(app_json))
    rabbit_close(rabbit_channel)
    return dumps(app_json), 202


# list apps
项目:pysmap    作者:SMAPPNYU    | 项目源码 | 文件源码
def dump_to_json(self, output_file, num_files=1):
        filehandles = [None]*num_files

        if num_files == 1:
            filehandles[0] = open(output_file, 'a')
        else:
            # open all filehandles
            filename, file_extension = output_file.split(os.extsep, 1)
            for i in range(num_files):
                filehandles[i] = open('{}_{}.{}'.format(filename, i, file_extension), 'a')

        # write the tweets as evenly 
        # as possible in each file
        tracker = 0 
        for tweet in self.get_collection_iterators():
            filehandles[tracker].write(json_util.dumps(tweet)+'\n')
            if tracker == num_files-1:
                tracker = 0
            else:
                tracker += 1

        # close all filehandles
        for fh in filehandles:
            fh.close()
项目:xunfeng    作者:ysrc    | 项目源码 | 文件源码
def Getplugin():
    type = request.form.get('type', '')
    risk = request.form.get('risk', '')
    search = request.form.get('search', '')
    query = {}
    if type:
        query['type'] = type
    if risk:
        query['level'] = risk
    if search:
        search = unquote(search)
        query['name'] = {"$regex": search, '$options': 'i'}
    cursor = Mongo.coll['Plugin'].find(query)
    rsp = []
    for i in cursor:
        result = {'name': i['name'], 'info': i['info']}
        rsp.append(result)
    return json.dumps(rsp)


# ??????
项目:fieldsight-kobocat    作者:awemulya    | 项目源码 | 文件源码
def delete_substage(request, id):
    try:
        sub_stage = Stage.objects.get(pk=id)
        old_fsxf = sub_stage.stage_forms
        old_fsxf.is_deleted = True
        # old_fsxf.stage = None
        old_fsxf.save()
        # org = sub_stage.stage.project.organization if sub_stage.stage.project else sub_stage.stage.site.project.organization
        # desc = "deleted form of stage {} substage {} by {}".format(sub_stage.stage.name, sub_stage.name,
        #                                                            request.user.username)
        # noti = old_fsxf.logs.create(source=request.user, type=1, title="form Deleted",
        #         organization=org, description=desc)
        # result = {}
        # result['description'] = desc
        # result['url'] = noti.get_absolute_url()
        # ChannelGroup("notify-{}".format(org.id)).send({"text": json.dumps(result)})
        # ChannelGroup("notify-0").send({"text": json.dumps(result)})
        # sub_stage.delete()
        return Response({}, status=status.HTTP_200_OK)
    except Exception as e:
        return Response({'error':e.message}, status=status.HTTP_400_BAD_REQUEST)
项目:fieldsight-kobocat    作者:awemulya    | 项目源码 | 文件源码
def public_api(request, username, id_string):
    """
    Returns public information about the form as JSON
    """

    xform = get_object_or_404(XForm,
                              user__username__iexact=username,
                              id_string__exact=id_string)

    _DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
    exports = {'username': xform.user.username,
               'id_string': xform.id_string,
               'bamboo_dataset': xform.bamboo_dataset,
               'shared': xform.shared,
               'shared_data': xform.shared_data,
               'downloadable': xform.downloadable,
               'title': xform.title,
               'date_created': xform.date_created.strftime(_DATETIME_FORMAT),
               'date_modified': xform.date_modified.strftime(_DATETIME_FORMAT),
               'uuid': xform.uuid,
               }
    response_text = json.dumps(exports)

    return HttpResponse(response_text, content_type='application/json')
项目:ysrc    作者:myDreamShadow    | 项目源码 | 文件源码
def Getplugin():
    type = request.form.get('type', '')
    risk = request.form.get('risk', '')
    search = request.form.get('search', '')
    query = {}
    if type:
        query['type'] = type
    if risk:
        query['level'] = risk
    if search:
        search = unquote(search)
        query['name'] = {"$regex": search, '$options': 'i'}
    cursor = Mongo.coll['Plugin'].find(query)
    rsp = []
    for i in cursor:
        result = {'name': i['name'], 'info': i['info']}
        rsp.append(result)
    return json.dumps(rsp)


# ??????
项目:dataIO-project    作者:MacHu-GWU    | 项目源码 | 文件源码
def test_all(self):
        data = {
            "int": 100,
            "float": 3.1415926535,
            "str": "string example ?????",
            "bytes": "bytes example ?????".encode("utf-8"),
            "boolean": True,
            "datetime": datetime.now()
        }
        js = json_util.dumps(data)

        data1 = json_util.loads(js)
        self.assertEqual(data["int"], data1["int"])
        self.assertAlmostEqual(data["float"], data1["float"], delta=0.0001)
        self.assertEqual(data["str"], data1["str"])
        self.assertEqual(data["boolean"], data1["boolean"])

        print(data1["bytes"])
        print(data1["datetime"])
        print(json_util.dumps(data, sort_keys=True, indent=4))


#--- Unittest ---
项目:musicbot    作者:AdrienPensart    | 项目源码 | 文件源码
def get_music(artist, album, title):
    '''Get a track tags or download it'''
    page_format = request.args.get('format', 'html')
    artist = unquote(artist)
    album = unquote(album)
    title = unquote(title)
    collection = app.config['COLLECTION']
    mf = MusicFilter(artists=[artist], albums=[album], titles=[title])
    musics = webfilter(partial(collection.filter, cursor_factory=RealDictCursor), mf)
    if len(musics) != 1:
        return ('Music not found', 404)
    music = musics[0]

    if page_format == 'html':
        return render_template("music.html", music=music)
    elif page_format == 'json':
        return dumps(music, sort_keys=True, indent=4, separators=(',', ': '))
    return ('Invalid format, available: json,html', 400)
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def get_metadata():
    if request.query.file_hash == '':
        response.status = 400
        return jsonize({'message': 'file_hash parameter is missing'})
    file_hash = clean_hash(request.query.file_hash)
    if not valid_hash(file_hash):
        response.status = 400
        return jsonize({'message': 'Invalid hash format (use MD5, SHA1 or SHA2)'})
    file_hash = get_file_id(file_hash)
    if file_hash is None:
        response.status = 404
        return jsonize({'message': 'Metadata not found in the database'})

    mdc = MetaController()
    res = mdc.read(file_hash)
    if res is None:
        log_event("metadata", file_hash)
    return dumps(change_date_to_str(res))
项目:codex-backend    作者:codexgigassys    | 项目源码 | 文件源码
def export_metadata():
    mdc = MetaController()
    hashes = request.forms.dict.get("file_hash[]")
    dump_to_save = ""
    random_id = id_generator()
    tmp_path = "/tmp/meta_export"
    tmp_folder = os.path.join(tmp_path, random_id)
    call_with_output(["mkdir", "-p", tmp_folder])
    for hash in hashes:
        hash = clean_hash(hash.replace('\r', ''))
        res = mdc.read(hash)
        dump = dumps(res, indent=4)
        file_name = os.path.join(tmp_folder, str(hash) + '.txt')
        fd = open(file_name, "w")
        fd.write(dump)
        fd.close()
    zip_path = os.path.join(tmp_path, random_id + '.zip')
    call_with_output(["zip", "-jr", zip_path, tmp_folder])
    resp = static_file(str(random_id) + '.zip', root=tmp_path, download=True)
    resp.set_cookie('fileDownload', 'true')
    shutil.rmtree(tmp_folder)
    os.remove(zip_path)
    return resp
项目:nebula    作者:nebula-orchestrator    | 项目源码 | 文件源码
def restart_app(app_name):
    rabbit_channel = rabbit_login()
    app_exists, app_json = mongo_get_app(mongo_collection, app_name)
    # check app exists first
    if app_exists is False:
        rabbit_close(rabbit_channel)
        return "{\"app_exists\": \"False\"}", 403
    # check if app already running:
    if app_json["running"] is False:
        rabbit_close(rabbit_channel)
        return "{\"running_before_restart\": \"False\"}", 403
    # post to rabbit to restart app
    app_json["command"] = "restart"
    rabbit_send(rabbit_channel, app_name + "_fanout", dumps(app_json))
    rabbit_close(rabbit_channel)
    return dumps(app_json), 202


# rolling restart an app
项目:nebula    作者:nebula-orchestrator    | 项目源码 | 文件源码
def roll_app(app_name):
    rabbit_channel = rabbit_login()
    app_exists, app_json = mongo_get_app(mongo_collection, app_name)
    # check app exists first
    if app_exists is False:
        rabbit_close(rabbit_channel)
        return "{\"app_exists\": \"False\"}", 403
    # check if app already running:
    if app_json["running"] is False:
        rabbit_close(rabbit_channel)
        return "{\"running_before_restart\": \"False\"}", 403
    # post to rabbit to restart app
    app_json["command"] = "roll"
    rabbit_send(rabbit_channel, app_name + "_fanout", dumps(app_json))
    rabbit_close(rabbit_channel)
    return dumps(app_json), 202


# stop an app
项目:nebula    作者:nebula-orchestrator    | 项目源码 | 文件源码
def stop_app(app_name):
    rabbit_channel = rabbit_login()
    # check app exists first
    app_exists = mongo_check_app_exists(mongo_collection, app_name)
    if app_exists is False:
        rabbit_close(rabbit_channel)
        return "{\"app_exists\": \"False\"}", 403
    # post to db
    app_json = mongo_update_app_running_state(mongo_collection, app_name, False)
    # post to rabbit to stop app
    app_json["command"] = "stop"
    rabbit_send(rabbit_channel, app_name + "_fanout", dumps(app_json))
    rabbit_close(rabbit_channel)
    return dumps(app_json), 202


# start an app
项目:nebula    作者:nebula-orchestrator    | 项目源码 | 文件源码
def start_app(app_name):
    rabbit_channel = rabbit_login()
    # check app exists first
    app_exists = mongo_check_app_exists(mongo_collection, app_name)
    if app_exists is False:
        rabbit_close(rabbit_channel)
        return "{\"app_exists\": \"False\"}", 403
    # post to db
    app_json = mongo_update_app_running_state(mongo_collection, app_name, True)
    # post to rabbit to stop app
    app_json["command"] = "start"
    rabbit_send(rabbit_channel, app_name + "_fanout", dumps(app_json))
    rabbit_close(rabbit_channel)
    return dumps(app_json), 202


# update an app
项目:ShadowCat    作者:NerdCats    | 项目源码 | 文件源码
def get_history(asset_id, document_limit=10):
    try:
        cursor = app.config['DB_COLL_HISTORY'].find(
            {'asset_id': asset_id}
        ).sort(
            "timestamp", pymongo.DESCENDING
        ).limit(int(document_limit))
    except pymongo.errors.AutoReconnect as e:
        logger.error(e.message)
    except Exception as e:
        logger.error(e.message)

    history = []
    for document in cursor:
        doc = utilities.to_isoformat_datetime(document)
        history.append(doc)
    return dumps(history)
项目:dbs-back    作者:Beit-Hatfutsot    | 项目源码 | 文件源码
def collection_to_csv(coll, f):
    '''
        dumps a collection to a csv files.
        the file includes the URL of the hebrew page, its title,
        the url of the english page and its title.
    '''
    from bhs_api.item import SHOW_FILTER
    for i in coll.find(SHOW_FILTER):
        url = slugs_to_urls(i['Slug'])
        try:
            line = ','.join([url['He'], i['Header']['He'].replace(',','|')])
        except KeyError:
            line = ','
        try:
            line = ','.join([line, url['En'], i['Header']['En'].replace(',','|')])
        except KeyError:
            line = ','.join([line, '', ''])
        line += '\n'
        f.write(line.encode('utf8'))
项目:smart-realestate    作者:stevensshi    | 项目源码 | 文件源码
def searchArea(text):
    zpids = []
    if text.isdigit():
        zpids = searchAreaByZip(text)
    else:
        city = text.split(',')[0].strip()
        state = text.split(', ')[1].strip()
        zpids = searchAreaByCityState(city, state)
    print zpids
    res = []
    update_list = []
    db = mongodb_client.getDB()
    for zpid in zpids:
        record = db[PROPERTY_TABLE_NAME].find_one({'zpid': zpid})
        if record != None:
            res.append(record)
        else:
            property_detail = getDetailsByZpid(zpid, False)
            res.append(property_detail)
            update_list.append(property_detail)

    storeUpdates(update_list)

    # Trick: use bson.dumps then re-construct json because of ObjectId.
    return json.loads(dumps(res))
项目:smart-realestate    作者:stevensshi    | 项目源码 | 文件源码
def getDetailsByZpid(zpid, get_prediction=False):
    db = mongodb_client.getDB()
    prop = json.loads(dumps(db[PROPERTY_TABLE_NAME].find_one({'zpid': zpid})))
    if prop == None:
        prop = zillow_web_scraper_client.get_property_by_zpid(zpid)

    # Get prediction
    if get_prediction:
        predicted_value = ml_prediction_client.predict(
            prop['zipcode'],
            prop['property_type'],
            prop['bedroom'],
            prop['bathroom'],
            prop['size'])
        prop['predicted_value'] = int(predicted_value)
    return prop
项目:VTU-Result-Analyser    作者:abhijith0505    | 项目源码 | 文件源码
def getOneStudentJson(college_code='1mv',year='14',branch='is',regno=45):
    client = MongoClient()

    db = client.results

    students = db.students

    student = students.find_one({"usn" : (college_code + year
        +branch + str(regno).zfill(3)).upper()})

    if student:
        return dumps(student) #dumps is used to convert bson format of mongodb to json

    else :
        student = student_results(college_code,year,branch,regno)
        db.students.insert_one(student)
        return dumps(student)
项目:FormShare    作者:qlands    | 项目源码 | 文件源码
def public_api(request, username, id_string):
    """
    Returns public information about the form as JSON
    """

    xform = get_object_or_404(XForm,
                              user__username__iexact=username,
                              id_string__iexact=id_string)

    _DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
    exports = {'username': xform.user.username,
               'id_string': xform.id_string,
               'bamboo_dataset': xform.bamboo_dataset,
               'shared': xform.shared,
               'shared_data': xform.shared_data,
               'downloadable': xform.downloadable,
               'title': xform.title,
               'date_created': xform.date_created.strftime(_DATETIME_FORMAT),
               'date_modified': xform.date_modified.strftime(_DATETIME_FORMAT),
               'uuid': xform.uuid,
               }
    response_text = json.dumps(exports)

    return HttpResponse(response_text, content_type='application/json')
项目:pybot    作者:aster1sk    | 项目源码 | 文件源码
def start_socket() :
    readbuffer = ''
    sock.connect(("riboflav.in", 6667))
    while True :
        readbuffer = readbuffer + sock.recv(1024)
        temp = string.split(readbuffer, "\n")
        readbuffer = temp.pop()
        for line in temp :
            body = {
                'uuid' : str(uuid4()),
                'raw'  : line,
                'args' : line.split(),
                'host' : host,
                'port' : port,
                'time' : time(),
                'event': "socket"
            }
            channel.basic_publish(exchange='stream', routing_key='', body=dumps(body))
项目:smappdragon    作者:SMAPPNYU    | 项目源码 | 文件源码
def dump_to_json(self, output_json, compression=None, mode='a'):
        if compression == 'bz2':
            mode = binary_mode(mode)
            filehandle = bz2.open(output_json, mode)
        elif compression == 'gzip':
            mode = binary_mode(mode)
            filehandle = gzip.open(output_json, mode)
        else:
            filehandle = open(output_json, mode)

        for tweet in self.get_iterator():
            filehandle.write(json_util.dumps(tweet)+'\n')
        filehandle.close()
项目:xgovctf    作者:alphagov    | 项目源码 | 文件源码
def write_metadata(pid, n, data):
    """
    Write an autogenerated problem's instance metadata.
    This includes any fields to be overwritten from
    the original problem object.

    Args:
        pid: the problem id
        n: the instance number
        data: the metadata object
    """

    metadata_path = get_metadata_path(pid, n)
    with open(metadata_path, "w") as f:
        f.write(json_util.dumps(data))
项目:PrivacyScore    作者:PrivacyScore    | 项目源码 | 文件源码
def post(self):
        try:
            args = parser.parse_args()
            listname = args['listname']
            description = args['description']
            tags = request.json['tags']
            columns = request.json['columns']
            isprivate = request.json['isprivate']
            obj = dumps(connector.SaveList(listname, description, tags, columns, True, isprivate, False))
            return obj, 201
        except Exception as ex:
            return '{"type":"error", "message":"' + GetException() + '"}'
项目:PrivacyScore    作者:PrivacyScore    | 项目源码 | 文件源码
def get(self, token):
        try:
            List = dumps(connector.ShowList(token))
            return List
        except bson.errors.InvalidId as invalid:
            return '{"type":"error", "message":"invalid object id"}'
        except Exception as ex:
            return '{"type":"error", "message":"' + GetException() + '"}'
项目:PrivacyScore    作者:PrivacyScore    | 项目源码 | 文件源码
def get(self):
        try:
            Lists = dumps(connector.ShowLists())
            return Lists
        except Exception as ex:
            return '{"type":"error", "message":"' + GetException() + '"}'
项目:PrivacyScore    作者:PrivacyScore    | 项目源码 | 文件源码
def get(self, token):
        try:
            dumps(connector.DeleteList(token))
            return '{"type":"success", "message":"ok"}', 200
        except Exception as ex:
            return '{"type":"error", "message":"' + GetException() + '"}'
项目:PrivacyScore    作者:PrivacyScore    | 项目源码 | 文件源码
def get(self, list_id, scan_group_id):
        try:
            List = dumps(connector.ShowScannedList(list_id, scan_group_id))
            return List
        except Exception as ex:
            return '{"type":"error", "message":"' + GetException() + '"}'
项目:PrivacyScore    作者:PrivacyScore    | 项目源码 | 文件源码
def get(self, site_id):
        try:
            scangroups = dumps(connector.GetScanGroupsBySite(site_id))
            return scangroups
        except Exception as ex:
            return '{"type":"error", "message":"' + GetException() + '"}'
项目:PrivacyScore    作者:PrivacyScore    | 项目源码 | 文件源码
def get(self, list_id):
        try:
            scangroups = dumps(connector.GetScanGroupsByList(list_id))
            return scangroups
        except Exception as ex:
            return '{"type":"error", "message":"' + GetException() + '"}'
项目:PrivacyScore    作者:PrivacyScore    | 项目源码 | 文件源码
def get(self, site_id):
        try:
            scandates = dumps(connector.GetScanDates(site_id))
            return scandates
        except Exception as ex:
            return '{"type":"error", "message":"' + GetException() + '"}'
项目:PrivacyScore    作者:PrivacyScore    | 项目源码 | 文件源码
def get(self, list_id):
        try:
            token = dumps(connector.GetToken(list_id))
            return token
        except Exception as ex:
            return '{"type":"error", "message":"' + GetException() + '"}'
项目:python-bsonjs    作者:mongodb-labs    | 项目源码 | 文件源码
def bsonjs_dumps(doc):
    """Provide same API as json_util.dumps"""
    return bsonjs.dumps(to_bson(doc))
项目:python-bsonjs    作者:mongodb-labs    | 项目源码 | 文件源码
def round_trip(self, doc):
        bson_bytes = to_bson(doc)
        self.assertEqual(bson_bytes, bsonjs.loads(bsonjs.dumps(bson_bytes)))
        # Check compatibility between bsonjs and json_util
        self.assertEqual(doc, json_util.loads(
            bsonjs.dumps(bson_bytes),
            json_options=json_util.STRICT_JSON_OPTIONS))
        self.assertEqual(bson_bytes, bsonjs.loads(json_util.dumps(
            doc, json_options=json_util.STRICT_JSON_OPTIONS)))
项目:python-bsonjs    作者:mongodb-labs    | 项目源码 | 文件源码
def test_dumps_multiple_bson_documents(self):
        json_str = '{ "test" : "me" }'
        bson_bytes = bsonjs.loads(json_str)
        self.assertEqual(json_str, bsonjs.dumps(bson_bytes + bson_bytes))
项目:django-audit-tools    作者:PeRDy    | 项目源码 | 文件源码
def test_response_json(self):
        # Create a json response object
        json_dict = {'response': 'test'}
        json_template = dumps(json_dict)
        response = HttpResponse(json_template, content_type='application/json')

        response_data = self.middleware._extract_response_data(response)

        self.assertIn('json', response_data['type'])
        self.assertEqual(200, response_data['status_code'])
        self.assertEqual(json_dict, response_data['content'])
项目:bguFinalProject    作者:liranfar    | 项目源码 | 文件源码
def convert_to_json_list(db_cursor):
    json_list = list(json.loads(json_util.dumps(db_cursor)))
    return json_list
项目:hoplite    作者:ni    | 项目源码 | 文件源码
def hoplite_dumps(obj, *args, **kwargs):
    """
    Serializes a dictionary into unicode(unless specified otherwise)
    bson.json_util.dumps does exactly what hoplite needs, so that's what
    we call
    :param obj: Python dictionary to be serialized
    :param args: Please refer to online documentation for bson.json_util.dumps
        and json.dumps
    :param kwargs: Please refer to online documentation for
        bson.json_util.dumps and json.dumps
    :return: serialized obj in unicode
    """
    return dumps(obj, *args, **kwargs)
项目:api-manager    作者:nebula-orchestrator    | 项目源码 | 文件源码
def list_apps():
    nebula_apps_list = mongo_list_apps(mongo_collection)
    return "{\"apps\": " + dumps(nebula_apps_list) + " }", 200


# get app info
项目:api-manager    作者:nebula-orchestrator    | 项目源码 | 文件源码
def get_app(app_name):
    app_exists, app_json = mongo_get_app(mongo_collection, app_name)
    if app_exists is True:
        return dumps(app_json), 200
    elif app_exists is False:
        return "{\"app_exists\": \"False\"}", 403


# set json header
项目:fame    作者:certsocietegenerale    | 项目源码 | 文件源码
def bson_renderer(data, template=None, ctx=None):
    return dumps(data)
项目:pysmap    作者:SMAPPNYU    | 项目源码 | 文件源码
def dump_to_json(self, output_file):
        filehandle = open(output_file, 'a')
        for tweet in self.collection.get_iterator():
            filehandle.write(json_util.dumps(tweet)+'\n')
        filehandle.close()
项目:xunfeng    作者:ysrc    | 项目源码 | 文件源码
def Addtask():
    title = request.form.get('title', '')
    plugin = request.form.get('plugin', '')
    condition = unquote(request.form.get('condition', ''))
    plan = request.form.get('plan', 0)
    ids = request.form.get('ids', '')
    isupdate = request.form.get('isupdate', '0')
    resultcheck = request.form.get('resultcheck', '0')
    result = 'fail'
    if plugin:
        targets = []
        if resultcheck == 'true':  # ?????
            list = condition.strip().split(';')
            query = querylogic(list)
            cursor = Mongo.coll['Info'].find(query)
            for i in cursor:
                tar = [i['ip'], i['port']]
                targets.append(tar)
        else:  # ???????
            for i in ids.split(','):
                tar = [i.split(':')[0], int(i.split(':')[1])]
                targets.append(tar)
        temp_result = True
        for p in plugin.split(','):
            query = querylogic(condition.strip().split(';'))
            item = {'status': 0, 'title': title, 'plugin': p, 'condition': condition, 'time': datetime.now(),
                    'target': targets, 'plan': int(plan), 'isupdate': int(isupdate), 'query': dumps(query)}
            insert_reuslt = Mongo.coll['Task'].insert(item)
            if not insert_reuslt:
                temp_result = False
        if temp_result:
            result = 'success'
    return result


# ??????
项目:xunfeng    作者:ysrc    | 项目源码 | 文件源码
def CheckUpdate():
    json = []
    notinstall = Mongo.coll['Update'].find({'isInstall': 0}).sort('unicode', -1)
    for _ in notinstall:
        json.append({'unicode': _['unicode'], 'name': _['name'], 'info': _['info'], 'time': _['pushtime'],
                     'author': _['author']})
    return dumps(json)


# ?????????
项目:fieldsight-kobocat    作者:awemulya    | 项目源码 | 文件源码
def delete_mainstage(request, id):
    try:
        with transaction.atomic():
            stage = Stage.objects.get(pk=id)
            org = stage.site.project.organization if stage.site else stage.project.organization
            substages = Stage.objects.filter(stage=stage)
            for sub_stage in substages:
                if hasattr(sub_stage, 'stage_forms'):
                    old_fsxf = sub_stage.stage_forms
                    old_fsxf.is_deleted = True
                    old_fsxf.stage = None
                    old_fsxf.save()
                    # desc = "deleted form of stage {} substage {} by {}".format(sub_stage.stage.name, sub_stage.name,
                    #                                                            request.user.username)
                    # noti = old_fsxf.logs.create(source=request.user, type=1, title="form Deleted",
                    #         organization=org, description=desc)
                    # result = {}
                    # result['description'] = desc
                    # result['url'] = noti.get_absolute_url()
                    # ChannelGroup("notify-{}".format(org.id)).send({"text": json.dumps(result)})
                    # ChannelGroup("notify-0").send({"text": json.dumps(result)})
                sub_stage.delete()
            stage.delete()
        return Response({}, status=status.HTTP_200_OK)
    except Exception as e:
        return Response({'error':e.message}, status=status.HTTP_400_BAD_REQUEST)