Python io 模块,StringIO() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用io.StringIO()

项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def get_environ(self):
        env = {}
        # The following code snippet does not follow PEP8 conventions
        # but it's formatted the way it is for demonstration purposes
        # to emphasize the required variables and their values
        #
        # Required WSGI variables
        env['wsgi.version']      = (1, 0)
        env['wsgi.url_scheme']   = 'http'
        env['wsgi.input']        = StringIO(self.request_data)
        env['wsgi.errors']       = sys.stderr
        env['wsgi.multithread']  = False
        env['wsgi.multiprocess'] = False
        env['wsgi.run_once']     = False
        # Required CGI variables
        env['REQUEST_METHOD']    = self.request_method    # GET
        env['PATH_INFO']         = self.path              # /hello
        env['SERVER_NAME']       = self.server_name       # localhost
        env['SERVER_PORT']       = str(self.server_port)  # 8888
        return env
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def __init__(self, max_size=0, mode='w+b', buffering=-1,
                 encoding=None, newline=None,
                 suffix=None, prefix=None, dir=None):
        if 'b' in mode:
            self._file = _io.BytesIO()
        else:
            # Setting newline="\n" avoids newline translation;
            # this is important because otherwise on Windows we'd
            # get double newline translation upon rollover().
            self._file = _io.StringIO(newline="\n")
        self._max_size = max_size
        self._rolled = False
        self._TemporaryFileArgs = {'mode': mode, 'buffering': buffering,
                                   'suffix': suffix, 'prefix': prefix,
                                   'encoding': encoding, 'newline': newline,
                                   'dir': dir}
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def rollover(self):
        if self._rolled: return
        file = self._file
        newfile = self._file = TemporaryFile(**self._TemporaryFileArgs)
        del self._TemporaryFileArgs

        newfile.write(file.getvalue())
        newfile.seek(file.tell(), 0)

        self._rolled = True

    # The method caching trick from NamedTemporaryFile
    # won't work here, because _file may change from a
    # BytesIO/StringIO instance to a real file. So we list
    # all the methods directly.

    # Context management protocol
项目:tailbiter    作者:darius    | 项目源码 | 文件源码
def run_in_real_python(self, code):
        real_stdout = sys.stdout

        py_stdout = io.StringIO()
        sys.stdout = py_stdout

        py_value = py_exc = None
        globs = {
            '__builtins__': __builtins__,
            '__name__': '__main__',
            '__doc__': None,
            '__package__': None,
        }

        try:
            py_value = eval(code, globs, globs)
        except AssertionError:              # pragma: no cover
            raise
        except Exception as e:
            py_exc = e
        finally:
            sys.stdout = real_stdout

        return py_value, py_exc, py_stdout
项目:acbs    作者:AOSC-Dev    | 项目源码 | 文件源码
def parse_ab3_defines(defines_file):  # , pkg_name):
    try:
        fp = open(defines_file, 'rt')
        abd_cont = fp.read()
        fp.close()
    except:
        print('[E] Failed to load autobuild defines file! Do you have read permission?')
        return False
    script = "ARCH={}\n".format(
        get_arch_name()) + abd_cont + gen_laundry_list(['PKGNAME', 'PKGDEP', 'BUILDDEP'])
    try:
        # Better to be replaced by subprocess.Popen
        abd_out = subprocess.check_output(script, shell=True)
    except:
        print('[E] Malformed Autobuild defines file found! Couldn\'t continue!')
        return False
    abd_fp = io.StringIO('[wrap]\n' + abd_out.decode('utf-8'))
    abd_config = RawConfigParser()
    abd_config.read_file(abd_fp)
    abd_config_dict = {}
    for i in abd_config['wrap']:
        abd_config_dict[i.upper()] = abd_config['wrap'][i]
    return abd_config_dict
项目:rpi-can-logger    作者:JonnoFTW    | 项目源码 | 文件源码
def _make_csv_writer(self):
        """

        :return:
        """
        self._buffer = StringIO()

        self._bytes_written = 0
        now = datetime.now()
        self._out_csv = open(self.log_folder + '/' + now.strftime('%Y%m%d_%H%M%S.csv'.format(self.make_random(6))), 'w')
        logging.warning("Writing to {} ({} bytes)".format(self._out_csv.name, self.max_bytes))
        self._out_writer = csv.DictWriter(self._buffer, fieldnames=self.fieldnames, restval=None)
        self._out_writer.writeheader()
        self._out_csv.write(self._buffer.getvalue())
        self._reset_buffer()
        self.writerow({'vid': self.vin})
