Python cStringIO 模块,StringIO() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用cStringIO.StringIO()

项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def rrset_to_text(m):
    s = StringIO()

    if 'bailiwick' in m:
        s.write(';;  bailiwick: %s\n' % m['bailiwick'])

    if 'count' in m:
        s.write(';;      count: %s\n' % locale.format('%d', m['count'], True))

    if 'time_first' in m:
        s.write(';; first seen: %s\n' % sec_to_text(m['time_first']))
    if 'time_last' in m:
        s.write(';;  last seen: %s\n' % sec_to_text(m['time_last']))

    if 'zone_time_first' in m:
        s.write(';; first seen in zone file: %s\n' % sec_to_text(m['zone_time_first']))
    if 'zone_time_last' in m:
        s.write(';;  last seen in zone file: %s\n' % sec_to_text(m['zone_time_last']))

    if 'rdata' in m:
        for rdata in m['rdata']:
            s.write('%s IN %s %s\n' % (m['rrname'], m['rrtype'], rdata))

    s.seek(0)
    return s.read()
项目:CTF    作者:calee0219    | 项目源码 | 文件源码
def vertical_flip(self):
        if self.empty == True:
            die("Attempted to call vertical_flip() on an empty BMP object!")

        mod_bitmap = ""
        rows = []

        # Let's pretend it's a file to make things easy
        f = StringIO(self.bitmap_data)
        for row_num in xrange(0, self.height):
            rows.append(f.read(self.width * 3 + self.padding_size))
        for row in rows[::-1]:
             mod_bitmap += row
        self.bitmap_data = mod_bitmap

        return self
