Python testtools 模块,CopyStreamResult() 实例源码

我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用testtools.CopyStreamResult()

项目:networking-huawei    作者:openstack    | 项目源码 | 文件源码
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly
                      ))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
项目:os-dpm    作者:openstack    | 项目源码 | 文件源码
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
项目:networking-ovn    作者:netgroup-polito    | 项目源码 | 文件源码
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
项目:networking-dpm    作者:openstack    | 项目源码 | 文件源码
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
项目:kuryr    作者:axbaretto    | 项目源码 | 文件源码
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
项目:nova-dpm    作者:openstack    | 项目源码 | 文件源码
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
项目:networking-huawei    作者:libuparayil    | 项目源码 | 文件源码
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly
                      ))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
项目:fortiosclient    作者:jerryz1982    | 项目源码 | 文件源码
def main():
    args = parse_args()
    stream = subunit.ByteStreamToStreamResult(
        sys.stdin, non_subunit_name='stdout')
    starts = Starts(sys.stdout)
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, sys.stdout,
                          print_failures=args.print_failures,
                          failonly=args.failonly))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([starts, outcomes, summary])
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if args.post_fails:
        print_fails(sys.stdout)
    print_summary(sys.stdout)
    return (0 if summary.wasSuccessful() else 1)
项目:stestr    作者:mtreinish    | 项目源码 | 文件源码
def __init__(self, repository, partial=False, run_id=None):
        # XXX: Perhaps should factor into a decorator and use an unaltered
        # TestProtocolClient.
        self._repository = repository
        self._run_id = run_id
        if not self._run_id:
            fd, name = tempfile.mkstemp(dir=self._repository.base)
            self.fname = name
            stream = os.fdopen(fd, 'wb')
        else:
            self.fname = os.path.join(self._repository.base, self._run_id)
            stream = open(self.fname, 'ab')
        self.partial = partial
        # The time take by each test, flushed at the end.
        self._times = {}
        self._test_start = None
        self._time = None
        subunit_client = testtools.StreamToExtendedDecorator(
            TestProtocolClient(stream))
        self.hook = testtools.CopyStreamResult([
            subunit_client,
            testtools.StreamToDict(self._handle_test)])
        self._stream = stream
项目:stestr    作者:mtreinish    | 项目源码 | 文件源码
def startTestRun(self):
        self._subunit = io.BytesIO()
        self.subunit_stream = subunit.v2.StreamResultToBytes(self._subunit)
        self.hook = testtools.CopyStreamResult([
            testtools.StreamToDict(self._handle_test),
            self.subunit_stream])
        self.hook.startTestRun()
        self.start_time = datetime.datetime.utcnow()
        session = self.session_factory()
        if not self._run_id:
            self.run = db_api.create_run(session=session)
            self._run_id = self.run.uuid
        else:
            int_id = db_api.get_run_id_from_uuid(self._run_id, session=session)
            self.run = db_api.get_run_by_id(int_id, session=session)
        session.close()
        self.totals = {}
项目:stestr    作者:mtreinish    | 项目源码 | 文件源码
def trace(stdin, stdout, print_failures=False, failonly=False,
          enable_diff=False, abbreviate=False, color=False, post_fails=False,
          no_summary=False):
    stream = subunit.ByteStreamToStreamResult(
        stdin, non_subunit_name='stdout')
    outcomes = testtools.StreamToDict(
        functools.partial(show_outcome, stdout,
                          print_failures=print_failures,
                          failonly=failonly,
                          enable_diff=enable_diff,
                          abbreviate=abbreviate,
                          enable_color=color))
    summary = testtools.StreamSummary()
    result = testtools.CopyStreamResult([outcomes, summary])
    result = testtools.StreamResultRouter(result)
    cat = subunit.test_results.CatFiles(stdout)
    result.add_rule(cat, 'test_id', test_id=None)
    start_time = datetime.datetime.utcnow()
    result.startTestRun()
    try:
        stream.run(result)
    finally:
        result.stopTestRun()
    stop_time = datetime.datetime.utcnow()
    elapsed_time = stop_time - start_time

    if count_tests('status', '.*') == 0:
        print("The test run didn't actually run any tests")
        return 1
    if post_fails:
        print_fails(stdout)
    if not no_summary:
        print_summary(stdout, elapsed_time)

    # NOTE(mtreinish): Ideally this should live in testtools streamSummary
    # this is just in place until the behavior lands there (if it ever does)
    if count_tests('status', '^success$') == 0:
        print("\nNo tests were successful during the run")
        return 1
    return 0 if summary.wasSuccessful() else 1
