Python tornado.escape 模块,json_decode() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用tornado.escape.json_decode()

项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_decode_argument(self):
        # These urls all decode to the same thing
        urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
                "/decode_arg/%E9?foo=%E9&encoding=latin1",
                "/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
                ]
        for req_url in urls:
            response = self.fetch(req_url)
            response.rethrow()
            data = json_decode(response.body)
            self.assertEqual(data, {u('path'): [u('unicode'), u('\u00e9')],
                                    u('query'): [u('unicode'), u('\u00e9')],
                                    })

        response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
        response.rethrow()
        data = json_decode(response.body)
        self.assertEqual(data, {u('path'): [u('bytes'), u('c3a9')],
                                u('query'): [u('bytes'), u('c3a9')],
                                })
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_multipart_form(self):
        # Encodings here are tricky:  Headers are latin1, bodies can be
        # anything (we use utf8 by default).
        response = self.raw_fetch([
            b"POST /multipart HTTP/1.0",
            b"Content-Type: multipart/form-data; boundary=1234567890",
            b"X-Header-encoding-test: \xe9",
        ],
            b"\r\n".join([
                b"Content-Disposition: form-data; name=argument",
                b"",
                u("\u00e1").encode("utf-8"),
                b"--1234567890",
                u('Content-Disposition: form-data; name="files"; filename="\u00f3"').encode("utf8"),
                b"",
                u("\u00fa").encode("utf-8"),
                b"--1234567890--",
                b"",
            ]))
        data = json_decode(response)
        self.assertEqual(u("\u00e9"), data["header"])
        self.assertEqual(u("\u00e1"), data["argument"])
        self.assertEqual(u("\u00f3"), data["filename"])
        self.assertEqual(u("\u00fa"), data["filebody"])
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_chunked_request_body(self):
        # Chunked requests are not widely supported and we don't have a way
        # to generate them in AsyncHTTPClient, but HTTPServer will read them.
        self.stream.write(b"""\
POST /echo HTTP/1.1
Transfer-Encoding: chunked
Content-Type: application/x-www-form-urlencoded

4
foo=
3
bar
0

""".replace(b"\n", b"\r\n"))
        read_stream_body(self.stream, self.stop)
        headers, response = self.wait()
        self.assertEqual(json_decode(response), {u('foo'): [u('bar')]})
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_decode_argument(self):
        # These urls all decode to the same thing
        urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
                "/decode_arg/%E9?foo=%E9&encoding=latin1",
                "/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
                ]
        for req_url in urls:
            response = self.fetch(req_url)
            response.rethrow()
            data = json_decode(response.body)
            self.assertEqual(data, {u('path'): [u('unicode'), u('\u00e9')],
                                    u('query'): [u('unicode'), u('\u00e9')],
                                    })

        response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
        response.rethrow()
        data = json_decode(response.body)
        self.assertEqual(data, {u('path'): [u('bytes'), u('c3a9')],
                                u('query'): [u('bytes'), u('c3a9')],
                                })
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_get_argument(self):
        response = self.fetch("/get_argument?foo=bar")
        self.assertEqual(response.body, b"bar")
        response = self.fetch("/get_argument?foo=")
        self.assertEqual(response.body, b"")
        response = self.fetch("/get_argument")
        self.assertEqual(response.body, b"default")

        # Test merging of query and body arguments.
        # In singular form, body arguments take precedence over query arguments.
        body = urllib_parse.urlencode(dict(foo="hello"))
        response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
        self.assertEqual(response.body, b"hello")
        # In plural methods they are merged.
        response = self.fetch("/get_arguments?foo=bar",
                              method="POST", body=body)
        self.assertEqual(json_decode(response.body),
                         dict(default=['bar', 'hello'],
                              query=['bar'],
                              body=['hello']))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_decode_argument(self):
        # These urls all decode to the same thing
        urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
                "/decode_arg/%E9?foo=%E9&encoding=latin1",
                "/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
                ]
        for req_url in urls:
            response = self.fetch(req_url)
            response.rethrow()
            data = json_decode(response.body)
            self.assertEqual(data, {u('path'): [u('unicode'), u('\u00e9')],
                                    u('query'): [u('unicode'), u('\u00e9')],
                                    })

        response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
        response.rethrow()
        data = json_decode(response.body)
        self.assertEqual(data, {u('path'): [u('bytes'), u('c3a9')],
                                u('query'): [u('bytes'), u('c3a9')],
                                })
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_get_argument(self):
        response = self.fetch("/get_argument?foo=bar")
        self.assertEqual(response.body, b"bar")
        response = self.fetch("/get_argument?foo=")
        self.assertEqual(response.body, b"")
        response = self.fetch("/get_argument")
        self.assertEqual(response.body, b"default")

        # Test merging of query and body arguments.
        # In singular form, body arguments take precedence over query arguments.
        body = urllib_parse.urlencode(dict(foo="hello"))
        response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
        self.assertEqual(response.body, b"hello")
        # In plural methods they are merged.
        response = self.fetch("/get_arguments?foo=bar",
                              method="POST", body=body)
        self.assertEqual(json_decode(response.body),
                         dict(default=['bar', 'hello'],
                              query=['bar'],
                              body=['hello']))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_multipart_form(self):
        # Encodings here are tricky:  Headers are latin1, bodies can be
        # anything (we use utf8 by default).
        response = self.raw_fetch([
            b"POST /multipart HTTP/1.0",
            b"Content-Type: multipart/form-data; boundary=1234567890",
            b"X-Header-encoding-test: \xe9",
        ],
            b"\r\n".join([
                b"Content-Disposition: form-data; name=argument",
                b"",
                u("\u00e1").encode("utf-8"),
                b"--1234567890",
                u('Content-Disposition: form-data; name="files"; filename="\u00f3"').encode("utf8"),
                b"",
                u("\u00fa").encode("utf-8"),
                b"--1234567890--",
                b"",
            ]))
        data = json_decode(response)
        self.assertEqual(u("\u00e9"), data["header"])
        self.assertEqual(u("\u00e1"), data["argument"])
        self.assertEqual(u("\u00f3"), data["filename"])
        self.assertEqual(u("\u00fa"), data["filebody"])
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_chunked_request_body(self):
        # Chunked requests are not widely supported and we don't have a way
        # to generate them in AsyncHTTPClient, but HTTPServer will read them.
        self.stream.write(b"""\
POST /echo HTTP/1.1
Transfer-Encoding: chunked
Content-Type: application/x-www-form-urlencoded

4
foo=
3
bar
0

""".replace(b"\n", b"\r\n"))
        read_stream_body(self.stream, self.stop)
        headers, response = self.wait()
        self.assertEqual(json_decode(response), {u('foo'): [u('bar')]})
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_get_argument(self):
        response = self.fetch("/get_argument?foo=bar")
        self.assertEqual(response.body, b"bar")
        response = self.fetch("/get_argument?foo=")
        self.assertEqual(response.body, b"")
        response = self.fetch("/get_argument")
        self.assertEqual(response.body, b"default")

        # Test merging of query and body arguments.
        # In singular form, body arguments take precedence over query arguments.
        body = urllib_parse.urlencode(dict(foo="hello"))
        response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
        self.assertEqual(response.body, b"hello")
        # In plural methods they are merged.
        response = self.fetch("/get_arguments?foo=bar",
                              method="POST", body=body)
        self.assertEqual(json_decode(response.body),
                         dict(default=['bar', 'hello'],
                              query=['bar'],
                              body=['hello']))
