Python test 模块,support() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用test.support()

项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def _listener(cls, conn, families):
        for fam in families:
            l = cls.connection.Listener(family=fam)
            conn.send(l.address)
            new_conn = l.accept()
            conn.send(new_conn)
            new_conn.close()
            l.close()

        l = socket.socket()
        l.bind((test.support.HOST, 0))
        l.listen(1)
        conn.send(l.getsockname())
        new_conn, addr = l.accept()
        conn.send(new_conn)
        new_conn.close()
        l.close()

        conn.recv()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_run_call_order__error_in_tearDown_default_result(self):

        class Foo(Test.LoggingTestCase):
            def defaultTestResult(self):
                return LoggingResult(self.events)
            def tearDown(self):
                super(Foo, self).tearDown()
                raise RuntimeError('raised by Foo.tearDown')

        events = []
        Foo(events).run()
        expected = ['startTestRun', 'startTest', 'setUp', 'test', 'tearDown',
                    'addError', 'stopTest', 'stopTestRun']
        self.assertEqual(events, expected)

    # "TestCase.run() still works when the defaultTestResult is a TestResult
    # that does not support startTestRun and stopTestRun.
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def check_enough_semaphores():
    """Check that the system supports enough semaphores to run the test."""
    # minimum number of semaphores available according to POSIX
    nsems_min = 256
    try:
        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
    except (AttributeError, ValueError):
        # sysconf not available or setting not available
        return
    if nsems == -1 or nsems >= nsems_min:
        return
    raise unittest.SkipTest("The OS doesn't support enough semaphores "
                            "to run the test (required: %d)." % nsems_min)


