Python imp 模块,lock_held() 实例源码

我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用imp.lock_held()

项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def check_parallel_module_init(self):
        if imp.lock_held():
            # This triggers on, e.g., from test import autotest.
            raise unittest.SkipTest("can't run when import lock is held")

        done = threading.Event()
        for N in (20, 50) * 3:
            if verbose:
                print("Trying", N, "threads ...", end=' ')
            # Make sure that random and modulefinder get reimported freshly
            for modname in ['random', 'modulefinder']:
                try:
                    del sys.modules[modname]
                except KeyError:
                    pass
            errors = []
            done_tasks = []
            done.clear()
            for i in range(N):
                thread.start_new_thread(task, (N, done, done_tasks, errors,))
            done.wait(60)
            self.assertFalse(errors)
            if verbose:
                print("OK.")
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_main():        # magic name!  see above
    global N, done

    import imp
    if imp.lock_held():
        # This triggers on, e.g., from test import autotest.
        raise unittest.SkipTest("can't run when import lock is held")

    done.acquire()
    for N in (20, 50) * 3:
        if verbose:
            print "Trying", N, "threads ...",
        for i in range(N):
            thread.start_new_thread(task, ())
        done.acquire()
        if verbose:
            print "OK."
    done.release()

    test_import_hangers()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_main():        # magic name!  see above
    global N, done

    import imp
    if imp.lock_held():
        # This triggers on, e.g., from test import autotest.
        raise unittest.SkipTest("can't run when import lock is held")

    done.acquire()
    for N in (20, 50) * 3:
        if verbose:
            print "Trying", N, "threads ...",
        for i in range(N):
            thread.start_new_thread(task, ())
        done.acquire()
        if verbose:
            print "OK."
    done.release()

    test_import_hangers()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_main():        # magic name!  see above
    global N, done

    import imp
    if imp.lock_held():
        # This triggers on, e.g., from test import autotest.
        raise unittest.SkipTest("can't run when import lock is held")

    done.acquire()
    for N in (20, 50) * 3:
        if verbose:
            print "Trying", N, "threads ...",
        for i in range(N):
            thread.start_new_thread(task, ())
        done.acquire()
        if verbose:
            print "OK."
    done.release()

    test_import_hangers()
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_main():        # magic name!  see above
    global N, done

    import imp
    if imp.lock_held():
        # This triggers on, e.g., from test import autotest.
        raise unittest.SkipTest("can't run when import lock is held")

    done.acquire()
    for N in (20, 50) * 3:
        if verbose:
            print "Trying", N, "threads ...",
        for i in range(N):
            thread.start_new_thread(task, ())
        done.acquire()
        if verbose:
            print "OK."
    done.release()

    test_import_hangers()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def verify_lock_state(self, expected):
        self.assertEqual(imp.lock_held(), expected,
                             "expected imp.lock_held() to be %r" % expected)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def testLock(self):
        LOOPS = 50

        # The import lock may already be held, e.g. if the test suite is run
        # via "import test.autotest".
        lock_held_at_start = imp.lock_held()
        self.verify_lock_state(lock_held_at_start)

        for i in range(LOOPS):
            imp.acquire_lock()
            self.verify_lock_state(True)

        for i in range(LOOPS):
            imp.release_lock()

        # The original state should be restored now.
        self.verify_lock_state(lock_held_at_start)

        if not lock_held_at_start:
            try:
                imp.release_lock()
            except RuntimeError:
                pass
            else:
                self.fail("release_lock() without lock should raise "
                            "RuntimeError")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_main():
    if imp.lock_held():
        # If the import lock is held, the threads will hang
        raise unittest.SkipTest("can't run when import lock is held")

    test.support.run_unittest(SocketServerTest)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def verify_lock_state(self, expected):
        self.assertEqual(imp.lock_held(), expected,
                             "expected imp.lock_held() to be %r" % expected)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def testLock(self):
        LOOPS = 50

        # The import lock may already be held, e.g. if the test suite is run
        # via "import test.autotest".
        lock_held_at_start = imp.lock_held()
        self.verify_lock_state(lock_held_at_start)

        for i in range(LOOPS):
            imp.acquire_lock()
            self.verify_lock_state(True)

        for i in range(LOOPS):
            imp.release_lock()

        # The original state should be restored now.
        self.verify_lock_state(lock_held_at_start)

        if not lock_held_at_start:
            try:
                imp.release_lock()
            except RuntimeError:
                pass
            else:
                self.fail("release_lock() without lock should raise "
                            "RuntimeError")
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_main():
    if imp.lock_held():
        # If the import lock is held, the threads will hang
        raise unittest.SkipTest("can't run when import lock is held")

    test.test_support.run_unittest(SocketServerTest)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def verify_lock_state(self, expected):
        self.assertEqual(imp.lock_held(), expected,
                             "expected imp.lock_held() to be %r" % expected)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def testLock(self):
        LOOPS = 50

        # The import lock may already be held, e.g. if the test suite is run
        # via "import test.autotest".
        lock_held_at_start = imp.lock_held()
        self.verify_lock_state(lock_held_at_start)

        for i in range(LOOPS):
            imp.acquire_lock()
            self.verify_lock_state(True)

        for i in range(LOOPS):
            imp.release_lock()

        # The original state should be restored now.
        self.verify_lock_state(lock_held_at_start)

        if not lock_held_at_start:
            try:
                imp.release_lock()
            except RuntimeError:
                pass
            else:
                self.fail("release_lock() without lock should raise "
                            "RuntimeError")
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_main():
    if imp.lock_held():
        # If the import lock is held, the threads will hang
        raise unittest.SkipTest("can't run when import lock is held")

    test.test_support.run_unittest(SocketServerTest)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def find_module(self, name, path=None):
        # Simulate some thread-unsafe behaviour. If calls to find_module()
        # are properly serialized, `x` will end up the same as `numcalls`.
        # Otherwise not.
        assert imp.lock_held()
        with self.lock:
            self.numcalls += 1
        x = self.x
        time.sleep(0.01)
        self.x = x + 1
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def check_parallel_module_init(self):
        if imp.lock_held():
            # This triggers on, e.g., from test import autotest.
            raise unittest.SkipTest("can't run when import lock is held")

        done = threading.Event()
        for N in (20, 50) * 3:
            if verbose:
                print("Trying", N, "threads ...", end=' ')
            # Make sure that random and modulefinder get reimported freshly
            for modname in ['random', 'modulefinder']:
                try:
                    del sys.modules[modname]
                except KeyError:
                    pass
            errors = []
            done_tasks = []
            done.clear()
            for i in range(N):
                t = threading.Thread(target=task,
                                     args=(N, done, done_tasks, errors,))
                t.start()
            self.assertTrue(done.wait(60))
            self.assertFalse(errors)
            if verbose:
                print("OK.")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def verify_lock_state(self, expected):
        self.assertEqual(imp.lock_held(), expected,
                             "expected imp.lock_held() to be %r" % expected)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def testLock(self):
        LOOPS = 50

        # The import lock may already be held, e.g. if the test suite is run
        # via "import test.autotest".
        lock_held_at_start = imp.lock_held()
        self.verify_lock_state(lock_held_at_start)

        for i in range(LOOPS):
            imp.acquire_lock()
            self.verify_lock_state(True)

        for i in range(LOOPS):
            imp.release_lock()

        # The original state should be restored now.
        self.verify_lock_state(lock_held_at_start)

        if not lock_held_at_start:
            try:
                imp.release_lock()
            except RuntimeError:
                pass
            else:
                self.fail("release_lock() without lock should raise "
                            "RuntimeError")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_main():
    if imp.lock_held():
        # If the import lock is held, the threads will hang
        raise unittest.SkipTest("can't run when import lock is held")

    test.support.run_unittest(SocketServerTest)
