Python tornado.escape 模块,json_encode() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用tornado.escape.json_encode()

项目:react-tornado-graphql-example    作者:yatsu    | 项目源码 | 文件源码
def error_response(func):
    @wraps(func)
    def wrapper(self, *args, **kwargs):
        try:
            result = func(self, *args, **kwargs)
        except Exception as ex:
            if not isinstance(ex, (web.HTTPError, ExecutionError, GraphQLError)):
                tb = ''.join(traceback.format_exception(*sys.exc_info()))
                app_log.error('Error: {0} {1}'.format(ex, tb))
            self.set_status(error_status(ex))
            error_json = json_encode({'errors': error_format(ex)})
            app_log.debug('error_json: %s', error_json)
            self.write(error_json)
        else:
            return result

    return wrapper
项目:react-tornado-graphql-example    作者:yatsu    | 项目源码 | 文件源码
def _execute_command(self, command):
        if len(self.job_servers) == 0:
            app_log.error('there is no job server')
            return

        server = self.job_servers[self.job_server_index]
        self.job_server_index = (self.job_server_index + 1) % len(self.job_servers)

        context = zmq.Context.instance()
        zmq_sock = context.socket(zmq.DEALER)
        zmq_sock.linger = 1000
        zmq_sock.identity = bytes(str(os.getpid()), 'ascii')
        ip = server['ip']
        if ip == '*':
            ip = 'localhost'
        url = 'tcp://{0}:{1}'.format(ip, server['zmq_port'])
        app_log.info('connect %s', url)
        zmq_sock.connect(url)

        command = json_encode({'command': command})
        app_log.info('command: %s', command)
        zmq_sock.send_multipart([b'0', bytes(command, 'ascii')])

        stream = ZMQStream(zmq_sock)
        stream.on_recv(self.response_handler)
项目:simple-environment-monitor-system    作者:diegorubin    | 项目源码 | 文件源码
def test_positions_save(self):
        monitor_position1 = Monitor(label='position1',
                                   url=SERVICE_URL,
                                   monitor_type='TextMonitor',
                                   data={'expected': 'LIVE'})
        monitor_position1.save()

        monitor_position0 = Monitor(label='position0',
                                   url=SERVICE_URL,
                                   monitor_type='TextMonitor',
                                   data={'expected': 'LIVE'})
        monitor_position0.save()

        body = json_encode({
            'monitors': ['position0', 'position1']
        })

        self.fetch('/api/monitors/positions', method='POST', body=body)
        monitor = Monitor()
        monitor.load('position1')
        self.assertEqual(monitor.get_position(), 1)

        monitor = Monitor()
        monitor.load('position0')
        self.assertEqual(monitor.get_position(), 0)
项目:fg21sim    作者:liweitianux    | 项目源码 | 文件源码
def get(self):
        """
        Handle the READ-ONLY tasks operations.

        Supported actions:
        - get: Get the current status of tasks.
        """
        action = self.get_argument("action", "get")
        if action == "get":
            response = {"status": self.status}
            success = True
        else:
            # ERROR: bad action
            success = False
            reason = "Bad request action: {0}".format(action)
        #
        if success:
            logger.debug("Response: {0}".format(response))
            self.set_header("Content-Type", "application/json; charset=UTF-8")
            self.write(json_encode(response))
        else:
            logger.warning("Request failed: {0}".format(reason))
            self.send_error(400, reason=reason)
项目:fg21sim    作者:liweitianux    | 项目源码 | 文件源码
def _push_configs(self):
        """
        Get the current configurations as well as the validation status,
        then push to the client to updates the configurations form.
        """
        data = self.configs.dump(flatten=True)
        data["userconfig"] = self.configs.userconfig
        __, errors = self.configs.check_all(raise_exception=False)
        msg = {"success": True,
               "type": "configs",
               "action": "push",
               "data": data,
               "errors": errors}
        message = json_encode(msg)
        logger.debug("Message of current configurations: {0}".format(message))
        self.write_message(message)
        logger.info("WebSocket: Pushed current configurations data " +
                    "with validation errors to the client")