#
# Creates a wrapper for a function which records the time it takes to finish
#
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_stderr_flush(self):
        # sys.stderr is flushed at process shutdown (issue #13812)
        if self.TYPE == "threads":
            return

        testfn = test.support.TESTFN
        self.addCleanup(test.support.unlink, testfn)
        proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
        proc.start()
        proc.join()
        with open(testfn, 'r') as f:
            err = f.read()
            # The whole traceback was printed
            self.assertIn("ZeroDivisionError", err)
            self.assertIn("test_multiprocessing.py", err)
            self.assertIn("1/0 # MARKER", err)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_fd_transfer(self):
        if self.TYPE != 'processes':
            self.skipTest("only makes sense with processes")
        conn, child_conn = self.Pipe(duplex=True)

        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
        p.daemon = True
        p.start()
        self.addCleanup(test.support.unlink, test.support.TESTFN)
        with open(test.support.TESTFN, "wb") as f:
            fd = f.fileno()
            if msvcrt:
                fd = msvcrt.get_osfhandle(fd)
            reduction.send_handle(conn, fd, p.pid)
        p.join()
        with open(test.support.TESTFN, "rb") as f:
            self.assertEqual(f.read(), b"foo")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_large_fd_transfer(self):
        # With fd > 256 (issue #11657)
        if self.TYPE != 'processes':
            self.skipTest("only makes sense with processes")
        conn, child_conn = self.Pipe(duplex=True)

        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
        p.daemon = True
        p.start()
        self.addCleanup(test.support.unlink, test.support.TESTFN)
        with open(test.support.TESTFN, "wb") as f:
            fd = f.fileno()
            for newfd in range(256, MAXFD):
                if not self._is_fd_assigned(newfd):
                    break
            else:
                self.fail("could not find an unassigned large file descriptor")
            os.dup2(fd, newfd)
            try:
                reduction.send_handle(conn, newfd, p.pid)
            finally:
                os.close(newfd)
        p.join()
        with open(test.support.TESTFN, "rb") as f:
            self.assertEqual(f.read(), b"bar")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def setUp(self):
        self.c = len(struct.pack('c', b' '))
        self.H = len(struct.pack('H', 0))
        self.i = len(struct.pack('i', 0))
        self.l = len(struct.pack('l', 0))
        self.P = len(struct.pack('P', 0))
        # due to missing size_t information from struct, it is assumed that
        # sizeof(Py_ssize_t) = sizeof(void*)
        self.header = 'PP'
        self.vheader = self.header + 'P'
        if hasattr(sys, "gettotalrefcount"):
            self.header += '2P'
            self.vheader += '2P'
        self.longdigit = sys.int_info.sizeof_digit
        import _testcapi
        self.gc_headsize = _testcapi.SIZEOF_PYGC_HEAD
        self.file = open(test.support.TESTFN, 'wb')
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_whichdb(self):
        for module in dbm_iterator():
            # Check whether whichdb correctly guesses module name
            # for databases opened with "module" module.
            # Try with empty files first
            name = module.__name__
            if name == 'dbm.dumb':
                continue   # whichdb can't support dbm.dumb
            delete_files()
            f = module.open(_fname, 'c')
            f.close()
            self.assertEqual(name, dbm.whichdb(_fname))
            # Now add a key
            f = module.open(_fname, 'w')
            f[b"1"] = b"1"
            # and test that we can find it
            self.assertIn(b"1", f)
            # and read it
            self.assertTrue(f[b"1"] == b"1")
            f.close()
            self.assertEqual(name, dbm.whichdb(_fname))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def _run_object_doctest(obj, module):
    finder = doctest.DocTestFinder(verbose=verbose, recurse=False)
    runner = doctest.DocTestRunner(verbose=verbose)
    # Use the object's fully qualified name if it has one
    # Otherwise, use the module's name
    try:
        name = "%s.%s" % (obj.__module__, obj.__name__)
    except AttributeError:
        name = module.__name__
    for example in finder.find(obj, name, module):
        runner.run(example)
    f, t = runner.failures, runner.tries
    if f:
        raise test.support.TestFailed("%d of %d doctests failed" % (f, t))
    if verbose:
        print ('doctest (%s) ... %d tests with zero failures' % (module.__name__, t))
    return f, t
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def check_enough_semaphores():
    """Check that the system supports enough semaphores to run the test."""
    # minimum number of semaphores available according to POSIX
    nsems_min = 256
    try:
        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
    except (AttributeError, ValueError):
        # sysconf not available or setting not available
        return
    if nsems == -1 or nsems >= nsems_min:
        return
    raise unittest.SkipTest("The OS doesn't support enough semaphores "
                            "to run the test (required: %d)." % nsems_min)


#
# Creates a wrapper for a function which records the time it takes to finish
#
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_stderr_flush(self):
        # sys.stderr is flushed at process shutdown (issue #13812)
        if self.TYPE == "threads":
            return

        testfn = test.support.TESTFN
        self.addCleanup(test.support.unlink, testfn)
        proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
        proc.start()
        proc.join()
        with open(testfn, 'r') as f:
            err = f.read()
            # The whole traceback was printed
            self.assertIn("ZeroDivisionError", err)
            self.assertIn("test_multiprocessing.py", err)
            self.assertIn("1/0 # MARKER", err)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_whichdb(self):
        for module in dbm_iterator():
            # Check whether whichdb correctly guesses module name
            # for databases opened with "module" module.
            # Try with empty files first
            name = module.__name__
            if name == 'dbm.dumb':
                continue   # whichdb can't support dbm.dumb
            delete_files()
            f = module.open(_fname, 'c')
            f.close()
            self.assertEqual(name, self.dbm.whichdb(_fname))
            # Now add a key
            f = module.open(_fname, 'w')
            f[b"1"] = b"1"
            # and test that we can find it
            self.assertIn(b"1", f)
            # and read it
            self.assertTrue(f[b"1"] == b"1")
            f.close()
            self.assertEqual(name, self.dbm.whichdb(_fname))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def _run_object_doctest(obj, module):
    finder = doctest.DocTestFinder(verbose=verbose, recurse=False)
    runner = doctest.DocTestRunner(verbose=verbose)
    # Use the object's fully qualified name if it has one
    # Otherwise, use the module's name
    try:
        name = "%s.%s" % (obj.__module__, obj.__name__)
    except AttributeError:
        name = module.__name__
    for example in finder.find(obj, name, module):
        runner.run(example)
    f, t = runner.failures, runner.tries
    if f:
        raise test.support.TestFailed("%d of %d doctests failed" % (f, t))
    if verbose:
        print ('doctest (%s) ... %d tests with zero failures' % (module.__name__, t))
    return f, t
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_weakref(self):
        # Check memoryviews are weakrefable
        for tp in self._types:
            b = tp(self._source)
            m = self._view(b)
            L = []
            def callback(wr, b=b):
                L.append(b)
            wr = weakref.ref(m, callback)
            self.assertIs(wr(), m)
            del m
            test.support.gc_collect()
            self.assertIs(wr(), None)
            self.assertIs(L[0], b)