项目:toshi-id-service    作者:toshiapp    | 项目源码 | 文件源码
def test_username_query(self):

        username = "bobsmith"
        positive_query = 'bobsm'
        negative_query = 'nancy'

        async with self.pool.acquire() as con:
            await con.execute("INSERT INTO users (username, toshi_id) VALUES ($1, $2)", username, TEST_ADDRESS)

        resp = await self.fetch("/search/user?query={}".format(positive_query), method="GET")
        self.assertEqual(resp.code, 200)
        body = json_decode(resp.body)
        self.assertEqual(len(body['results']), 1)

        # ensure we got a tracking event
        self.assertEqual((await self.next_tracking_event())[0], None)

        resp = await self.fetch("/search/user?query={}".format(negative_query), method="GET")
        self.assertEqual(resp.code, 200)
        body = json_decode(resp.body)
        self.assertEqual(len(body['results']), 0)

        # ensure we got a tracking event
        self.assertEqual((await self.next_tracking_event())[0], None)
项目:toshi-id-service    作者:toshiapp    | 项目源码 | 文件源码
def test_username_query_sql_inject_attampt(self):

        username = "bobsmith"
        inject_attempt = quote_plus("x'; delete from users; select * from users")

        async with self.pool.acquire() as con:
            await con.execute("INSERT INTO users (username, toshi_id) VALUES ($1, $2)", username, TEST_ADDRESS)

        resp = await self.fetch("/search/user?query={}".format(inject_attempt), method="GET")
        self.assertEqual(resp.code, 200)
        body = json_decode(resp.body)
        self.assertEqual(len(body['results']), 0)

        async with self.pool.acquire() as con:
            row = await con.fetchrow("SELECT COUNT(*) AS count FROM users")

        self.assertEqual(row['count'], 1)
