Python six.moves 模块,cStringIO() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.moves.cStringIO()

项目:bx-python    作者:bxlab    | 项目源码 | 文件源码
def readline( self ):
        if self.dirty:
            self.fix_dirty()
        if self.at_eof:
            return ""
        rval = []
        while 1:
            line = self.current_block.readline()
            self.file_pos += len( line )
            rval.append( line )
            if len( line ) > 0 and line[-1] == '\n':
                break
            elif self.current_block_index == self.nblocks - 1:
                self.at_eof = True
                break
            else:
                self.current_block_index += 1
                self.current_block = StringIO( self.load_block( self.current_block_index ) )      
        return "".join( rval )
项目:bx-python    作者:bxlab    | 项目源码 | 文件源码
def readline( self ):
        if self.dirty:
            self.fix_dirty()
        if self.at_eof:
            return ""
        rval = []
        while 1:
            line = self.current_block.readline()
            rval.append( line )
            if len( line ) > 0 and line[-1] == '\n':
                break
            elif self.current_block_index == self.nblocks - 1:
                self.at_eof = True
                break
            else:
                self.current_block_index += 1
                self.current_block = StringIO( self.load_block( self.current_block_index ) )      
        return "".join( rval )
项目:pyktrader2    作者:harveywwu    | 项目源码 | 文件源码
def __str__(self):
        tempfile = StringIO()
        tempfile.write("***Bids***\n")
        if self.bids != None and len(self.bids) > 0:
            for key, value in self.bids.price_tree.items(reverse=True):
                tempfile.write('%s' % value)
        tempfile.write("\n***Asks***\n")
        if self.asks != None and len(self.asks) > 0:
            for key, value in list(self.asks.price_tree.items()):
                tempfile.write('%s' % value)
        tempfile.write("\n***Trades***\n")
        if self.tape != None and len(self.tape) > 0:
            num = 0
            for entry in self.tape:
                if num < 10: # get last 5 entries
                    tempfile.write(str(entry['quantity']) + " @ " + str(entry['price']) + " (" + str(entry['timestamp']) + ") " + str(entry['party1'][0]) + "/" + str(entry['party2'][0]) + "\n")
                    num += 1
                else:
                    break
        tempfile.write("\n")
        return tempfile.getvalue()
项目:gpustat    作者:wookayin    | 项目源码 | 文件源码
def test_new_query_mocked(self, N, Process):
        """
        A basic functionality test, in a case where everything is just normal.
        """
        _configure_mock(N, Process)

        gpustats = gpustat.new_query()
        fp = StringIO()
        gpustats.print_formatted(fp=fp, no_color=False, show_user=True, show_cmd=True, show_pid=True, show_power=True)

        result = fp.getvalue()
        print(result)

        unescaped = remove_ansi_codes(result)
        # remove first line (header)
        unescaped = '\n'.join(unescaped.split('\n')[1:])

        self.maxDiff = 4096
        self.assertEqual(unescaped, MOCK_EXPECTED_OUTPUT_FULL)
项目:gpustat    作者:wookayin    | 项目源码 | 文件源码
def test_args_endtoend(self, N, Process):
        """
        End-to-end testing given command line args.
        """
        _configure_mock(N, Process)

        def capture_output(*args):
            f = StringIO()
            import contextlib

            with contextlib.redirect_stdout(f):  # requires python 3.4+
                try:
                    gpustat.main(*args)
                except SystemExit:
                    raise AssertionError("Argparse failed (see above error message)")
            return f.getvalue()

        s = capture_output('gpustat', )
        unescaped = remove_ansi_codes(s)
        unescaped = '\n'.join(unescaped.split('\n')[1:])   # remove first line (header)
        self.maxDiff = 4096
        self.assertEqual(unescaped, MOCK_EXPECTED_OUTPUT_DEFAULT)

        s = capture_output('gpustat', '--no-header')
        self.assertIn("[0]", s.split('\n')[0])
项目:deb-python-pecan    作者:openstack    | 项目源码 | 文件源码
def test_copy_dir(self):
        from pecan.scaffolds import PecanScaffold

        class SimpleScaffold(PecanScaffold):
            _scaffold_dir = ('pecan', os.path.join(
                'tests', 'scaffold_fixtures', 'simple'
            ))

        SimpleScaffold().copy_to(os.path.join(
            self.scaffold_destination,
            'someapp'
        ), out_=StringIO())

        assert os.path.isfile(os.path.join(
            self.scaffold_destination, 'someapp', 'foo'
        ))
        assert os.path.isfile(os.path.join(
            self.scaffold_destination, 'someapp', 'bar', 'spam.txt'
        ))
        with open(os.path.join(
            self.scaffold_destination, 'someapp', 'foo'
        ), 'r') as f:
            assert f.read().strip() == 'YAR'