项目:CTF    作者:calee0219    | 项目源码 | 文件源码
def grayscale(self):
        """ Convert the image into a (24-bit) grayscale one, using the Y'UV method. """

        # http://en.wikipedia.org/wiki/YUV
        Wr = 0.299
        Wb = 0.114
        Wg = 0.587

        mod_bitmap = ""

        f = StringIO(self.bitmap_data)
        for row_num in xrange(0, self.height):
            for pix in xrange(0, self.width):
                pixel = struct.unpack("3B", f.read(3))
                out_pix = chr(int(Wr * pixel[2] + Wg * pixel[1] + Wb * pixel[0]))
                mod_bitmap += out_pix * 3

            mod_bitmap += chr(0x00) * self.padding_size
            f.seek(self.padding_size, 1)

        self.bitmap_data = mod_bitmap

        return self
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def _try_config_test(self, logcfg, epattern, foundTest=None ):
        import ossie.utils.log4py.config

        with stdout_redirect(cStringIO.StringIO()) as new_stdout:
            ossie.utils.log4py.config.strConfig(logcfg,None)

        new_stdout.seek(0)
        found = []
        epats=[]
        if type(epattern) == str:
            epats.append(epattern)
        else:
            epats = epattern
        if foundTest == None:
            foundTest = len(epats)*[True]
        for x in new_stdout.readlines():
            for epat in epats:
                m=re.search( epat, x )
                if m :
                    found.append( True )

        self.assertEqual(found, foundTest )
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def _showPorts(self, ports, destfile=None):
        localdef_dest = False
        if destfile == None:
            localdef_dest = True
            destfile = cStringIO.StringIO()

        if ports:
            table = TablePrinter('Port Name', 'Port Interface')
            for port in ports.itervalues():
                table.append(port['Port Name'], port['Port Interface'])
            table.write(f=destfile)
        else:
            print >>destfile, "None"

        if localdef_dest:
            pydoc.pager(destfile.getvalue())
            destfile.close()
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def api(self, destfile=None):
        localdef_dest = False
        if destfile == None:
            localdef_dest = True
            destfile = cStringIO.StringIO()

        print >>destfile, "Provides (Input) Ports =============="
        self._showPorts(self._providesPortDict, destfile=destfile)
        print >>destfile, "\n"

        print >>destfile, "Uses (Output) Ports =============="
        self._showPorts(self._usesPortDict, destfile=destfile)
        print >>destfile, "\n"

        if localdef_dest:
            pydoc.pager(destfile.getvalue())
            destfile.close()
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def api(self, destfile=None):
        localdef_dest = False
        if destfile == None:
            localdef_dest = True
            destfile = cStringIO.StringIO()

        print >>destfile, 'Allocation Properties ======'
        if not self._allocProps:
            print >>destfile, 'None'
            return

        table = TablePrinter('Property Name', '(Data Type)', 'Action')
        for prop in self._allocProps:
            table.append(prop.clean_name, '('+prop.type+')', prop.action)
            if prop.type in ('struct', 'structSeq'):
                if prop.type == 'structSeq':
                    structdef = prop.structDef
                else:
                    structdef = prop
                for member in structdef.members.itervalues():
                    table.append('  '+member.clean_name, member.type)
        table.write(f=destfile)
        if localdef_dest:
            pydoc.pager(destfile.getvalue())
            destfile.close()
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def api(self, showComponentName=True, showInterfaces=True, showProperties=True, externalPropInfo=None, destfile=None):
        '''
        Inspect interfaces and properties for the component
        '''
        localdef_dest = False
        if destfile == None:
            localdef_dest = True
            destfile = cStringIO.StringIO()

        className = self.__class__.__name__
        if showComponentName == True:
            print >>destfile, className+" [" + str(self.name) + "]:"
        if showInterfaces == True:
            PortSupplier.api(self, destfile=destfile)
        if showProperties == True and self._properties != None:
            PropertySet.api(self, externalPropInfo, destfile=destfile)

        if localdef_dest:
            pydoc.pager(destfile.getvalue())
            destfile.close()
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def redirectSTDOUT(filename):
    if _DEBUG == True:
        print "redirectSTDOUT(): redirecting stdout/stderr to filename " + str(filename)
    if type(filename) == str:
        dirname = os.path.dirname(filename)
        if len(dirname) == 0 or \
           (len(dirname) > 0 and os.path.isdir(dirname)):
            try:
                f = open(filename,'w')
                # Send stdout and stderr to provided filename
                sys.stdout = f 
                sys.stderr = f 
            except Exception, e:
                print "redirectSTDOUT(): ERROR - Unable to open file " + str(filename) + " for writing stdout and stderr " + str(e)
    elif type(filename) == cStringIO.OutputType:
        sys.stdout = filename
        sys.stderr = filename
    else:
        print 'redirectSTDOUT(): failed to redirect stdout/stderr to ' + str(filename)
        print 'redirectSTDOUT(): argument must be: string filename, cStringIO.StringIO object'
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def api(self, destfile=None):
        """
        Prints application programming interface (API) information and returns.

        """
        localdef_dest = False
        if destfile == None:
            localdef_dest = True
            destfile = cStringIO.StringIO()

        print >>destfile, "Component " + self.__class__.__name__ + " :"
        PortSupplier.api(self, destfile=destfile)

        if localdef_dest:
            pydoc.pager(destfile.getvalue())
            destfile.close()
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def read_string4(f):
    r"""
    >>> import StringIO
    >>> read_string4(StringIO.StringIO("\x00\x00\x00\x00abc"))
    ''
    >>> read_string4(StringIO.StringIO("\x03\x00\x00\x00abcdef"))
    'abc'
    >>> read_string4(StringIO.StringIO("\x00\x00\x00\x03abcdef"))
    Traceback (most recent call last):
    ...
    ValueError: expected 50331648 bytes in a string4, but only 6 remain
    """

    n = read_int4(f)
    if n < 0:
        raise ValueError("string4 byte count < 0: %d" % n)
    data = f.read(n)
    if len(data) == n:
        return data
    raise ValueError("expected %d bytes in a string4, but only %d remain" %
                     (n, len(data)))
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def read_string1(f):
    r"""
    >>> import StringIO
    >>> read_string1(StringIO.StringIO("\x00"))
    ''
    >>> read_string1(StringIO.StringIO("\x03abcdef"))
    'abc'
    """

    n = read_uint1(f)
    assert n >= 0
    data = f.read(n)
    if len(data) == n:
        return data
    raise ValueError("expected %d bytes in a string1, but only %d remain" %
                     (n, len(data)))
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def read_long1(f):
    r"""
    >>> import StringIO
    >>> read_long1(StringIO.StringIO("\x00"))
    0L
    >>> read_long1(StringIO.StringIO("\x02\xff\x00"))
    255L
    >>> read_long1(StringIO.StringIO("\x02\xff\x7f"))
    32767L
    >>> read_long1(StringIO.StringIO("\x02\x00\xff"))
    -256L
    >>> read_long1(StringIO.StringIO("\x02\x00\x80"))
    -32768L
    """

    n = read_uint1(f)
    data = f.read(n)
    if len(data) != n:
        raise ValueError("not enough data in stream to read long1")
    return decode_long(data)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def read_long4(f):
    r"""
    >>> import StringIO
    >>> read_long4(StringIO.StringIO("\x02\x00\x00\x00\xff\x00"))
    255L
    >>> read_long4(StringIO.StringIO("\x02\x00\x00\x00\xff\x7f"))
    32767L
    >>> read_long4(StringIO.StringIO("\x02\x00\x00\x00\x00\xff"))
    -256L
    >>> read_long4(StringIO.StringIO("\x02\x00\x00\x00\x00\x80"))
    -32768L
    >>> read_long1(StringIO.StringIO("\x00\x00\x00\x00"))
    0L
    """

    n = read_int4(f)
    if n < 0:
        raise ValueError("long4 byte count < 0: %d" % n)
    data = f.read(n)
    if len(data) != n:
        raise ValueError("not enough data in stream to read long4")
    return decode_long(data)
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _handle_message_delivery_status(self, msg):
        # We can't just write the headers directly to self's file object
        # because this will leave an extra newline between the last header
        # block and the boundary.  Sigh.
        blocks = []
        for part in msg.get_payload():
            s = StringIO()
            g = self.clone(s)
            g.flatten(part, unixfrom=False)
            text = s.getvalue()
            lines = text.split('\n')
            # Strip off the unnecessary trailing empty line
            if lines and lines[-1] == '':
                blocks.append(NL.join(lines[:-1]))
            else:
                blocks.append(text)
        # Now join all the blocks with an empty line.  This has the lovely
        # effect of separating each block with an empty line, but not adding
        # an extra one after the last one.
        self._fp.write(NL.join(blocks))
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def parseString(string, handler, errorHandler=ErrorHandler()):
    try:
        from cStringIO import StringIO
    except ImportError:
        from StringIO import StringIO

    if errorHandler is None:
        errorHandler = ErrorHandler()
    parser = make_parser()
    parser.setContentHandler(handler)
    parser.setErrorHandler(errorHandler)

    inpsrc = InputSource()
    inpsrc.setByteStream(StringIO(string))
    parser.parse(inpsrc)