# Variations on source objects for the buffer: bytes-like objects, then arrays
# with itemsize > 1.
# NOTE: support for multi-dimensional objects is unimplemented.
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_recursionlimit_fatalerror(self):
        # A fatal error occurs if a second recursion limit is hit when recovering
        # from a first one.
        code = textwrap.dedent("""
            import sys

            def f():
                try:
                    f()
                except RuntimeError:
                    f()

            sys.setrecursionlimit(%d)
            f()""")
        with test.support.SuppressCrashReport():
            for i in (50, 1000):
                sub = subprocess.Popen([sys.executable, '-c', code % i],
                    stderr=subprocess.PIPE)
                err = sub.communicate()[1]
                self.assertTrue(sub.returncode, sub.returncode)
                self.assertIn(
                    b"Fatal Python error: Cannot recover from stack overflow",
                    err)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_pythontypes(self):
        # check all types defined in Python/
        size = test.support.calcobjsize
        vsize = test.support.calcvobjsize
        check = self.check_sizeof
        # _ast.AST
        import _ast
        check(_ast.AST(), size('P'))
        try:
            raise TypeError
        except TypeError:
            tb = sys.exc_info()[2]
            # traceback
            if tb is not None:
                check(tb, size('2P2i'))
        # symtable entry
        # XXX
        # sys.flags
        check(sys.flags, vsize('') + self.P * len(sys.flags))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_whichdb(self):
        for module in dbm_iterator():
            # Check whether whichdb correctly guesses module name
            # for databases opened with "module" module.
            # Try with empty files first
            name = module.__name__
            if name == 'dbm.dumb':
                continue   # whichdb can't support dbm.dumb
            delete_files()
            f = module.open(_fname, 'c')
            f.close()
            self.assertEqual(name, self.dbm.whichdb(_fname))
            # Now add a key
            f = module.open(_fname, 'w')
            f[b"1"] = b"1"
            # and test that we can find it
            self.assertIn(b"1", f)
            # and read it
            self.assertTrue(f[b"1"] == b"1")
            f.close()
            self.assertEqual(name, self.dbm.whichdb(_fname))
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def check_enough_semaphores():
    """Check that the system supports enough semaphores to run the test."""
    # minimum number of semaphores available according to POSIX
    nsems_min = 256
    try:
        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
    except (AttributeError, ValueError):
        # sysconf not available or setting not available
        return
    if nsems == -1 or nsems >= nsems_min:
        return
    raise unittest.SkipTest("The OS doesn't support enough semaphores "
                            "to run the test (required: %d)." % nsems_min)