项目:awsume    作者:trek10inc    | 项目源码 | 文件源码
def test_start_auto_refresher(self, mock_write_auto_session, mock_kill_all):
        capturedOut = StringIO()
        sys.stdout = capturedOut

        args = argparse.Namespace()
        args.profile_name = 'profile-name'
        userSession = collections.OrderedDict()
        config = collections.OrderedDict()
        config['role_arn'] = 'the-role-arn'
        credentials = collections.OrderedDict()
        credentials['__name__'] = 'creds-name'

        mock_out_data = mock.Mock()
        mock_set_data = mock.Mock()
        mock_out_data.set_data = mock_set_data

        awsumepy.start_auto_refresher(args, userSession, config, credentials, mock_out_data)
        mock_set_data.assert_called_with('Auto auto-refresh-profile-name profile-name')
项目:OpenMDAO    作者:OpenMDAO    | 项目源码 | 文件源码
def _indented_print(f_locals, d, indent, excludes=('__init__',), file=sys.stdout):
    """
    Print trace info, indenting based on call depth.
    """
    sindent = tab * indent
    sep = '=' if d is f_locals else ':'

    for name in sorted(d, key=lambda a: str(a)):
        if name not in excludes:
            if isinstance(d[name], (dict, OrderedDict)):
                f = cStringIO()
                _indented_print(f_locals, d[name], 0, file=f)
                s = "  %s%s%s{%s}" % (sindent, name, sep, f.getvalue())
            else:
                s = "  %s%s%s%s" % (sindent, name, sep, d[name])
            if ' object at ' in s:
                s = addr_regex.sub('', s)
            linelen = len(s)
            leneq = len(s.split(sep, 1)[0])
            if linelen > MAXLINE:
                if '\n' in s:
                    # change indent
                    s = s.replace("\n", "\n%s" % (' '*leneq))
            print(s, file=file)
项目:OpenMDAO    作者:OpenMDAO    | 项目源码 | 文件源码
def test_list_inputs(self):
        self.prob.run_model()

        stream = cStringIO()
        inputs = self.prob.model.list_inputs(out_stream=stream)
        self.assertEqual(sorted(inputs), [
            ('comp2.a', [1.]),
            ('comp2.b', [-4.]),
            ('comp2.c', [3.]),
            ('comp3.a', [1.]),
            ('comp3.b', [-4.]),
            ('comp3.c', [3.])
        ])
        text = stream.getvalue()
        self.assertEqual(text.count('comp2.'), 3)
        self.assertEqual(text.count('comp3.'), 3)
        self.assertEqual(text.count('value:'), 6)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def get_publisherinfo(self, header=None, ccancel=None):
                """Get publisher information from the repository."""

                try:
                        pubs = self._frepo.get_publishers()
                        buf = cStringIO()
                        p5i.write(buf, pubs)
                except Exception as e:
                        reason = "Unable to retrieve publisher configuration " \
                            "data:\n{0}".format(e)
                        ex = tx.TransportProtoError("file", errno.EPROTO,
                            reason=reason, repourl=self._url)
                        self.__record_proto_error(ex)
                        raise ex
                buf.seek(0)
                return buf
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def get_publisherinfo(self, header=None, ccancel=None):
                """Get publisher information from the repository."""

                try:
                        pubs = self._arc.get_publishers()
                        buf = cStringIO()
                        p5i.write(buf, pubs)
                except Exception as e:
                        reason = "Unable to retrieve publisher configuration " \
                            "data:\n{0}".format(e)
                        ex = tx.TransportProtoError("file", errno.EPROTO,
                            reason=reason, repourl=self._url)
                        self.__record_proto_error(ex)
                        raise ex
                buf.seek(0)
                return buf
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def get_versions(self, header=None, ccancel=None):
                """Query the repo for versions information.
                Returns a file-like object."""

                buf = cStringIO()
                vops = {
                    "catalog": ["1"],
                    "file": ["0"],
                    "manifest": ["0"],
                    "publisher": ["0", "1"],
                    "versions": ["0"],
                    "status": ["0"]
                }

                buf.write("pkg-server {0}\n".format(pkg.VERSION))
                buf.write("\n".join(
                    "{0} {1}".format(op, " ".join(vers))
                    for op, vers in six.iteritems(vops)
                ) + "\n")
                buf.seek(0)
                self.__stats.record_tx()
                return buf