# this is the parser list used by the make_parser function if no
# alternatives are given as parameters to the function
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def gzip_encode(data):
    """data -> gzip encoded data

    Encode data using the gzip content encoding as described in RFC 1952
    """
    if not gzip:
        raise NotImplementedError
    f = StringIO.StringIO()
    gzf = gzip.GzipFile(mode="wb", fileobj=f, compresslevel=1)
    gzf.write(data)
    gzf.close()
    encoded = f.getvalue()
    f.close()
    return encoded

##
# Decode a string using the gzip content encoding such as specified by the
# Content-Encoding: gzip
# in the HTTP header, as described in RFC 1952
#
# @param data The encoded data
# @return the unencoded data
# @raises ValueError if data is not correctly coded.
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def gzip_decode(data):
    """gzip encoded data -> unencoded data

    Decode data using the gzip content encoding as described in RFC 1952
    """
    if not gzip:
        raise NotImplementedError
    f = StringIO.StringIO(data)
    gzf = gzip.GzipFile(mode="rb", fileobj=f)
    try:
        decoded = gzf.read()
    except IOError:
        raise ValueError("invalid data")
    f.close()
    gzf.close()
    return decoded

##
# Return a decoded file-like object for the gzip encoding
# as described in RFC 1952.
#
# @param response A stream supporting a read() method
# @return a file-like object that the decoded data can be read() from
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def rollover(self):
        if self._rolled: return
        file = self._file
        newfile = self._file = TemporaryFile(*self._TemporaryFileArgs)
        del self._TemporaryFileArgs

        newfile.write(file.getvalue())
        newfile.seek(file.tell(), 0)

        self._rolled = True

    # The method caching trick from NamedTemporaryFile
    # won't work here, because _file may change from a
    # _StringIO instance to a real file. So we list
    # all the methods directly.

    # Context management protocol
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def getbodytext(self, decode = 1):
        """Return the message's body text as string.  This undoes a
        Content-Transfer-Encoding, but does not interpret other MIME
        features (e.g. multipart messages).  To suppress decoding,
        pass 0 as an argument."""
        self.fp.seek(self.startofbody)
        encoding = self.getencoding()
        if not decode or encoding in ('', '7bit', '8bit', 'binary'):
            return self.fp.read()
        try:
            from cStringIO import StringIO
        except ImportError:
            from StringIO import StringIO
        output = StringIO()
        mimetools.decode(self.fp, output, encoding)
        return output.getvalue()
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def file(self):
        """
        Returns a file pointer to this binary

        :example:

        >>> process_obj = c.select(Process).where("process_name:svch0st.exe").first()
        >>> binary_obj = process_obj.binary
        >>> print(binary_obj.file.read(2))
        MZ
        """
        # TODO: I don't like reaching through to the session...
        with closing(self._cb.session.get("/api/v1/binary/{0:s}".format(self.md5sum), stream=True)) as r:
            z = StringIO(r.content)
            zf = ZipFile(z)
            fp = zf.open('filedata')
            return fp