项目:rpi-can-logger    作者:JonnoFTW    | 项目源码 | 文件源码
def _make_writer(self):
        """

        :return:
        """
        self._buffer = StringIO()

        self._bytes_written = 0
        now = datetime.now()
        self.fname = self.log_folder + '/' + now.strftime('%Y%m%d_%H%M%S_{}.json'.format(self.make_random(6)))
        self.fname = str(pathlib.Path(self.fname))
        self._out_fh = open(self.fname, 'w')
        self.write_pid()
        logging.warning("Writing to  {} ({} bytes)".format(self._out_fh.name, self.max_bytes))

        # compress any old files still lying around
        for fname in glob(self.log_folder+"/*.json"):
            if fname != self.fname:
                self._compress(fname)
项目:segno    作者:heuer    | 项目源码 | 文件源码
def xpm_as_matrix(buff, border):
    """\
    Returns the XPM QR code as list of [0,1] lists.

    :param io.StringIO buff: Buffer to read the matrix from.
    """
    res = []
    img_data = _img_data(buff.getvalue())
    height = int(img_data[0].split(' ')[0])
    img_data = img_data[3:]
    for i, row in enumerate(img_data):
        if i < border:
            continue
        if i >= height - border:
            break
        r = row[border:-border] if border else row
        res.append([(1 if b == 'X' else 0) for b in r])
    return res
项目:gransk    作者:pcbje    | 项目源码 | 文件源码
def test_base(self):
    pipe = test_helper.get_mock_pipeline([helper.RUN_PIPELINE])
    _strings = strings.Subscriber(pipe)
    _strings.setup({
        'min_string_length': 4,
        'max_lines': 2
    })

    doc = document.get_document('mock')
    doc.set_size(12345)

    _strings.consume(doc, StringIO('AAAA\x00BBBB\x00CCCC'))

    # Two child documents produced.
    self.assertEquals(2, len(pipe.consumer.produced))

    expected = 'mock.00000.child'
    actual = pipe.consumer.produced[0][0].path

    self.assertEquals(expected, actual)
项目:xarray-simlab    作者:benbovy    | 项目源码 | 文件源码
def test_info(self, process, process_repr):
        for cls_or_obj in [ExampleProcess, process]:
            buf = StringIO()
            cls_or_obj.info(buf=buf)
            actual = buf.getvalue()
            assert actual == process_repr

        class EmptyProcess(Process):
            pass

        expected = dedent("""\
        Variables:
            *empty*
        Meta:
            time_dependent: True""")

        buf = StringIO()
        EmptyProcess.info(buf=buf)
        actual = buf.getvalue()
        assert actual == expected
项目:generalscnn    作者:zxqfl    | 项目源码 | 文件源码
def process_replay(tup):
    game_string, isValue = tup
    game = json.load(StringIO(game_string))
    assert game[0]['type'] == 'metadata'
    metadata = game[0]
    result = []
    try:
        if isValue:
            result = process_eseq_value(game)
        else:
            assert metadata['won'] == True, "Input for policy features should always be from winning side"
            for state in all_states(game):
                result.extend(training_features(state))
    except Exception as e:
        print("Warning: encountered error `%s` while procssing replay %s; skipping" % (str(e), metadata['replay_id']), file=sys.stderr)
    return result
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def feed(self, markup):
        if isinstance(markup, bytes):
            markup = BytesIO(markup)
        elif isinstance(markup, str):
            markup = StringIO(markup)

        # Call feed() at least once, even if the markup is empty,
        # or the parser won't be initialized.
        data = markup.read(self.CHUNK_SIZE)
        try:
            self.parser = self.parser_for(self.soup.original_encoding)
            self.parser.feed(data)
            while len(data) != 0:
                # Now call feed() on the rest of the data, chunk by chunk.
                data = markup.read(self.CHUNK_SIZE)
                if len(data) != 0:
                    self.parser.feed(data)
            self.parser.close()
        except (UnicodeDecodeError, LookupError, etree.ParserError) as e:
            raise ParserRejectedMarkup(str(e))
项目:runcommands    作者:wylee    | 项目源码 | 文件源码
def test_show_config(self):
        config = Config()
        stdout = io.StringIO()
        with replace(sys, 'stdout', stdout):
            show_config(config, flat=True)
        result = stdout.getvalue()
        lines = result.splitlines()
        self.assertIn('run.commands_module => commands.py', lines)
        self.assertIn('run.config_file => runcommands.tests:commands.cfg', lines)
        self.assertIn('run.env => None', lines)
        self.assertIn('run.default_env => None', lines)
        self.assertIn('run.options => ', lines)
        self.assertIn('run.echo => False', lines)
        self.assertIn('run.hide => False', lines)
        self.assertIn('run.debug => False', lines)
        self.assertIn('version => X.Y.Z', lines)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def capture_context_buffer(**kw):
    if kw.pop('bytes_io', False):
        buf = io.BytesIO()
    else:
        buf = io.StringIO()

    kw.update({
        'dialect_name': "sqlite",
        'output_buffer': buf
    })
    conf = EnvironmentContext.configure

    def configure(*arg, **opt):
        opt.update(**kw)
        return conf(*arg, **opt)

    with mock.patch.object(EnvironmentContext, "configure", configure):
        yield buf