项目:kafka-rest    作者:gamechanger    | 项目源码 | 文件源码
def _encode_payload(schema_cache, topic, batch):
    value_schema = avro.schema.make_avsc_object(schema_cache[topic]['value'], avro.schema.Names())
    value_serializer = AvroJsonSerializer(value_schema)
    if schema_cache[topic].get('key') is not None:
        key_schema = avro.schema.make_avsc_object(schema_cache[topic]['key'], avro.schema.Names())
        key_serializer = AvroJsonSerializer(key_schema)

    body = {'records': [{'value': value_serializer.to_ordered_dict(message.value),
                         'key': key_serializer.to_ordered_dict(message.key) if message.key is not None else None,
                         'partition': message.partition}
                        for message in batch]}

    # The REST proxy's API requires us to double-encode the schemas.
    # Don't ask why, because I have no idea.
    if schema_cache[topic].get('value-id') is None:
        body['value_schema'] = json_encode(schema_cache[topic]['value'])
    else:
        body['value_schema_id'] = schema_cache[topic]['value-id']
    if schema_cache[topic].get('key') is not None:
        if schema_cache[topic].get('key-id') is None:
            body['key_schema'] = json_encode(schema_cache[topic]['key'])
        else:
            body['key_schema_id'] = schema_cache[topic]['key-id']

    return json_encode(body)
项目:time2go    作者:twitchyliquid64    | 项目源码 | 文件源码
def generate(self, **kwargs):
        """Generate this template with the given arguments."""
        namespace = {
            "escape": escape.xhtml_escape,
            "xhtml_escape": escape.xhtml_escape,
            "url_escape": escape.url_escape,
            "json_encode": escape.json_encode,
            "squeeze": escape.squeeze,
            "linkify": escape.linkify,
            "datetime": datetime,
            "_utf8": escape.utf8,  # for internal use
            "_string_types": (unicode, bytes_type),
        }
        namespace.update(self.namespace)
        namespace.update(kwargs)
        exec self.compiled in namespace
        execute = namespace["_execute"]
        try:
            return execute()
        except Exception:
            formatted_code = _format_code(self.code).rstrip()
            logging.error("%s code:\n%s", self.name, formatted_code)
            raise
项目:time2go    作者:twitchyliquid64    | 项目源码 | 文件源码
def write(self, chunk):
        """Writes the given chunk to the output buffer.

        To write the output to the network, use the flush() method below.

        If the given chunk is a dictionary, we write it as JSON and set
        the Content-Type of the response to be application/json.
        (if you want to send JSON as a different Content-Type, call
        set_header *after* calling write()).

        Note that lists are not converted to JSON because of a potential
        cross-site security vulnerability.  All JSON output should be
        wrapped in a dictionary.  More details at
        http://haacked.com/archive/2008/11/20/anatomy-of-a-subtle-json-vulnerability.aspx
        """
        if self._finished:
            raise RuntimeError("Cannot write() after finish().  May be caused "
                               "by using async operations without the "
                               "@asynchronous decorator.")
        if isinstance(chunk, dict):
            chunk = escape.json_encode(chunk)
            self.set_header("Content-Type", "application/json; charset=UTF-8")
        chunk = utf8(chunk)
        self._write_buffer.append(chunk)
项目:annotated-py-tornado    作者:hhstore    | 项目源码 | 文件源码
def write(self, chunk):
        """Writes the given chunk to the output buffer.

        To write the output to the network, use the flush() method below.

        If the given chunk is a dictionary, we write it as JSON and set
        the Content-Type of the response to be ``application/json``.
        (if you want to send JSON as a different ``Content-Type``, call
        set_header *after* calling write()).

        Note that lists are not converted to JSON because of a potential
        cross-site security vulnerability.  All JSON output should be
        wrapped in a dictionary.  More details at
        http://haacked.com/archive/2008/11/20/anatomy-of-a-subtle-json-vulnerability.aspx
        """
        if self._finished:
            raise RuntimeError("Cannot write() after finish().  May be caused "
                               "by using async operations without the "
                               "@asynchronous decorator.")
        if isinstance(chunk, dict):
            chunk = escape.json_encode(chunk)
            self.set_header("Content-Type", "application/json; charset=UTF-8")
        chunk = utf8(chunk)
        self._write_buffer.append(chunk)
项目:deprecated_thedap    作者:unitedvote    | 项目源码 | 文件源码
def generate(self, **kwargs):
        """Generate this template with the given arguments."""
        namespace = {
            "escape": escape.xhtml_escape,
            "xhtml_escape": escape.xhtml_escape,
            "url_escape": escape.url_escape,
            "json_encode": escape.json_encode,
            "squeeze": escape.squeeze,
            "linkify": escape.linkify,
            "datetime": datetime,
            "_utf8": escape.utf8,  # for internal use
            "_string_types": (unicode, bytes_type),
        }
        namespace.update(self.namespace)
        namespace.update(kwargs)
        exec self.compiled in namespace
        execute = namespace["_execute"]
        try:
            return execute()
        except Exception:
            formatted_code = _format_code(self.code).rstrip()
            logging.error("%s code:\n%s", self.name, formatted_code)
            raise