项目:recipebook    作者:dpapathanasiou    | 项目源码 | 文件源码
def put (url, data, headers={}):
    """Make a PUT request to the url, using data in the message body,
    with the additional headers, if any"""

    reply = -1 # default, non-http response

    curl = pycurl.Curl()
    curl.setopt(pycurl.URL, url)
    if len(headers) > 0:
        curl.setopt(pycurl.HTTPHEADER, [k+': '+v for k,v in headers.items()])
    curl.setopt(pycurl.PUT, 1)
    curl.setopt(pycurl.INFILESIZE, len(data))
    databuffer = StringIO(data)
    curl.setopt(pycurl.READFUNCTION, databuffer.read)
    try:
        curl.perform()
        reply = curl.getinfo(pycurl.HTTP_CODE)
    except Exception:
        pass
    curl.close()

    return reply
项目:PyJFuzz    作者:mseclab    | 项目源码 | 文件源码
def __init__(self, sock, mode='rb', bufsize=-1, close=False):
        self._sock = sock
        self.mode = mode # Not actually used in this version
        if bufsize < 0:
            bufsize = self.default_bufsize
        self.bufsize = bufsize
        self.softspace = False
        # _rbufsize is the suggested recv buffer size.  It is *strictly*
        # obeyed within readline() for recv calls.  If it is larger than
        # default_bufsize it will be used for recv calls within read().
        if bufsize == 0:
            self._rbufsize = 1
        elif bufsize == 1:
            self._rbufsize = self.default_bufsize
        else:
            self._rbufsize = bufsize
        self._wbufsize = bufsize
        # We use StringIO for the read buffer to avoid holding a list
        # of variously sized string objects which have been known to
        # fragment the heap due to how they are malloc()ed and often
        # realloc()ed down much smaller than their original allocation.
        self._rbuf = StringIO()
        self._wbuf = [] # A list of strings
        self._wbuf_len = 0
        self._close = close