项目:PyCIRCLeanMail    作者:CIRCL    | 项目源码 | 文件源码
def __init__(self, raw_email, debug=False):
        '''
            Setup the base options of the copy/convert setup
        '''
        self.raw_email = raw_email
        self.log_processing = StringIO()
        self.log_content = StringIO()
        self.tree(self.raw_email)

        twiggy_out = outputs.StreamOutput(formats.shell_format, stream=self.log_processing)
        emitters['*'] = filters.Emitter(levels.DEBUG, True, twiggy_out)

        self.log_name = log.name('files')

        self.cur_attachment = None

        self.debug = debug
        if self.debug:
            if not os.path.exists('debug_logs'):
                os.makedirs('debug_logs')
            self.log_debug_err = os.path.join('debug_logs', 'debug_stderr.log')
            self.log_debug_out = os.path.join('debug_logs', 'debug_stdout.log')
        else:
            self.log_debug_err = os.devnull
            self.log_debug_out = os.devnull
项目:ConfigSpace    作者:automl    | 项目源码 | 文件源码
def build_forbidden(clause):
    if not isinstance(clause, AbstractForbiddenComponent):
        raise TypeError("build_forbidden must be called with an instance of "
                        "'%s', got '%s'" %
                        (AbstractForbiddenComponent, type(clause)))

    if not isinstance(clause, (ForbiddenEqualsClause, ForbiddenAndConjunction)):
        raise NotImplementedError("IRACE cannot handle '%s' of type %s" %
                                  str(clause), (type(clause)))

    retval = io.StringIO()
    retval.write("(")
    # Really simple because everything is an AND-conjunction of equals
    # conditions
    dlcs = clause.get_descendant_literal_clauses()
    for dlc in dlcs:
        if retval.tell() > 1:
            retval.write(" && ")
        retval.write("%s==%s" % (dlc.hyperparameter.name, dlc.value))
    retval.write(")")
    retval.seek(0)
    return retval.getvalue()
项目:ConfigSpace    作者:automl    | 项目源码 | 文件源码
def build_forbidden(clause):
    if not isinstance(clause, AbstractForbiddenComponent):
        raise TypeError("build_forbidden must be called with an instance of "
                        "'%s', got '%s'" %
                        (AbstractForbiddenComponent, type(clause)))

    retval = StringIO()
    retval.write("{")
    # Really simple because everything is an AND-conjunction of equals
    # conditions
    dlcs = clause.get_descendant_literal_clauses()
    for dlc in dlcs:
        if retval.tell() > 1:
            retval.write(", ")
        retval.write("%s=%s" % (dlc.hyperparameter.name, dlc.value))
    retval.write("}")
    retval.seek(0)
    return retval.getvalue()
项目:ConfigSpace    作者:automl    | 项目源码 | 文件源码
def build_forbidden(clause):
    if not isinstance(clause, AbstractForbiddenComponent):
        raise TypeError("build_forbidden must be called with an instance of "
                        "'%s', got '%s'" %
                        (AbstractForbiddenComponent, type(clause)))

    if not isinstance(clause, (ForbiddenEqualsClause, ForbiddenAndConjunction)):
        raise NotImplementedError("SMAC cannot handle '%s' of type %s" %
                                  str(clause), (type(clause)))

    retval = StringIO()
    retval.write("{")
    # Really simple because everything is an AND-conjunction of equals
    # conditions
    dlcs = clause.get_descendant_literal_clauses()
    for dlc in dlcs:
        if retval.tell() > 1:
            retval.write(", ")
        retval.write("%s=%s" % (dlc.hyperparameter.name, dlc.value))
    retval.write("}")
    retval.seek(0)
    return retval.getvalue()
项目:ConfigSpace    作者:automl    | 项目源码 | 文件源码
def __repr__(self) -> str:
        self._populate_values()

        representation = io.StringIO()
        representation.write("Configuration:\n")

        hyperparameters = self.configuration_space.get_hyperparameters()
        hyperparameters.sort(key=lambda t: t.name)
        for hyperparameter in hyperparameters:
            hp_name = hyperparameter.name
            if hp_name in self._values and self._values[hp_name] is not None:
                representation.write("  ")

                value = repr(self._values[hp_name])
                if isinstance(hyperparameter, Constant):
                    representation.write("%s, Constant: %s" % (hp_name, value))
                else:
                    representation.write("%s, Value: %s" % (hp_name, value))
                representation.write("\n")

        return representation.getvalue()
