Python StringIO.StringIO 模块,StringIO() 实例源码

我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用StringIO.StringIO.StringIO()

项目:mechanize    作者:python-mechanize    | 项目源码 | 文件源码
def test_encoding(self):
        import mechanize
        from StringIO import StringIO
        import urllib
        # always take first encoding, since that's the one from the real HTTP
        # headers, rather than from HTTP-EQUIV
        b = mechanize.Browser()
        for s, ct in [
            ("", mechanize._html.DEFAULT_ENCODING),
            ("Foo: Bar\r\n\r\n", mechanize._html.DEFAULT_ENCODING),
            ("Content-Type: text/html; charset=UTF-8\r\n\r\n", "UTF-8"),
            ("Content-Type: text/html; charset=UTF-8\r\n"
             "Content-Type: text/html; charset=KOI8-R\r\n\r\n", "UTF-8"),
        ]:
            msg = mimetools.Message(StringIO(s))
            r = urllib.addinfourl(StringIO(""), msg, "http://www.example.com/")
            b.set_response(r)
            self.assertEqual(b.encoding(), ct)
项目:mechanize    作者:python-mechanize    | 项目源码 | 文件源码
def test_select_form(self):
        from mechanize import _response
        br = TestBrowser()
        fp = StringIO.StringIO('''<html>
            <form name="a"></form>
            <form name="b" data-ac="123"></form>
            <form name="c" class="x"></form>
            </html>''')
        headers = mimetools.Message(
            StringIO.StringIO("Content-type: text/html"))
        response = _response.response_seek_wrapper(
            _response.closeable_response(fp, headers, "http://example.com/",
                                         200, "OK"))
        br.set_response(response)
        for i, n in enumerate('abc'):
            br.select_form(nr=i)
            self.assertEqual(br.form.name, n)
            br.select_form(nr=0), br.select_form(name=n)
            self.assertEqual(br.form.name, n)
        br.select_form(data_ac=re.compile(r'\d+'))
        self.assertEqual(br.form.name, 'b')
        br.select_form(class_=lambda x: x == 'x')
        self.assertEqual(br.form.name, 'c')
项目:mechanize    作者:python-mechanize    | 项目源码 | 文件源码
def test_str(self):
        from mechanize import _response

        br = TestBrowser()
        self.assertEqual(str(br), "<TestBrowser (not visiting a URL)>")

        fp = StringIO.StringIO('<html><form name="f"><input /></form></html>')
        headers = mimetools.Message(
            StringIO.StringIO("Content-type: text/html"))
        response = _response.response_seek_wrapper(
            _response.closeable_response(fp, headers, "http://example.com/",
                                         200, "OK"))
        br.set_response(response)
        self.assertEqual(str(br), "<TestBrowser visiting http://example.com/>")

        br.select_form(nr=0)
        self.assertEqual(
            str(br), """\
<TestBrowser visiting http://example.com/
 selected form:
 <f GET http://example.com/ application/x-www-form-urlencoded
  <TextControl(<None>=)>>
>""")
项目:mechanize    作者:python-mechanize    | 项目源码 | 文件源码
def monkey_patch_httplib(self, putheader):
        def do_nothing(*args, **kwds):
            return

        def getresponse(self_):
            class Response(object):
                msg = mimetools.Message(StringIO.StringIO(""))
                status = 200
                reason = "OK"

                def read(self__, sz=-1):
                    return ""

            return Response()

        self.monkey_patch(httplib.HTTPConnection, "putheader", putheader)
        self.monkey_patch(httplib.HTTPConnection, "connect", do_nothing)
        self.monkey_patch(httplib.HTTPConnection, "send", do_nothing)
        self.monkey_patch(httplib.HTTPConnection, "close", do_nothing)
        self.monkey_patch(httplib.HTTPConnection, "getresponse", getresponse)
项目:tf-tutorial    作者:zchen0211    | 项目源码 | 文件源码
def _render(self, mode='human', close=False):
    if close:
      return

    outfile = StringIO.StringIO() if mode == 'ansi' else sys.stdout

    row, col = self.s // self.ncol, self.s % self.ncol
    desc = self.desc.tolist()
    desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)

    outfile.write("\n".join("".join(row) for row in desc)+"\n")
    if self.lastaction is not None:
      outfile.write("  ({})\n".format(self.get_action_meanings()[self.lastaction]))
    else:
      outfile.write("\n")

    return outfile