项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def __init__(self, ofile, maxresultrows=None):
        self._maxresultrows = 50000 if maxresultrows is None else maxresultrows

        self._ofile = ofile
        self._fieldnames = None
        self._buffer = StringIO()

        self._writer = csv.writer(self._buffer, dialect=CsvDialect)
        self._writerow = self._writer.writerow
        self._finished = False
        self._flushed = False

        self._inspector = OrderedDict()
        self._chunk_count = 0
        self._record_count = 0
        self._total_record_count = 0L
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _handle_message_delivery_status(self, msg):
        # We can't just write the headers directly to self's file object
        # because this will leave an extra newline between the last header
        # block and the boundary.  Sigh.
        blocks = []
        for part in msg.get_payload():
            s = StringIO()
            g = self.clone(s)
            g.flatten(part, unixfrom=False)
            text = s.getvalue()
            lines = text.split('\n')
            # Strip off the unnecessary trailing empty line
            if lines and lines[-1] == '':
                blocks.append(NL.join(lines[:-1]))
            else:
                blocks.append(text)
        # Now join all the blocks with an empty line.  This has the lovely
        # effect of separating each block with an empty line, but not adding
        # an extra one after the last one.
        self._fp.write(NL.join(blocks))
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_header_splitter(self):
        eq = self.ndiffAssertEqual
        msg = MIMEText('')
        # It'd be great if we could use add_header() here, but that doesn't
        # guarantee an order of the parameters.
        msg['X-Foobar-Spoink-Defrobnit'] = (
            'wasnipoop; giraffes="very-long-necked-animals"; '
            'spooge="yummy"; hippos="gargantuan"; marshmallows="gooey"')
        sfp = StringIO()
        g = Generator(sfp)
        g.flatten(msg)
        eq(sfp.getvalue(), '''\
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
X-Foobar-Spoink-Defrobnit: wasnipoop; giraffes="very-long-necked-animals";
\tspooge="yummy"; hippos="gargantuan"; marshmallows="gooey"

''')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_nested_with_same_boundary(self):
        eq = self.ndiffAssertEqual
        # msg 39.txt is similarly evil in that it's got inner parts that use
        # the same boundary as outer parts.  Again, I believe the way this is
        # parsed is closest to the spirit of RFC 2046
        msg = self._msgobj('msg_39.txt')
        sfp = StringIO()
        iterators._structure(msg, sfp)
        eq(sfp.getvalue(), """\
multipart/mixed
    multipart/mixed
        multipart/alternative
        application/octet-stream
        application/octet-stream
    text/plain
""")
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_invalid_content_type(self):
        eq = self.assertEqual
        neq = self.ndiffAssertEqual
        msg = Message()
        # RFC 2045, $5.2 says invalid yields text/plain
        msg['Content-Type'] = 'text'
        eq(msg.get_content_maintype(), 'text')
        eq(msg.get_content_subtype(), 'plain')
        eq(msg.get_content_type(), 'text/plain')
        # Clear the old value and try something /really/ invalid
        del msg['content-type']
        msg['Content-Type'] = 'foo'
        eq(msg.get_content_maintype(), 'text')
        eq(msg.get_content_subtype(), 'plain')
        eq(msg.get_content_type(), 'text/plain')
        # Still, make sure that the message is idempotently generated
        s = StringIO()
        g = Generator(s)
        g.flatten(msg)
        neq(s.getvalue(), 'Content-Type: foo\n\n')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_generate(self):
        # First craft the message to be encapsulated
        m = Message()
        m['Subject'] = 'An enclosed message'
        m.set_payload('Here is the body of the message.\n')
        r = MIMEMessage(m)
        r['Subject'] = 'The enclosing message'
        s = StringIO()
        g = Generator(s)
        g.flatten(r)
        self.assertEqual(s.getvalue(), """\
Content-Type: message/rfc822
MIME-Version: 1.0
Subject: The enclosing message

Subject: An enclosed message

Here is the body of the message.
""")
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_header_splitter(self):
        eq = self.ndiffAssertEqual
        msg = MIMEText('')
        # It'd be great if we could use add_header() here, but that doesn't
        # guarantee an order of the parameters.
        msg['X-Foobar-Spoink-Defrobnit'] = (
            'wasnipoop; giraffes="very-long-necked-animals"; '
            'spooge="yummy"; hippos="gargantuan"; marshmallows="gooey"')
        sfp = StringIO()
        g = Generator(sfp)
        g.flatten(msg)
        eq(sfp.getvalue(), '''\
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
X-Foobar-Spoink-Defrobnit: wasnipoop; giraffes="very-long-necked-animals";
\tspooge="yummy"; hippos="gargantuan"; marshmallows="gooey"

''')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_nested_with_same_boundary(self):
        eq = self.ndiffAssertEqual
        # msg 39.txt is similarly evil in that it's got inner parts that use
        # the same boundary as outer parts.  Again, I believe the way this is
        # parsed is closest to the spirit of RFC 2046
        msg = self._msgobj('msg_39.txt')
        sfp = StringIO()
        Iterators._structure(msg, sfp)
        eq(sfp.getvalue(), """\
multipart/mixed
    multipart/mixed
        multipart/alternative
        application/octet-stream
        application/octet-stream
    text/plain
""")
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_invalid_content_type(self):
        eq = self.assertEqual
        neq = self.ndiffAssertEqual
        msg = Message()
        # RFC 2045, $5.2 says invalid yields text/plain
        msg['Content-Type'] = 'text'
        eq(msg.get_content_maintype(), 'text')
        eq(msg.get_content_subtype(), 'plain')
        eq(msg.get_content_type(), 'text/plain')
        # Clear the old value and try something /really/ invalid
        del msg['content-type']
        msg['Content-Type'] = 'foo'
        eq(msg.get_content_maintype(), 'text')
        eq(msg.get_content_subtype(), 'plain')
        eq(msg.get_content_type(), 'text/plain')
        # Still, make sure that the message is idempotently generated
        s = StringIO()
        g = Generator(s)
        g.flatten(msg)
        neq(s.getvalue(), 'Content-Type: foo\n\n')
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_generate(self):
        # First craft the message to be encapsulated
        m = Message()
        m['Subject'] = 'An enclosed message'
        m.set_payload('Here is the body of the message.\n')
        r = MIMEMessage(m)
        r['Subject'] = 'The enclosing message'
        s = StringIO()
        g = Generator(s)
        g.flatten(r)
        self.assertEqual(s.getvalue(), """\
Content-Type: message/rfc822
MIME-Version: 1.0
Subject: The enclosing message

Subject: An enclosed message

Here is the body of the message.
""")
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def remove_blank_lines(source):
    """
    Removes blank lines from 'source' and returns the result.

    Example:

    .. code-block:: python

        test = "foo"

        test2 = "bar"

    Will become:

    .. code-block:: python

        test = "foo"
        test2 = "bar"
    """
    io_obj = cStringIO.StringIO(source)
    source = [a for a in io_obj.readlines() if a.strip()]
    return "".join(source)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def rev_readlines2(arg,bufsize=8192):
    f1=open(arg,'rb')
    f1.seek(0,2)# go to the end
    leftover=''
    while f1.tell():
        print f1.tell()
        if f1.tell()<bufsize: bufsize=f1.tell()
        f1.seek(-bufsize,1)
        in_memory=f1.read(bufsize)+leftover
        f1.seek(-bufsize,1)
        buffer=cStringIO.StringIO(in_memory)
        buffer.seek(0,2)# go to the end
        line=collections.deque()
        while buffer.tell():
            buffer.seek(-1,1)
            c=buffer.read(1)
            buffer.seek(-1,1)
            line.appendleft(c)
            if c =='\n':
                yield ''.join(line).strip()
                line.clear()
        leftover=''.join(line).strip()
    yield leftover

