我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用mimetools.Message()。
def read_multi(self, environ, keep_blank_values, strict_parsing): """Internal: read a part that is itself multipart.""" ib = self.innerboundary if not valid_boundary(ib): raise ValueError, 'Invalid boundary in multipart form: %r' % (ib,) self.list = [] if self.qs_on_post: for key, value in urlparse.parse_qsl(self.qs_on_post, self.keep_blank_values, self.strict_parsing): self.list.append(MiniFieldStorage(key, value)) FieldStorageClass = None klass = self.FieldStorageClass or self.__class__ part = klass(self.fp, {}, ib, environ, keep_blank_values, strict_parsing) # Throw first part away while not part.done: headers = rfc822.Message(self.fp) part = klass(self.fp, headers, ib, environ, keep_blank_values, strict_parsing) self.list.append(part) self.skip_lines()
def getbodyparts(self): """Only for multipart messages: return the message's body as a list of SubMessage objects. Each submessage object behaves (almost) as a Message object.""" if self.getmaintype() != 'multipart': raise Error, 'Content-Type is not multipart/*' bdry = self.getparam('boundary') if not bdry: raise Error, 'multipart/* without boundary param' self.fp.seek(self.startofbody) mf = multifile.MultiFile(self.fp) mf.push(bdry) parts = [] while mf.next(): n = "%s.%r" % (self.number, 1 + len(parts)) part = SubMessage(self.folder, n, mf) parts.append(part) mf.pop() return parts
def test_encoding(self): import mechanize from StringIO import StringIO import urllib # always take first encoding, since that's the one from the real HTTP # headers, rather than from HTTP-EQUIV b = mechanize.Browser() for s, ct in [ ("", mechanize._html.DEFAULT_ENCODING), ("Foo: Bar\r\n\r\n", mechanize._html.DEFAULT_ENCODING), ("Content-Type: text/html; charset=UTF-8\r\n\r\n", "UTF-8"), ("Content-Type: text/html; charset=UTF-8\r\n" "Content-Type: text/html; charset=KOI8-R\r\n\r\n", "UTF-8"), ]: msg = mimetools.Message(StringIO(s)) r = urllib.addinfourl(StringIO(""), msg, "http://www.example.com/") b.set_response(r) self.assertEqual(b.encoding(), ct)
def test_str(self): from mechanize import _response br = TestBrowser() self.assertEqual(str(br), "<TestBrowser (not visiting a URL)>") fp = StringIO.StringIO('<html><form name="f"><input /></form></html>') headers = mimetools.Message( StringIO.StringIO("Content-type: text/html")) response = _response.response_seek_wrapper( _response.closeable_response(fp, headers, "http://example.com/", 200, "OK")) br.set_response(response) self.assertEqual(str(br), "<TestBrowser visiting http://example.com/>") br.select_form(nr=0) self.assertEqual( str(br), """\ <TestBrowser visiting http://example.com/ selected form: <f GET http://example.com/ application/x-www-form-urlencoded <TextControl(<None>=)>> >""")
def monkey_patch_httplib(self, putheader): def do_nothing(*args, **kwds): return def getresponse(self_): class Response(object): msg = mimetools.Message(StringIO.StringIO("")) status = 200 reason = "OK" def read(self__, sz=-1): return "" return Response() self.monkey_patch(httplib.HTTPConnection, "putheader", putheader) self.monkey_patch(httplib.HTTPConnection, "connect", do_nothing) self.monkey_patch(httplib.HTTPConnection, "send", do_nothing) self.monkey_patch(httplib.HTTPConnection, "close", do_nothing) self.monkey_patch(httplib.HTTPConnection, "getresponse", getresponse)
def getreply(self): file = self.sock.makefile('rb') data = ''.join(file.readlines()) file.close() self.file = StringIO(data) line = self.file.readline() try: [ver, code, msg] = line.split(None, 2) except ValueError: try: [ver, code] = line.split(None, 1) msg = "" except ValueError: return -1, line, None if ver[:5] != 'HTTP/': return -1, line, None code = int(code) msg = msg.strip() headers = mimetools.Message(self.file, 0) return ver, code, msg, headers
def found_terminator(self): if not self.header: # parse http header fp = StringIO.StringIO(self.data) request = string.split(fp.readline(), None, 2) if len(request) != 3: # badly formed request; just shut down self.shutdown = 1 else: # parse message header self.header = mimetools.Message(fp) self.set_terminator("\r\n") self.server.handle_request( self, request[0], request[1], self.header ) self.close_when_done() self.data = "" else: pass # ignore body data, for now
def open_local_file(self, req): import email.utils import mimetypes host = req.get_host() filename = req.get_selector() localfile = url2pathname(filename) try: stats = os.stat(localfile) size = stats.st_size modified = email.utils.formatdate(stats.st_mtime, usegmt=True) mtype = mimetypes.guess_type(filename)[0] headers = mimetools.Message(StringIO( 'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' % (mtype or 'text/plain', size, modified))) if host: host, port = splitport(host) if not host or \ (not port and _safe_gethostbyname(host) in self.get_names()): if host: origurl = 'file://' + host + filename else: origurl = 'file://' + filename return addinfourl(open(localfile, 'rb'), headers, origurl) except OSError, msg: # urllib2 users shouldn't expect OSErrors coming from urlopen() raise URLError(msg) raise URLError('file not on local host')
def open_local_file(self, url): """Use local file.""" import mimetypes, mimetools, email.utils try: from cStringIO import StringIO except ImportError: from StringIO import StringIO host, file = splithost(url) localname = url2pathname(file) try: stats = os.stat(localname) except OSError, e: raise IOError(e.errno, e.strerror, e.filename) size = stats.st_size modified = email.utils.formatdate(stats.st_mtime, usegmt=True) mtype = mimetypes.guess_type(url)[0] headers = mimetools.Message(StringIO( 'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' % (mtype or 'text/plain', size, modified))) if not host: urlfile = file if file[:1] == '/': urlfile = 'file://' + file return addinfourl(open(localname, 'rb'), headers, urlfile) host, port = splitport(host) if not port \ and socket.gethostbyname(host) in (localhost(), thishost()): urlfile = file if file[:1] == '/': urlfile = 'file://' + file return addinfourl(open(localname, 'rb'), headers, urlfile) raise IOError, ('local file error', 'not on local host')
def noheaders(): """Return an empty mimetools.Message object.""" global _noheaders if _noheaders is None: import mimetools try: from cStringIO import StringIO except ImportError: from StringIO import StringIO _noheaders = mimetools.Message(StringIO(), 0) _noheaders.fp.close() # Recycle file descriptor return _noheaders # Utility classes
def openmessage(self, n): """Open a message -- returns a Message object.""" return Message(self, n)
def __init__(self, f, n, fp = None): """Constructor.""" self.folder = f self.number = n if fp is None: path = f.getmessagefilename(n) fp = open(path, 'r') mimetools.Message.__init__(self, fp)
def __repr__(self): """String representation.""" return 'Message(%s, %s)' % (repr(self.folder), self.number)
def open_local_file(self, url): """Use local file.""" import mimetypes, mimetools, email.utils try: from cStringIO import StringIO except ImportError: from StringIO import StringIO host, file = splithost(url) localname = url2pathname(file) try: stats = os.stat(localname) except OSError, e: raise IOError(e.errno, e.strerror, e.filename) size = stats.st_size modified = email.utils.formatdate(stats.st_mtime, usegmt=True) mtype = mimetypes.guess_type(url)[0] headers = mimetools.Message(StringIO( 'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' % (mtype or 'text/plain', size, modified))) if not host: urlfile = file if file[:1] == '/': urlfile = 'file://' + file elif file[:2] == './': raise ValueError("local file url may start with / or file:. Unknown url of type: %s" % url) return addinfourl(open(localname, 'rb'), headers, urlfile) host, port = splitport(host) if not port \ and socket.gethostbyname(host) in (localhost(), thishost()): urlfile = file if file[:1] == '/': urlfile = 'file://' + file return addinfourl(open(localname, 'rb'), headers, urlfile) raise IOError, ('local file error', 'not on local host')
def __init__(self, ct, f, next=None, uribase='thismessage:/', seekable=0, **kw): # Get the boundary. It's too bad I have to write this myself, # but no way am I going to import cgi for 10 lines of code! for param in ct.split(';'): a = param.strip() if a.startswith('boundary='): if a[9] in [ '"', "'" ]: boundary = a[10:-1] else: boundary = a[9:] break else: raise ValueError('boundary parameter not found') self.id_dict, self.loc_dict, self.parts = {}, {}, [] self.next = next self.base = uribase mf = multifile.MultiFile(f, seekable) mf.push(boundary) while mf.next(): head = mimetools.Message(mf) body = StringIO.StringIO() mimetools.decode(mf, body, head.getencoding()) body.seek(0) part = (head, body) self.parts.append(part) key = head.get('content-id') if key: if key[0] == '<' and key[-1] == '>': key = key[1:-1] self.id_dict[key] = part key = head.get('content-location') if key: self.loc_dict[key] = part mf.pop()
def send_error(self, code, message=None): """Send and log an error reply. Arguments are the error code, and a detailed message. The detailed message defaults to the short entry matching the response code. This sends an error response (so it must be called before any output has been generated), logs the error, and finally sends a piece of HTML explaining the error to the user. """ try: short, long = self.responses[code] except KeyError: short, long = '???', '???' if message is None: message = short explain = long self.log_error("code %d, message %s", code, message) self.send_response(code, message) self.send_header('Connection', 'close') # Message body is omitted for cases described in: # - RFC7230: 3.3. 1xx, 204(No Content), 304(Not Modified) # - RFC7231: 6.3.6. 205(Reset Content) content = None if code >= 200 and code not in (204, 205, 304): # HTML encode to prevent Cross Site Scripting attacks # (see bug #1100201) content = (self.error_message_format % { 'code': code, 'message': _quote_html(message), 'explain': explain }) self.send_header("Content-Type", self.error_content_type) self.end_headers() if self.command != 'HEAD' and content: self.wfile.write(content)
def test_info(self): self.assertIsInstance(self.returned_obj.info(), mimetools.Message)