Python StringIO 模块,StringIO() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用StringIO.StringIO()

项目:ThreatPrep    作者:ThreatResponse    | 项目源码 | 文件源码
def get_iam_credential_report(self):
        report = None
        while report == None:
            try:
                report = self.iam_client.get_credential_report()
            except botocore.exceptions.ClientError as e:
                if 'ReportNotPresent' in e.message:
                    self.iam_client.generate_credential_report()
                else:
                    raise e
                time.sleep(5)
        document = StringIO.StringIO(report['Content'])
        reader = csv.DictReader(document)
        report_rows = []
        for row in reader:
            report_rows.append(row)
        return report_rows
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def scan_file(self, this_file):
        """ Submit a file to be scanned by VirusTotal

        :param this_file: File to be scanned (32MB file size limit)
        :return: JSON response that contains scan_id and permalink.
        """
        params = {'apikey': self.api_key}
        try:
            if type(this_file) == str and os.path.isfile(this_file):
                files = {'file': (this_file, open(this_file, 'rb'))}
            elif isinstance(this_file, StringIO.StringIO):
                files = {'file': this_file.read()}
            else:
                files = {'file': this_file}
        except TypeError as e:
            return dict(error=e.message)

        try:
            response = requests.post(self.base + 'file/scan', files=files, params=params, proxies=self.proxies)
        except requests.RequestException as e:
            return dict(error=e.message)

        return _return_response_and_status_code(response)
项目:newsreap    作者:caronc    | 项目源码 | 文件源码
def encode_public_key(self):
        """
        Based on spotnab, this is the gzipped version of the key
        with base64 applied to it. We encode it as such and
        return it.

        """
        fileobj = StringIO()
        with GzipFile(fileobj=fileobj, mode="wb") as f:
            try:
                f.write(self.public_pem())
            except TypeError:
                # It wasn't initialized yet
                return None

        return b64encode(fileobj.getvalue())
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def test_pop_zipfile():
    sio = StringIO()
    zf = wheel.install.VerifyingZipFile(sio, 'w')
    zf.writestr("one", b"first file")
    zf.writestr("two", b"second file")
    zf.close()

    try:
        zf.pop()
    except RuntimeError:
        pass # already closed
    else:
        raise Exception("expected RuntimeError")

    zf = wheel.install.VerifyingZipFile(sio, 'a')
    zf.pop()
    zf.close()

    zf = wheel.install.VerifyingZipFile(sio, 'r')
    assert len(zf.infolist()) == 1
项目:ckanext-validation    作者:frictionlessdata    | 项目源码 | 文件源码
def test_validation_fails_on_upload(self, mock_open):

        invalid_file = StringIO.StringIO()
        invalid_file.write(INVALID_CSV)

        mock_upload = MockFieldStorage(invalid_file, 'invalid.csv')

        dataset = factories.Dataset()

        invalid_stream = io.BufferedReader(io.BytesIO(INVALID_CSV))

        with mock.patch('io.open', return_value=invalid_stream):

            with assert_raises(t.ValidationError) as e:

                    call_action(
                        'resource_create',
                        package_id=dataset['id'],
                        format='CSV',
                        upload=mock_upload
                    )

        assert 'validation' in e.exception.error_dict
        assert 'missing-value' in str(e.exception)
        assert 'Row 2 has a missing value in column 4' in str(e.exception)
项目:ckanext-validation    作者:frictionlessdata    | 项目源码 | 文件源码
def test_validation_passes_on_upload(self, mock_open):

        invalid_file = StringIO.StringIO()
        invalid_file.write(VALID_CSV)

        mock_upload = MockFieldStorage(invalid_file, 'invalid.csv')

        dataset = factories.Dataset()

        valid_stream = io.BufferedReader(io.BytesIO(VALID_CSV))

        with mock.patch('io.open', return_value=valid_stream):

            resource = call_action(
                'resource_create',
                package_id=dataset['id'],
                format='CSV',
                upload=mock_upload
            )

        assert_equals(resource['validation_status'], 'success')
        assert 'validation_timestamp' in resource