#
# Creates a wrapper for a function which records the time it takes to finish
#
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_stderr_flush(self):
        # sys.stderr is flushed at process shutdown (issue #13812)
        if self.TYPE == "threads":
            self.skipTest('test not appropriate for {}'.format(self.TYPE))

        testfn = test.support.TESTFN
        self.addCleanup(test.support.unlink, testfn)
        proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
        proc.start()
        proc.join()
        with open(testfn, 'r') as f:
            err = f.read()
            # The whole traceback was printed
            self.assertIn("ZeroDivisionError", err)
            self.assertIn("test_multiprocessing.py", err)
            self.assertIn("1/0 # MARKER", err)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_no_import_lock_contention(self):
        with test.support.temp_cwd():
            module_name = 'imported_by_an_imported_module'
            with open(module_name + '.py', 'w') as f:
                f.write("""if 1:
                    import multiprocessing

                    q = multiprocessing.Queue()
                    q.put('knock knock')
                    q.get(timeout=3)
                    q.close()
                    del q
                """)

            with test.support.DirsOnSysPath(os.getcwd()):
                try:
                    __import__(module_name)
                except pyqueue.Empty:
                    self.fail("Probable regression on import lock contention;"
                              " see Issue #22853")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_fd_transfer(self):
        if self.TYPE != 'processes':
            self.skipTest("only makes sense with processes")
        conn, child_conn = self.Pipe(duplex=True)

        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
        p.daemon = True
        p.start()
        self.addCleanup(test.support.unlink, test.support.TESTFN)
        with open(test.support.TESTFN, "wb") as f:
            fd = f.fileno()
            if msvcrt:
                fd = msvcrt.get_osfhandle(fd)
            reduction.send_handle(conn, fd, p.pid)
        p.join()
        with open(test.support.TESTFN, "rb") as f:
            self.assertEqual(f.read(), b"foo")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _listener(cls, conn, families):
        for fam in families:
            l = cls.connection.Listener(family=fam)
            conn.send(l.address)
            new_conn = l.accept()
            conn.send(new_conn)
            new_conn.close()
            l.close()

        l = socket.socket()
        l.bind((test.support.HOST, 0))
        l.listen(1)
        conn.send(l.getsockname())
        new_conn, addr = l.accept()
        conn.send(new_conn)
        new_conn.close()
        l.close()

        conn.recv()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_threads_join(self):
        # Non-daemon threads should be joined at subinterpreter shutdown
        # (issue #18808)
        r, w = os.pipe()
        self.addCleanup(os.close, r)
        self.addCleanup(os.close, w)
        code = r"""if 1:
            import os
            import threading
            import time

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_EndInterpreter is called.
                time.sleep(0.05)
                os.write(%d, b"x")
            threading.Thread(target=f).start()
            """ % (w,)
        ret = test.support.run_in_subinterp(code)
        self.assertEqual(ret, 0)
        # The thread was joined properly.
        self.assertEqual(os.read(r, 1), b"x")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_daemon_threads_fatal_error(self):
        subinterp_code = r"""if 1:
            import os
            import threading
            import time

            def f():
                # Make sure the daemon thread is still running when
                # Py_EndInterpreter is called.
                time.sleep(10)
            threading.Thread(target=f, daemon=True).start()
            """
        script = r"""if 1:
            import _testcapi

            _testcapi.run_in_subinterp(%r)
            """ % (subinterp_code,)
        with test.support.SuppressCrashReport():
            rc, out, err = assert_python_failure("-c", script)
        self.assertIn("Fatal Python error: Py_EndInterpreter: "
                      "not the last thread", err.decode())
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_issue22668(self):
        a = array.array('H', [256, 256, 256, 256])
        x = memoryview(a)
        m = x.cast('B')
        b = m.cast('H')
        c = b[0:2]
        d = memoryview(b)

        del b

        self.assertEqual(c[0], 256)
        self.assertEqual(d[0], 256)
        self.assertEqual(c.format, "H")
        self.assertEqual(d.format, "H")

        _ = m.cast('I')
        self.assertEqual(c[0], 256)
        self.assertEqual(d[0], 256)
        self.assertEqual(c.format, "H")
        self.assertEqual(d.format, "H")