项目:TACTIC-Handler    作者:listyque    | 项目源码 | 文件源码
def remove_blank_lines(source):
    """
    Removes blank lines from *source* and returns the result.

    Example:

    .. code-block:: python

        test = "foo"

        test2 = "bar"

    Will become:

    .. code-block:: python

        test = "foo"
        test2 = "bar"
    """
    io_obj = io.StringIO(source)
    source = [a for a in io_obj.readlines() if a.strip()]
    return "".join(source)
项目:scheduled-bots    作者:SuLab    | 项目源码 | 文件源码
def get_assembly_report(self, taxid):
        if self.ass_sum is None:
            self.get_assembly_summaries()
        df = self.ass_sum.query("taxid == {} & refseq_category == 'reference genome'".format(taxid))
        if len(df) == 0:
            # try "representative genome" (needed for mouse and rat)
            df = self.ass_sum.query("taxid == {} & refseq_category == 'representative genome'".format(taxid))
        if len(df) != 1:
            raise ValueError("unknown reference: {}".format(df))
        print(df)
        ftp_path = list(df.ftp_path)[0]
        assembly = os.path.split(ftp_path)[1]
        url = os.path.join(ftp_path, assembly + "_assembly_report.txt")
        print(url)
        # read the column names from the file
        table = request.urlopen(request.Request(url)).read().decode()
        names = [x for x in table.split("\n") if x.startswith("#")][-1].strip().replace("# ", "").split("\t")
        self.chr_df[taxid] = pd.read_csv(StringIO(table), sep="\t", names=names, comment='#')
        self.chr_df[taxid] = self.chr_df[taxid].rename(columns={'Sequence-Name': 'SequenceName', 'Sequence-Role': 'SequenceRole',
                                                                'Assigned-Molecule': 'AssignedMolecule',
                                                                'Assigned-Molecule-Location/Type': 'AssignedMoleculeLocationType',
                                                                'GenBank-Accn': 'GenBankAccn', 'RefSeq-Accn': 'RefSeqAccn',
                                                                'UCSC-style-name': 'UCSCstylename'})
        #print(self.chr_df[taxid].query("SequenceRole == 'assembled-molecule'"))
项目:typot    作者:chakki-works    | 项目源码 | 文件源码
def _get_added(cls, diff):
        patches = PatchSet(StringIO(diff))

        diff_contents = []
        for p in patches:
            if p.added > 0:
                contents = []
                for h in p:
                    added = []
                    for i, line in enumerate(h):
                        if line.is_added:
                            added_line = Line(line.target_line_no, line.value, i + 1)
                            added.append(added_line)
                    contents += added
                diff_contents.append(
                    DiffContent(p.path, contents)
                    )

        return diff_contents
项目:gitnet    作者:networks-lab    | 项目源码 | 文件源码
def test_default(self):
        """Does the default method print the proper information?"""
        with patch('sys.stdout', new=StringIO()) as fake_out:
            self.my_log.describe(mode="default")
            output = fake_out.getvalue()
            self.assertIn("Log containing 4 records from local git created at ", output)
            self.assertIn("\nOrigin:", output)
            self.assertNotIn("Filters:", output)
            self.assertIn("\nNumber of authors: 4\n", output)
            self.assertIn("\nNumber of files: 7\n", output)
            self.assertIn("\nMost common email address domains:", output)
            self.assertIn("\n\t @gmail.com [4 users]\n", output)
            self.assertIn("\nDate range: 2016-05-20 09:19:20-04:00 to 2016-05-26 11:21:03-04:00\n", output)
            self.assertIn("\nChange distribution summary:\n", output)
            self.assertIn("\n\t Files changed: Mean = 2.75, SD = 0.829\n", output)
            self.assertIn("\n\t Line insertions: Mean = 2.75, SD = 0.829\n", output)
            self.assertIn("\n\t Line deletions: Mean = nan, SD = nan\n", output)
            self.assertIn("\nNumber of merges: 0\n", output)
            self.assertIn("\nNumber of parsing errors: 0\n", output)
