Python threading 模块,_active() 实例源码

我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用threading._active()

项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def __init__(self):
        #_DummyThread_.__init__(self) # pylint:disable=super-init-not-called

        # It'd be nice to use a pattern like "greenlet-%d", but maybe somebody out
        # there is checking thread names...
        self._name = self._Thread__name = __threading__._newname("DummyThread-%d")
        self._set_ident()

        g = getcurrent()
        gid = _get_ident(g) # same as id(g)
        __threading__._active[gid] = self
        rawlink = getattr(g, 'rawlink', None)
        if rawlink is not None:
            # raw greenlet.greenlet greenlets don't
            # have rawlink...
            rawlink(_cleanup)
        else:
            # ... so for them we use weakrefs.
            # See https://github.com/gevent/gevent/issues/918
            global _weakref
            if _weakref is None:
                _weakref = __import__('weakref')
            ref = _weakref.ref(g, _make_cleanup_id(gid))
            self.__raw_ref = ref
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_foreign_thread(self):
        # Check that a "foreign" thread can use the threading module.
        def f(mutex):
            # Calling current_thread() forces an entry for the foreign
            # thread to get made in the threading._active map.
            threading.current_thread()
            mutex.release()

        mutex = threading.Lock()
        mutex.acquire()
        tid = _thread.start_new_thread(f, (mutex,))
        # Wait for the thread to finish.
        mutex.acquire()
        self.assertIn(tid, threading._active)
        self.assertIsInstance(threading._active[tid], threading._DummyThread)
        del threading._active[tid]

    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
    # exposed at the Python level.  This test relies on ctypes to get at it.
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_foreign_thread(self):
        # Check that a "foreign" thread can use the threading module.
        def f(mutex):
            # Calling current_thread() forces an entry for the foreign
            # thread to get made in the threading._active map.
            threading.current_thread()
            mutex.release()

        mutex = threading.Lock()
        mutex.acquire()
        tid = thread.start_new_thread(f, (mutex,))
        # Wait for the thread to finish.
        mutex.acquire()
        self.assertIn(tid, threading._active)
        self.assertIsInstance(threading._active[tid], threading._DummyThread)
        del threading._active[tid]

    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
    # exposed at the Python level.  This test relies on ctypes to get at it.
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_foreign_thread(self):
        # Check that a "foreign" thread can use the threading module.
        def f(mutex):
            # Calling current_thread() forces an entry for the foreign
            # thread to get made in the threading._active map.
            threading.current_thread()
            mutex.release()

        mutex = threading.Lock()
        mutex.acquire()
        tid = thread.start_new_thread(f, (mutex,))
        # Wait for the thread to finish.
        mutex.acquire()
        self.assertIn(tid, threading._active)
        self.assertIsInstance(threading._active[tid], threading._DummyThread)
        del threading._active[tid]

    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
    # exposed at the Python level.  This test relies on ctypes to get at it.
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_foreign_thread(self):
        # Check that a "foreign" thread can use the threading module.
        def f(mutex):
            # Calling current_thread() forces an entry for the foreign
            # thread to get made in the threading._active map.
            threading.current_thread()
            mutex.release()

        mutex = threading.Lock()
        mutex.acquire()
        tid = _thread.start_new_thread(f, (mutex,))
        # Wait for the thread to finish.
        mutex.acquire()
        self.assertIn(tid, threading._active)
        self.assertIsInstance(threading._active[tid], threading._DummyThread)
        del threading._active[tid]

    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
    # exposed at the Python level.  This test relies on ctypes to get at it.
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_foreign_thread(self):
        # Check that a "foreign" thread can use the threading module.
        def f(mutex):
            # Calling current_thread() forces an entry for the foreign
            # thread to get made in the threading._active map.
            threading.current_thread()
            mutex.release()

        mutex = threading.Lock()
        mutex.acquire()
        tid = thread.start_new_thread(f, (mutex,))
        # Wait for the thread to finish.
        mutex.acquire()
        self.assertIn(tid, threading._active)
        self.assertIsInstance(threading._active[tid], threading._DummyThread)
        del threading._active[tid]

    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
    # exposed at the Python level.  This test relies on ctypes to get at it.
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_foreign_thread(self):
        # Check that a "foreign" thread can use the threading module.
        def f(mutex):
            # Calling current_thread() forces an entry for the foreign
            # thread to get made in the threading._active map.
            threading.current_thread()
            mutex.release()

        mutex = threading.Lock()
        mutex.acquire()
        tid = _thread.start_new_thread(f, (mutex,))
        # Wait for the thread to finish.
        mutex.acquire()
        self.assertIn(tid, threading._active)
        self.assertIsInstance(threading._active[tid], threading._DummyThread)
        del threading._active[tid]

    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
    # exposed at the Python level.  This test relies on ctypes to get at it.
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_foreign_thread(self):
        # Check that a "foreign" thread can use the threading module.
        def f(mutex):
            # Calling current_thread() forces an entry for the foreign
            # thread to get made in the threading._active map.
            threading.current_thread()
            mutex.release()

        mutex = threading.Lock()
        mutex.acquire()
        tid = thread.start_new_thread(f, (mutex,))
        # Wait for the thread to finish.
        mutex.acquire()
        self.assertIn(tid, threading._active)
        self.assertIsInstance(threading._active[tid], threading._DummyThread)
        del threading._active[tid]

    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
    # exposed at the Python level.  This test relies on ctypes to get at it.
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def __bootstrap(self):
    try:
      self._set_ident()
      self._Thread__started.set()
      threading._active_limbo_lock.acquire()
      threading._active[self._Thread__ident] = self
      del threading._limbo[self]
      threading._active_limbo_lock.release()

      if threading._trace_hook:
        sys.settrace(threading._trace_hook)
      if threading._profile_hook:
        sys.setprofile(threading._profile_hook)

      try:
        self.run()
      finally:
        self._Thread__exc_clear()
    finally:
      with threading._active_limbo_lock:
        self._Thread__stop()
        try:
          del threading._active[threading._get_ident()]
        except:
          pass
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def __init__(self):
        #_DummyThread_.__init__(self) # pylint:disable=super-init-not-called

        # It'd be nice to use a pattern like "greenlet-%d", but maybe somebody out
        # there is checking thread names...
        self._name = self._Thread__name = __threading__._newname("DummyThread-%d")
        self._set_ident()

        g = getcurrent()
        gid = _get_ident(g) # same as id(g)
        __threading__._active[gid] = self
        rawlink = getattr(g, 'rawlink', None)
        if rawlink is not None:
            # raw greenlet.greenlet greenlets don't
            # have rawlink...
            rawlink(_cleanup)
        else:
            # ... so for them we use weakrefs.
            # See https://github.com/gevent/gevent/issues/918
            global _weakref
            if _weakref is None:
                _weakref = __import__('weakref')
            ref = _weakref.ref(g, _make_cleanup_id(gid))
            self.__raw_ref = ref
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_foreign_thread(self):
        # Check that a "foreign" thread can use the threading module.
        def f(mutex):
            # Calling current_thread() forces an entry for the foreign
            # thread to get made in the threading._active map.
            threading.current_thread()
            mutex.release()

        mutex = threading.Lock()
        mutex.acquire()
        tid = _thread.start_new_thread(f, (mutex,))
        # Wait for the thread to finish.
        mutex.acquire()
        self.assertIn(tid, threading._active)
        self.assertIsInstance(threading._active[tid], threading._DummyThread)
        del threading._active[tid]

    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
    # exposed at the Python level.  This test relies on ctypes to get at it.
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_orig_thread(self):
        new_mod = """import eventlet
eventlet.monkey_patch()
from eventlet import patcher
import threading
_threading = patcher.original('threading')
def test():
    print(repr(threading.currentThread()))
t = _threading.Thread(target=test)
t.start()
t.join()
print(len(threading._active))
print(len(_threading._active))
"""
        self.write_to_tempfile("newmod", new_mod)
        output, lines = self.launch_subprocess('newmod.py')
        self.assertEqual(len(lines), 4, "\n".join(lines))
        assert lines[0].startswith('<Thread'), lines[0]
        assert lines[1] == '1', lines
        assert lines[2] == '1', lines
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_threading(self):
        new_mod = """import eventlet
eventlet.monkey_patch()
import threading
def test():
    print(repr(threading.currentThread()))
t = threading.Thread(target=test)
t.start()
t.join()
print(len(threading._active))
"""
        self.write_to_tempfile("newmod", new_mod)
        output, lines = self.launch_subprocess('newmod.py')
        self.assertEqual(len(lines), 3, "\n".join(lines))
        assert lines[0].startswith('<_MainThread'), lines[0]
        self.assertEqual(lines[1], "1", lines[1])
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_greenlet(self):
        new_mod = """import eventlet
eventlet.monkey_patch()
from eventlet import event
import threading
evt = event.Event()
def test():
    print(repr(threading.currentThread()))
    evt.send()
eventlet.spawn_n(test)
evt.wait()
print(len(threading._active))
"""
        self.write_to_tempfile("newmod", new_mod)
        output, lines = self.launch_subprocess('newmod.py')
        self.assertEqual(len(lines), 3, "\n".join(lines))
        assert lines[0].startswith('<_MainThread'), lines[0]
        self.assertEqual(lines[1], "1", lines[1])
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def id2thread_name(thread_id):
    return threading.Thread.getName(threading._active[thread_id])
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _cleanup(g):
    __threading__._active.pop(id(g), None)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def __init__(self):
        #_DummyThread_.__init__(self)

        # It'd be nice to use a pattern like "greenlet-%d", but maybe somebody out
        # there is checking thread names...
        self._name = self._Thread__name = __threading__._newname("DummyThread-%d")
        self._set_ident()

        __threading__._active[_get_ident()] = self
        g = getcurrent()
        rawlink = getattr(g, 'rawlink', None)
        if rawlink is not None:
            rawlink(_cleanup)
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def _cleanup(g):
    __threading__._active.pop(id(g), None)
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def _make_cleanup_id(gid):
    def _(_r):
        __threading__._active.pop(gid, None)
    return _
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_ident_of_no_threading_threads(self):
        # The ident still must work for the main thread and dummy threads.
        self.assertFalse(threading.currentThread().ident is None)
        def f():
            ident.append(threading.currentThread().ident)
            done.set()
        done = threading.Event()
        ident = []
        _thread.start_new_thread(f, ())
        done.wait()
        self.assertFalse(ident[0] is None)
        # Kill the "immortal" _DummyThread
        del threading._active[ident[0]]

    # run with a small(ish) thread stack size (256kB)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_ident_of_no_threading_threads(self):
        # The ident still must work for the main thread and dummy threads.
        self.assertIsNotNone(threading.currentThread().ident)
        def f():
            ident.append(threading.currentThread().ident)
            done.set()
        done = threading.Event()
        ident = []
        thread.start_new_thread(f, ())
        done.wait()
        self.assertIsNotNone(ident[0])
        # Kill the "immortal" _DummyThread
        del threading._active[ident[0]]

    # run with a small(ish) thread stack size (256kB)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_ident_of_no_threading_threads(self):
        # The ident still must work for the main thread and dummy threads.
        self.assertIsNotNone(threading.currentThread().ident)
        def f():
            ident.append(threading.currentThread().ident)
            done.set()
        done = threading.Event()
        ident = []
        thread.start_new_thread(f, ())
        done.wait()
        self.assertIsNotNone(ident[0])
        # Kill the "immortal" _DummyThread
        del threading._active[ident[0]]

    # run with a small(ish) thread stack size (256kB)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_ident_of_no_threading_threads(self):
        # The ident still must work for the main thread and dummy threads.
        self.assertFalse(threading.currentThread().ident is None)
        def f():
            ident.append(threading.currentThread().ident)
            done.set()
        done = threading.Event()
        ident = []
        _thread.start_new_thread(f, ())
        done.wait()
        self.assertFalse(ident[0] is None)
        # Kill the "immortal" _DummyThread
        del threading._active[ident[0]]

    # run with a small(ish) thread stack size (256kB)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_ident_of_no_threading_threads(self):
        # The ident still must work for the main thread and dummy threads.
        self.assertFalse(threading.currentThread().ident is None)
        def f():
            ident.append(threading.currentThread().ident)
            done.set()
        done = threading.Event()
        ident = []
        thread.start_new_thread(f, ())
        done.wait()
        self.assertFalse(ident[0] is None)
        # Kill the "immortal" _DummyThread
        del threading._active[ident[0]]

    # run with a small(ish) thread stack size (256kB)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_ident_of_no_threading_threads(self):
        # The ident still must work for the main thread and dummy threads.
        self.assertFalse(threading.currentThread().ident is None)
        def f():
            ident.append(threading.currentThread().ident)
            done.set()
        done = threading.Event()
        ident = []
        _thread.start_new_thread(f, ())
        done.wait()
        self.assertFalse(ident[0] is None)
        # Kill the "immortal" _DummyThread
        del threading._active[ident[0]]

    # run with a small(ish) thread stack size (256kB)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_ident_of_no_threading_threads(self):
        # The ident still must work for the main thread and dummy threads.
        self.assertFalse(threading.currentThread().ident is None)
        def f():
            ident.append(threading.currentThread().ident)
            done.set()
        done = threading.Event()
        ident = []
        thread.start_new_thread(f, ())
        done.wait()
        self.assertFalse(ident[0] is None)
        # Kill the "immortal" _DummyThread
        del threading._active[ident[0]]

    # run with a small(ish) thread stack size (256kB)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _cleanup(g):
    __threading__._active.pop(id(g), None)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _make_cleanup_id(gid):
    def _(_r):
        __threading__._active.pop(gid, None)
    return _