项目:ckanext-validation    作者:frictionlessdata    | 项目源码 | 文件源码
def test_schema_upload_field(self, mock_open):

        schema_file = StringIO.StringIO('{"fields":[{"name":"category"}]}')

        mock_upload = MockFieldStorage(schema_file, 'schema.json')

        dataset = factories.Dataset()

        resource = call_action(
            'resource_create',
            package_id=dataset['id'],
            url='http://example.com/file.csv',
            schema_upload=mock_upload
        )

        assert_equals(resource['schema'], {'fields': [{'name': 'category'}]})

        assert 'schema_upload' not in resource
        assert 'schema_url' not in resource
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def test_apiHostCollocation(self):
        app = self._rhDom.createApplication("/waveforms/through_w/through_w.sad.xml")
        provides_ports = object.__getattribute__(app,'_providesPortDict')
        self.assertEquals(provides_ports, {})
        uses_ports = object.__getattribute__(app,'_usesPortDict')
        self.assertEquals(uses_ports, {})
        _destfile=StringIO.StringIO()
        app.api(destfile=_destfile)
        provides_ports = object.__getattribute__(app,'_providesPortDict')
        self.assertEquals(len(provides_ports), 1)
        self.assertEquals(provides_ports.keys()[0], 'input')
        self.assertEquals(provides_ports['input']['Port Interface'], 'IDL:CF/LifeCycle:1.0')
        self.assertEquals(provides_ports['input']['Port Name'], 'input')
        uses_ports = object.__getattribute__(app,'_usesPortDict')
        self.assertEquals(uses_ports.keys()[0], 'output')
        self.assertEquals(uses_ports['output']['Port Interface'], 'IDL:CF/LifeCycle:1.0')
        self.assertEquals(uses_ports['output']['Port Name'], 'output')
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def dumpPackets(self, pkt_start=0, pkt_end=None, payload_start=0, payload_end=40, raw_payload=False, header_only=False, use_pager=True ):
        genf=self._gen_packet( self.raw_data_, pkt_start ) 
        if pkt_end == None: 
           pkt_end = self.npkts_ 
        else:
           pkt_end = pkt_end + 1
        res = StringIO()
        for i, pkt in enumerate(genf,pkt_start):
            if i < pkt_end:
                print >>res, 'Packet: ', str(i)
                print >>res, pkt.header_and_payload(payload_start, payload_end, header_only=header_only, raw=raw_payload )
            else:
                break

        if use_pager:
            _helpers.Pager( res.getvalue() )
        else:
            print res.getvalue()
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def parseString(inString):
    from StringIO import StringIO
    doc = parsexml_(StringIO(inString))
    rootNode = doc.getroot()
    rootTag, rootClass = get_root_tag(rootNode)
    if rootClass is None:
        rootTag = 'softPkg'
        rootClass = softPkg
    rootObj = rootClass.factory()
    rootObj.build(rootNode)
    # Enable Python to collect the space used by the DOM.
    doc = None
##     sys.stdout.write('<?xml version="1.0" ?>\n')
##     rootObj.export(sys.stdout, 0, name_="softPkg",
##         namespacedef_='')
    return rootObj
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def parseString(inString):
    from StringIO import StringIO
    doc = parsexml_(StringIO(inString))
    rootNode = doc.getroot()
    rootTag, rootClass = get_root_tag(rootNode)
    if rootClass is None:
        rootTag = 'devicepkg'
        rootClass = devicepkg
    rootObj = rootClass.factory()
    rootObj.build(rootNode)
    # Enable Python to collect the space used by the DOM.
    doc = None
##     sys.stdout.write('<?xml version="1.0" ?>\n')
##     rootObj.export(sys.stdout, 0, name_="devicepkg",
##         namespacedef_='')
    return rootObj
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def parseString(inString):
    from StringIO import StringIO
    doc = parsexml_(StringIO(inString))
    rootNode = doc.getroot()
    rootTag, rootClass = get_root_tag(rootNode)
    if rootClass is None:
        rootTag = 'deviceconfiguration'
        rootClass = deviceconfiguration
    rootObj = rootClass.factory()
    rootObj.build(rootNode)
    # Enable Python to collect the space used by the DOM.
    doc = None
##     sys.stdout.write('<?xml version="1.0" ?>\n')
##     rootObj.export(sys.stdout, 0, name_="deviceconfiguration",
##         namespacedef_='')
    return rootObj
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def parseString(inString):
    from StringIO import StringIO
    doc = parsexml_(StringIO(inString))
    rootNode = doc.getroot()
    rootTag, rootClass = get_root_tag(rootNode)
    if rootClass is None:
        rootTag = 'profile'
        rootClass = profile
    rootObj = rootClass.factory()
    rootObj.build(rootNode)
    # Enable Python to collect the space used by the DOM.
    doc = None
