我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用StringIO.StringIO.StringIO()。
def test_encoding(self): import mechanize from StringIO import StringIO import urllib # always take first encoding, since that's the one from the real HTTP # headers, rather than from HTTP-EQUIV b = mechanize.Browser() for s, ct in [ ("", mechanize._html.DEFAULT_ENCODING), ("Foo: Bar\r\n\r\n", mechanize._html.DEFAULT_ENCODING), ("Content-Type: text/html; charset=UTF-8\r\n\r\n", "UTF-8"), ("Content-Type: text/html; charset=UTF-8\r\n" "Content-Type: text/html; charset=KOI8-R\r\n\r\n", "UTF-8"), ]: msg = mimetools.Message(StringIO(s)) r = urllib.addinfourl(StringIO(""), msg, "http://www.example.com/") b.set_response(r) self.assertEqual(b.encoding(), ct)
def test_select_form(self): from mechanize import _response br = TestBrowser() fp = StringIO.StringIO('''<html> <form name="a"></form> <form name="b" data-ac="123"></form> <form name="c" class="x"></form> </html>''') headers = mimetools.Message( StringIO.StringIO("Content-type: text/html")) response = _response.response_seek_wrapper( _response.closeable_response(fp, headers, "http://example.com/", 200, "OK")) br.set_response(response) for i, n in enumerate('abc'): br.select_form(nr=i) self.assertEqual(br.form.name, n) br.select_form(nr=0), br.select_form(name=n) self.assertEqual(br.form.name, n) br.select_form(data_ac=re.compile(r'\d+')) self.assertEqual(br.form.name, 'b') br.select_form(class_=lambda x: x == 'x') self.assertEqual(br.form.name, 'c')
def test_str(self): from mechanize import _response br = TestBrowser() self.assertEqual(str(br), "<TestBrowser (not visiting a URL)>") fp = StringIO.StringIO('<html><form name="f"><input /></form></html>') headers = mimetools.Message( StringIO.StringIO("Content-type: text/html")) response = _response.response_seek_wrapper( _response.closeable_response(fp, headers, "http://example.com/", 200, "OK")) br.set_response(response) self.assertEqual(str(br), "<TestBrowser visiting http://example.com/>") br.select_form(nr=0) self.assertEqual( str(br), """\ <TestBrowser visiting http://example.com/ selected form: <f GET http://example.com/ application/x-www-form-urlencoded <TextControl(<None>=)>> >""")
def monkey_patch_httplib(self, putheader): def do_nothing(*args, **kwds): return def getresponse(self_): class Response(object): msg = mimetools.Message(StringIO.StringIO("")) status = 200 reason = "OK" def read(self__, sz=-1): return "" return Response() self.monkey_patch(httplib.HTTPConnection, "putheader", putheader) self.monkey_patch(httplib.HTTPConnection, "connect", do_nothing) self.monkey_patch(httplib.HTTPConnection, "send", do_nothing) self.monkey_patch(httplib.HTTPConnection, "close", do_nothing) self.monkey_patch(httplib.HTTPConnection, "getresponse", getresponse)
def _render(self, mode='human', close=False): if close: return outfile = StringIO.StringIO() if mode == 'ansi' else sys.stdout row, col = self.s // self.ncol, self.s % self.ncol desc = self.desc.tolist() desc[row][col] = utils.colorize(desc[row][col], "red", highlight=True) outfile.write("\n".join("".join(row) for row in desc)+"\n") if self.lastaction is not None: outfile.write(" ({})\n".format(self.get_action_meanings()[self.lastaction])) else: outfile.write("\n") return outfile
def getText(self, program_name, interval): """ :type interval: Interval.Interval :param program_name: :param interval: :return: """ rewrites = self.programs.get(program_name) start = interval.start stop = interval.stop # ensure start/end are in range if stop > len(self.tokens.tokens) - 1: stop = len(self.tokens.tokens) - 1 if start < 0: start = 0 # if no instructions to execute if not rewrites: return self.tokens.getText(interval) buf = StringIO() indexToOp = self._reduceToSingleOperationPerIndex(rewrites) i = start while all((i <= stop, i < len(self.tokens.tokens))): op = indexToOp.pop(i, None) token = self.tokens.get(i) if op is None: if token.type != Token.EOF: buf.write(token.text) i += 1 else: i = op.execute(buf) if stop == len(self.tokens.tokens) - 1: for op in indexToOp.values(): if op.index >= len(self.tokens.tokens) - 1: buf.write( op.text) # TODO: this check is probably not needed return buf.getvalue()
def execute(self, buf): """ :type buf: StringIO.StringIO :param buf: :return: """ return self.index
def __init__(self, code, msg, headers, data, url=None): StringIO.StringIO.__init__(self, data) self.code, self.msg, self.headers, self.url = code, msg, headers, url
def http_open(self, req): import mimetools, copy from StringIO import StringIO self.requests.append(copy.deepcopy(req)) if self._count == 0: self._count = self._count + 1 name = httplib.responses[self.code] msg = mimetools.Message(StringIO(self.headers)) return self.parent.error( "http", req, MockFile(), self.code, name, msg) else: self.req = req msg = mimetools.Message(StringIO("\r\n\r\n")) return MockResponse(200, "OK", msg, "", req.get_full_url())
def ch_en(self,response): if response.info().get('Content-Encoding') == 'gzip' or response.info().get('Content-Encoding') == 'x-gzip': buf = StringIO(response.read()) f = gzip.GzipFile(fileobj=buf) data = f.read() elif response.info().get('Content-Encoding') == 'deflate': f = StringIO.StringIO(zlib.decompress(response.read())) data = f.read() else: data = response.read() return data
def http_open(self, req): import mimetools, httplib, copy from StringIO import StringIO self.requests.append(copy.deepcopy(req)) if self._count == 0: self._count = self._count + 1 name = httplib.responses[self.code] msg = mimetools.Message(StringIO(self.headers)) return self.parent.error( "http", req, MockFile(), self.code, name, msg) else: self.req = req msg = mimetools.Message(StringIO("\r\n\r\n")) return MockResponse(200, "OK", msg, "", req.get_full_url())
def __init__(self, url="http://example.com/", data=None, info=None): self.url = self._url = url self.fp = StringIO.StringIO(data) if info is None: info = {} self._info = self._headers = MockHeaders(info) self.close_called = False
def http_open(self, req): import mimetools import copy from StringIO import StringIO self.requests.append(copy.deepcopy(req)) if self._count == 0: self._count = self._count + 1 name = "Not important" msg = mimetools.Message(StringIO(self.headers)) return self.parent.error("http", req, test_response(), self.code, name, msg) else: self.req = req return test_response("", [], req.get_full_url())
def test_redirected_robots_txt(self): # redirected robots.txt fetch shouldn't result in another attempted # robots.txt fetch to check the redirection is allowed! import mechanize from mechanize import build_opener, HTTPHandler, \ HTTPDefaultErrorHandler, HTTPRedirectHandler, \ HTTPRobotRulesProcessor class MockHTTPHandler(mechanize.BaseHandler): def __init__(self): self.requests = [] def http_open(self, req): import mimetools import httplib import copy from StringIO import StringIO self.requests.append(copy.deepcopy(req)) if req.get_full_url() == "http://example.com/robots.txt": hdr = "Location: http://example.com/en/robots.txt\r\n\r\n" msg = mimetools.Message(StringIO(hdr)) return self.parent.error("http", req, test_response(), 302, "Blah", msg) else: return test_response("Allow: *", [], req.get_full_url()) hh = MockHTTPHandler() hdeh = HTTPDefaultErrorHandler() hrh = HTTPRedirectHandler() rh = HTTPRobotRulesProcessor() o = build_test_opener(hh, hdeh, hrh, rh) o.open("http://example.com/") self.assertEqual([req.get_full_url() for req in hh.requests], [ "http://example.com/robots.txt", "http://example.com/en/robots.txt", "http://example.com/", ])
def getFileType(self): try: from StringIO import StringIO except ImportError: raise SkipTest("StringIO.StringIO is not available.") else: return StringIO
def getFileType(self): try: from cStringIO import StringIO except ImportError: raise SkipTest("cStringIO.StringIO is not available.") else: return StringIO
def get_schema(self): if self.schema is None: sio = StringIO.StringIO(""" <schema> <import package='ZServer'/> <multisection name='*' type='ZServer.server' attribute='servers'/> </schema> """) schema = ZConfig.loadSchemaFile(sio) BaseTest.schema = schema return self.schema
def load_factory(self, text): conf, xxx = ZConfig.loadConfigFile(self.get_schema(), StringIO.StringIO(text)) self.assertEqual(len(conf.servers), 1) return conf.servers[0]
def _trap_warning_output(self): if self._old_stderr is not None: return import sys from StringIO import StringIO self._old_stderr = sys.stderr self._our_stderr_stream = sys.stderr = StringIO()
def test_ftp(self): class MockFTPWrapper: def __init__(self, data): self.data = data def retrfile(self, filename, filetype): self.filename, self.filetype = filename, filetype return StringIO.StringIO(self.data), len(self.data) class NullFTPHandler(mechanize.FTPHandler): def __init__(self, data): self.data = data def connect_ftp(self, user, passwd, host, port, dirs, timeout): self.user, self.passwd = user, passwd self.host, self.port = host, port self.dirs = dirs self.timeout = timeout self.ftpwrapper = MockFTPWrapper(self.data) return self.ftpwrapper import ftplib import socket data = "rheum rhaponicum" h = NullFTPHandler(data) o = h.parent = MockOpener() for url, host, port, type_, dirs, timeout, filename, mimetype in [ ("ftp://localhost/foo/bar/baz.html", "localhost", ftplib.FTP_PORT, "I", ["foo", "bar"], _sockettimeout._GLOBAL_DEFAULT_TIMEOUT, "baz.html", "text/html"), ("ftp://localhost:80/foo/bar/", "localhost", 80, "D", ["foo", "bar"], _sockettimeout._GLOBAL_DEFAULT_TIMEOUT, "", None), ("ftp://localhost/baz.gif;type=a", "localhost", ftplib.FTP_PORT, "A", [], _sockettimeout._GLOBAL_DEFAULT_TIMEOUT, "baz.gif", None), # TODO: really this should guess image/gif ]: req = Request(url, timeout=timeout) r = h.ftp_open(req) # ftp authentication not yet implemented by FTPHandler self.assertTrue(h.user == h.passwd == "") self.assertEqual(h.host, socket.gethostbyname(host)) self.assertEqual(h.port, port) self.assertEqual(h.dirs, dirs) if sys.version_info >= (2, 6): self.assertEquals(h.timeout, timeout) self.assertEqual(h.ftpwrapper.filename, filename) self.assertEqual(h.ftpwrapper.filetype, type_) headers = r.info() self.assertEqual(headers.get("Content-type"), mimetype) self.assertEqual(int(headers["Content-length"]), len(data))