项目:toshi-id-service    作者:toshiapp    | 项目源码 | 文件源码
def test_underscore_username_query(self):

        async with self.pool.acquire() as con:
            await con.execute("INSERT INTO users (username, name, toshi_id) VALUES ($1, $2, $3)",
                              "wager_weight", "Wager Weight", "0x0000000000000000000000000000000000000001")
            await con.execute("INSERT INTO users (username, name, toshi_id) VALUES ($1, $2, $3)",
                              "bob_smith", "Robert", "0x0000000000000000000000000000000000000002")
            await con.execute("INSERT INTO users (username, name, toshi_id) VALUES ($1, $2, $3)",
                              "bob_jack", "Jackie", "0x0000000000000000000000000000000000000003")
            await con.execute("INSERT INTO users (username, name, toshi_id) VALUES ($1, $2, $3)",
                              "user1234", "user1234", "0x0000000000000000000000000000000000000004")

        for positive_query in ["wager", "wager_we", "wager_weight", "bob_smi"]:

            resp = await self.fetch("/search/user?query={}".format(positive_query), method="GET")
            self.assertEqual(resp.code, 200)
            body = json_decode(resp.body)
            self.assertEqual(len(body['results']), 1, "Failed to get match for search query: {}".format(positive_query))

        for negative_query in ["wager_foo", "bob_bar", "1234", "bobsmith"]:

            resp = await self.fetch("/search/user?query={}".format(negative_query), method="GET")
            self.assertEqual(resp.code, 200)
            body = json_decode(resp.body)
            self.assertEqual(len(body['results']), 0, "got unexpected match for search query: {}".format(negative_query))
项目:toshi-id-service    作者:toshiapp    | 项目源码 | 文件源码
def test_inactive_username_query(self):

        username = "bobsmith"
        positive_query = 'bobsm'

        async with self.pool.acquire() as con:
            await con.execute("INSERT INTO users (username, toshi_id, active) VALUES ($1, $2, false)", username, TEST_ADDRESS)

        resp = await self.fetch("/search/user?query={}".format(positive_query), method="GET")
        self.assertEqual(resp.code, 200)
        body = json_decode(resp.body)
        self.assertEqual(len(body['results']), 0)

        resp = await self.fetch_signed("/user", signing_key=TEST_PRIVATE_KEY, method="PUT", body={
            "payment_address": TEST_ADDRESS
        })
        self.assertResponseCodeEqual(resp, 200)

        resp = await self.fetch("/search/user?query={}".format(positive_query), method="GET")
        self.assertEqual(resp.code, 200)
        body = json_decode(resp.body)
        self.assertEqual(len(body['results']), 1)