项目:gitnet    作者:networks-lab    | 项目源码 | 文件源码
def test_not_default(self):
        """ Does a non-default method print the proper information?
            Note: At this point, default is the only setting so they end up being the same."""
        with patch('sys.stdout', new=StringIO()) as fake_out:
            self.my_log.describe(mode="not default")
            output = fake_out.getvalue()
            self.assertIn("Log containing 4 records from local git created at ", output)
            self.assertIn("\nOrigin:", output)
            self.assertNotIn("Filters:", output)
            self.assertIn("\nNumber of authors: 4\n", output)
            self.assertIn("\nNumber of files: 7\n", output)
            self.assertIn("\nMost common email address domains:", output)
            self.assertIn("\n\t @gmail.com [4 users]\n", output)
            self.assertIn("\nDate range: 2016-05-20 09:19:20-04:00 to 2016-05-26 11:21:03-04:00\n", output)
            self.assertIn("\nChange distribution summary:\n", output)
            self.assertIn("\n\t Files changed: Mean = 2.75, SD = 0.829\n", output)
            self.assertIn("\n\t Line insertions: Mean = 2.75, SD = 0.829\n", output)
            self.assertIn("\n\t Line deletions: Mean = nan, SD = nan\n", output)
            self.assertIn("\nNumber of merges: 0\n", output)
            self.assertIn("\nNumber of parsing errors: 0\n", output)
项目:gitnet    作者:networks-lab    | 项目源码 | 文件源码
def test_whole(self):
        """Is the entire output as expected?"""
        with patch('sys.stdout', new=StringIO()) as fake_out:
            self.my_log.describe()
            out = fake_out.getvalue()

            self.assertRegex(out, "Log containing 4 records from local git created at ....-..-.. ..:..:..\.......\.\n"
                                  "Origin:  .*\n"
                                  "Number of authors: 4\n"
                                  "Number of files: 7\n"
                                  "Most common email address domains:\n"
                                  "\t @gmail.com \[4 users\]\n"
                                  "Date range: 2016-05-20 09:19:20-04:00 to 2016-05-26 11:21:03-04:00\n"
                                  "Change distribution summary:\n"
                                  "\t Files changed: Mean = 2.75, SD = 0.829\n"
                                  "\t Line insertions: Mean = 2.75, SD = 0.829\n"
                                  "\t Line deletions: Mean = nan, SD = nan\n"
                                  "Number of merges: 0\n"
                                  "Number of parsing errors: 0\n")
项目:gitnet    作者:networks-lab    | 项目源码 | 文件源码
def test_exclude(self):
        """Does exclude prevent statistics from being printed?"""
        with patch('sys.stdout', new=StringIO()) as fake_out:
            self.my_log.describe(exclude=['merges', 'errors', 'files', 'summary', 'changes', 'path', 'filters',
                                          'authors', 'dates', 'emails'])
            output = fake_out.getvalue()
            self.assertNotIn("Log containing 4 records from local git created at ", output)
            self.assertNotIn("\nOrigin:", output)
            self.assertNotIn("Filters:", output)
            self.assertNotIn("\nNumber of authors: 4\n", output)
            self.assertNotIn("\nNumber of files: 7\n", output)
            self.assertNotIn("\nMost common email address domains:", output)
            self.assertNotIn("\n\t @gmail.com [4 users]\n", output)
            self.assertNotIn("\nDate range: 2016-05-20 09:19:20-04:00 to 2016-05-26 11:21:03-04:00\n", output)
            self.assertNotIn("\nChange distribution summary:\n", output)
            self.assertNotIn("\n\t Files changed: Mean = 2.75, SD = 0.829\n", output)
            self.assertNotIn("\n\t Line insertions: Mean = 2.75, SD = 0.829\n", output)
            self.assertNotIn("\n\t Line deletions: Mean = nan, SD = nan\n", output)
            self.assertNotIn("\nNumber of merges: 0\n", output)
            self.assertNotIn("\nNumber of parsing errors: 0\n", output)
            self.assertEqual(output, "")
项目:petronia    作者:groboclown    | 项目源码 | 文件源码
def emit(events, stream=None, Dumper=Dumper,
        canonical=None, indent=None, width=None,
        allow_unicode=None, line_break=None):
    """
    Emit YAML parsing events into a stream.
    If stream is None, return the produced string instead.
    """
    getvalue = None
    if stream is None:
        stream = io.StringIO()
        getvalue = stream.getvalue
    dumper = Dumper(stream, canonical=canonical, indent=indent, width=width,
            allow_unicode=allow_unicode, line_break=line_break)
    try:
        for event in events:
            dumper.emit(event)
    finally:
        dumper.dispose()
    if getvalue:
        return getvalue()
项目:esmcheckds2    作者:andywalden    | 项目源码 | 文件源码
def _insert_zone_names(self):
        """
        Args:
            _zonetree (str): set in __init__

        Returns:
            List of dicts (str: str) devices by zone
        """
        self._zone_name = None
        self._zonetree_io = StringIO(self._zonetree)
        self._zonetree_csv = csv.reader(self._zonetree_io, delimiter=',')
        self._zonetree_lod = []

        for self._row in self._zonetree_csv:
            if self._row[0] == '1':
                self._zone_name = self._row[1]
                if self._zone_name == 'Undefined':
                    self._zone_name = ''
                continue
            for self._dev in self._devtree:
                if self._dev['ds_id'] == self._row[2]:
                    self._dev['zone_name'] = self._zone_name
        return self._devtree
