Python testtools 模块,StreamToDict() 实例源码

我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用testtools.StreamToDict()

项目:networking-huawei    作者:openstack    | 项目源码 | 文件源码
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly
                      ))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
项目:os-dpm    作者:openstack    | 项目源码 | 文件源码
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
项目:networking-ovn    作者:netgroup-polito    | 项目源码 | 文件源码
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
项目:networking-dpm    作者:openstack    | 项目源码 | 文件源码
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
项目:kuryr    作者:axbaretto    | 项目源码 | 文件源码
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
项目:nova-dpm    作者:openstack    | 项目源码 | 文件源码
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
项目:networking-huawei    作者:libuparayil    | 项目源码 | 文件源码
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly
                      ))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
项目:fortiosclient    作者:jerryz1982    | 项目源码 | 文件源码
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
项目:stestr    作者:mtreinish    | 项目源码 | 文件源码
def _find_failing(repo):
    run = repo.get_failing()
    case = run.get_test()
    ids = []

    def gather_errors(test_dict):
        if test_dict['status'] == 'fail':
            ids.append(test_dict['id'])

    result = testtools.StreamToDict(gather_errors)
    result.startTestRun()
    try:
        case.run(result)
    finally:
        result.stopTestRun()
    return ids
项目:stestr    作者:mtreinish    | 项目源码 | 文件源码
def __init__(self, repository, partial=False, run_id=None):
        # XXX: Perhaps should factor into a decorator and use an unaltered
        # TestProtocolClient.
        self._repository = repository
        self._run_id = run_id
        if not self._run_id:
            fd, name = tempfile.mkstemp(dir=self._repository.base)
            self.fname = name
            stream = os.fdopen(fd, 'wb')
        else:
            self.fname = os.path.join(self._repository.base, self._run_id)
            stream = open(self.fname, 'ab')
        self.partial = partial
        # The time take by each test, flushed at the end.
        self._times = {}
        self._test_start = None
        self._time = None
        subunit_client = testtools.StreamToExtendedDecorator(
            TestProtocolClient(stream))
        self.hook = testtools.CopyStreamResult([
            subunit_client,
            testtools.StreamToDict(self._handle_test)])
        self._stream = stream
项目:stestr    作者:mtreinish    | 项目源码 | 文件源码
def startTestRun(self):
        self._subunit = io.BytesIO()
        self.subunit_stream = subunit.v2.StreamResultToBytes(self._subunit)
        self.hook = testtools.CopyStreamResult([
            testtools.StreamToDict(self._handle_test),
            self.subunit_stream])
        self.hook.startTestRun()
        self.start_time = datetime.datetime.utcnow()
        session = self.session_factory()
        if not self._run_id:
            self.run = db_api.create_run(session=session)
            self._run_id = self.run.uuid
        else:
            int_id = db_api.get_run_id_from_uuid(self._run_id, session=session)
            self.run = db_api.get_run_by_id(int_id, session=session)
        session.close()
        self.totals = {}
项目:stestr    作者:mtreinish    | 项目源码 | 文件源码
def trace(stdin, stdout, print_failures=False, failonly=False,
          enable_diff=False, abbreviate=False, color=False, post_fails=False,
          no_summary=False):
    stream = subunit.ByteStreamToStreamResult(
        stdin, non_subunit_name='stdout')
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, stdout,
                          print_failures=print_failures,
                          failonly=failonly,
                          enable_diff=enable_diff,
                          abbreviate=abbreviate,
                          enable_color=color))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([outcomes, summary])
    result = testtools.StreamResultRouter(result)
    cat = subunit.test_results.CatFiles(stdout)
    result.add_rule(cat, 'test_id', test_id=None)
    start_time = datetime.datetime.utcnow()
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    stop_time = datetime.datetime.utcnow()
    elapsed_time = stop_time - start_time

    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if post_fails:
        print_fails(stdout)
    if not no_summary:
        print_summary(stdout, elapsed_time)

    # NOTE(mtreinish): Ideally this should live in testtools streamSummary
    # this is just in place until the behavior lands there (if it ever does)
    if count_tests('status', '^success$') == 0:
        print("\nNo tests were successful during the run")
        return 1
    return 0 if summary.wasSuccessful() else 1