项目:toshi-id-service    作者:toshiapp    | 项目源码 | 文件源码
def test_create_user(self):

        resp = await self.fetch("/timestamp")
        self.assertEqual(resp.code, 200)

        resp = await self.fetch_signed("/user", signing_key=TEST_PRIVATE_KEY, method="POST",
                                       body={'payment_address': TEST_PAYMENT_ADDRESS})

        self.assertResponseCodeEqual(resp, 200)

        body = json_decode(resp.body)

        self.assertEqual(body['toshi_id'], TEST_ADDRESS)

        async with self.pool.acquire() as con:
            row = await con.fetchrow("SELECT * FROM users WHERE toshi_id = $1", TEST_ADDRESS)

        self.assertIsNotNone(row)
        self.assertFalse(row['is_app'])

        self.assertIsNotNone(row['username'])

        # ensure we got a tracking event
        self.assertEqual((await self.next_tracking_event())[0], encode_id(TEST_ADDRESS))
项目:toshi-id-service    作者:toshiapp    | 项目源码 | 文件源码
def test_create_app_user(self):

        resp = await self.fetch("/timestamp")
        self.assertEqual(resp.code, 200)

        resp = await self.fetch_signed("/user", signing_key=TEST_PRIVATE_KEY, method="POST",
                                       body={"is_app": True, 'payment_address': TEST_PAYMENT_ADDRESS})

        self.assertResponseCodeEqual(resp, 200)

        body = json_decode(resp.body)

        self.assertEqual(body['toshi_id'], TEST_ADDRESS)

        async with self.pool.acquire() as con:
            row = await con.fetchrow("SELECT * FROM users WHERE toshi_id = $1", TEST_ADDRESS)

        self.assertIsNotNone(row)
        self.assertTrue(row['is_app'])

        # ensure we got a tracking event
        self.assertEqual((await self.next_tracking_event())[0], encode_id(TEST_ADDRESS))
项目:toshi-id-service    作者:toshiapp    | 项目源码 | 文件源码
def test_set_unknown_categories_fails(self):

        username = "toshibot"
        name = "ToshiBot"

        categories = await self.setup_categories()

        async with self.pool.acquire() as con:
            await con.execute("INSERT INTO users (username, toshi_id, name, is_app, is_public) VALUES ($1, $2, $3, true, true)",
                              username, TEST_ADDRESS, name)

        resp = await self.fetch_signed("/user", signing_key=TEST_PRIVATE_KEY, method="PUT", body={
            "categories": [categories[-1][0] + 10, 'badcat']
        })
        self.assertResponseCodeEqual(resp, 400)

        resp = await self.fetch("/user/{}".format(TEST_ADDRESS))
        self.assertResponseCodeEqual(resp, 200)
        body = json_decode(resp.body)
        self.assertIn("categories", body)
        self.assertEqual(len(body['categories']), 0)
项目:toshi-id-service    作者:toshiapp    | 项目源码 | 文件源码
def test_app_underscore_username_query(self):

        async with self.pool.acquire() as con:
            await con.execute("INSERT INTO users (username, name, toshi_id, is_app, is_public) VALUES ($1, $2, $3, true, true)",
                              "wager_weight", "Wager Weight", "0x0000000000000000000000000000000000000001")
            await con.execute("INSERT INTO users (username, name, toshi_id, is_app, is_public) VALUES ($1, $2, $3, true, true)",
                              "bob_smith", "Robert", "0x0000000000000000000000000000000000000002")
            await con.execute("INSERT INTO users (username, name, toshi_id, is_app, is_public) VALUES ($1, $2, $3, true, true)",
                              "bob_jack", "Jackie", "0x0000000000000000000000000000000000000003")
            await con.execute("INSERT INTO users (username, name, toshi_id, is_app, is_public) VALUES ($1, $2, $3, true, true)",
                              "user1234", "user1234", "0x0000000000000000000000000000000000000004")

        for positive_query in ["wager", "wager_we", "wager_weight", "bob_smi"]:

            resp = await self.fetch("/search/apps?query={}".format(positive_query), method="GET")
            self.assertEqual(resp.code, 200)
            body = json_decode(resp.body)
            self.assertEqual(len(body['results']), 1, "Failed to get match for search query: {}".format(positive_query))

        for negative_query in ["wager_foo", "bob_bar", "1234", "bobsmith"]:

            resp = await self.fetch("/search/apps?query={}".format(negative_query), method="GET")
            self.assertEqual(resp.code, 200)
            body = json_decode(resp.body)
            self.assertEqual(len(body['results']), 0, "got unexpected match for search query: {}".format(negative_query))