项目:azure-search-ta    作者:yokawasa    | 项目源码 | 文件源码
def textanalyze(self,index_name, analyzer, text):
        # Create JSON string for request body
        reqobject={}
        reqobject['text'] = text
        reqobject['analyzer'] = analyzer
        io=StringIO()
        json.dump(reqobject, io)
        req_body = io.getvalue()
        # HTTP request to Azure search REST API
        conn = httplib.HTTPSConnection(self.__api_url)
        conn.request("POST",
                u"/indexes/{0}/analyze?api-version={1}".format(index_name, _AZURE_SEARCH_API_VERSION),
                req_body, self.headers)
        response = conn.getresponse()
        #print "status:", response.status, response.reason
        data = (response.read()).decode('utf-8')
        #print("data:{}".format(data))
        conn.close()
        return data
项目:sublime-text-3-packages    作者:nickjj    | 项目源码 | 文件源码
def topngbytes(name, rows, x, y, **k):
    """Convenience function for creating a PNG file "in memory" as a
    string.  Creates a :class:`Writer` instance using the keyword arguments,
    then passes `rows` to its :meth:`Writer.write` method.  The resulting
    PNG file is returned as a string.  `name` is used to identify the file for
    debugging.
    """

    import os

    print(name)
    f = BytesIO()
    w = Writer(x, y, **k)
    w.write(f, rows)
    if os.environ.get('PYPNG_TEST_TMP'):
        w = open(name, 'wb')
        w.write(f.getvalue())
        w.close()
    return f.getvalue()
项目:sublime-text-3-packages    作者:nickjj    | 项目源码 | 文件源码
def testPtrns(self):
        "Test colour type 3 and tRNS chunk (and 4-bit palette)."
        a = (50,99,50,50)
        b = (200,120,120,80)
        c = (255,255,255)
        d = (200,120,120)
        e = (50,99,50)
        w = Writer(3, 3, bitdepth=4, palette=[a,b,c,d,e])
        f = BytesIO()
        w.write_array(f, array('B', (4, 3, 2, 3, 2, 0, 2, 0, 1)))
        r = Reader(bytes=f.getvalue())
        x,y,pixels,meta = r.asRGBA8()
        self.assertEqual(x, 3)
        self.assertEqual(y, 3)
        c = c+(255,)
        d = d+(255,)
        e = e+(255,)
        boxed = [(e,d,c),(d,c,a),(c,a,b)]
        flat = [itertools.chain(*row) for row in boxed]
        self.assertEqual(list(map(list, pixels)), list(map(list, flat)))
项目:sublime-text-3-packages    作者:nickjj    | 项目源码 | 文件源码
def testPAMin(self):
        """Test that the command line tool can read PAM file."""
        def do():
            return _main(['testPAMin'])
        s = BytesIO()
        s.write(strtobytes('P7\nWIDTH 3\nHEIGHT 1\nDEPTH 4\nMAXVAL 255\n'
                'TUPLTYPE RGB_ALPHA\nENDHDR\n'))
        # The pixels in flat row flat pixel format
        flat =  [255,0,0,255, 0,255,0,120, 0,0,255,30]
        asbytes = seqtobytes(flat)
        s.write(asbytes)
        s.flush()
        s.seek(0)
        o = BytesIO()
        testWithIO(s, o, do)
        r = Reader(bytes=o.getvalue())
        x,y,pixels,meta = r.read()
        self.assertTrue(r.alpha)
        self.assertTrue(not r.greyscale)
        self.assertEqual(list(itertools.chain(*pixels)), flat)