项目:deprecated_thedap    作者:unitedvote    | 项目源码 | 文件源码
def write(self, chunk):
        """Writes the given chunk to the output buffer.

        To write the output to the network, use the flush() method below.

        If the given chunk is a dictionary, we write it as JSON and set
        the Content-Type of the response to be application/json.
        (if you want to send JSON as a different Content-Type, call
        set_header *after* calling write()).

        Note that lists are not converted to JSON because of a potential
        cross-site security vulnerability.  All JSON output should be
        wrapped in a dictionary.  More details at
        http://haacked.com/archive/2008/11/20/anatomy-of-a-subtle-json-vulnerability.aspx
        """
        if self._finished:
            raise RuntimeError("Cannot write() after finish().  May be caused "
                               "by using async operations without the "
                               "@asynchronous decorator.")
        if isinstance(chunk, dict):
            chunk = escape.json_encode(chunk)
            self.set_header("Content-Type", "application/json; charset=UTF-8")
        chunk = utf8(chunk)
        self._write_buffer.append(chunk)
项目:django-tornado-websockets    作者:Kocal    | 项目源码 | 文件源码
def test_on_message_when_nonexistent_event(self):
        ws_connection = yield self.ws_connect('/ws/test')

        # Test when the event is not binded
        self.assertDictEqual(self.ws.events, {'hello': ANY})

        ws_connection.write_message(json_encode({
            'event': 'bye',
            'data': {'message': 'Bye !'}
        }))

        # Throw TimeoutError and/or StopIteration, because WebSocketHandler.on_message() does not send
        # any message when the client send a nonexistent event ('bye' in our case).
        # Also, I don't know how catch them, because self.assertRaises() and try/catch don't work.
        # So I use @unittest.expectedFailure here, but it's a bit dirty imo.
        yield ws_connection.read_message()
项目:get_started_with_respeaker    作者:respeaker    | 项目源码 | 文件源码
def write(self, chunk):
        """Writes the given chunk to the output buffer.

        To write the output to the network, use the flush() method below.

        If the given chunk is a dictionary, we write it as JSON and set
        the Content-Type of the response to be ``application/json``.
        (if you want to send JSON as a different ``Content-Type``, call
        set_header *after* calling write()).

        Note that lists are not converted to JSON because of a potential
        cross-site security vulnerability.  All JSON output should be
        wrapped in a dictionary.  More details at
        http://haacked.com/archive/2008/11/20/anatomy-of-a-subtle-json-vulnerability.aspx
        """
        if self._finished:
            raise RuntimeError("Cannot write() after finish().  May be caused "
                               "by using async operations without the "
                               "@asynchronous decorator.")
        if isinstance(chunk, dict):
            chunk = escape.json_encode(chunk)
            self.set_header("Content-Type", "application/json; charset=UTF-8")
        chunk = utf8(chunk)
        self._write_buffer.append(chunk)