项目:globibot    作者:best-coloc-ever    | 项目源码 | 文件源码
def fetch_definitions(self, term):
        url = url_concat(UrbanDictionary.DEFINE_URL, dict(term=term))
        request = HTTPRequest(
            url     = url,
            headers = {
                'X-Mashape-Key': self.api_key,
                'Accept'       : 'text/plain'
            }
        )
        tornado_future = self.client.fetch(request)
        future = to_asyncio_future(tornado_future)
        response = await future

        data = json_decode(response.body)

        return data['list']
项目:globibot    作者:best-coloc-ever    | 项目源码 | 文件源码
def access_token(self, code, state):
        client = AsyncHTTPClient()

        payload = (
            ('client_id', self.client_id),
            ('client_secret', self.client_secret),
            ('grant_type', 'authorization_code'),
            ('redirect_uri', Twitch.REDIRECT_URI),
            ('code', code),
            ('state', state),
        )

        url = Twitch.TOKEN_URL
        request = HTTPRequest(
            url = url,
            method = 'POST',
            body = urlencode(payload)
        )
        tornado_future = client.fetch(request)
        future = to_asyncio_future(tornado_future)
        response = await future

        data = json_decode(response.body)
        return data['access_token']
项目:toshi-directory-service    作者:toshiapp    | 项目源码 | 文件源码
def test_username_query(self):
        username = "TokenBot"
        positive_query = 'enb'
        negative_query = 'TickleFight'

        async with self.pool.acquire() as con:
            await con.execute("INSERT INTO apps (name, token_id) VALUES ($1, $2)", username, TEST_ADDRESS)
            await con.execute("INSERT INTO sofa_manifests (token_id, payment_address) VALUES ($1, $2)", TEST_ADDRESS, TEST_ADDRESS)

        resp = await self.fetch("/search/apps?query={}".format(positive_query), method="GET")
        self.assertEqual(resp.code, 200)
        body = json_decode(resp.body)
        self.assertEqual(len(body['apps']), 1)

        resp = await self.fetch("/search/apps?query={}".format(negative_query), method="GET")
        self.assertEqual(resp.code, 200)
        body = json_decode(resp.body)
        self.assertEqual(len(body['apps']), 0)
项目:django-gateone    作者:jimmy201602    | 项目源码 | 文件源码
def save_term_settings(term, location, session, settings):
    """
    Saves the *settings* associated with the given *term*, *location*, and
    *session* in the 'term_settings.json' file inside the user's session
    directory.

    When complete the given *callback* will be called (if given).
    """
    if not session:
        return # Just a viewer of a broadcast terminal
    term = str(term) # JSON wants strings as keys
    term_settings = RUDict()
    term_settings[location] = {term: settings}
    session_dir = os.path.join(getsettings('BASE_DIR'), 'sessions')
    session_dir = os.path.join(session_dir, session)
    settings_path = os.path.join(session_dir, 'term_settings.json')
    # First we read in the existing settings and then update them.
    if os.path.exists(settings_path):
        with io.open(settings_path, encoding='utf-8') as f:
            term_settings.update(json_decode(f.read()))
        term_settings[location][term].update(settings)
    with io.open(settings_path, 'w', encoding='utf-8') as f:
        f.write(json_encode(term_settings))