项目:stestr    作者:mtreinish    | 项目源码 | 文件源码
def assertRunExit(self, cmd, expected, subunit=False, stdin=None):
        if stdin:
            p = subprocess.Popen(
                "%s" % cmd, shell=True, stdin=subprocess.PIPE,
                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            out, err = p.communicate(stdin)
        else:
            p = subprocess.Popen(
                "%s" % cmd, shell=True,
                stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            out, err = p.communicate()

        if not subunit:
            self.assertEqual(
                p.returncode, expected,
                "Stdout: %s; Stderr: %s" % (out, err))
            return (out, err)
        else:
            self.assertEqual(p.returncode, expected,
                             "Expected return code: %s doesn't match actual "
                             "return code of: %s" % (expected, p.returncode))
            output_stream = io.BytesIO(out)
            stream = subunit_lib.ByteStreamToStreamResult(output_stream)
            starts = testtools.StreamResult()
            summary = testtools.StreamSummary()
            tests = []

            def _add_dict(test):
                tests.append(test)

            outcomes = testtools.StreamToDict(functools.partial(_add_dict))
            result = testtools.CopyStreamResult([starts, outcomes, summary])
            result.startTestRun()
            try:
                stream.run(result)
            finally:
                result.stopTestRun()
            self.assertThat(len(tests), testtools.matchers.GreaterThan(0))
            return (out, err)
项目:stestr    作者:mtreinish    | 项目源码 | 文件源码
def startTestRun(self):
        self._subunit = BytesIO()
        serialiser = subunit.v2.StreamResultToBytes(self._subunit)
        self._hook = testtools.CopyStreamResult([
            testtools.StreamToDict(self._handle_test),
            serialiser])
        self._hook.startTestRun()
项目:deb-subunit    作者:openstack    | 项目源码 | 文件源码
def tag_stream(original, filtered, tags):
    """Alter tags on a stream.

    :param original: The input stream.
    :param filtered: The output stream.
    :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
        '-TAG' commands.

        A 'TAG' command will add the tag to the output stream,
        and override any existing '-TAG' command in that stream.
        Specifically:
         * A global 'tags: TAG' will be added to the start of the stream.
         * Any tags commands with -TAG will have the -TAG removed.

        A '-TAG' command will remove the TAG command from the stream.
        Specifically:
         * A 'tags: -TAG' command will be added to the start of the stream.
         * Any 'tags: TAG' command will have 'TAG' removed from it.
        Additionally, any redundant tagging commands (adding a tag globally
        present, or removing a tag globally removed) are stripped as a
        by-product of the filtering.
    :return: 0
    """
    new_tags, gone_tags = tags_to_new_gone(tags)
    source = ByteStreamToStreamResult(original, non_subunit_name='stdout')
    class Tagger(CopyStreamResult):
        def status(self, **kwargs):
            tags = kwargs.get('test_tags')
            if not tags:
                tags = set()
            tags.update(new_tags)
            tags.difference_update(gone_tags)
            if tags:
                kwargs['test_tags'] = tags
            else:
                kwargs['test_tags'] = None
            super(Tagger, self).status(**kwargs)
    output = Tagger([StreamResultToBytes(filtered)])
    source.run(output)
    return 0
项目:deb-subunit    作者:openstack    | 项目源码 | 文件源码
def run_tests_from_stream(input_stream, result, passthrough_stream=None,
    forward_stream=None, protocol_version=1, passthrough_subunit=True):
    """Run tests from a subunit input stream through 'result'.

    Non-test events - top level file attachments - are expected to be
    dropped by v2 StreamResults at the present time (as all the analysis code
    is in ExtendedTestResult API's), so to implement passthrough_stream they
    are diverted and copied directly when that is set.

    :param input_stream: A stream containing subunit input.
    :param result: A TestResult that will receive the test events.
        NB: This should be an ExtendedTestResult for v1 and a StreamResult for
        v2.
    :param passthrough_stream: All non-subunit input received will be
        sent to this stream.  If not provided, uses the ``TestProtocolServer``
        default, which is ``sys.stdout``.
    :param forward_stream: All subunit input received will be forwarded
        to this stream. If not provided, uses the ``TestProtocolServer``
        default, which is to not forward any input. Do not set this when
        transforming the stream - items would be double-reported.
    :param protocol_version: What version of the subunit protocol to expect.
    :param passthrough_subunit: If True, passthrough should be as subunit
        otherwise unwrap it. Only has effect when forward_stream is None.
        (when forwarding as subunit non-subunit input is always turned into
        subunit)
    """
    if 1==protocol_version:
        test = ProtocolTestCase(
            input_stream, passthrough=passthrough_stream,
            forward=forward_stream)
    elif 2==protocol_version:
        # In all cases we encapsulate unknown inputs.
        if forward_stream is not None:
            # Send events to forward_stream as subunit.
            forward_result = StreamResultToBytes(forward_stream)
            # If we're passing non-subunit through, copy:
            if passthrough_stream is None:
                # Not passing non-test events - split them off to nothing.
                router = StreamResultRouter(forward_result)
                router.add_rule(StreamResult(), 'test_id', test_id=None)
                result = CopyStreamResult([router, result])
            else:
                # otherwise, copy all events to forward_result
                result = CopyStreamResult([forward_result, result])
        elif passthrough_stream is not None:
            if not passthrough_subunit:
                # Route non-test events to passthrough_stream, unwrapping them for
                # display.
                passthrough_result = CatFiles(passthrough_stream)
            else:
                passthrough_result = StreamResultToBytes(passthrough_stream)
            result = StreamResultRouter(result)
            result.add_rule(passthrough_result, 'test_id', test_id=None)
        test = ByteStreamToStreamResult(input_stream,
            non_subunit_name='stdout')
    else:
        raise Exception("Unknown protocol version.")
    result.startTestRun()
    test.run(result)
    result.stopTestRun()