##     sys.stdout.write('<?xml version="1.0" ?>\n')
##     rootObj.export(sys.stdout, 0, name_="profile",
##         namespacedef_='')
    return rootObj
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def parseString(inString):
    from StringIO import StringIO
    doc = parsexml_(StringIO(inString))
    rootNode = doc.getroot()
    rootTag, rootClass = get_root_tag(rootNode)
    if rootClass is None:
        rootTag = 'softwareassembly'
        rootClass = softwareassembly
    rootObj = rootClass.factory()
    rootObj.build(rootNode)
    # Enable Python to collect the space used by the DOM.
    doc = None
##     sys.stdout.write('<?xml version="1.0" ?>\n')
##     rootObj.export(sys.stdout, 0, name_="softwareassembly",
##         namespacedef_='')
    return rootObj
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def parseString(inString):
    from StringIO import StringIO
    doc = parsexml_(StringIO(inString))
    rootNode = doc.getroot()
    rootTag, rootClass = get_root_tag(rootNode)
    if rootClass is None:
        rootTag = 'softwarecomponent'
        rootClass = softwarecomponent
    rootObj = rootClass.factory()
    rootObj.build(rootNode)
    # Enable Python to collect the space used by the DOM.
    doc = None
##     sys.stdout.write('<?xml version="1.0" ?>\n')
##     rootObj.export(sys.stdout, 0, name_="softwarecomponent",
##         namespacedef_='')
    return rootObj
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def parseString(inString):
    from StringIO import StringIO
    doc = parsexml_(StringIO(inString))
    rootNode = doc.getroot()
    rootTag, rootClass = get_root_tag(rootNode)
    if rootClass is None:
        rootTag = 'properties'
        rootClass = properties
    rootObj = rootClass.factory()
    rootObj.build(rootNode)
    # Enable Python to collect the space used by the DOM.
    doc = None
##     sys.stdout.write('<?xml version="1.0" ?>\n')
##     rootObj.export(sys.stdout, 0, name_="properties",
##         namespacedef_='')
    return rootObj
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def toXML(self, level=0, version="2.2.2"):
        value = None
        if self.defvalue != None:
            value = to_xmlvalue(self.defvalue, self.type_)

        simp = ossie.parsers.prf.simple(id_=self.id_, 
                                        type_=self.type_,
                                        name=self.name, 
                                        mode=self.mode,
                                        description=self.__doc__,
                                        value=value,
                                        units=self.units,
                                        action=ossie.parsers.prf.action(type_=self.action))

        for kind in self.kinds:
            simp.add_kind(ossie.parsers.prf.kind(kindtype=kind))

        xml = StringIO.StringIO()
        simp.export(xml, level, name_='simple')
        return xml.getvalue()
项目:dabdabrevolution    作者:harryparkdotio    | 项目源码 | 文件源码
def _body(self):
        try:
            read_func = self.environ['wsgi.input'].read
        except KeyError:
            self.environ['wsgi.input'] = BytesIO()
            return self.environ['wsgi.input']
        body_iter = self._iter_chunked if self.chunked else self._iter_body
        body, body_size, is_temp_file = BytesIO(), 0, False
        for part in body_iter(read_func, self.MEMFILE_MAX):
            body.write(part)
            body_size += len(part)
            if not is_temp_file and body_size > self.MEMFILE_MAX:
                body, tmp = TemporaryFile(mode='w+b'), body
                body.write(tmp.getvalue())
                del tmp
                is_temp_file = True
        self.environ['wsgi.input'] = body
        body.seek(0)
        return body
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def _check_rst_data(self, data):
        """Returns warnings when the provided data doesn't compile."""
        source_path = StringIO()
        parser = Parser()
        settings = frontend.OptionParser().get_default_values()
        settings.tab_width = 4
        settings.pep_references = None
        settings.rfc_references = None
        reporter = SilentReporter(source_path,
                          settings.report_level,
                          settings.halt_level,
                          stream=settings.warning_stream,
                          debug=settings.debug,
                          encoding=settings.error_encoding,
                          error_handler=settings.error_encoding_error_handler)

        document = nodes.document(settings, reporter, source=source_path)
        document.note_source(source_path, -1)
        try:
            parser.parse(data, document)
        except AttributeError:
            reporter.messages.append((-1, 'Could not finish the parsing.',
                                      '', {}))

        return reporter.messages
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def parseString(string, handler, errorHandler=ErrorHandler()):
    try:
        from cStringIO import StringIO
    except ImportError:
        from StringIO import StringIO

    if errorHandler is None:
        errorHandler = ErrorHandler()
    parser = make_parser()
    parser.setContentHandler(handler)
    parser.setErrorHandler(errorHandler)

    inpsrc = InputSource()
    inpsrc.setByteStream(StringIO(string))
    parser.parse(inpsrc)