#different approach and much faster
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def write(self, data):
        """Buffer the input, then send as many bytes as possible"""
        self.buffer.write(data)
        if self.writable():
            buff = self.buffer.getvalue()
            # next try/except clause suggested by Robert Brown
            try:
                    sent = self.sock.send(buff)
            except:
                    # Catch socket exceptions and abort
                    # writing the buffer
                    sent = len(data)

            # reset the buffer to the data that has not yet be sent
            self.buffer=cStringIO.StringIO()
            self.buffer.write(buff[sent:])
项目:Splunk_CBER_App    作者:MHaggis    | 项目源码 | 文件源码
def __init__(self, ofile, maxresultrows=None):
        self._maxresultrows = 50000 if maxresultrows is None else maxresultrows

        self._ofile = ofile
        self._fieldnames = None
        self._buffer = StringIO()

        self._writer = csv.writer(self._buffer, dialect=CsvDialect)
        self._writerow = self._writer.writerow
        self._finished = False
        self._flushed = False

        self._inspector = OrderedDict()
        self._chunk_count = 0
        self._record_count = 0
        self._total_record_count = 0L
项目:v2ex-tornado-2    作者:coderyy    | 项目源码 | 文件源码
def runTokenizerTest(test):
    #XXX - move this out into the setup function
    #concatenate all consecutive character tokens into a single token
    if 'doubleEscaped' in test:
        test = unescape_test(test)

    expected = concatenateCharacterTokens(test['output'])            
    if 'lastStartTag' not in test:
        test['lastStartTag'] = None
    outBuffer = cStringIO.StringIO()
    stdout = sys.stdout
    sys.stdout = outBuffer
    parser = TokenizerTestParser(test['initialState'], 
                                 test['lastStartTag'])
    tokens = parser.parse(test['input'])
    tokens = concatenateCharacterTokens(tokens)
    received = normalizeTokens(tokens)
    errorMsg = u"\n".join(["\n\nInitial state:",
                          test['initialState'] ,
                          "\nInput:", unicode(test['input']),
                          "\nExpected:", unicode(expected),
                          "\nreceived:", unicode(tokens)])
    errorMsg = errorMsg.encode("utf-8")
    ignoreErrorOrder = test.get('ignoreErrorOrder', False)
    assert tokensMatch(expected, received, ignoreErrorOrder), errorMsg