项目:easypy    作者:weka-io    | 项目源码 | 文件源码
def __repr__(self):
        owner, count = self._get_data()
        try:
            owner = threading._active[owner].name
        except KeyError:
            pass
        if owner:
            return "<{}, owned by <{}>x{} for {}>".format(self._name, owner, count, self._lease_timer.elapsed)
        else:
            return "<{}, unowned>".format(self._name)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_ident_of_no_threading_threads(self):
        # The ident still must work for the main thread and dummy threads.
        self.assertFalse(threading.currentThread().ident is None)
        def f():
            ident.append(threading.currentThread().ident)
            done.set()
        done = threading.Event()
        ident = []
        _thread.start_new_thread(f, ())
        done.wait()
        self.assertFalse(ident[0] is None)
        # Kill the "immortal" _DummyThread
        del threading._active[ident[0]]

    # run with a small(ish) thread stack size (256kB)
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_tpool(self):
        new_mod = """import eventlet
eventlet.monkey_patch()
from eventlet import tpool
import threading
def test():
    print(repr(threading.currentThread()))
tpool.execute(test)
print(len(threading._active))
"""
        self.write_to_tempfile("newmod", new_mod)
        output, lines = self.launch_subprocess('newmod.py')
        self.assertEqual(len(lines), 3, "\n".join(lines))
        assert lines[0].startswith('<Thread'), lines[0]
        self.assertEqual(lines[1], "1", lines[1])