项目:novajoin    作者:openstack    | 项目源码 | 文件源码
def run_pylint():
    buff = StringIO()
    reporter = text.ParseableTextReporter(output=buff)
    args = ["--include-ids=y", "-E", "cinder"]
    lint.Run(args, reporter=reporter, exit=False)
    val = buff.getvalue()
    buff.close()
    return val
项目:webapp2    作者:GoogleCloudPlatform    | 项目源码 | 文件源码
def blank(cls, path, environ=None, base_url=None,
              headers=None, **kwargs):  # pragma: no cover
        """Adds parameters compatible with WebOb > 1.2: POST and **kwargs."""
        try:
            request = super(Request, cls).blank(
                path,
                environ=environ,
                base_url=base_url,
                headers=headers,
                **kwargs
            )

            if cls._request_charset and not cls._request_charset == 'utf-8':
                return request.decode(cls._request_charset)
            return request

        except TypeError:
            if not kwargs:
                raise

        data = kwargs.pop('POST', None)
        if data is not None:
            environ = environ or {}
            environ['REQUEST_METHOD'] = 'POST'
            if hasattr(data, 'items'):
                data = list(data.items())
            if not isinstance(data, str):
                data = urlencode(data)
            environ['wsgi.input'] = cStringIO(data)
            environ['webob.is_body_seekable'] = True
            environ['CONTENT_LENGTH'] = str(len(data))
            environ['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'

        base = super(Request, cls).blank(path, environ=environ,
                                         base_url=base_url, headers=headers)
        if kwargs:
            obj = cls(base.environ, **kwargs)
            obj.headers.update(base.headers)
            return obj
        else:
            return base
项目:vsphere-automation-sdk-python    作者:vmware    | 项目源码 | 文件源码
def pp(value):
    """ Utility method used to print the data nicely. """
    output = cStringIO()
    PrettyPrinter(stream=output).pprint(value)
    return output.getvalue()
项目:transform    作者:tensorflow    | 项目源码 | 文件源码
def __init__(self, delimiter):
      """Initializes the writer wrapper.

      Args:
        delimiter: A one-character string used to separate fields.
      """
      self._state = (delimiter)
      self._buffer = moves.cStringIO()

      # Since we use self._writer to encode individual rows, we set
      # lineterminator='' so that self._writer doesn't add a newline.
      self._writer = csv.writer(
          self._buffer,
          lineterminator='',
          delimiter=delimiter)
项目:bx-python    作者:bxlab    | 项目源码 | 文件源码
def fix_dirty( self ):
        chunk, offset = self.get_block_and_offset( self.file_pos )
        if self.current_block_index != chunk:
            self.current_block = StringIO( self.load_block( chunk ) )
            self.current_block.read( offset )
            self.current_block_index = chunk
        else:
            self.current_block.seek( offset )
        self.dirty = False
项目:bx-python    作者:bxlab    | 项目源码 | 文件源码
def fix_dirty( self ):
        chunk, offset = self.get_block_and_offset( self.file_pos )
        if self.current_block_index != chunk:
            self.current_block = StringIO( self.load_block( chunk ) )
            self.current_block.read( offset )
            self.current_block_index = chunk
        else:
            self.current_block.seek( offset )
        self.dirty = False
项目:mock    作者:rpm-software-management    | 项目源码 | 文件源码
def __init__(self, config, buildroot):
        self.config = config
        self.buildroot = buildroot
        self.rundir = buildroot.make_chroot_path(RUNDIR)
        self.socket_path = os.path.join(self.rundir, SOCKET_NAME)
        self.executed_commands = []
        # util.do cannot return output when the command fails, we need to
        # capture it's logging
        self.log_buffer = StringIO()
        self.log = logging.getLogger("mockbuild.plugin.pm_request")
        self.log.level = logging.DEBUG
        self.log.addFilter(OutputFilter())
        self.log.propagate = False
        self.log.addHandler(logging.StreamHandler(self.log_buffer))
项目:gpustat    作者:wookayin    | 项目源码 | 文件源码
def __repr__(self):
        return self.print_to(StringIO()).getvalue()
项目:typed-astunparse    作者:mbdevpl    | 项目源码 | 文件源码
def _JoinedStr(self, t):
        self.write("f")
        strings = []
        for value in t.values:
            if isinstance(value, ast.Str) or isinstance(value, typed_ast.ast3.Str):
                strings.append(value.s)
                continue
            unparser = type(self)(value, cStringIO())
            s = unparser.f.getvalue().rstrip()
            for delimiter in ['"""', "'''", '"', "'"]:
                if s.startswith(delimiter) and s.endswith(delimiter):
                    s = s[len(delimiter):-len(delimiter)]
                    break
            strings.append(s)
        self.write(repr(''.join(strings)))
项目:typed-astunparse    作者:mbdevpl    | 项目源码 | 文件源码
def unparse(tree: t.Union[ast.AST, typed_ast.ast3.AST]) -> str:
    """Unparse the abstract syntax tree into a str.

    Behave just like astunparse.unparse(tree), but handle trees which are typed, untyped, or mixed.
    In other words, a mixture of ast.AST-based and typed_ast.ast3-based nodes will be unparsed.
    """
    stream = cStringIO()
    Unparser(tree, file=stream)
    return stream.getvalue()
项目:typed-astunparse    作者:mbdevpl    | 项目源码 | 文件源码
def dump(
        tree: t.Union[ast.AST, typed_ast.ast3.AST], annotate_fields: bool=True,
        include_attributes: bool=False) -> str:
    """Behave just like astunparse.dump(tree), but handle typed_ast.ast3-based trees."""
    stream = cStringIO()
    Printer(
        file=stream, annotate_fields=annotate_fields,
        include_attributes=include_attributes).visit(tree)
    return stream.getvalue()
项目:deb-python-pecan    作者:openstack    | 项目源码 | 文件源码
def __call__(self, environ, start_response):
            try:
                return self.app(environ, start_response)
            except Exception as exc:
                # get a formatted exception
                out = StringIO()
                print_exc(file=out)
                LOG.exception(exc)

                # get formatted WSGI environment
                formatted_environ = pformat(environ)

                # render our template
                result = debug_template.render(
                    traceback=out.getvalue(),
                    environment=formatted_environ
                )

                # construct and return our response
                response = Response()
                if isinstance(exc, HTTPException):
                    response.status_int = exc.status
                else:
                    response.status_int = 500
                response.unicode_body = result
                return response(environ, start_response)
项目:deb-python-pecan    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(TestScaffoldUtils, self).setUp()
        self.scaffold_destination = tempfile.mkdtemp()
        self.out = sys.stdout

        sys.stdout = StringIO()
项目:deb-python-pecan    作者:openstack    | 项目源码 | 文件源码
def test_destination_directory_levels_deep(self):
        from pecan.scaffolds import copy_dir
        f = StringIO()
        copy_dir(
            (
                'pecan', os.path.join('tests', 'scaffold_fixtures', 'simple')
            ),
            os.path.join(self.scaffold_destination, 'some', 'app'),
            {},
            out_=f
        )

        assert os.path.isfile(os.path.join(
            self.scaffold_destination, 'some', 'app', 'foo')
        )
        assert os.path.isfile(os.path.join(
            self.scaffold_destination, 'some', 'app', 'bar', 'spam.txt')
        )
        with open(os.path.join(
            self.scaffold_destination, 'some', 'app', 'foo'
        ), 'r') as f:
            assert f.read().strip() == 'YAR'
        with open(os.path.join(
            self.scaffold_destination, 'some', 'app', 'bar', 'spam.txt'
        ), 'r') as f:
            assert f.read().strip() == 'Pecan'
项目:deb-python-pecan    作者:openstack    | 项目源码 | 文件源码
def test_destination_directory_already_exists(self):
        from pecan.scaffolds import copy_dir
        f = StringIO()
        copy_dir(
            (
                'pecan', os.path.join('tests', 'scaffold_fixtures', 'simple')
            ),
            os.path.join(self.scaffold_destination),
            {},
            out_=f
        )
        assert 'already exists' in f.getvalue()
项目:deb-python-pecan    作者:openstack    | 项目源码 | 文件源码
def test_copy_dir_with_file_content_substitution(self):
        from pecan.scaffolds import copy_dir
        copy_dir(
            (
                'pecan',
                os.path.join('tests', 'scaffold_fixtures', 'content_sub'),
            ),
            os.path.join(
                self.scaffold_destination, 'someapp'
            ),
            {'package': 'thingy'},
            out_=StringIO()
        )

        assert os.path.isfile(os.path.join(
            self.scaffold_destination, 'someapp', 'foo')
        )
        assert os.path.isfile(os.path.join(
            self.scaffold_destination, 'someapp', 'bar', 'spam.txt')
        )
        with open(os.path.join(
            self.scaffold_destination, 'someapp', 'foo'
        ), 'r') as f:
            assert f.read().strip() == 'YAR thingy'
        with open(os.path.join(
            self.scaffold_destination, 'someapp', 'bar', 'spam.txt'
        ), 'r') as f:
            assert f.read().strip() == 'Pecan thingy'
项目:deb-python-pecan    作者:openstack    | 项目源码 | 文件源码
def test_logging_setup(self):
        class RootController(object):
            @expose()
            def index(self):
                import logging
                logging.getLogger('pecantesting').info('HELLO WORLD')
                return "HELLO WORLD"

        f = StringIO()

        app = TestApp(make_app(RootController(), logging={
            'loggers': {
                'pecantesting': {
                    'level': 'INFO', 'handlers': ['memory']
                }
            },
            'handlers': {
                'memory': {
                    'level': 'INFO',
                    'class': 'logging.StreamHandler',
                    'stream': f
                }
            }
        }))

        app.get('/')
        assert f.getvalue() == 'HELLO WORLD\n'
项目:deb-python-pecan    作者:openstack    | 项目源码 | 文件源码
def test_logging_setup_with_config_obj(self):
        class RootController(object):
            @expose()
            def index(self):
                import logging
                logging.getLogger('pecantesting').info('HELLO WORLD')
                return "HELLO WORLD"

        f = StringIO()

        from pecan.configuration import conf_from_dict
        app = TestApp(make_app(RootController(), logging=conf_from_dict({
            'loggers': {
                'pecantesting': {
                    'level': 'INFO', 'handlers': ['memory']
                }
            },
            'handlers': {
                'memory': {
                    'level': 'INFO',
                    'class': 'logging.StreamHandler',
                    'stream': f
                }
            }
        })))

        app.get('/')
        assert f.getvalue() == 'HELLO WORLD\n'
项目:deb-python-pecan    作者:openstack    | 项目源码 | 文件源码
def test_basic_single_default_hook(self):

        _stdout = StringIO()

        class RootController(object):
            @expose()
            def index(self):
                return 'Hello, World!'

        app = TestApp(
            make_app(
                RootController(), hooks=lambda: [
                    RequestViewerHook(writer=_stdout)
                ]
            )
        )
        response = app.get('/')

        out = _stdout.getvalue()

        assert response.status_int == 200
        assert response.body == b_('Hello, World!')
        assert 'path' in out
        assert 'method' in out
        assert 'status' in out
        assert 'method' in out
        assert 'params' in out
        assert 'hooks' in out
        assert '200 OK' in out
        assert "['RequestViewerHook']" in out
        assert '/' in out
项目:deb-python-pecan    作者:openstack    | 项目源码 | 文件源码
def test_bad_response_from_app(self):
        """When exceptions are raised the hook deals with them properly"""

        _stdout = StringIO()

        class RootController(object):
            @expose()
            def index(self):
                return 'Hello, World!'

        app = TestApp(
            make_app(
                RootController(), hooks=lambda: [
                    RequestViewerHook(writer=_stdout)
                ]
            )
        )
        response = app.get('/404', expect_errors=True)

        out = _stdout.getvalue()

        assert response.status_int == 404
        assert 'path' in out
        assert 'method' in out
        assert 'status' in out
        assert 'method' in out
        assert 'params' in out
        assert 'hooks' in out
        assert '404 Not Found' in out
        assert "['RequestViewerHook']" in out
        assert '/' in out
项目:deb-python-pecan    作者:openstack    | 项目源码 | 文件源码
def test_single_blacklist_item(self):

        _stdout = StringIO()

        class RootController(object):
            @expose()
            def index(self):
                return 'Hello, World!'

        app = TestApp(
            make_app(
                RootController(),
                hooks=lambda: [
                    RequestViewerHook(
                        config={'blacklist': ['/']}, writer=_stdout
                    )
                ]
            )
        )
        response = app.get('/')

        out = _stdout.getvalue()

        assert response.status_int == 200
        assert response.body == b_('Hello, World!')
        assert out == ''
项目:deb-python-pecan    作者:openstack    | 项目源码 | 文件源码
def test_item_not_in_defaults(self):

        _stdout = StringIO()

        class RootController(object):
            @expose()
            def index(self):
                return 'Hello, World!'

        app = TestApp(
            make_app(
                RootController(),
                hooks=lambda: [
                    RequestViewerHook(
                        config={'items': ['date']}, writer=_stdout
                    )
                ]
            )
        )
        response = app.get('/')

        out = _stdout.getvalue()

        assert response.status_int == 200
        assert response.body == b_('Hello, World!')
        assert 'date' in out
        assert 'method' not in out
        assert 'status' not in out
        assert 'method' not in out
        assert 'params' not in out
        assert 'hooks' not in out
        assert '200 OK' not in out
        assert "['RequestViewerHook']" not in out
        assert '/' not in out
项目:awsume    作者:trek10inc    | 项目源码 | 文件源码
def test_print_version(self):
        capturedOut = StringIO()
        sys.stdout = capturedOut
        awsumepy.print_version()
        sys.stdout = sys.__stdout__
        self.assertEqual(capturedOut.getvalue(), 'Version ' + awsumepy.__version__ + '\n')
项目:awsume    作者:trek10inc    | 项目源码 | 文件源码
def test_handle_profiles(self,
                             mock_validate_profiles,
                             mock_is_role_profile,
                             mock_requires_mfa):
        config = collections.OrderedDict()
        credentials = collections.OrderedDict()
        arguments = argparse.Namespace()
        arguments.profile_name = 'profile-admin'

        mock_requires_mfa.return_value = False
        mock_is_role_profile.return_value = False
        capturedOut = StringIO()
        sys.stdout = capturedOut
        mock_set_data = mock.Mock()
        mock_out_data = mock.Mock()
        mock_out_data.set_data = mock_set_data


        awsumepy.handle_profiles(config, credentials, arguments, mock_out_data)
        mock_set_data.assert_called_with('Awsume None None None None profile-admin')

        mock_is_role_profile.return_value = True
        awsumepy.handle_profiles(config, credentials, arguments, mock_out_data)

        mock_requires_mfa.return_value = True
        mock_is_role_profile.return_value = False
        awsumepy.handle_profiles(config, credentials, arguments, mock_out_data)

        mock_is_role_profile.return_value = True
        awsumepy.handle_profiles(config, credentials, arguments, mock_out_data)

        self.assertEqual(mock_validate_profiles.call_count, 4)
项目:awsume    作者:trek10inc    | 项目源码 | 文件源码
def test_print_formatted_data(self):
        capturedOut = StringIO()
        sys.stderr = capturedOut
        awsumepy.print_formatted_data('some-formatted-data')
        sys.stderr = sys.__stderr__
        self.assertNotEqual(capturedOut.getvalue(), '')
项目:OpenMDAO    作者:OpenMDAO    | 项目源码 | 文件源码
def test_inp_inp_conn_no_src(self):
        raise unittest.SkipTest("no setup testing yet")
        self.p.model.connect('G3.G4.C3.x', 'G3.G4.C4.x')

        stream = cStringIO()
        self.p.setup(out_stream=stream)

        self.p['G3.G4.C3.x'] = 999.
        self.assertEqual(self.p.model.G3.G4.C3._inputs['x'], 999.)
        self.assertEqual(self.p.model.G3.G4.C4._inputs['x'], 999.)

        content = stream.getvalue()
        self.assertTrue("The following parameters have no associated unknowns:\nG1.G2.C1.x\nG3.G4.C3.x\nG3.G4.C4.x" in content)
        self.assertTrue("The following components have no connections:\nG1.G2.C1\nG1.G2.C2\nG3.G4.C3\nG3.G4.C4\n" in content)
        self.assertTrue("No recorders have been specified, so no data will be saved." in content)
项目:OpenMDAO    作者:OpenMDAO    | 项目源码 | 文件源码
def test_list_explicit_outputs(self):
        self.prob.run_model()

        stream = cStringIO()
        outputs = self.prob.model.list_outputs(implicit=False, out_stream=stream)
        self.assertEqual(sorted(outputs), [
            ('comp1.a', [1.]),
            ('comp1.b', [-4.]),
            ('comp1.c', [3.])
        ])
        text = stream.getvalue()
        self.assertEqual(text.count('comp1.'), 3)
        self.assertEqual(text.count('value:'), 3)
        self.assertEqual(text.count('residual:'), 3)
项目:OpenMDAO    作者:OpenMDAO    | 项目源码 | 文件源码
def test_list_implicit_outputs(self):
        self.prob.run_model()

        stream = cStringIO()
        states = self.prob.model.list_outputs(explicit=False, out_stream=stream)
        self.assertEqual(sorted(states), [
            ('comp2.x', [3.]),
            ('comp3.x', [3.])
        ])
        text = stream.getvalue()
        self.assertEqual(text.count('comp2.x'), 1)
        self.assertEqual(text.count('comp3.x'), 1)
        self.assertEqual(text.count('value:'), 2)
        self.assertEqual(text.count('residual:'), 2)
项目:OpenMDAO    作者:OpenMDAO    | 项目源码 | 文件源码
def test_linesearch_vector_bound_enforcement(self):
        top = self.top

        ls = top.model.nonlinear_solver.linesearch = BoundsEnforceLS(bound_enforcement='vector')
        ls.options['print_bound_enforce'] = True

        # Setup again because we assigned a new linesearch
        top.setup(check=False)

        # Test lower bounds: should go to the lower bound and stall
        top['px.x'] = 2.0
        top['comp.y'] = 0.
        top['comp.z'] = 1.6
        top.run_model()
        for ind in range(3):
            assert_rel_error(self, top['comp.z'][ind], [1.5], 1e-8)

        # Test upper bounds: should go to the minimum upper bound and stall
        top['px.x'] = 0.5
        top['comp.y'] = 0.
        top['comp.z'] = 2.4

        stdout = sys.stdout
        strout = StringIO()

        sys.stdout = strout
        try:
            top.run_model()
        finally:
            sys.stdout = stdout

        txt = strout.getvalue()

        self.assertTrue("'comp.z' exceeds upper bound" in txt)

        for ind in range(3):
            assert_rel_error(self, top['comp.z'][ind], [2.5], 1e-8)
项目:OpenMDAO    作者:OpenMDAO    | 项目源码 | 文件源码
def run_model(prob):
    """Call `run_model` on problem and capture output."""
    stdout = sys.stdout
    strout = StringIO()

    sys.stdout = strout
    try:
        prob.run_model()
    finally:
        sys.stdout = stdout

    return strout.getvalue()
项目:OpenMDAO    作者:OpenMDAO    | 项目源码 | 文件源码
def test_find_cite_with_write(self):

        p = self.prob
        p.setup()

        dest = StringIO()
        find_citations(p, out_stream=dest)

        expected = """Class: <class 'openmdao.core.problem.Problem'>
    @inproceedings{2014_openmdao_derivs,
        Author = {Justin S. Gray and Tristan A. Hearn and Kenneth T. Moore
                  and John Hwang and Joaquim Martins and Andrew Ning},
        Booktitle = {15th AIAA/ISSMO Multidisciplinary Analysis and Optimization Conference},
        Doi = {doi:10.2514/6.2014-2042},
        Month = {2014/07/08},
        Publisher = {American Institute of Aeronautics and Astronautics},
        Title = {Automatic Evaluation of Multidisciplinary Derivatives Using
                 a Graph-Based Problem Formulation in OpenMDAO},
        Year = {2014}
    }
Class: <class 'openmdao.core.group.Group'>
    foobar model
Class: <class 'openmdao.solvers.nonlinear.nonlinear_runonce.NonlinearRunOnce'>
    foobar nonlinear_solver
Class: <class 'openmdao.solvers.linear.linear_runonce.LinearRunOnce'>
    foobar linear_solver
Class: <class 'openmdao.components.exec_comp.ExecComp'>
    foobar exec comp"""

        self.assertEqual(expected, dest.getvalue().strip())
项目:mxbox    作者:Lyken17    | 项目源码 | 文件源码
def __repr__(self):
        return self.print_to(StringIO()).getvalue()
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def publisher_0(self, *tokens):
                """Returns a pkg(7) information datastream based on the
                repository configuration's publisher information."""

                prefix = self._get_req_pub()
                pubs = [
                   pub for pub in self.repo.get_publishers()
                   if not prefix or pub.prefix == prefix
                ]
                if prefix and not pubs:
                        # Publisher specified in request is unknown.
                        e = srepo.RepositoryUnknownPublisher(prefix)
                        cherrypy.log("Request failed: {0}".format(str(e)))
                        raise cherrypy.HTTPError(http_client.NOT_FOUND, str(e))

                buf = cStringIO()
                try:
                        p5i.write(buf, pubs)
                except Exception as e:
                        # Treat any remaining error as a 404, but log it and
                        # include the real failure information.
                        cherrypy.log("Request failed: {0}".format(str(e)))
                        raise cherrypy.HTTPError(http_client.NOT_FOUND, str(e))
                buf.seek(0)
                self.__set_response_expires("publisher", 86400*365, 86400*365)
                # Page handlers MUST return bytes.
                return misc.force_bytes(buf.getvalue())
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def publisher_1(self, *tokens):
                """Returns a pkg(7) information datastream based on the
                the request's publisher or all if not specified."""

                prefix = self._get_req_pub()

                pubs = []
                if not prefix:
                        pubs = self.repo.get_publishers()
                else:
                        try:
                                pub = self.repo.get_publisher(prefix)
                                pubs.append(pub)
                        except Exception as e:
                                # If the Publisher object creation fails, return
                                # a not found error to the client so it will
                                # treat it as an unsupported operation.
                                cherrypy.log("Request failed: {0}".format(
                                    str(e)))
                                raise cherrypy.HTTPError(http_client.NOT_FOUND,
                                    str(e))

                buf = cStringIO()
                try:
                        p5i.write(buf, pubs)
                except Exception as e:
                        # Treat any remaining error as a 404, but log it and
                        # include the real failure information.
                        cherrypy.log("Request failed: {0}".format(str(e)))
                        raise cherrypy.HTTPError(http_client.NOT_FOUND, str(e))
                buf.seek(0)
                self.__set_response_expires("publisher", 86400*365, 86400*365)
                return buf.getvalue()
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def _parse_html_error(content):
                """Parse a html document that contains error information.
                Return the html as a plain text string."""

                msg = None
                if not content:
                        return msg

                from xml.dom.minidom import Document, parse
                dom = parse(cStringIO(content))
                msg = ""

                paragraphs = []
                if not isinstance(dom, Document):
                        # Assume the output was the message.
                        msg = content
                else:
                        paragraphs = dom.getElementsByTagName("p")

                # XXX this is specific to the depot server's current
                # error output style.
                for p in paragraphs:
                        for c in p.childNodes:
                                if c.nodeType == c.TEXT_NODE:
                                        value = c.nodeValue
                                        if value is not None:
                                                msg += ("\n{0}".format(value))

                return msg
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def get_versions(self, header=None, ccancel=None):
                """Query the repo for versions information.
                Returns a file-like object."""

                buf = cStringIO()
                vops = {
                    "abandon": ["0"],
                    "add": ["0"],
                    "admin": ["0"],
                    "append": ["0"],
                    "catalog": ["1"],
                    "close": ["0"],
                    "file": ["0", "1"],
                    "manifest": ["0", "1"],
                    "open": ["0"],
                    "publisher": ["0", "1"],
                    "search": ["1"],
                    "status": ["0"],
                    "versions": ["0"],
                }

                buf.write("pkg-server {0}\n".format(pkg.VERSION))
                buf.write("\n".join(
                    "{0} {1}".format(op, " ".join(vers))
                    for op, vers in six.iteritems(vops)
                ) + "\n")
                buf.seek(0)
                self.__stats.record_tx()
                return buf
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def intercept_log_messages():
    try:
        mylog = logging.getLogger('nova')
        stream = cStringIO()
        handler = logging.logging.StreamHandler(stream)
        handler.setFormatter(formatters.ContextFormatter())
        mylog.logger.addHandler(handler)
        yield stream
    finally:
        mylog.logger.removeHandler(handler)
项目:Hawkeye    作者:tozhengxq    | 项目源码 | 文件源码
def _check_error(self,args,code,expected,**kw):
        original = sys.stderr
        try:
            actual = cStringIO()
            sys.stderr = actual
            try:
                shell.main(args,**kw)
            except SystemExit as e:
                self.assertEqual(code,e.args[0])
            else:
                self.fail('No exception raised')
        finally:
            sys.stderr = original
        actual = actual.getvalue()
        self.assertTrue(expected in actual,'%r not in:\n"""\n%s\n"""'%(expected,actual))