项目:TACTIC-Handler    作者:listyque    | 项目源码 | 文件源码
def remove_blank_lines(source):
    """
    Removes blank lines from *source* and returns the result.

    Example:

    .. code-block:: python

        test = "foo"

        test2 = "bar"

    Will become:

    .. code-block:: python

        test = "foo"
        test2 = "bar"
    """
    io_obj = io.StringIO(source)
    source = [a for a in io_obj.readlines() if a.strip()]
    return "".join(source)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def make_input_stream(input, charset):
    # Is already an input stream.
    if hasattr(input, 'read'):
        if PY2:
            return input
        rv = _find_binary_reader(input)
        if rv is not None:
            return rv
        raise TypeError('Could not find binary reader for input stream.')

    if input is None:
        input = b''
    elif not isinstance(input, bytes):
        input = input.encode(charset)
    if PY2:
        return StringIO(input)
    return io.BytesIO(input)
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def open_resource(name):
    """Load the object from the datastore"""
    import logging
    from cStringIO import StringIO
    try:
        data = ndb.Key('Zoneinfo', name, namespace=NDB_NAMESPACE).get().data
    except AttributeError:
        # Missing zone info; test for GMT - which would be there if the 
        # Zoneinfo has been initialized.
        if ndb.Key('Zoneinfo', 'GMT', namespace=NDB_NAMESPACE).get():
          # the user asked for a zone that doesn't seem to exist.
          logging.exception("Requested zone '%s' is not in the database." %
              name)
          raise

        # we need to initialize the database
        init_zoneinfo()
        return open_resource(name)

    return StringIO(data)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def emit(events, stream=None, Dumper=Dumper,
        canonical=None, indent=None, width=None,
        allow_unicode=None, line_break=None):
    """
    Emit YAML parsing events into a stream.
    If stream is None, return the produced string instead.
    """
    getvalue = None
    if stream is None:
        from StringIO import StringIO
        stream = StringIO()
        getvalue = stream.getvalue
    dumper = Dumper(stream, canonical=canonical, indent=indent, width=width,
            allow_unicode=allow_unicode, line_break=line_break)
    try:
        for event in events:
            dumper.emit(event)
    finally:
        dumper.dispose()
    if getvalue:
        return getvalue()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def emit(events, stream=None, Dumper=Dumper,
        canonical=None, indent=None, width=None,
        allow_unicode=None, line_break=None):
    """
    Emit YAML parsing events into a stream.
    If stream is None, return the produced string instead.
    """
    getvalue = None
    if stream is None:
        from StringIO import StringIO
        stream = StringIO()
        getvalue = stream.getvalue
    dumper = Dumper(stream, canonical=canonical, indent=indent, width=width,
            allow_unicode=allow_unicode, line_break=line_break)
    try:
        for event in events:
            dumper.emit(event)
    finally:
        dumper.dispose()
    if getvalue:
        return getvalue()
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def runProfiler(logger, func, args=tuple(), kw={}, verbose=True, nb_func=25, sort_by=('cumulative', 'calls')):
    profile_filename = "/tmp/profiler"
    prof = Profile(profile_filename)
    try:
        logger.warning("Run profiler")
        result = prof.runcall(func, *args, **kw)
        prof.close()
        logger.error("Profiler: Process data...")
        stat = loadStats(profile_filename)
        stat.strip_dirs()
        stat.sort_stats(*sort_by)

        logger.error("Profiler: Result:")
        log = StringIO()
        stat.stream = log
        stat.print_stats(nb_func)
        log.seek(0)
        for line in log:
            logger.error(line.rstrip())
        return result
    finally:
        unlink(profile_filename)