项目:deb-python-eventlet    作者:openstack    | 项目源码 | 文件源码
def test_greenthread(self):
        new_mod = """import eventlet
eventlet.monkey_patch()
import threading
def test():
    print(repr(threading.currentThread()))
t = eventlet.spawn(test)
t.wait()
print(len(threading._active))
"""
        self.write_to_tempfile("newmod", new_mod)
        output, lines = self.launch_subprocess('newmod.py')
        self.assertEqual(len(lines), 3, "\n".join(lines))
        assert lines[0].startswith('<_GreenThread'), lines[0]
        self.assertEqual(lines[1], "1", lines[1])
项目:Lixiang_zhaoxin    作者:hejaxian    | 项目源码 | 文件源码
def _cleanup(g):
    __threading__._active.pop(id(g), None)
项目:Lixiang_zhaoxin    作者:hejaxian    | 项目源码 | 文件源码
def __init__(self):
        #_DummyThread_.__init__(self)

        # It'd be nice to use a pattern like "greenlet-%d", but maybe somebody out
        # there is checking thread names...
        self._name = self._Thread__name = __threading__._newname("DummyThread-%d")
        self._set_ident()

        __threading__._active[_get_ident()] = self
        g = getcurrent()
        rawlink = getattr(g, 'rawlink', None)
        if rawlink is not None:
            rawlink(_cleanup)