项目:stestr    作者:mtreinish    | 项目源码 | 文件源码
def _prior_tests(self, run, failing_id):
        """Calculate what tests from the test run run ran before test_id.

        Tests that ran in a different worker are not included in the result.
        """
        if not getattr(self, '_worker_to_test', False):
            case = run.get_test()
            # Use None if there is no worker-N tag
            # If there are multiple, map them all.
            # (worker-N -> [testid, ...])
            worker_to_test = {}
            # (testid -> [workerN, ...])
            test_to_worker = {}

            def map_test(test_dict):
                tags = test_dict['tags']
                id = test_dict['id']
                workers = []
                for tag in tags:
                    if tag.startswith('worker-'):
                        workers.append(tag)
                if not workers:
                    workers = [None]
                for worker in workers:
                    worker_to_test.setdefault(worker, []).append(id)
                test_to_worker.setdefault(id, []).extend(workers)

            mapper = testtools.StreamToDict(map_test)
            mapper.startTestRun()
            try:
                case.run(mapper)
            finally:
                mapper.stopTestRun()
            self._worker_to_test = worker_to_test
            self._test_to_worker = test_to_worker
        failing_workers = self._test_to_worker[failing_id]
        prior_tests = []
        for worker in failing_workers:
            worker_tests = self._worker_to_test[worker]
            prior_tests.extend(worker_tests[:worker_tests.index(failing_id)])
        return prior_tests
项目:stestr    作者:mtreinish    | 项目源码 | 文件源码
def assertRunExit(self, cmd, expected, subunit=False, stdin=None):
        if stdin:
            p = subprocess.Popen(
                "%s" % cmd, shell=True, stdin=subprocess.PIPE,
                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            out, err = p.communicate(stdin)
        else:
            p = subprocess.Popen(
                "%s" % cmd, shell=True,
                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            out, err = p.communicate()

        if not subunit:
            self.assertEqual(
                p.returncode, expected,
                "Stdout: %s; Stderr: %s" % (out, err))
            return (out, err)
        else:
            self.assertEqual(p.returncode, expected,
                             "Expected return code: %s doesn't match actual "
                             "return code of: %s" % (expected, p.returncode))
            output_stream = io.BytesIO(out)
            stream = subunit_lib.ByteStreamToStreamResult(output_stream)
            starts = testtools.StreamResult()
            summary = testtools.StreamSummary()
            tests = []

            def _add_dict(test):
                tests.append(test)

            outcomes = testtools.StreamToDict(functools.partial(_add_dict))
            result = testtools.CopyStreamResult([starts, outcomes, summary])
            result.startTestRun()
            try:
                stream.run(result)
            finally:
                result.stopTestRun()
            self.assertThat(len(tests), testtools.matchers.GreaterThan(0))
            return (out, err)
项目:stestr    作者:mtreinish    | 项目源码 | 文件源码
def startTestRun(self):
        self._subunit = BytesIO()
        serialiser = subunit.v2.StreamResultToBytes(self._subunit)
        self._hook = testtools.CopyStreamResult([
            testtools.StreamToDict(self._handle_test),
            serialiser])
        self._hook.startTestRun()
项目:deb-subunit    作者:openstack    | 项目源码 | 文件源码
def to_disk(argv=None, stdin=None, stdout=None):
    if stdout is None:
        stdout = sys.stdout
    if stdin is None:
        stdin = sys.stdin
    parser = optparse.OptionParser(
        description="Export a subunit stream to files on disk.",
        epilog=dedent("""\
            Creates a directory per test id, a JSON file with test
            metadata within that directory, and each attachment
            is written to their name relative to that directory.

            Global packages (no test id) are discarded.

            Exits 0 if the export was completed, or non-zero otherwise.
            """))
    parser.add_option(
        "-d", "--directory", help="Root directory to export to.",
        default=".")
    options, args = parser.parse_args(argv)
    if len(args) > 1:
        raise Exception("Unexpected arguments.")
    if len(args):
        source = io.open(args[0], 'rb')
    else:
        source = stdin
    exporter = DiskExporter(options.directory)
    result = StreamToDict(exporter.export)
    run_tests_from_stream(source, result, protocol_version=2)
    return 0