项目:webvtt-py    作者:glut23    | 项目源码 | 文件源码
def test_write_captions(self):
        os.makedirs(OUTPUT_DIR)
        copy(self._get_file('one_caption.vtt'), OUTPUT_DIR)

        out = io.StringIO()
        self.webvtt.read(os.path.join(OUTPUT_DIR, 'one_caption.vtt'))
        new_caption = Caption('00:00:07.000', '00:00:11.890', ['New caption text line1', 'New caption text line2'])
        self.webvtt.captions.append(new_caption)
        self.webvtt.write(out)

        out.seek(0)
        lines = [line.rstrip() for line in out.readlines()]

        expected_lines = [
            'WEBVTT',
            '',
            '00:00:00.500 --> 00:00:07.000',
            'Caption text #1',
            '',
            '00:00:07.000 --> 00:00:11.890',
            'New caption text line1',
            'New caption text line2'
        ]

        self.assertListEqual(lines, expected_lines)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_copy_text(self):
        self.conn.set_client_encoding('latin1')
        self._create_temp_table()  # the above call closed the xn

        if sys.version_info[0] < 3:
            abin = ''.join(map(chr, list(range(32, 127)) + list(range(160, 256))))
            about = abin.decode('latin1').replace('\\', '\\\\')

        else:
            abin = bytes(list(range(32, 127)) + list(range(160, 256))).decode('latin1')
            about = abin.replace('\\', '\\\\')

        curs = self.conn.cursor()
        curs.execute('insert into tcopy values (%s, %s)',
            (42, abin))

        import io
        f = io.StringIO()
        curs.copy_to(f, 'tcopy', columns=('data',))
        f.seek(0)
        self.assertEqual(f.readline().rstrip(), about)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def _copy_from(self, curs, nrecs, srec, copykw):
        f = StringIO()
        for i, c in zip(range(nrecs), cycle(string.ascii_letters)):
            l = c * srec
            f.write("%s\t%s\n" % (i, l))

        f.seek(0)
        curs.copy_from(MinimalRead(f), "tcopy", **copykw)

        curs.execute("select count(*) from tcopy")
        self.assertEqual(nrecs, curs.fetchone()[0])

        curs.execute("select data from tcopy where id < %s order by id",
                (len(string.ascii_letters),))
        for i, (l,) in enumerate(curs):
            self.assertEqual(l, string.ascii_letters[i] * srec)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_copy_no_column_limit(self):
        cols = ["c%050d" % i for i in range(200)]

        curs = self.conn.cursor()
        curs.execute('CREATE TEMPORARY TABLE manycols (%s)' % ',\n'.join(
            ["%s int" % c for c in cols]))
        curs.execute("INSERT INTO manycols DEFAULT VALUES")

        f = StringIO()
        curs.copy_to(f, "manycols", columns=cols)
        f.seek(0)
        self.assertEqual(f.read().split(), ['\\N'] * len(cols))

        f.seek(0)
        curs.copy_from(f, "manycols", columns=cols)
        curs.execute("select count(*) from manycols;")
        self.assertEqual(curs.fetchone()[0], 2)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_copy_rowcount(self):
        curs = self.conn.cursor()

        curs.copy_from(StringIO('aaa\nbbb\nccc\n'), 'tcopy', columns=['data'])
        self.assertEqual(curs.rowcount, 3)

        curs.copy_expert(
            "copy tcopy (data) from stdin",
            StringIO('ddd\neee\n'))
        self.assertEqual(curs.rowcount, 2)

        curs.copy_to(StringIO(), "tcopy")
        self.assertEqual(curs.rowcount, 5)

        curs.execute("insert into tcopy (data) values ('fff')")
        curs.copy_expert("copy tcopy to stdout", StringIO())
        self.assertEqual(curs.rowcount, 6)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_copy_text(self):
        self.conn.set_client_encoding('latin1')
        self._create_temp_table()  # the above call closed the xn

        if sys.version_info[0] < 3:
            abin = ''.join(map(chr, range(32, 127) + range(160, 256)))
            about = abin.decode('latin1').replace('\\', '\\\\')

        else:
            abin = bytes(range(32, 127) + range(160, 256)).decode('latin1')
            about = abin.replace('\\', '\\\\')

        curs = self.conn.cursor()
        curs.execute('insert into tcopy values (%s, %s)',
            (42, abin))

        import io
        f = io.StringIO()
        curs.copy_to(f, 'tcopy', columns=('data',))
        f.seek(0)
        self.assertEqual(f.readline().rstrip(), about)
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def _get_config(self, unit, filename):
        """Get a ConfigParser object for parsing a unit's config file."""
        file_contents = unit.file_contents(filename)

        # NOTE(beisner):  by default, ConfigParser does not handle options
        # with no value, such as the flags used in the mysql my.cnf file.
        # https://bugs.python.org/issue7005
        config = configparser.ConfigParser(allow_no_value=True)
        config.readfp(io.StringIO(file_contents))
        return config
项目:flora    作者:Lamden    | 项目源码 | 文件源码
def post(self):
        payload = {
            'owner' : request.form['owner'],
            'package' : request.form['package'],
            'data' : request.form['data']
        }

        owner = request.form['owner']
        package = request.form['package']
        data = request.form['data']
        b = ENGINE.get_named_secret(owner)
        print(b)
        secret = rsa.decrypt(eval(b), KEY[1])

        # data is a python tuple of the templated solidity at index 0 and an example payload at index 1
        # compilation of this code should return true
        # if there are errors, don't commit it to the db
        # otherwise, commit it
        raw_data = decrypt(secret, eval(data))
        package_data = json.loads(raw_data.decode('utf8'))
        '''
        payload = {
            'tsol' : open(code_path[0]).read(),
            'example' : example
        }
        '''

        # assert that the code compiles with the provided example
        tsol.compile(StringIO(package_data['tsol']), package_data['example'])

        template = pickle.dumps(package_data['tsol'])
        example = pickle.dumps(package_data['example'])

        if ENGINE.add_package(owner, package, template, example) == True:
            return success_payload(None, 'Package successfully uploaded.')
        return error_payload('Problem uploading package. Try again.')