# Variations on source objects for the buffer: bytes-like objects, then arrays
# with itemsize > 1.
# NOTE: support for multi-dimensional objects is unimplemented.
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_recursionlimit_fatalerror(self):
        # A fatal error occurs if a second recursion limit is hit when recovering
        # from a first one.
        code = textwrap.dedent("""
            import sys

            def f():
                try:
                    f()
                except RuntimeError:
                    f()

            sys.setrecursionlimit(%d)
            f()""")
        with test.support.SuppressCrashReport():
            for i in (50, 1000):
                sub = subprocess.Popen([sys.executable, '-c', code % i],
                    stderr=subprocess.PIPE)
                err = sub.communicate()[1]
                self.assertTrue(sub.returncode, sub.returncode)
                self.assertIn(
                    b"Fatal Python error: Cannot recover from stack overflow",
                    err)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_pythontypes(self):
        # check all types defined in Python/
        size = test.support.calcobjsize
        vsize = test.support.calcvobjsize
        check = self.check_sizeof
        # _ast.AST
        import _ast
        check(_ast.AST(), size('P'))
        try:
            raise TypeError
        except TypeError:
            tb = sys.exc_info()[2]
            # traceback
            if tb is not None:
                check(tb, size('2P2i'))
        # symtable entry
        # XXX
        # sys.flags
        check(sys.flags, vsize('') + self.P * len(sys.flags))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_whichdb(self):
        for module in dbm_iterator():
            # Check whether whichdb correctly guesses module name
            # for databases opened with "module" module.
            # Try with empty files first
            name = module.__name__
            if name == 'dbm.dumb':
                continue   # whichdb can't support dbm.dumb
            delete_files()
            f = module.open(_fname, 'c')
            f.close()
            self.assertEqual(name, self.dbm.whichdb(_fname))
            # Now add a key
            f = module.open(_fname, 'w')
            f[b"1"] = b"1"
            # and test that we can find it
            self.assertIn(b"1", f)
            # and read it
            self.assertTrue(f[b"1"] == b"1")
            f.close()
            self.assertEqual(name, self.dbm.whichdb(_fname))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def check_enough_semaphores():
    """Check that the system supports enough semaphores to run the test."""
    # minimum number of semaphores available according to POSIX
    nsems_min = 256
    try:
        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
    except (AttributeError, ValueError):
        # sysconf not available or setting not available
        return
    if nsems == -1 or nsems >= nsems_min:
        return
    raise unittest.SkipTest("The OS doesn't support enough semaphores "
                            "to run the test (required: %d)." % nsems_min)