项目:django-gateone    作者:jimmy201602    | 项目源码 | 文件源码
def get_log_metadata(golog_path):
    """
    Returns the metadata from the log at the given *golog_path* in the form of
    a dict.
    """
    metadata = {}
    if not os.path.getsize(golog_path): # 0 bytes
        return metadata # Nothing to do
    try:
        first_frame, distance = retrieve_first_frame(golog_path)
    except IOError:
        # Something wrong with the log...  Probably still being written to
        return metadata
    if first_frame[14:].startswith('{'):
        # This is JSON, capture metadata
        metadata = json_decode(first_frame[14:])
    return metadata # All done
项目:django-gateone    作者:jimmy201602    | 项目源码 | 文件源码
def get_current_user(self):
        """
        Mostly identical to the function of the same name in MainHandler.  The
        difference being that when API authentication is enabled the WebSocket
        will expect and perform its own auth of the client.
        """
        expiration = self.settings.get('auth_timeout', "14d")
        # Need the expiration in days (which is a bit silly but whatever):
        expiration = (
            float(total_seconds(convert_to_timedelta(expiration)))
            / float(86400))
        #user_json = self.get_secure_cookie(
            #"gateone_user", max_age_days=expiration)
        user_json = self.message.http_session.get('gateone_user',None)
        #print 'user_json',user_json
        #user_json {"upn": "ANONYMOUS", "session": "YmM5MDU5MDgyYmVjNDU0M2E5MDMzYTg5NWMzZTI5YTBkN"}
        if not user_json:
            if not self.settings['auth']:
                # This can happen if the user's browser isn't allowing
                # persistent cookies (e.g. incognito mode)
                return {'upn': 'ANONYMOUS', 'session': generate_session_id()}
            return None
        user = json_decode(user_json)
        #user['ip_address'] = self.request.remote_ip
        return user
项目:django-gateone    作者:jimmy201602    | 项目源码 | 文件源码
def clear_term_settings(self, term):
        """
        Removes any settings associated with the given *term* in the user's
        term_settings.json file (in their session directory).
        """
        term = str(term)
        self.term_log.debug("clear_term_settings(%s)" % term)
        term_settings = RUDict()
        term_settings[self.ws.location] = {term: {}}
        #session_dir = options.session_dir
        session_dir = self.settings['session_dir']
        session_dir = os.path.join(session_dir, self.ws.session)
        settings_path = os.path.join(session_dir, 'term_settings.json')
        if not os.path.exists(settings_path):
            return # Nothing to do
        # First we read in the existing settings and then update them.
        if os.path.exists(settings_path):
            with io.open(settings_path, encoding='utf-8') as f:
                term_settings.update(json_decode(f.read()))
        del term_settings[self.ws.location][term]
        with io.open(settings_path, 'w', encoding='utf-8') as f:
            f.write(json_encode(term_settings))
        self.trigger("terminal:clear_term_settings", term)

    #@require(authenticated(), policies('terminal'))
项目:My-Web-Server-Framework-With-Python2.7    作者:syjsu    | 项目源码 | 文件源码
def test_multipart_form(self):
        # Encodings here are tricky:  Headers are latin1, bodies can be
        # anything (we use utf8 by default).
        response = self.raw_fetch([
            b"POST /multipart HTTP/1.0",
            b"Content-Type: multipart/form-data; boundary=1234567890",
            b"X-Header-encoding-test: \xe9",
        ],
            b"\r\n".join([
                b"Content-Disposition: form-data; name=argument",
                b"",
                u("\u00e1").encode("utf-8"),
                b"--1234567890",
                u('Content-Disposition: form-data; name="files"; filename="\u00f3"').encode("utf8"),
                b"",
                u("\u00fa").encode("utf-8"),
                b"--1234567890--",
                b"",
            ]))
        data = json_decode(response)
        self.assertEqual(u("\u00e9"), data["header"])
        self.assertEqual(u("\u00e1"), data["argument"])
        self.assertEqual(u("\u00f3"), data["filename"])
        self.assertEqual(u("\u00fa"), data["filebody"])