项目:toshi-services-lib    作者:toshiapp    | 项目源码 | 文件源码
def test_invalid_signature(self):

        body = {
            "registration_id": "1234567890"
        }

        timestamp = int(time.time())
        signature = sign_request(FAUCET_PRIVATE_KEY, "POST", "/", timestamp, json_encode(body).encode('utf-8'))

        resp = await self.fetch_signed("/", method="POST", body=body,
                                       address=TEST_ADDRESS, timestamp=timestamp, signature=signature)

        self.assertEqual(resp.code, 400, resp.body)

        # make sure query string also fails
        resp = await self.fetch("/?{}".format(
            generate_query_args(signature=signature, address=TEST_ADDRESS, timestamp=timestamp)))

        self.assertEqual(resp.code, 400, resp.body)
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def create_from_json_oembed(link=None, oembed_doc=None, thumbnail_file_path=None):
        """
        Ideally this is a link right now. Specificallly a video link.

        JSON object, thumbnail_path, and the actual url comes in, a sha1 should be created from the url and the
            file_key takes that sha1 value. Then call get_from_file with the type=link
            value set along with the thumbnail path in place.

            The resulting sourcefile should then have the data field set with the oembed doc.

            A source file should be created and returned.
        """
        sha1_key = Sourcefile.get_sha1_file_key(file_path=None, file_data=link)
        sf = Sourcefile.get_from_file(thumbnail_file_path, sha1_key, type='link')
        if sf:
            sf.data = json_encode(oembed_doc)
            sf.save()
        return sf
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def post(self, user_name):
        is_json = self.get_argument('json', None)
        user = self.get_current_user_object()

        shake_owner = User.get('name="%s"' % (user_name))
        if not shake_owner:
            raise tornado.web.HTTPError(404)

        if not user.subscribe_to_user(shake_owner):
            if is_json:
                return self.write(json_encode({'error':'error'}))
            else:
                return self.redirect('/user/%s' % (user_name))
        else:
            if is_json:
                return self.write(json_encode({'subscription_status': True}))
            else:
                return self.redirect('/user/%s' % (user_name))
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def post(self, user_name):
        is_json = self.get_argument('json', None)
        user = self.get_current_user_object()

        shake_owner = User.get('name="%s"' % (user_name))
        if not shake_owner:
            raise tornado.web.HTTPError(404)

        if not user.unsubscribe_from_user(shake_owner):
            if is_json:
                return self.write(json_encode({'error':'error'}))
            else:
                return self.redirect('/user/%s' % (user_name))
        else:
            if is_json:
                return self.write(json_encode({'subscription_status': False}))
            else:
                return self.redirect('/user/%s' % (user_name))
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def post(self, share_key, comment_id):
        shared_file = models.Sharedfile.get_by_share_key(share_key)
        user = self.get_current_user_object()
        comment = Comment.get("id=%s", comment_id)

        if not shared_file or not comment:
            raise tornado.web.HTTPError(404)

        existing_comment_like = models.CommentLike.get("comment_id = %s and user_id = %s",
                comment.id, user.id)

        if existing_comment_like:
            existing_comment_like.deleted = 1
            existing_comment_like.save()

        json = self.get_argument("json", False)
        if json:
            self.set_header("Cache-Control","no-store, no-cache, must-revalidate");
            self.set_header("Pragma","no-cache");
            self.set_header("Expires", 0);
            count = models.CommentLike.where_count("comment_id = %s", comment.id)
            return self.write(json_encode({'response':'ok', 'count': count, 'like' : True }))
        else:
            return self.redirect("/p/%s?salty" % (share_key,))
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def get(self, shake_name):
        shake = Shake.get("name=%s and deleted=0", shake_name)
        if not shake:
            raise tornado.web.HTTPError(404)

        value = {
            'title' : escape.xhtml_escape(shake.title) if shake.title else '',
            'title_raw' : shake.title if shake.title else '',
            'description' : escape.xhtml_escape(shake.description) if shake.description else '',
            'description_raw' : shake.description if shake.description else ''
        }
        # prevents IE from caching ajax requests.
        self.set_header("Cache-Control","no-store, no-cache, must-revalidate"); 
        self.set_header("Pragma","no-cache");
        self.set_header("Expires", 0);
        return self.write(escape.json_encode(value))
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def post(self, shake_id):
        is_json = self.get_argument('json', None)
        user = self.get_current_user_object()

        shake = Shake.get('id=%s and deleted=0', shake_id)
        if not shake:
            if is_json:
                return self.write(json_encode({'error':'Shake not found.'}))
            else:
                return self.redirect(shake.path())

        if not user.subscribe(shake):
            if is_json:
                return self.write(json_encode({'error':'error'}))
            else:
                return self.redirect(shake.path())
        else:
            if is_json:
                return self.write(json_encode({'subscription_status': True}))
            else:
                return self.redirect(shake.path())
项目:mltshp    作者:MLTSHP    | 项目源码 | 文件源码
def post(self, shake_id):
        is_json = self.get_argument('json', None)
        user = self.get_current_user_object()

        shake = Shake.get('id=%s and deleted=0', shake_id)
        if not shake:
            if is_json:
                return self.write({'error':'Shake not found.'})
            else:
                return self.redirect(shake.path())

        if not user.unsubscribe(shake):
            if is_json:
                return self.write({'error':'error'})
            else:
                return self.redirect(shake.path())
        else:
            if is_json:
                return self.write(json_encode({'subscription_status': False}))
            else:
                return self.redirect(shake.path())