#
# Creates a wrapper for a function which records the time it takes to finish
#
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_stderr_flush(self):
        # sys.stderr is flushed at process shutdown (issue #13812)
        if self.TYPE == "threads":
            self.skipTest('test not appropriate for {}'.format(self.TYPE))

        testfn = test.support.TESTFN
        self.addCleanup(test.support.unlink, testfn)
        proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
        proc.start()
        proc.join()
        with open(testfn, 'r') as f:
            err = f.read()
            # The whole traceback was printed
            self.assertIn("ZeroDivisionError", err)
            self.assertIn("test_multiprocessing.py", err)
            self.assertIn("1/0 # MARKER", err)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_traceback(self):
        # We want ensure that the traceback from the child process is
        # contained in the traceback raised in the main process.
        if self.TYPE == 'processes':
            with self.Pool(1) as p:
                try:
                    p.apply(self._test_traceback)
                except Exception as e:
                    exc = e
                else:
                    raise AssertionError('expected RuntimeError')
            self.assertIs(type(exc), RuntimeError)
            self.assertEqual(exc.args, (123,))
            cause = exc.__cause__
            self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
            self.assertIn('raise RuntimeError(123) # some comment', cause.tb)

            with test.support.captured_stderr() as f1:
                try:
                    raise exc
                except RuntimeError:
                    sys.excepthook(*sys.exc_info())
            self.assertIn('raise RuntimeError(123) # some comment',
                          f1.getvalue())
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_fd_transfer(self):
        if self.TYPE != 'processes':
            self.skipTest("only makes sense with processes")
        conn, child_conn = self.Pipe(duplex=True)

        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
        p.daemon = True
        p.start()
        self.addCleanup(test.support.unlink, test.support.TESTFN)
        with open(test.support.TESTFN, "wb") as f:
            fd = f.fileno()
            if msvcrt:
                fd = msvcrt.get_osfhandle(fd)
            reduction.send_handle(conn, fd, p.pid)
        p.join()
        with open(test.support.TESTFN, "rb") as f:
            self.assertEqual(f.read(), b"foo")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_large_fd_transfer(self):
        # With fd > 256 (issue #11657)
        if self.TYPE != 'processes':
            self.skipTest("only makes sense with processes")
        conn, child_conn = self.Pipe(duplex=True)

        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
        p.daemon = True
        p.start()
        self.addCleanup(test.support.unlink, test.support.TESTFN)
        with open(test.support.TESTFN, "wb") as f:
            fd = f.fileno()
            for newfd in range(256, MAXFD):
                if not self._is_fd_assigned(newfd):
                    break
            else:
                self.fail("could not find an unassigned large file descriptor")
            os.dup2(fd, newfd)
            try:
                reduction.send_handle(conn, newfd, p.pid)
            finally:
                os.close(newfd)
        p.join()
        with open(test.support.TESTFN, "rb") as f:
            self.assertEqual(f.read(), b"bar")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _listener(cls, conn, families):
        for fam in families:
            l = cls.connection.Listener(family=fam)
            conn.send(l.address)
            new_conn = l.accept()
            conn.send(new_conn)
            new_conn.close()
            l.close()

        l = socket.socket()
        l.bind((test.support.HOST, 0))
        l.listen(1)
        conn.send(l.getsockname())
        new_conn, addr = l.accept()
        conn.send(new_conn)
        new_conn.close()
        l.close()

        conn.recv()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_threads_join(self):
        # Non-daemon threads should be joined at subinterpreter shutdown
        # (issue #18808)
        r, w = os.pipe()
        self.addCleanup(os.close, r)
        self.addCleanup(os.close, w)
        code = r"""if 1:
            import os
            import threading
            import time

            def f():
                # Sleep a bit so that the thread is still running when
                # Py_EndInterpreter is called.
                time.sleep(0.05)
                os.write(%d, b"x")
            threading.Thread(target=f).start()
            """ % (w,)
        ret = test.support.run_in_subinterp(code)
        self.assertEqual(ret, 0)
        # The thread was joined properly.
        self.assertEqual(os.read(r, 1), b"x")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def tearDown(self):
        sys.stdout = self.orig_stdout
        sys.stderr = self.orig_stderr
        sys.displayhook = self.orig_displayhook
        test.support.reap_children()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_excepthook(self):
        with test.support.captured_output("stderr") as stderr:
            sys.excepthook(1, '1', 1)
        self.assertTrue("TypeError: print_exception(): Exception expected for " \
                         "value, str found" in stderr.getvalue())

    # FIXME: testing the code for a lost or replaced excepthook in
    # Python/pythonrun.c::PyErr_PrintEx() is tricky.
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_getwindowsversion(self):
        # Raise SkipTest if sys doesn't have getwindowsversion attribute
        test.support.get_attribute(sys, "getwindowsversion")
        v = sys.getwindowsversion()
        self.assertEqual(len(v), 5)
        self.assertIsInstance(v[0], int)
        self.assertIsInstance(v[1], int)
        self.assertIsInstance(v[2], int)
        self.assertIsInstance(v[3], int)
        self.assertIsInstance(v[4], str)
        self.assertRaises(IndexError, operator.getitem, v, 5)
        self.assertIsInstance(v.major, int)
        self.assertIsInstance(v.minor, int)
        self.assertIsInstance(v.build, int)
        self.assertIsInstance(v.platform, int)
        self.assertIsInstance(v.service_pack, str)
        self.assertIsInstance(v.service_pack_minor, int)
        self.assertIsInstance(v.service_pack_major, int)
        self.assertIsInstance(v.suite_mask, int)
        self.assertIsInstance(v.product_type, int)
        self.assertEqual(v[0], v.major)
        self.assertEqual(v[1], v.minor)
        self.assertEqual(v[2], v.build)
        self.assertEqual(v[3], v.platform)
        self.assertEqual(v[4], v.service_pack)

        # This is how platform.py calls it. Make sure tuple
        #  still has 5 elements
        maj, min, buildno, plat, csd = sys.getwindowsversion()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def tearDown(self):
        self.file.close()
        test.support.unlink(test.support.TESTFN)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def delete_files():
    # we don't know the precise name the underlying database uses
    # so we use glob to locate all names
    for f in glob.glob(_fname + "*"):
        test.support.unlink(f)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def setUp(self):
        delete_files()
        self.filename = test.support.TESTFN
        self.d = dbm.open(self.filename, 'c')
        self.d.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_main():
    classes = [WhichDBTestCase]
    for mod in dbm_iterator():
        classes.append(type("TestCase-" + mod.__name__, (AnyDBMTestCase,),
                            {'module': mod}))
    test.support.run_unittest(*classes)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def setUp(self):
        self._threads = test.support.threading_setup()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def tearDown(self):
        test.support.threading_cleanup(*self._threads)
        test.support.reap_children()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_various_ops_small_stack(self):
        if verbose:
            print('with 256kB thread stack size...')
        try:
            threading.stack_size(262144)
        except _thread.error:
            raise unittest.SkipTest(
                'platform does not support changing thread stack size')
        self.test_various_ops()
        threading.stack_size(0)

    # run with a large thread stack size (1MB)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_various_ops_large_stack(self):
        if verbose:
            print('with 1MB thread stack size...')
        try:
            threading.stack_size(0x100000)
        except _thread.error:
            raise unittest.SkipTest(
                'platform does not support changing thread stack size')
        self.test_various_ops()
        threading.stack_size(0)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_main():
    test.support.run_unittest(LockTests, PyRLockTests, CRLockTests, EventTests,
                              ConditionAsRLockTests, ConditionTests,
                              SemaphoreTests, BoundedSemaphoreTests,
                              ThreadTests,
                              ThreadJoinOnShutdown,
                              ThreadingExceptionTests,
                              BarrierTests
                              )
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_writable_readonly(self):
        # Issue #10451: memoryview incorrectly exposes a readonly
        # buffer as writable causing a segfault if using mmap
        tp = self.ro_type
        if tp is None:
            return
        b = tp(self._source)
        m = self._view(b)
        i = io.BytesIO(b'ZZZZ')
        self.assertRaises(TypeError, i.readinto, m)

# Variations on source objects for the buffer: bytes-like objects, then arrays
# with itemsize > 1.
# NOTE: support for multi-dimensional objects is unimplemented.
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_main():
    test.support.run_unittest(__name__)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_sys_exit(self):
        # See Issue 13854
        if self.TYPE == 'threads':
            return

        testfn = test.support.TESTFN
        self.addCleanup(test.support.unlink, testfn)

        for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
            p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
            p.daemon = True
            p.start()
            p.join(5)
            self.assertEqual(p.exitcode, code)

            with open(testfn, 'r') as f:
                self.assertEqual(f.read().rstrip(), str(reason))

        for reason in (True, False, 8):
            p = self.Process(target=sys.exit, args=(reason,))
            p.daemon = True
            p.start()
            p.join(5)
            self.assertEqual(p.exitcode, reason)

#
#
#