项目:Tinychat-Bot--Discontinued    作者:Tinychat    | 项目源码 | 文件源码
def __init__(self, buf=None):
        """
        @raise TypeError: Unable to coerce C{buf} to C{StringIO}.
        """
        self._buffer = StringIO()

        if isinstance(buf, python.str_types):
            self._buffer.write(buf)
        elif hasattr(buf, 'getvalue'):
            self._buffer.write(buf.getvalue())
        elif (
                hasattr(buf, 'read') and
                hasattr(buf, 'seek') and
                hasattr(buf, 'tell')):
            old_pos = buf.tell()
            buf.seek(0)
            self._buffer.write(buf.read())
            buf.seek(old_pos)
        elif buf is not None:
            raise TypeError("Unable to coerce buf->StringIO got %r" % (buf,))

        self._get_len()
        self._len_changed = False
        self._buffer.seek(0, 0)
项目:Tinychat-Bot--Discontinued    作者:Tinychat    | 项目源码 | 文件源码
def truncate(self, size=0):
        """
        Truncates the stream to the specified length.

        @param size: The length of the stream, in bytes.
        @type size: C{int}
        """
        if size == 0:
            self._buffer = StringIO()
            self._len_changed = True

            return

        cur_pos = self.tell()
        self.seek(0)
        buf = self.read(size)
        self._buffer = StringIO()

        self._buffer.write(buf)
        self.seek(cur_pos)
        self._len_changed = True
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def _decode(self, headers, fileobj):
        encoding = headers[-1].get_all("content-transfer-encoding", ["7bit"])[0]
        encoding = encoding.lower()

        if encoding == "base64":
            try:
                data = base64.b64decode(fileobj.read())
            except TypeError as error:
                self.log.error("Base64 decoding failed ({0})".format(error))
                idiokit.stop(False)
            return StringIO(data)

        if encoding == "quoted-printable":
            output = StringIO()
            quopri.decode(fileobj, output)
            output.seek(0)
            return output

        return fileobj
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def format(self, parts, events, filename, *args):
        prefix, ext = os.path.splitext(filename)
        if ext.lower() == ".zip":
            zip_name = filename
            raw_name = prefix
        else:
            zip_name = filename + ".zip"
            raw_name = filename

        data = self.formatter.format(parts, events, *args)
        memfile = StringIO()
        zipped = zipfile.ZipFile(memfile, 'w', zipfile.ZIP_DEFLATED)
        zipped.writestr(raw_name, data.encode("utf-8"))
        zipped.close()
        memfile.flush()
        memfile.seek(0)

        part = MIMEBase("application", "zip")
        part.set_payload(memfile.read())
        encode_base64(part)
        part.add_header("Content-Disposition", "attachment", filename=zip_name)
        parts.append(part)

        return u""