项目:teleport    作者:eomsoft    | 项目源码 | 文件源码
def write_json(self, code, message='', data=None):
        if self._mode != self.MODE_JSON:
            self.write('should be json request.')
            self.finish()
            return

        if not isinstance(code, int):
            raise RuntimeError('`code` must be a integer.')
        if not isinstance(message, str):
            raise RuntimeError('`msg` must be a string.')

        if data is None:
            data = list()

        _ret = {'code': code, 'message': message, 'data': data}

        self.set_header("Content-Type", "application/json")
        self.write(json_encode(_ret))
        self.finish()
项目:doubao_service    作者:roboyun    | 项目源码 | 文件源码
def update_user_info(self,mdn,params):


        sql = '''select id from user where mdn = '%s' '''%mdn
        user_id = self.mysql_read.query(sql)[0]['id']

        sql = '''select id,info_json from user_info where user_id = %s ''' %user_id
        result = self.mysql_read.query(sql)

        if result:
            info_json = json_decode(result[0]['info_json'])
            info_json.update(params)
            sql = '''update user_info set info_json='%s',update_time=now() where id=%s ''' %(json_encode(info_json),result[0]['id'])

            self.mysql_write.execute(sql)
        else:
            sql = '''insert into user_info (user_id,info_json,update_time) values(%s,'%s',now()) ''' %(user_id,json_encode(params))

            self.mysql_write.execute(sql)
项目:toshi-ethereum-service    作者:toshiapp    | 项目源码 | 文件源码
def test_invalid_signature_in_pn_registration(self):

        body = {
            "registration_id": TEST_APN_ID,
        }

        timestamp = int(time.time())
        signature = sign_request(FAUCET_PRIVATE_KEY, "POST", "/v1/apn/register", timestamp, json_encode(body).encode('utf-8'))

        resp = await self.fetch_signed("/apn/register", method="POST", body=body,
                                       signature=signature, address=TEST_ADDRESS, timestamp=timestamp)

        self.assertEqual(resp.code, 400, resp.body)

        async with self.pool.acquire() as con:

            rows1 = await con.fetch("SELECT * FROM notification_registrations WHERE toshi_id = $1", TEST_ADDRESS)
            rows2 = await con.fetch("SELECT * FROM notification_registrations WHERE toshi_id = $1", FAUCET_ADDRESS)

        self.assertEqual(len(rows1), 0)
        self.assertEqual(len(rows2), 0)
项目:toshi-ethereum-service    作者:toshiapp    | 项目源码 | 文件源码
def test_invalid_signature_in_deregistration(self):

        async with self.pool.acquire() as con:

            await con.fetchrow("INSERT INTO notification_registrations VALUES ($2, 'apn', $1, $2)",
                               TEST_APN_ID, TEST_ADDRESS)

        body = {
            "registration_id": TEST_APN_ID
        }
        timestamp = int(time.time())
        signature = sign_request(FAUCET_PRIVATE_KEY, "POST", "/v1/apn/deregister", timestamp, json_encode(body).encode('utf-8'))

        resp = await self.fetch_signed("/apn/deregister", method="POST", body=body, timestamp=timestamp, signature=signature, address=TEST_ADDRESS)

        self.assertEqual(resp.code, 400, resp.body)

        async with self.pool.acquire() as con:

            rows = await con.fetch("SELECT * FROM notification_registrations WHERE toshi_id = $1", TEST_ADDRESS)

        self.assertEqual(len(rows), 1)
项目:wtee    作者:gvalkov    | 项目源码 | 文件源码
def on_stdin(self, fd, events):
        data = fd.read()
        lines = data.splitlines(True)

        # TODO: Use the value of '--input-encoding' here.
        decoded_data = data.decode('utf8', errors='replace')
        lines = decoded_data.splitlines(True)
        if lines:
            if not lines[-1].endswith('\n'):
                self.last_line.append(lines[-1])
                lines = lines[:-1]
            else:
                if self.last_line:
                    lines[0] = ''.join(self.last_line) + lines[0]
                    self.last_line = []

        self.broadcast(self.clients, escape.json_encode(lines))
        self.stdout_buffer.write(data)
        self.stdout_buffer.flush()

        # TODO: Empty string and None mean different things with os.O_NONBLOCK.
        if not data:
            log.debug('stdin closed')
            self.stdin_closed = True
            self.disable_stdin_handler()