项目:remoteControlPPT    作者:htwenning    | 项目源码 | 文件源码
def execute(meth, *args, **kwargs):
    """
    Execute *meth* in a Python thread, blocking the current coroutine/
    greenthread until the method completes.

    The primary use case for this is to wrap an object or module that is not
    amenable to monkeypatching or any of the other tricks that Eventlet uses
    to achieve cooperative yielding.  With tpool, you can force such objects to
    cooperate with green threads by sticking them in native threads, at the cost
    of some overhead.
    """
    setup()
    # if already in tpool, don't recurse into the tpool
    # also, call functions directly if we're inside an import lock, because
    # if meth does any importing (sadly common), it will hang
    my_thread = threading.currentThread()
    if my_thread in _threads or imp.lock_held() or _nthreads == 0:
        return meth(*args, **kwargs)

    e = event.Event()
    _reqq.put((e, meth, args, kwargs))

    rv = e.wait()
    if isinstance(rv, tuple) \
            and len(rv) == 3 \
            and isinstance(rv[1], EXC_CLASSES):
        (c, e, tb) = rv
        if not QUIET:
            traceback.print_exception(c, e, tb)
            traceback.print_stack()
        six.reraise(c, e, tb)
    return rv
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def verify_lock_state(self, expected):
        self.assertEqual(imp.lock_held(), expected,
                             "expected imp.lock_held() to be %r" % expected)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def testLock(self):
        LOOPS = 50

        # The import lock may already be held, e.g. if the test suite is run
        # via "import test.autotest".
        lock_held_at_start = imp.lock_held()
        self.verify_lock_state(lock_held_at_start)

        for i in range(LOOPS):
            imp.acquire_lock()
            self.verify_lock_state(True)

        for i in range(LOOPS):
            imp.release_lock()

        # The original state should be restored now.
        self.verify_lock_state(lock_held_at_start)

        if not lock_held_at_start:
            try:
                imp.release_lock()
            except RuntimeError:
                pass
            else:
                self.fail("release_lock() without lock should raise "
                            "RuntimeError")
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_main():
    if imp.lock_held():
        # If the import lock is held, the threads will hang
        raise unittest.SkipTest("can't run when import lock is held")

    test.test_support.run_unittest(SocketServerTest)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def verify_lock_state(self, expected):
        self.assertEqual(imp.lock_held(), expected,
                             "expected imp.lock_held() to be %r" % expected)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def testLock(self):
        LOOPS = 50

        # The import lock may already be held, e.g. if the test suite is run
        # via "import test.autotest".
        lock_held_at_start = imp.lock_held()
        self.verify_lock_state(lock_held_at_start)

        for i in range(LOOPS):
            imp.acquire_lock()
            self.verify_lock_state(True)

        for i in range(LOOPS):
            imp.release_lock()

        # The original state should be restored now.
        self.verify_lock_state(lock_held_at_start)

        if not lock_held_at_start:
            try:
                imp.release_lock()
            except RuntimeError:
                pass
            else:
                self.fail("release_lock() without lock should raise "
                            "RuntimeError")
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def verify_lock_state(self, expected):
        self.assertEqual(imp.lock_held(), expected,
                             "expected imp.lock_held() to be %r" % expected)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def testLock(self):
        LOOPS = 50

        # The import lock may already be held, e.g. if the test suite is run
        # via "import test.autotest".
        lock_held_at_start = imp.lock_held()
        self.verify_lock_state(lock_held_at_start)

        for i in range(LOOPS):
            imp.acquire_lock()
            self.verify_lock_state(True)

        for i in range(LOOPS):
            imp.release_lock()

        # The original state should be restored now.
        self.verify_lock_state(lock_held_at_start)

        if not lock_held_at_start:
            try:
                imp.release_lock()
            except RuntimeError:
                pass
            else:
                self.fail("release_lock() without lock should raise "
                            "RuntimeError")
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_main():
    if imp.lock_held():
        # If the import lock is held, the threads will hang
        raise unittest.SkipTest("can't run when import lock is held")

    test.test_support.run_unittest(SocketServerTest)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def verify_lock_state(self, expected):
        self.assertEqual(imp.lock_held(), expected,
                             "expected imp.lock_held() to be %r" % expected)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def testLock(self):
        LOOPS = 50

        # The import lock may already be held, e.g. if the test suite is run
        # via "import test.autotest".
        lock_held_at_start = imp.lock_held()
        self.verify_lock_state(lock_held_at_start)

        for i in range(LOOPS):
            imp.acquire_lock()
            self.verify_lock_state(True)

        for i in range(LOOPS):
            imp.release_lock()

        # The original state should be restored now.
        self.verify_lock_state(lock_held_at_start)

        if not lock_held_at_start:
            try:
                imp.release_lock()
            except RuntimeError:
                pass
            else:
                self.fail("release_lock() without lock should raise "
                            "RuntimeError")
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def execute(meth, *args, **kwargs):
    """
    Execute *meth* in a Python thread, blocking the current coroutine/
    greenthread until the method completes.

    The primary use case for this is to wrap an object or module that is not
    amenable to monkeypatching or any of the other tricks that Eventlet uses
    to achieve cooperative yielding.  With tpool, you can force such objects to
    cooperate with green threads by sticking them in native threads, at the cost
    of some overhead.
    """
    setup()
    # if already in tpool, don't recurse into the tpool
    # also, call functions directly if we're inside an import lock, because
    # if meth does any importing (sadly common), it will hang
    my_thread = threading.currentThread()
    if my_thread in _threads or imp.lock_held() or _nthreads == 0:
        return meth(*args, **kwargs)

    e = event.Event()
    _reqq.put((e, meth, args, kwargs))

    rv = e.wait()
    if isinstance(rv, tuple) \
            and len(rv) == 3 \
            and isinstance(rv[1], EXC_CLASSES):
        (c, e, tb) = rv
        if not QUIET:
            traceback.print_exception(c, e, tb)
            traceback.print_stack()
        six.reraise(c, e, tb)
    return rv
项目:pyjvm    作者:coderforlife    | 项目源码 | 文件源码
def __start_core(prefer, opts):
    # Load the JVM dynamic library
    from ._util import jvm_load
    jvm_load(prefer)

    # Start the JVM in its own thread
    import threading, imp
    from ._internal import jvm_create
    jvm = jvm_create() # if the JVM is already created, this will raise an exception
    event = threading.Event()
    locked = imp.lock_held()
    if locked:
        # If we are in the middle of an import (which is definitely possible) we can't start
        # another thread unless we release the import lock. If we do start the thread and then wait
        # for it to start, it will deadlock Python
        imp.release_lock()
    try:
        t = threading.Thread(target=jvm.run, name='JVM-Main', args=(opts,event))
        t.daemon = True
        t.start()
        event.wait() # wait for it to be initialized
    finally:
        if locked: imp.acquire_lock()
    jvm.check_pending_exception()
    global __jvm
    __jvm = jvm