项目:deep-rl-tensorflow    作者:carpedm20    | 项目源码 | 文件源码
def _render(self, mode='human', close=False):
    if close:
      return

    outfile = StringIO.StringIO() if mode == 'ansi' else sys.stdout

    row, col = self.s // self.ncol, self.s % self.ncol
    desc = self.desc.tolist()
    desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True)

    outfile.write("\n".join("".join(row) for row in desc)+"\n")
    if self.lastaction is not None:
      outfile.write("  ({})\n".format(self.get_action_meanings()[self.lastaction]))
    else:
      outfile.write("\n")

    return outfile
项目:HTTPIDL    作者:cikelengfeng    | 项目源码 | 文件源码
def getText(self, program_name, interval):
        """
        :type interval: Interval.Interval
        :param program_name:
        :param interval:
        :return:
        """
        rewrites = self.programs.get(program_name)
        start = interval.start
        stop = interval.stop

        # ensure start/end are in range
        if stop > len(self.tokens.tokens) - 1: stop = len(self.tokens.tokens) - 1
        if start < 0: start = 0

        # if no instructions to execute
        if not rewrites: return self.tokens.getText(interval)
        buf = StringIO()
        indexToOp = self._reduceToSingleOperationPerIndex(rewrites)
        i = start
        while all((i <= stop, i < len(self.tokens.tokens))):
            op = indexToOp.pop(i, None)
            token = self.tokens.get(i)
            if op is None:
                if token.type != Token.EOF: buf.write(token.text)
                i += 1
            else:
                i = op.execute(buf)

        if stop == len(self.tokens.tokens) - 1:
            for op in indexToOp.values():
                if op.index >= len(self.tokens.tokens) - 1: buf.write(
                    op.text)  # TODO: this check is probably not needed

        return buf.getvalue()
项目:HTTPIDL    作者:cikelengfeng    | 项目源码 | 文件源码
def execute(self, buf):
            """
            :type buf: StringIO.StringIO
            :param buf:
            :return:
            """
            return self.index
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def __init__(self, code, msg, headers, data, url=None):
        StringIO.StringIO.__init__(self, data)
        self.code, self.msg, self.headers, self.url = code, msg, headers, url
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def http_open(self, req):
        import mimetools, copy
        from StringIO import StringIO
        self.requests.append(copy.deepcopy(req))
        if self._count == 0:
            self._count = self._count + 1
            name = httplib.responses[self.code]
            msg = mimetools.Message(StringIO(self.headers))
            return self.parent.error(
                "http", req, MockFile(), self.code, name, msg)
        else:
            self.req = req
            msg = mimetools.Message(StringIO("\r\n\r\n"))
            return MockResponse(200, "OK", msg, "", req.get_full_url())
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def __init__(self, code, msg, headers, data, url=None):
        StringIO.StringIO.__init__(self, data)
        self.code, self.msg, self.headers, self.url = code, msg, headers, url
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def http_open(self, req):
        import mimetools, copy
        from StringIO import StringIO
        self.requests.append(copy.deepcopy(req))
        if self._count == 0:
            self._count = self._count + 1
            name = httplib.responses[self.code]
            msg = mimetools.Message(StringIO(self.headers))
            return self.parent.error(
                "http", req, MockFile(), self.code, name, msg)
        else:
            self.req = req
            msg = mimetools.Message(StringIO("\r\n\r\n"))
            return MockResponse(200, "OK", msg, "", req.get_full_url())