项目:enterprise_gateway    作者:jupyter-incubator    | 项目源码 | 文件源码
def test_kernel_env(self):
        """Kernel should start with environment vars defined in the request."""
        self.app.personality.env_whitelist = ['TEST_VAR']
        kernel_body = json.dumps({
            'name': 'python',
            'env': {
                'KERNEL_FOO': 'kernel-foo-value',
                'NOT_KERNEL': 'ignored',
                'KERNEL_GATEWAY': 'overridden',
                'TEST_VAR': 'allowed'
            }
        })
        ws = yield self.spawn_kernel(kernel_body)
        req = self.execute_request('import os; print(os.getenv("KERNEL_FOO"), os.getenv("NOT_KERNEL"), os.getenv("KERNEL_GATEWAY"), os.getenv("TEST_VAR"))')
        ws.write_message(json_encode(req))
        content = yield self.await_stream(ws)
        self.assertEqual(content['name'], 'stdout')
        self.assertIn('kernel-foo-value', content['text'])
        self.assertNotIn('ignored', content['text'])
        self.assertNotIn('overridden', content['text'])
        self.assertIn('allowed', content['text'])

        ws.close()
项目:enterprise_gateway    作者:jupyter-incubator    | 项目源码 | 文件源码
def test_seed_language_support(self):
        """Kernel should have variables preseeded from notebook."""
        ws = yield self.spawn_kernel()

        if sys.version_info.major == 2:
            code = 'print this.s'
        else:
            code = 'print(this.s)'

        # Print the encoded "zen of python" string, the kernel should have
        # it imported
        req = self.execute_request(code)
        ws.write_message(json_encode(req))
        content = yield self.await_stream(ws)
        self.assertEqual(content['name'], 'stdout')
        self.assertIn('Gur Mra bs Clguba', content['text'])

        ws.close()
项目:react-tornado-graphql-example    作者:yatsu    | 项目源码 | 文件源码
def handle_graqhql(self):
        result = self.execute_graphql()
        app_log.debug('GraphQL result data: %s errors: %s invalid %s',
                      result.data, result.errors, result.invalid)
        if result and result.invalid:
            ex = ExecutionError(errors=result.errors)
            app_log.warn('GraphQL Error: %s', ex)
            raise ex

        response = {'data': result.data}
        self.write(json_encode(response))
项目:react-tornado-graphql-example    作者:yatsu    | 项目源码 | 文件源码
def on_subscribe(self, subid, data):
        query = data.get('query')
        op_name = self._get_op_name(query)
        app_log.info('subscrption start: subid=%s query=%s op_name=%s',
                     subid, query, op_name)
        if op_name in self.subscriptions:
            del self.subscriptions[op_name]
        self.subscriptions[op_name] = subid
        app_log.debug('subscriptions: %s', self.subscriptions)
        self.write_message(json_encode({
            'type': 'subscription_success',
            'id': subid
        }))