项目:My-Web-Server-Framework-With-Python2.7    作者:syjsu    | 项目源码 | 文件源码
def test_chunked_request_body(self):
        # Chunked requests are not widely supported and we don't have a way
        # to generate them in AsyncHTTPClient, but HTTPServer will read them.
        self.stream.write(b"""\
POST /echo HTTP/1.1
Transfer-Encoding: chunked
Content-Type: application/x-www-form-urlencoded

4
foo=
3
bar
0

""".replace(b"\n", b"\r\n"))
        read_stream_body(self.stream, self.stop)
        headers, response = self.wait()
        self.assertEqual(json_decode(response), {u('foo'): [u('bar')]})
项目:My-Web-Server-Framework-With-Python2.7    作者:syjsu    | 项目源码 | 文件源码
def test_decode_argument(self):
        # These urls all decode to the same thing
        urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
                "/decode_arg/%E9?foo=%E9&encoding=latin1",
                "/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
                ]
        for req_url in urls:
            response = self.fetch(req_url)
            response.rethrow()
            data = json_decode(response.body)
            self.assertEqual(data, {u('path'): [u('unicode'), u('\u00e9')],
                                    u('query'): [u('unicode'), u('\u00e9')],
                                    })

        response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
        response.rethrow()
        data = json_decode(response.body)
        self.assertEqual(data, {u('path'): [u('bytes'), u('c3a9')],
                                u('query'): [u('bytes'), u('c3a9')],
                                })
项目:My-Web-Server-Framework-With-Python2.7    作者:syjsu    | 项目源码 | 文件源码
def test_get_argument(self):
        response = self.fetch("/get_argument?foo=bar")
        self.assertEqual(response.body, b"bar")
        response = self.fetch("/get_argument?foo=")
        self.assertEqual(response.body, b"")
        response = self.fetch("/get_argument")
        self.assertEqual(response.body, b"default")

        # Test merging of query and body arguments.
        # In singular form, body arguments take precedence over query arguments.
        body = urllib_parse.urlencode(dict(foo="hello"))
        response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
        self.assertEqual(response.body, b"hello")
        # In plural methods they are merged.
        response = self.fetch("/get_arguments?foo=bar",
                              method="POST", body=body)
        self.assertEqual(json_decode(response.body),
                         dict(default=['bar', 'hello'],
                              query=['bar'],
                              body=['hello']))
项目:react-tornado-graphql-example    作者:yatsu    | 项目源码 | 文件源码
def graphql_request(self):
        return json_decode(self.request.body)
项目:react-tornado-graphql-example    作者:yatsu    | 项目源码 | 文件源码
def on_message(self, message):
        data = json_decode(message)
        subid = data.get('id')
        if data.get('type') == 'subscription_start':
            self.on_subscribe(subid, data)
        elif data.get('type') == 'subscription_end':
            self.on_unsubscribe(subid, data)
        else:
            raise ValueError('Invalid type: {0}'.format(data.get('type')))
项目:react-tornado-graphql-example    作者:yatsu    | 项目源码 | 文件源码
def response_handler(self, msg):
        ident, resp_bytes = msg
        resp = json_decode(to_unicode(resp_bytes))
        app_log.debug('resp: %s', resp)

        subid = self.subscriptions.get('commandExecute')
        if subid is not None:
            self.write_message(json_encode({
                'type': 'subscription_data',
                'id': subid,
                'payload': {
                    'data': resp
                }
            }))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_types(self):
        headers = {"Cookie": "foo=bar"}
        response = self.fetch("/typecheck?foo=bar", headers=headers)
        data = json_decode(response.body)
        self.assertEqual(data, {})

        response = self.fetch("/typecheck", method="POST", body="foo=bar", headers=headers)
        data = json_decode(response.body)
        self.assertEqual(data, {})

