Python threading 模块,_limbo() 实例源码

我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用threading._limbo()

项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def __bootstrap(self):
    try:
      self._set_ident()
      self._Thread__started.set()
      threading._active_limbo_lock.acquire()
      threading._active[self._Thread__ident] = self
      del threading._limbo[self]
      threading._active_limbo_lock.release()

      if threading._trace_hook:
        sys.settrace(threading._trace_hook)
      if threading._profile_hook:
        sys.setprofile(threading._profile_hook)

      try:
        self.run()
      finally:
        self._Thread__exc_clear()
    finally:
      with threading._active_limbo_lock:
        self._Thread__stop()
        try:
          del threading._active[threading._get_ident()]
        except:
          pass
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_limbo_cleanup(self):
        # Issue 7481: Failure to start thread should cleanup the limbo map.
        def fail_new_thread(*args):
            raise threading.ThreadError()
        _start_new_thread = threading._start_new_thread
        threading._start_new_thread = fail_new_thread
        try:
            t = threading.Thread(target=lambda: None)
            self.assertRaises(threading.ThreadError, t.start)
            self.assertFalse(
                t in threading._limbo,
                "Failed to cleanup _limbo map on failure of Thread.start().")
        finally:
            threading._start_new_thread = _start_new_thread
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_limbo_cleanup(self):
        # Issue 7481: Failure to start thread should cleanup the limbo map.
        def fail_new_thread(*args):
            raise thread.error()
        _start_new_thread = threading._start_new_thread
        threading._start_new_thread = fail_new_thread
        try:
            t = threading.Thread(target=lambda: None)
            self.assertRaises(thread.error, t.start)
            self.assertFalse(
                t in threading._limbo,
                "Failed to cleanup _limbo map on failure of Thread.start().")
        finally:
            threading._start_new_thread = _start_new_thread
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_limbo_cleanup(self):
        # Issue 7481: Failure to start thread should cleanup the limbo map.
        def fail_new_thread(*args):
            raise thread.error()
        _start_new_thread = threading._start_new_thread
        threading._start_new_thread = fail_new_thread
        try:
            t = threading.Thread(target=lambda: None)
            self.assertRaises(thread.error, t.start)
            self.assertFalse(
                t in threading._limbo,
                "Failed to cleanup _limbo map on failure of Thread.start().")
        finally:
            threading._start_new_thread = _start_new_thread
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_limbo_cleanup(self):
        # Issue 7481: Failure to start thread should cleanup the limbo map.
        def fail_new_thread(*args):
            raise threading.ThreadError()
        _start_new_thread = threading._start_new_thread
        threading._start_new_thread = fail_new_thread
        try:
            t = threading.Thread(target=lambda: None)
            self.assertRaises(threading.ThreadError, t.start)
            self.assertFalse(
                t in threading._limbo,
                "Failed to cleanup _limbo map on failure of Thread.start().")
        finally:
            threading._start_new_thread = _start_new_thread
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_limbo_cleanup(self):
        # Issue 7481: Failure to start thread should cleanup the limbo map.
        def fail_new_thread(*args):
            raise thread.error()
        _start_new_thread = threading._start_new_thread
        threading._start_new_thread = fail_new_thread
        try:
            t = threading.Thread(target=lambda: None)
            self.assertRaises(thread.error, t.start)
            self.assertFalse(
                t in threading._limbo,
                "Failed to cleanup _limbo map on failure of Thread.start().")
        finally:
            threading._start_new_thread = _start_new_thread
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_limbo_cleanup(self):
        # Issue 7481: Failure to start thread should cleanup the limbo map.
        def fail_new_thread(*args):
            raise threading.ThreadError()
        _start_new_thread = threading._start_new_thread
        threading._start_new_thread = fail_new_thread
        try:
            t = threading.Thread(target=lambda: None)
            self.assertRaises(threading.ThreadError, t.start)
            self.assertFalse(
                t in threading._limbo,
                "Failed to cleanup _limbo map on failure of Thread.start().")
        finally:
            threading._start_new_thread = _start_new_thread
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_limbo_cleanup(self):
        # Issue 7481: Failure to start thread should cleanup the limbo map.
        def fail_new_thread(*args):
            raise thread.error()
        _start_new_thread = threading._start_new_thread
        threading._start_new_thread = fail_new_thread
        try:
            t = threading.Thread(target=lambda: None)
            self.assertRaises(thread.error, t.start)
            self.assertFalse(
                t in threading._limbo,
                "Failed to cleanup _limbo map on failure of Thread.start().")
        finally:
            threading._start_new_thread = _start_new_thread
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def start(self):
    """Starts this background thread."""
    if not self._Thread__initialized:
      raise RuntimeError('thread.__init__() not called')
    if self._Thread__started.is_set():
      raise RuntimeError('threads can only be started once')
    with threading._active_limbo_lock:
      threading._limbo[self] = self
    try:
      start_new_background_thread(self.__bootstrap, ())
    except Exception:
      with threading._active_limbo_lock:
        del threading._limbo[self]
      raise
    self._Thread__started.wait()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_limbo_cleanup(self):
        # Issue 7481: Failure to start thread should cleanup the limbo map.
        def fail_new_thread(*args):
            raise threading.ThreadError()
        _start_new_thread = threading._start_new_thread
        threading._start_new_thread = fail_new_thread
        try:
            t = threading.Thread(target=lambda: None)
            self.assertRaises(threading.ThreadError, t.start)
            self.assertFalse(
                t in threading._limbo,
                "Failed to cleanup _limbo map on failure of Thread.start().")
        finally:
            threading._start_new_thread = _start_new_thread