项目:python-driver    作者:bblfsh    | 项目源码 | 文件源码
def _restart_data(self, format_: str='json') -> None:
        assert format_ == 'json'

        with open(join(CURDIR, 'data', 'helloworld.py')) as f:
            testcode = f.read()

        self.data = Request({
            'filepath': 'test.py',
            'action': 'ParseAST',
            'content': testcode,
            'language': 'python',
        })

        bufferclass = io.StringIO if format_ == 'json' else io.BytesIO

        # This will mock the python_driver stdin
        self.sendbuffer = bufferclass()
        # This will mock the python_driver stdout
        self.recvbuffer = bufferclass()
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def load_test_fixture(fixture_path):
    path = os.path.dirname(os.path.abspath(__file__))
    fixture_file = open(path + '/' + fixture_path)
    input = fixture_file.read()
    fixture_file.close()
    sys.stdin = StringIO(input)
    sys.stdout = StringIO()
项目:Cortex-Analyzers    作者:CERT-BDF    | 项目源码 | 文件源码
def load_data(file: str):
    with io.open(os.path.join(__abspath__, 'test_data', file)) as afile:
        input_str = afile.read().replace('PATH', os.path.join(__abspath__, 'test_data'))
    sys.stdin = io.StringIO(input_str)
    sys.stdout = io.StringIO()
项目:spoonybard    作者:notnownikki    | 项目源码 | 文件源码
def open(self):
        """Open the ssh connection"""
        key_str = io.StringIO(self.config['key'])
        pkey = paramiko.RSAKey.from_private_key(key_str)
        self.client = paramiko.SSHClient()
        self.client.set_missing_host_key_policy(
            paramiko.AutoAddPolicy())
        self.client.connect(
            self.config['hostname'], username=self.config['username'],
            pkey=pkey, timeout=60, banner_timeout=60)
        self.transport = self.client.get_transport()
        self.transport.set_keepalive(60)
        self.script_filename = self.get_tmp_script_filename()
项目:spoonybard    作者:notnownikki    | 项目源码 | 文件源码
def run_script(self, script):
        self.stream = StringIO(script)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def openStream(self, source):
        """Produces a file object from source.

        source can be either a file object, local filename or a string.

        """
        # Already a file object
        if hasattr(source, 'read'):
            stream = source
        else:
            stream = StringIO(source)

        return stream
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def captured_output(stream_name):
    """Return a context manager used by captured_stdout/stdin/stderr
    that temporarily replaces the sys stream *stream_name* with a StringIO.

    Taken from Lib/support/__init__.py in the CPython repo.
    """
    orig_stdout = getattr(sys, stream_name)
    setattr(sys, stream_name, StreamWrapper.from_stream(orig_stdout))
    try:
        yield getattr(sys, stream_name)
    finally:
        setattr(sys, stream_name, orig_stdout)
项目:scibot    作者:SciCrunch    | 项目源码 | 文件源码
def export(request):
    print('starting csv export')
    output_rows, DATE = export_impl()    
    data = StringIO()
    writer = csv.writer(data)
    writer.writerows(sorted(output_rows))

    r = Response(gzip.compress(data.getvalue().encode()))
    r.content_type = 'text/csv'
    r.headers.update({
        'Content-Disposition':'attachment;filename = RRID-data-%s.csv' % DATE,
        'Content-Encoding':'gzip'
        })

    return r
项目:tailbiter    作者:darius    | 项目源码 | 文件源码
def run_in_vm(self, code):
        real_stdout = sys.stdout

        # Run the code through our VM.

        vm_stdout = io.StringIO()
        if CAPTURE_STDOUT:              # pragma: no branch
            sys.stdout = vm_stdout

        vm_value = vm_exc = None
        try:
            vm_value = run(code, None, None)
        except VirtualMachineError:         # pragma: no cover
            # If the VM code raises an error, show it.
            raise
        except AssertionError:              # pragma: no cover
            # If test code fails an assert, show it.
            raise
        except Exception as e:
            # Otherwise, keep the exception for comparison later.
            if not CAPTURE_EXCEPTION:       # pragma: no cover
                raise
            vm_exc = e
        finally:
            sys.stdout = real_stdout
            real_stdout.write("-- stdout ----------\n")
            real_stdout.write(vm_stdout.getvalue())

        return vm_value, vm_exc, vm_stdout