项目:react-tornado-graphql-example    作者:yatsu    | 项目源码 | 文件源码
def response_handler(self, msg):
        ident, resp_bytes = msg
        resp = json_decode(to_unicode(resp_bytes))
        app_log.debug('resp: %s', resp)

        subid = self.subscriptions.get('commandExecute')
        if subid is not None:
            self.write_message(json_encode({
                'type': 'subscription_data',
                'id': subid,
                'payload': {
                    'data': resp
                }
            }))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def generate(self, **kwargs):
        """Generate this template with the given arguments."""
        namespace = {
            "escape": escape.xhtml_escape,
            "xhtml_escape": escape.xhtml_escape,
            "url_escape": escape.url_escape,
            "json_encode": escape.json_encode,
            "squeeze": escape.squeeze,
            "linkify": escape.linkify,
            "datetime": datetime,
            "_tt_utf8": escape.utf8,  # for internal use
            "_tt_string_types": (unicode_type, bytes),
            # __name__ and __loader__ allow the traceback mechanism to find
            # the generated source code.
            "__name__": self.name.replace('.', '_'),
            "__loader__": ObjectDict(get_source=lambda name: self.code),
        }
        namespace.update(self.namespace)
        namespace.update(kwargs)
        exec_in(self.compiled, namespace)
        execute = namespace["_tt_execute"]
        # Clear the traceback module's cache of source data now that
        # we've generated a new template (mainly for this module's
        # unittests, where different tests reuse the same name).
        linecache.clearcache()
        return execute()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def finish(self):
            response_body = utf8(json_encode(self.chunk_lengths))
            self.connection.write_headers(
                ResponseStartLine('HTTP/1.1', 200, 'OK'),
                HTTPHeaders({'Content-Length': str(len(response_body))}))
            self.connection.write(response_body)
            self.connection.finish()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_json_encode(self):
        # json deals with strings, not bytes.  On python 2 byte strings will
        # convert automatically if they are utf8; on python 3 byte strings
        # are not allowed.
        self.assertEqual(json_decode(json_encode(u("\u00e9"))), u("\u00e9"))
        if bytes is str:
            self.assertEqual(json_decode(json_encode(utf8(u("\u00e9")))), u("\u00e9"))
            self.assertRaises(UnicodeDecodeError, json_encode, b"\xe9")
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def write(self, chunk):
        """Writes the given chunk to the output buffer.

        To write the output to the network, use the flush() method below.

        If the given chunk is a dictionary, we write it as JSON and set
        the Content-Type of the response to be ``application/json``.
        (if you want to send JSON as a different ``Content-Type``, call
        set_header *after* calling write()).

        Note that lists are not converted to JSON because of a potential
        cross-site security vulnerability.  All JSON output should be
        wrapped in a dictionary.  More details at
        http://haacked.com/archive/2009/06/25/json-hijacking.aspx/ and
        https://github.com/facebook/tornado/issues/1009
        """
        if self._finished:
            raise RuntimeError("Cannot write() after finish()")
        if not isinstance(chunk, (bytes, unicode_type, dict)):
            message = "write() only accepts bytes, unicode, and dict objects"
            if isinstance(chunk, list):
                message += ". Lists not accepted for security reasons; see http://www.tornadoweb.org/en/stable/web.html#tornado.web.RequestHandler.write"
            raise TypeError(message)
        if isinstance(chunk, dict):
            chunk = escape.json_encode(chunk)
            self.set_header("Content-Type", "application/json; charset=UTF-8")
        chunk = utf8(chunk)
        self._write_buffer.append(chunk)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def generate(self, **kwargs):
        """Generate this template with the given arguments."""
        namespace = {
            "escape": escape.xhtml_escape,
            "xhtml_escape": escape.xhtml_escape,
            "url_escape": escape.url_escape,
            "json_encode": escape.json_encode,
            "squeeze": escape.squeeze,
            "linkify": escape.linkify,
            "datetime": datetime,
            "_tt_utf8": escape.utf8,  # for internal use
            "_tt_string_types": (unicode_type, bytes),
            # __name__ and __loader__ allow the traceback mechanism to find
            # the generated source code.
            "__name__": self.name.replace('.', '_'),
            "__loader__": ObjectDict(get_source=lambda name: self.code),
        }
        namespace.update(self.namespace)
        namespace.update(kwargs)
        exec_in(self.compiled, namespace)
        execute = namespace["_tt_execute"]
        # Clear the traceback module's cache of source data now that
        # we've generated a new template (mainly for this module's
        # unittests, where different tests reuse the same name).
        linecache.clearcache()
        return execute()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_json_encode(self):
        # json deals with strings, not bytes.  On python 2 byte strings will
        # convert automatically if they are utf8; on python 3 byte strings
        # are not allowed.
        self.assertEqual(json_decode(json_encode(u("\u00e9"))), u("\u00e9"))
        if bytes is str:
            self.assertEqual(json_decode(json_encode(utf8(u("\u00e9")))), u("\u00e9"))
            self.assertRaises(UnicodeDecodeError, json_encode, b"\xe9")
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def write(self, chunk):
        """Writes the given chunk to the output buffer.

        To write the output to the network, use the flush() method below.

        If the given chunk is a dictionary, we write it as JSON and set
        the Content-Type of the response to be ``application/json``.
        (if you want to send JSON as a different ``Content-Type``, call
        set_header *after* calling write()).

        Note that lists are not converted to JSON because of a potential
        cross-site security vulnerability.  All JSON output should be
        wrapped in a dictionary.  More details at
        http://haacked.com/archive/2009/06/25/json-hijacking.aspx/ and
        https://github.com/facebook/tornado/issues/1009
        """
        if self._finished:
            raise RuntimeError("Cannot write() after finish()")
        if not isinstance(chunk, (bytes, unicode_type, dict)):
            message = "write() only accepts bytes, unicode, and dict objects"
            if isinstance(chunk, list):
                message += ". Lists not accepted for security reasons; see http://www.tornadoweb.org/en/stable/web.html#tornado.web.RequestHandler.write"
            raise TypeError(message)
        if isinstance(chunk, dict):
            chunk = escape.json_encode(chunk)
            self.set_header("Content-Type", "application/json; charset=UTF-8")
        chunk = utf8(chunk)
        self._write_buffer.append(chunk)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def generate(self, **kwargs):
        """Generate this template with the given arguments."""
        namespace = {
            "escape": escape.xhtml_escape,
            "xhtml_escape": escape.xhtml_escape,
            "url_escape": escape.url_escape,
            "json_encode": escape.json_encode,
            "squeeze": escape.squeeze,
            "linkify": escape.linkify,
            "datetime": datetime,
            "_tt_utf8": escape.utf8,  # for internal use
            "_tt_string_types": (unicode_type, bytes),
            # __name__ and __loader__ allow the traceback mechanism to find
            # the generated source code.
            "__name__": self.name.replace('.', '_'),
            "__loader__": ObjectDict(get_source=lambda name: self.code),
        }
        namespace.update(self.namespace)
        namespace.update(kwargs)
        exec_in(self.compiled, namespace)
        execute = namespace["_tt_execute"]
        # Clear the traceback module's cache of source data now that
        # we've generated a new template (mainly for this module's
        # unittests, where different tests reuse the same name).
        linecache.clearcache()
        return execute()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def finish(self):
            response_body = utf8(json_encode(self.chunk_lengths))
            self.connection.write_headers(
                ResponseStartLine('HTTP/1.1', 200, 'OK'),
                HTTPHeaders({'Content-Length': str(len(response_body))}))
            self.connection.write(response_body)
            self.connection.finish()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_json_encode(self):
        # json deals with strings, not bytes.  On python 2 byte strings will
        # convert automatically if they are utf8; on python 3 byte strings
        # are not allowed.
        self.assertEqual(json_decode(json_encode(u("\u00e9"))), u("\u00e9"))
        if bytes is str:
            self.assertEqual(json_decode(json_encode(utf8(u("\u00e9")))), u("\u00e9"))
            self.assertRaises(UnicodeDecodeError, json_encode, b"\xe9")