项目:XssScan-v1.0    作者:dineshbarai    | 项目源码 | 文件源码
def ch_en(self,response):
        if response.info().get('Content-Encoding') == 'gzip' or response.info().get('Content-Encoding') == 'x-gzip':
            buf = StringIO(response.read())
            f = gzip.GzipFile(fileobj=buf)
            data = f.read()
        elif response.info().get('Content-Encoding') == 'deflate':
            f = StringIO.StringIO(zlib.decompress(response.read()))
            data = f.read()
        else:
            data = response.read()
        return data
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def __init__(self, code, msg, headers, data, url=None):
        StringIO.StringIO.__init__(self, data)
        self.code, self.msg, self.headers, self.url = code, msg, headers, url
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def http_open(self, req):
        import mimetools, httplib, copy
        from StringIO import StringIO
        self.requests.append(copy.deepcopy(req))
        if self._count == 0:
            self._count = self._count + 1
            name = httplib.responses[self.code]
            msg = mimetools.Message(StringIO(self.headers))
            return self.parent.error(
                "http", req, MockFile(), self.code, name, msg)
        else:
            self.req = req
            msg = mimetools.Message(StringIO("\r\n\r\n"))
            return MockResponse(200, "OK", msg, "", req.get_full_url())
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def __init__(self, code, msg, headers, data, url=None):
        StringIO.StringIO.__init__(self, data)
        self.code, self.msg, self.headers, self.url = code, msg, headers, url
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def http_open(self, req):
        import mimetools, httplib, copy
        from StringIO import StringIO
        self.requests.append(copy.deepcopy(req))
        if self._count == 0:
            self._count = self._count + 1
            name = httplib.responses[self.code]
            msg = mimetools.Message(StringIO(self.headers))
            return self.parent.error(
                "http", req, MockFile(), self.code, name, msg)
        else:
            self.req = req
            msg = mimetools.Message(StringIO("\r\n\r\n"))
            return MockResponse(200, "OK", msg, "", req.get_full_url())
项目:mechanize    作者:python-mechanize    | 项目源码 | 文件源码
def __init__(self, url="http://example.com/", data=None, info=None):
        self.url = self._url = url
        self.fp = StringIO.StringIO(data)
        if info is None:
            info = {}
        self._info = self._headers = MockHeaders(info)
        self.close_called = False
项目:mechanize    作者:python-mechanize    | 项目源码 | 文件源码
def __init__(self, code, msg, headers, data, url=None):
        StringIO.StringIO.__init__(self, data)
        self.code, self.msg, self.headers, self.url = code, msg, headers, url
项目:mechanize    作者:python-mechanize    | 项目源码 | 文件源码
def http_open(self, req):
        import mimetools
        import copy
        from StringIO import StringIO
        self.requests.append(copy.deepcopy(req))
        if self._count == 0:
            self._count = self._count + 1
            name = "Not important"
            msg = mimetools.Message(StringIO(self.headers))
            return self.parent.error("http", req,
                                     test_response(), self.code, name, msg)
        else:
            self.req = req
            return test_response("", [], req.get_full_url())
项目:mechanize    作者:python-mechanize    | 项目源码 | 文件源码
def test_redirected_robots_txt(self):
        # redirected robots.txt fetch shouldn't result in another attempted
        # robots.txt fetch to check the redirection is allowed!
        import mechanize
        from mechanize import build_opener, HTTPHandler, \
            HTTPDefaultErrorHandler, HTTPRedirectHandler, \
            HTTPRobotRulesProcessor

        class MockHTTPHandler(mechanize.BaseHandler):
            def __init__(self):
                self.requests = []

            def http_open(self, req):
                import mimetools
                import httplib
                import copy
                from StringIO import StringIO
                self.requests.append(copy.deepcopy(req))
                if req.get_full_url() == "http://example.com/robots.txt":
                    hdr = "Location: http://example.com/en/robots.txt\r\n\r\n"
                    msg = mimetools.Message(StringIO(hdr))
                    return self.parent.error("http", req,
                                             test_response(), 302, "Blah", msg)
                else:
                    return test_response("Allow: *", [], req.get_full_url())

        hh = MockHTTPHandler()
        hdeh = HTTPDefaultErrorHandler()
        hrh = HTTPRedirectHandler()
        rh = HTTPRobotRulesProcessor()
        o = build_test_opener(hh, hdeh, hrh, rh)
        o.open("http://example.com/")
        self.assertEqual([req.get_full_url() for req in hh.requests], [
            "http://example.com/robots.txt",
            "http://example.com/en/robots.txt",
            "http://example.com/",
        ])
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def getFileType(self):
        try:
            from StringIO import StringIO
        except ImportError:
            raise SkipTest("StringIO.StringIO is not available.")
        else:
            return StringIO
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def getFileType(self):
        try:
            from cStringIO import StringIO
        except ImportError:
            raise SkipTest("cStringIO.StringIO is not available.")
        else:
            return StringIO