# This is kind of hacky, but run some of the HTTPServer tests through
# WSGIContainer and WSGIApplication to make sure everything survives
# repeated disassembly and reassembly.
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def fetch_json(self, *args, **kwargs):
        response = self.fetch(*args, **kwargs)
        response.rethrow()
        return json_decode(response.body)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_query_string_encoding(self):
        response = self.fetch("/echo?foo=%C3%A9")
        data = json_decode(response.body)
        self.assertEqual(data, {u("foo"): [u("\u00e9")]})
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_empty_query_string(self):
        response = self.fetch("/echo?foo=&foo=")
        data = json_decode(response.body)
        self.assertEqual(data, {u("foo"): [u(""), u("")]})
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_types(self):
        headers = {"Cookie": "foo=bar"}
        response = self.fetch("/typecheck?foo=bar", headers=headers)
        data = json_decode(response.body)
        self.assertEqual(data, {})

        response = self.fetch("/typecheck", method="POST", body="foo=bar", headers=headers)
        data = json_decode(response.body)
        self.assertEqual(data, {})
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_double_slash(self):
        # urlparse.urlsplit (which tornado.httpserver used to use
        # incorrectly) would parse paths beginning with "//" as
        # protocol-relative urls.
        response = self.fetch("//doubleslash")
        self.assertEqual(200, response.code)
        self.assertEqual(json_decode(response.body), {})
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_uncompressed(self):
        response = self.fetch('/', method='POST', body='foo=bar')
        self.assertEquals(json_decode(response.body), {u('foo'): [u('bar')]})
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_gzip(self):
        response = self.post_gzip('foo=bar')
        self.assertEquals(json_decode(response.body), {u('foo'): [u('bar')]})
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def fetch_chunk_sizes(self, **kwargs):
        response = self.fetch('/', method='POST', **kwargs)
        response.rethrow()
        chunks = json_decode(response.body)
        self.assertEqual(len(self.BODY), sum(chunks))
        for chunk_size in chunks:
            self.assertLessEqual(chunk_size, self.CHUNK_SIZE,
                                 'oversized chunk: ' + str(chunks))
            self.assertGreater(chunk_size, 0,
                               'empty chunk: ' + str(chunks))
        return chunks
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def fetch_json(self, path):
        return json_decode(self.fetch(path).body)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def fetch_json(self, *args, **kwargs):
        response = self.fetch(*args, **kwargs)
        response.rethrow()
        return json_decode(response.body)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_types(self):
        cookie_value = to_unicode(create_signed_value(self.COOKIE_SECRET,
                                                      "asdf", "qwer"))
        response = self.fetch("/typecheck/asdf?foo=bar",
                              headers={"Cookie": "asdf=" + cookie_value})
        data = json_decode(response.body)
        self.assertEqual(data, {})

        response = self.fetch("/typecheck/asdf?foo=bar", method="POST",
                              headers={"Cookie": "asdf=" + cookie_value},
                              body="foo=bar")
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_pos(self):
        response = self.fetch('/pos/foo')
        response.rethrow()
        data = json_decode(response.body)
        self.assertEqual(data, {'args': ['foo'], 'kwargs': {}})
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_kw(self):
        response = self.fetch('/kw/foo')
        response.rethrow()
        data = json_decode(response.body)
        self.assertEqual(data, {'args': [], 'kwargs': {'path': 'foo'}})
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_catch_error(self):
        response = self.fetch('/')
        self.assertEqual(json_decode(response.body),
                         {'arg_name': 'foo',
                          'log_message': 'Missing argument foo'})
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_flow_control_fixed_body(self):
        response = self.fetch('/', body='abcdefghijklmnopqrstuvwxyz',
                              method='POST')
        response.rethrow()
        self.assertEqual(json_decode(response.body),
                         dict(methods=['prepare', 'data_received',
                                       'data_received', 'data_received',
                                       'post']))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_flow_control_compressed_body(self):
        bytesio = BytesIO()
        gzip_file = gzip.GzipFile(mode='w', fileobj=bytesio)
        gzip_file.write(b'abcdefghijklmnopqrstuvwxyz')
        gzip_file.close()
        compressed_body = bytesio.getvalue()
        response = self.fetch('/', body=compressed_body, method='POST',
                              headers={'Content-Encoding': 'gzip'})
        response.rethrow()
        self.assertEqual(json_decode(response.body),
                         dict(methods=['prepare', 'data_received',
                                       'data_received', 'data_received',
                                       'post']))