项目:gelyung    作者:denglj    | 项目源码 | 文件源码
def finish(self, chunk=None):
        if chunk is not None:
            chunk = json_encode(chunk)
        super(RESTfulHandler, self).finish(chunk)
项目:globibot    作者:best-coloc-ever    | 项目源码 | 文件源码
def respond_json(method):

    @wraps(method)
    def call(self, *args, **kwargs):
        data = method(self, *args, **kwargs)

        if data:
            self.set_header('Content-Type', 'application/json')
            self.write(json_encode(data))

    return call
项目:globibot    作者:best-coloc-ever    | 项目源码 | 文件源码
def respond_json_async(method):

    @wraps(method)
    async def call(self, *args, **kwargs):
        future = method(self, *args, **kwargs)

        if future:
            data = await future

            if data:
                self.set_header('Content-Type', 'application/json')
                self.write(json_encode(data))

    return call
项目:mopidy-jukebox    作者:palfrey    | 项目源码 | 文件源码
def get(self):
        with self._connect() as connection:
            items = sources(connection)
            self.write(json_encode(items))
项目:mopidy-jukebox    作者:palfrey    | 项目源码 | 文件源码
def get(self, url):
        with self._connect() as connection:
            try:
                item = source(connection, url)
                self.write(json_encode(item))
            except IndexError:
                self.set_status(404, "Can't find '%s'" % url)
项目:mopidy-jukebox    作者:palfrey    | 项目源码 | 文件源码
def get(self):
        with self._connect() as connection:
            items = sources(connection)
            self.write(json_encode(items))
项目:mopidy-jukebox    作者:palfrey    | 项目源码 | 文件源码
def get(self, url):
        with self._connect() as connection:
            try:
                item = source(connection, url)
                self.write(json_encode(item))
            except IndexError:
                self.set_status(404, "Can't find '%s'" % url)
项目:wxnotify    作者:mxgnene01    | 项目源码 | 文件源码
def reply(self, data):
        self.set_header('Content-Type', 'application/json; charset=UTF-8')
        self.finish(json_encode(data))
项目:simple-environment-monitor-system    作者:diegorubin    | 项目源码 | 文件源码
def test_create(self):
        body = json_encode({
            'type': 'TextMonitor', 'label': 'server',
            'url': SERVICE_URL,
            'data': {'expected': 'LIVE'}
        })
        response = self.fetch('/api/monitors', method='POST', body=body)
        self.assertEqual(response.code, 200)
        self.assertEqual(len(Monitor().all()), 1)