# this is the parser list used by the make_parser function if no
# alternatives are given as parameters to the function
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def gzip_encode(data):
    """data -> gzip encoded data

    Encode data using the gzip content encoding as described in RFC 1952
    """
    if not gzip:
        raise NotImplementedError
    f = StringIO.StringIO()
    gzf = gzip.GzipFile(mode="wb", fileobj=f, compresslevel=1)
    gzf.write(data)
    gzf.close()
    encoded = f.getvalue()
    f.close()
    return encoded

##
# Decode a string using the gzip content encoding such as specified by the
# Content-Encoding: gzip
# in the HTTP header, as described in RFC 1952
#
# @param data The encoded data
# @return the unencoded data
# @raises ValueError if data is not correctly coded.
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def gzip_decode(data):
    """gzip encoded data -> unencoded data

    Decode data using the gzip content encoding as described in RFC 1952
    """
    if not gzip:
        raise NotImplementedError
    f = StringIO.StringIO(data)
    gzf = gzip.GzipFile(mode="rb", fileobj=f)
    try:
        decoded = gzf.read()
    except IOError:
        raise ValueError("invalid data")
    f.close()
    gzf.close()
    return decoded

##
# Return a decoded file-like object for the gzip encoding
# as described in RFC 1952.
#
# @param response A stream supporting a read() method
# @return a file-like object that the decoded data can be read() from
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def rollover(self):
        if self._rolled: return
        file = self._file
        newfile = self._file = TemporaryFile(*self._TemporaryFileArgs)
        del self._TemporaryFileArgs

        newfile.write(file.getvalue())
        newfile.seek(file.tell(), 0)

        self._rolled = True

    # The method caching trick from NamedTemporaryFile
    # won't work here, because _file may change from a
    # _StringIO instance to a real file. So we list
    # all the methods directly.

    # Context management protocol
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def getbodytext(self, decode = 1):
        """Return the message's body text as string.  This undoes a
        Content-Transfer-Encoding, but does not interpret other MIME
        features (e.g. multipart messages).  To suppress decoding,
        pass 0 as an argument."""
        self.fp.seek(self.startofbody)
        encoding = self.getencoding()
        if not decode or encoding in ('', '7bit', '8bit', 'binary'):
            return self.fp.read()
        try:
            from cStringIO import StringIO
        except ImportError:
            from StringIO import StringIO
        output = StringIO()
        mimetools.decode(self.fp, output, encoding)
        return output.getvalue()
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def get_message(self, key):
        """Return a Message representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        visible_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
            visible_headers.write(line.replace(os.linesep, '\n'))
        body = self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
        msg = BabylMessage(original_headers.getvalue() + body)
        msg.set_visible(visible_headers.getvalue())
        if key in self._labels:
            msg.set_labels(self._labels[key])
        return msg
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def get_string(self, key):
        """Return a string representation or raise a KeyError."""
        start, stop = self._lookup(key)
        self._file.seek(start)
        self._file.readline()   # Skip '1,' line specifying labels.
        original_headers = StringIO.StringIO()
        while True:
            line = self._file.readline()
            if line == '*** EOOH ***' + os.linesep or line == '':
                break
            original_headers.write(line.replace(os.linesep, '\n'))
        while True:
            line = self._file.readline()
            if line == os.linesep or line == '':
                break
        return original_headers.getvalue() + \
               self._file.read(stop - self._file.tell()).replace(os.linesep,
                                                                 '\n')
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def _get_result(response, limit=None):
    if limit == '0':
        result = response.read(224 * 1024)
    elif limit:
        result = response.read(int(limit) * 1024)
    else:
        result = response.read(5242880)

    try:
        encoding = response.info().getheader('Content-Encoding')
    except:
        encoding = None
    if encoding == 'gzip':
        result = gzip.GzipFile(fileobj=StringIO.StringIO(result)).read()

    return result
项目:PyJFuzz    作者:mseclab    | 项目源码 | 文件源码
def __init__(self, sock, mode='rb', bufsize=-1, close=False):
        self._sock = sock
        self.mode = mode # Not actually used in this version
        if bufsize < 0:
            bufsize = self.default_bufsize
        self.bufsize = bufsize
        self.softspace = False
        # _rbufsize is the suggested recv buffer size.  It is *strictly*
        # obeyed within readline() for recv calls.  If it is larger than
        # default_bufsize it will be used for recv calls within read().
        if bufsize == 0:
            self._rbufsize = 1
        elif bufsize == 1:
            self._rbufsize = self.default_bufsize
        else:
            self._rbufsize = bufsize
        self._wbufsize = bufsize
        # We use StringIO for the read buffer to avoid holding a list
        # of variously sized string objects which have been known to
        # fragment the heap due to how they are malloc()ed and often
        # realloc()ed down much smaller than their original allocation.
        self._rbuf = StringIO()
        self._wbuf = [] # A list of strings
        self._wbuf_len = 0
        self._close = close
项目:jx-sqlite    作者:mozilla    | 项目源码 | 文件源码
def ini2value(ini_content):
    """
    INI FILE CONTENT TO Data
    """
    from ConfigParser import ConfigParser

    buff = StringIO.StringIO(ini_content)
    config = ConfigParser()
    config._read(buff, "dummy")

    output = {}
    for section in config.sections():
        output[section]=s = {}
        for k, v in config.items(section):
            s[k]=v
    return wrap(output)
项目:gransk    作者:pcbje    | 项目源码 | 文件源码
def test_base(self):
    pipe = test_helper.get_mock_pipeline([helper.RUN_PIPELINE])
    _strings = strings.Subscriber(pipe)
    _strings.setup({
        'min_string_length': 4,
        'max_lines': 2
    })

    doc = document.get_document('mock')
    doc.set_size(12345)

    _strings.consume(doc, StringIO('AAAA\x00BBBB\x00CCCC'))

    # Two child documents produced.
    self.assertEquals(2, len(pipe.consumer.produced))

    expected = 'mock.00000.child'
    actual = pipe.consumer.produced[0][0].path

    self.assertEquals(expected, actual)
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _decompressContent(response, new_content):
    content = new_content
    try:
        encoding = response.get('content-encoding', None)
        if encoding in ['gzip', 'deflate']:
            if encoding == 'gzip':
                content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
            if encoding == 'deflate':
                content = zlib.decompress(content)
            response['content-length'] = str(len(content))
            # Record the historical presence of the encoding in a way the won't interfere.
            response['-content-encoding'] = response['content-encoding']
            del response['content-encoding']
    except IOError:
        content = ""
        raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
    return content
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def render(file_name):
        """
        This function makes use of slsutil salt module to render sls files
        Args:
            file_name (str): the sls file path
        """
        err = StringIO.StringIO()
        out = StringIO.StringIO()
        exception = None
        with redirect_stderr(err):
            with redirect_stdout(out):
                try:
                    result = SLSRenderer.caller.cmd('slsutil.renderer', file_name)
                except salt.exceptions.SaltException as ex:
                    exception = StageRenderingException(file_name, ex.strerror)

        if exception:
            # pylint: disable=E0702
            raise exception

        logger.info("Rendered SLS file %s, stdout\n%s", file_name, out.getvalue())
        logger.debug("Rendered SLS file %s, stderr\n%s", file_name, err.getvalue())
        return result, out.getvalue(), err.getvalue()
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def topngbytes(name, rows, x, y, **k):
    """Convenience function for creating a PNG file "in memory" as a
    string.  Creates a :class:`Writer` instance using the keyword arguments,
    then passes `rows` to its :meth:`Writer.write` method.  The resulting
    PNG file is returned as a string.  `name` is used to identify the file for
    debugging.
    """

    import os

    print (name)
    f = BytesIO()
    w = Writer(x, y, **k)
    w.write(f, rows)
    if os.environ.get('PYPNG_TEST_TMP'):
        w = open(name, 'wb')
        w.write(f.getvalue())
        w.close()
    return f.getvalue()
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def testPtrns(self):
        "Test colour type 3 and tRNS chunk (and 4-bit palette)."
        a = (50,99,50,50)
        b = (200,120,120,80)
        c = (255,255,255)
        d = (200,120,120)
        e = (50,99,50)
        w = Writer(3, 3, bitdepth=4, palette=[a,b,c,d,e])
        f = BytesIO()
        w.write_array(f, array('B', (4, 3, 2, 3, 2, 0, 2, 0, 1)))
        r = Reader(bytes=f.getvalue())
        x,y,pixels,meta = r.asRGBA8()
        self.assertEqual(x, 3)
        self.assertEqual(y, 3)
        c = c+(255,)
        d = d+(255,)
        e = e+(255,)
        boxed = [(e,d,c),(d,c,a),(c,a,b)]
        flat = map(lambda row: itertools.chain(*row), boxed)
        self.assertEqual(map(list, pixels), map(list, flat))
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def testPAMin(self):
        """Test that the command line tool can read PAM file."""
        def do():
            return _main(['testPAMin'])
        s = BytesIO()
        s.write(strtobytes('P7\nWIDTH 3\nHEIGHT 1\nDEPTH 4\nMAXVAL 255\n'
                'TUPLTYPE RGB_ALPHA\nENDHDR\n'))
        # The pixels in flat row flat pixel format
        flat =  [255,0,0,255, 0,255,0,120, 0,0,255,30]
        asbytes = seqtobytes(flat)
        s.write(asbytes)
        s.flush()
        s.seek(0)
        o = BytesIO()
        testWithIO(s, o, do)
        r = Reader(bytes=o.getvalue())
        x,y,pixels,meta = r.read()
        self.assertTrue(r.alpha)
        self.assertTrue(not r.greyscale)
        self.assertEqual(list(itertools.chain(*pixels)), flat)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def fetch_quote(symbols, timestamp, cached_file=None):
    url = URL % '+'.join(symbols)

    if not cached_file:
        # fetch
        log('Fetching %s' % url)
        fp = urllib.urlopen(url)
        try:
            data = fp.read()
        finally:
            fp.close()

        # log result
        if LOG_DATA_FETCHED:
            log_filename = LOG_FILENAME % timestamp.replace(':','-')
            out = open(log_filename, 'wb')
            try:
                log('Fetched %s bytes logged in %s' % (len(data), log_filename))
                out.write(data)
            finally:
                out.close()
    else:
        data = open(cached_file,'rb').read()

    return StringIO(data)
项目:Instagram-API    作者:danleyb2    | 项目源码 | 文件源码
def request(self, endpoint, post=None):
        buffer = BytesIO()

        ch = pycurl.Curl()
        ch.setopt(pycurl.URL, Constants.API_URL + endpoint)
        ch.setopt(pycurl.USERAGENT, self.userAgent)
        ch.setopt(pycurl.WRITEFUNCTION, buffer.write)
        ch.setopt(pycurl.FOLLOWLOCATION, True)
        ch.setopt(pycurl.HEADER, True)
        ch.setopt(pycurl.VERBOSE, False)
        ch.setopt(pycurl.COOKIEFILE, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))
        ch.setopt(pycurl.COOKIEJAR, os.path.join(self.IGDataPath, self.username, self.username + "-cookies.dat"))

        if post is not None:
            ch.setopt(pycurl.POST, True)
            ch.setopt(pycurl.POSTFIELDS, post)

        if self.proxy:
            ch.setopt(pycurl.PROXY, self.proxyHost)
            if self.proxyAuth:
                ch.setopt(pycurl.PROXYUSERPWD, self.proxyAuth)

        ch.perform()
        resp = buffer.getvalue()
        header_len = ch.getinfo(pycurl.HEADER_SIZE)
        header = resp[0: header_len]
        body = resp[header_len:]

        ch.close()

        if self.debug:
            print("REQUEST: " + endpoint)
            if post is not None:
                if not isinstance(post, list):
                    print("DATA: " + str(post))
            print("RESPONSE: " + body)

        return [header, json_decode(body)]
项目:v2ex-tornado-2    作者:coderyy    | 项目源码 | 文件源码
def run_test(innerHTML, input, expected, errors, treeClass):
    try:
        p = html5parser.HTMLParser(tree = treeClass["builder"])
        if innerHTML:
            document = p.parseFragment(StringIO.StringIO(input), innerHTML)
        else:
            document = p.parse(StringIO.StringIO(input))
    except constants.DataLossWarning:
        #Ignore testcases we know we don't pass
        return

    document = treeClass.get("adapter", lambda x: x)(document)
    try:
        output = convertTokens(treeClass["walker"](document))
        output = attrlist.sub(sortattrs, output)
        expected = attrlist.sub(sortattrs, convertExpected(expected))
        assert expected == output, "\n".join([
                "", "Input:", input,
                "", "Expected:", expected,
                "", "Received:", output
                ])
    except NotImplementedError:
        pass # Amnesty for those that confess...
项目:kaggle-spark-ml    作者:imgoodman    | 项目源码 | 文件源码
def loadRecord(line):
    """
    ????csv??
    """
    input_line=StringIO.StringIO(line)
    #row=unicodecsv.reader(input_line, encoding="utf-8")
    #return row.next()
    #reader=csv.DictReader(input_line,fieldnames=["id","qid1","qid2","question1","question2","is_duplicate"])
    reader=csv.reader(input_line)
    return reader.next()
    #data=[]
    #for row in reader:
    #    print row
    #    data.append([unicode(cell,"utf-8") for cell in row])
    #return data[0]
    #return reader.next()

#raw_data=sc.textFile(train_file_path).map(loadRecord)
#print raw_data.take(10)
项目:TACTIC-Handler    作者:listyque    | 项目源码 | 文件源码
def feed(self, markup):
        if isinstance(markup, bytes):
            markup = BytesIO(markup)
        elif isinstance(markup, unicode):
            markup = StringIO(markup)

        # Call feed() at least once, even if the markup is empty,
        # or the parser won't be initialized.
        data = markup.read(self.CHUNK_SIZE)
        try:
            self.parser = self.parser_for(self.soup.original_encoding)
            self.parser.feed(data)
            while len(data) != 0:
                # Now call feed() on the rest of the data, chunk by chunk.
                data = markup.read(self.CHUNK_SIZE)
                if len(data) != 0:
                    self.parser.feed(data)
            self.parser.close()
        except (UnicodeDecodeError, LookupError, etree.ParserError), e:
            raise ParserRejectedMarkup(str(e))
项目:QQBot    作者:springhack    | 项目源码 | 文件源码
def CurlPOST(url, data, cookie):
    c = pycurl.Curl()
    b = StringIO.StringIO()
    c.setopt(pycurl.URL, url)
    c.setopt(pycurl.POST, 1)
    c.setopt(pycurl.HTTPHEADER,['Content-Type: application/json'])
    # c.setopt(pycurl.TIMEOUT, 10)
    c.setopt(pycurl.WRITEFUNCTION, b.write)
    c.setopt(pycurl.COOKIEFILE, cookie)
    c.setopt(pycurl.COOKIEJAR, cookie)
    c.setopt(pycurl.POSTFIELDS, data)
    c.perform()
    html = b.getvalue()
    b.close()
    c.close()
    return html
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def test_pop_zipfile():
    sio = StringIO()
    zf = wheel.install.VerifyingZipFile(sio, 'w')
    zf.writestr("one", b"first file")
    zf.writestr("two", b"second file")
    zf.close()

    try:
        zf.pop()
    except RuntimeError:
        pass # already closed
    else:
        raise Exception("expected RuntimeError")

    zf = wheel.install.VerifyingZipFile(sio, 'a')
    zf.pop()
    zf.close()

    zf = wheel.install.VerifyingZipFile(sio, 'r')
    assert len(zf.infolist()) == 1
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def _decompressContent(response, new_content):
    content = new_content
    try:
        encoding = response.get('content-encoding', None)
        if encoding in ['gzip', 'deflate']:
            if encoding == 'gzip':
                content = gzip.GzipFile(fileobj=StringIO.StringIO(new_content)).read()
            if encoding == 'deflate':
                content = zlib.decompress(content)
            response['content-length'] = str(len(content))
            # Record the historical presence of the encoding in a way the won't interfere.
            response['-content-encoding'] = response['content-encoding']
            del response['content-encoding']
    except IOError:
        content = ""
        raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content)
    return content
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def __init__(self, body, mimetype='application/octet-stream',
               chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
    """Create a new MediaInMemoryUpload.

  DEPRECATED: Use MediaIoBaseUpload with either io.TextIOBase or StringIO for
  the stream.

  Args:
    body: string, Bytes of body content.
    mimetype: string, Mime-type of the file or default of
      'application/octet-stream'.
    chunksize: int, File will be uploaded in chunks of this many bytes. Only
      used if resumable=True.
    resumable: bool, True if this is a resumable upload. False means upload
      in a single request.
    """
    fd = StringIO.StringIO(body)
    super(MediaInMemoryUpload, self).__init__(fd, mimetype, chunksize=chunksize,
                                              resumable=resumable)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def emit(events, stream=None, Dumper=Dumper,
        canonical=None, indent=None, width=None,
        allow_unicode=None, line_break=None):
    """
    Emit YAML parsing events into a stream.
    If stream is None, return the produced string instead.
    """
    getvalue = None
    if stream is None:
        from StringIO import StringIO
        stream = StringIO()
        getvalue = stream.getvalue
    dumper = Dumper(stream, canonical=canonical, indent=indent, width=width,
            allow_unicode=allow_unicode, line_break=line_break)
    try:
        for event in events:
            dumper.emit(event)
    finally:
        dumper.dispose()
    if getvalue:
        return getvalue()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_pop_zipfile():
    sio = StringIO()
    zf = wheel.install.VerifyingZipFile(sio, 'w')
    zf.writestr("one", b"first file")
    zf.writestr("two", b"second file")
    zf.close()

    try:
        zf.pop()
    except RuntimeError:
        pass # already closed
    else:
        raise Exception("expected RuntimeError")

    zf = wheel.install.VerifyingZipFile(sio, 'a')
    zf.pop()
    zf.close()

    zf = wheel.install.VerifyingZipFile(sio, 'r')
    assert len(zf.infolist()) == 1
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def test_pop_zipfile():
    sio = StringIO()
    zf = wheel.install.VerifyingZipFile(sio, 'w')
    zf.writestr("one", b"first file")
    zf.writestr("two", b"second file")
    zf.close()

    try:
        zf.pop()
    except RuntimeError:
        pass # already closed
    else:
        raise Exception("expected RuntimeError")

    zf = wheel.install.VerifyingZipFile(sio, 'a')
    zf.pop()
    zf.close()

    zf = wheel.install.VerifyingZipFile(sio, 'r')
    assert len(zf.infolist()) == 1
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def test_pop_zipfile():
    sio = StringIO()
    zf = wheel.install.VerifyingZipFile(sio, 'w')
    zf.writestr("one", b"first file")
    zf.writestr("two", b"second file")
    zf.close()

    try:
        zf.pop()
    except RuntimeError:
        pass # already closed
    else:
        raise Exception("expected RuntimeError")

    zf = wheel.install.VerifyingZipFile(sio, 'a')
    zf.pop()
    zf.close()

    zf = wheel.install.VerifyingZipFile(sio, 'r')
    assert len(zf.infolist()) == 1
项目:pykit    作者:baishancloud    | 项目源码 | 文件源码
def test_object_pairs_hook(self):
        s = '{"xkd":1, "kcw":2, "art":3, "hxm":4, "qrt":5, "pad":6, "hoy":7}'
        p = [("xkd", 1), ("kcw", 2), ("art", 3), ("hxm", 4),
             ("qrt", 5), ("pad", 6), ("hoy", 7)]
        self.assertEqual(self.loads(s), eval(s))
        self.assertEqual(self.loads(s, object_pairs_hook=lambda x: x), p)
        self.assertEqual(self.json.load(StringIO(s),
                                        object_pairs_hook=lambda x: x), p)
        od = self.loads(s, object_pairs_hook=OrderedDict)
        self.assertEqual(od, OrderedDict(p))
        self.assertEqual(type(od), OrderedDict)
        # the object_pairs_hook takes priority over the object_hook
        self.assertEqual(self.loads(s, object_pairs_hook=OrderedDict,
                                    object_hook=lambda x: None),
                         OrderedDict(p))
        # check that empty object literals work (see #17368)
        self.assertEqual(self.loads('{}', object_pairs_hook=OrderedDict),
                         OrderedDict())
        self.assertEqual(self.loads('{"empty": {}}',
                                    object_pairs_hook=OrderedDict),
                         OrderedDict([('empty', OrderedDict())]))