项目:XssScan-v1.1    作者:dineshbarai    | 项目源码 | 文件源码
def ch_en(self,response):
        if response.info().get('Content-Encoding') == 'gzip' or response.info().get('Content-Encoding') == 'x-gzip':
            buf = StringIO(response.read())
            f = gzip.GzipFile(fileobj=buf)
            data = f.read()
        elif response.info().get('Content-Encoding') == 'deflate':
            f = StringIO.StringIO(zlib.decompress(response.read()))
            data = f.read()
        else:
            data = response.read()
        return data
项目:ZServer    作者:zopefoundation    | 项目源码 | 文件源码
def get_schema(self):
        if self.schema is None:
            sio = StringIO.StringIO("""
            <schema>
              <import package='ZServer'/>
              <multisection name='*'
                            type='ZServer.server'
                            attribute='servers'/>
            </schema>
            """)
            schema = ZConfig.loadSchemaFile(sio)
            BaseTest.schema = schema
        return self.schema
项目:ZServer    作者:zopefoundation    | 项目源码 | 文件源码
def load_factory(self, text):
        conf, xxx = ZConfig.loadConfigFile(self.get_schema(),
                                           StringIO.StringIO(text))
        self.assertEqual(len(conf.servers), 1)
        return conf.servers[0]
项目:ZServer    作者:zopefoundation    | 项目源码 | 文件源码
def _trap_warning_output(self):

        if self._old_stderr is not None:
            return

        import sys
        from StringIO import StringIO

        self._old_stderr = sys.stderr
        self._our_stderr_stream = sys.stderr = StringIO()
项目:mechanize    作者:python-mechanize    | 项目源码 | 文件源码
def test_ftp(self):
        class MockFTPWrapper:
            def __init__(self, data):
                self.data = data

            def retrfile(self, filename, filetype):
                self.filename, self.filetype = filename, filetype
                return StringIO.StringIO(self.data), len(self.data)

        class NullFTPHandler(mechanize.FTPHandler):
            def __init__(self, data):
                self.data = data

            def connect_ftp(self, user, passwd, host, port, dirs, timeout):
                self.user, self.passwd = user, passwd
                self.host, self.port = host, port
                self.dirs = dirs
                self.timeout = timeout
                self.ftpwrapper = MockFTPWrapper(self.data)
                return self.ftpwrapper

        import ftplib
        import socket
        data = "rheum rhaponicum"
        h = NullFTPHandler(data)
        o = h.parent = MockOpener()

        for url, host, port, type_, dirs, timeout, filename, mimetype in [
            ("ftp://localhost/foo/bar/baz.html", "localhost", ftplib.FTP_PORT,
             "I", ["foo", "bar"], _sockettimeout._GLOBAL_DEFAULT_TIMEOUT,
             "baz.html", "text/html"),
            ("ftp://localhost:80/foo/bar/", "localhost", 80, "D",
             ["foo", "bar"], _sockettimeout._GLOBAL_DEFAULT_TIMEOUT, "", None),
            ("ftp://localhost/baz.gif;type=a", "localhost", ftplib.FTP_PORT,
             "A", [], _sockettimeout._GLOBAL_DEFAULT_TIMEOUT, "baz.gif",
             None),  # TODO: really this should guess image/gif
        ]:
            req = Request(url, timeout=timeout)
            r = h.ftp_open(req)
            # ftp authentication not yet implemented by FTPHandler
            self.assertTrue(h.user == h.passwd == "")
            self.assertEqual(h.host, socket.gethostbyname(host))
            self.assertEqual(h.port, port)
            self.assertEqual(h.dirs, dirs)
            if sys.version_info >= (2, 6):
                self.assertEquals(h.timeout, timeout)
            self.assertEqual(h.ftpwrapper.filename, filename)
            self.assertEqual(h.ftpwrapper.filetype, type_)
            headers = r.info()
            self.assertEqual(headers.get("Content-type"), mimetype)
            self.assertEqual(int(headers["Content-length"]), len(data))