Python threading 模块,_MainThread() 实例源码

我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用threading._MainThread()

项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        if constant.MAIN_UI is None:
            raise RuntimeError('Main UI not initialized')

        # Check if this was called in the main thread (the one running Qt EventLoop)
        # noinspection PyProtectedMember
        if isinstance(threading.current_thread(), threading._MainThread):

            # If we are in the EventLoop, there's no need to dispatch the call to the a signal
            # noinspection PyProtectedMember
            constant.MAIN_UI._do('main_ui', self.func.__name__, args, kwargs)
        else:

            # Otherwise, queue the call
            constant.MAIN_UI.do('main_ui', self.func.__name__, *args, **kwargs)


# noinspection PyAbstractClass
项目:jobs    作者:josiahcarlson    | 项目源码 | 文件源码
def handle_auto_shutdown():
    global ATEXIT_SET, SIGNAL_SET, OLD_SIGNAL
    if not ATEXIT_SET:
        ATEXIT_SET = atexit.register(_signal_handler)

    if not SIGNAL_SET and isinstance(threading.currentThread(), threading._MainThread):
        SIGNAL_SET, OLD_SIGNAL = True, signal.signal(signal.SIGTERM, _signal_handler)
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def get_event_loop(self):
        """Get the event loop.

        This may be None or an instance of EventLoop.
        """
        if (self._local._loop is None and
            not self._local._set_called and
            isinstance(threading.current_thread(), threading._MainThread)):
            self.set_event_loop(self.new_event_loop())
        if self._local._loop is None:
            raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)
        return self._local._loop
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def _init_watcher(self):
        with events._lock:   # ?
            if self._watcher is None:  # pragma: no branch
                self._watcher = SafeChildWatcher()       # ?????

                # ????????
                if isinstance(threading.current_thread(), threading._MainThread):
                    self._watcher.attach_loop(self._local._loop)

    #
    # ????: ??????
    #
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def set_event_loop(self, loop):
        """Set the event loop.

        As a side effect, if a child watcher was set before, then calling
        .set_event_loop() from the main thread will call .attach_loop(loop) on
        the child watcher.
        """
        super().set_event_loop(loop)

        # ????????
        if self._watcher is not None and \
            isinstance(threading.current_thread(), threading._MainThread):
            self._watcher.attach_loop(loop)
项目:Url    作者:beiruan    | 项目源码 | 文件源码
def __enter__(self):
            if not isinstance(threading.current_thread(), threading._MainThread):
                logging.warning("timeout only works on main thread, are you running pyspider in threads?")
                self.seconds = 0
            if self.seconds:
                signal.signal(signal.SIGALRM, self.handle_timeout)
                signal.alarm(int(math.ceil(self.seconds)))
项目:FreeDiscovery    作者:FreeDiscovery    | 项目源码 | 文件源码
def _rename_main_thread():
    """
    This aims to address the fact that joblib wrongly detects uWSGI workers
    as running in the non main thread even when they are not
    see https://github.com/joblib/joblib/issues/180
    """
    import threading
    if isinstance(threading.current_thread(), threading._MainThread) and \
       threading.current_thread().name != 'MainThread':
        print('Warning: joblib: renaming current thread {} to "MainThread".'
              .format(threading.current_thread().name))
        threading.current_thread().name = 'MainThread'
项目:golightan    作者:shirou    | 项目源码 | 文件源码
def get_event_loop(self):
        """Get the event loop.

        This may be None or an instance of EventLoop.
        """
        if (self._local._loop is None and
            not self._local._set_called and
            isinstance(threading.current_thread(), threading._MainThread)):
            self.set_event_loop(self.new_event_loop())
        if self._local._loop is None:
            raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)
        return self._local._loop
项目:golightan    作者:shirou    | 项目源码 | 文件源码
def _init_watcher(self):
        with events._lock:
            if self._watcher is None:  # pragma: no branch
                self._watcher = SafeChildWatcher()
                if isinstance(threading.current_thread(),
                              threading._MainThread):
                    self._watcher.attach_loop(self._local._loop)
项目:golightan    作者:shirou    | 项目源码 | 文件源码
def set_event_loop(self, loop):
        """Set the event loop.

        As a side effect, if a child watcher was set before, then calling
        .set_event_loop() from the main thread will call .attach_loop(loop) on
        the child watcher.
        """

        super().set_event_loop(loop)

        if self._watcher is not None and \
            isinstance(threading.current_thread(), threading._MainThread):
            self._watcher.attach_loop(loop)
项目:mlens    作者:flennerhag    | 项目源码 | 文件源码
def effective_n_jobs(self, n_jobs):
        """Determine the number of jobs which are going to run in parallel.

        This also checks if we are attempting to create a nested parallel
        loop.
        """
        if mp is None:
            return 1

        if mp.current_process().daemon:
            # Daemonic processes cannot have children
            if n_jobs != 1:
                warnings.warn(
                    'Multiprocessing-backed parallel loops cannot be nested,'
                    ' setting n_jobs=1',
                    stacklevel=3)
            return 1

        if not isinstance(threading.current_thread(), threading._MainThread):
            # Prevent posix fork inside in non-main posix threads
            warnings.warn(
                'Multiprocessing-backed parallel loops cannot be nested'
                ' below threads, setting n_jobs=1',
                stacklevel=3)
            return 1

        return super(MultiprocessingBackend, self).effective_n_jobs(n_jobs)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def get_event_loop(self):
        """Get the event loop.

        This may be None or an instance of EventLoop.
        """
        if (self._local._loop is None and
            not self._local._set_called and
            isinstance(threading.current_thread(), threading._MainThread)):
            self.set_event_loop(self.new_event_loop())
        if self._local._loop is None:
            raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)
        return self._local._loop
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def _init_watcher(self):
        with events._lock:
            if self._watcher is None:  # pragma: no branch
                self._watcher = SafeChildWatcher()
                if isinstance(threading.current_thread(),
                              threading._MainThread):
                    self._watcher.attach_loop(self._local._loop)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def set_event_loop(self, loop):
        """Set the event loop.

        As a side effect, if a child watcher was set before, then calling
        .set_event_loop() from the main thread will call .attach_loop(loop) on
        the child watcher.
        """

        super().set_event_loop(loop)

        if self._watcher is not None and \
            isinstance(threading.current_thread(), threading._MainThread):
            self._watcher.attach_loop(loop)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def get_event_loop(self):
        """Get the event loop.

        This may be None or an instance of EventLoop.
        """
        if (self._local._loop is None and
            not self._local._set_called and
            isinstance(threading.current_thread(), threading._MainThread)):
            self.set_event_loop(self.new_event_loop())
        assert self._local._loop is not None, \
               ('There is no current event loop in thread %r.' %
                threading.current_thread().name)
        return self._local._loop
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def _init_watcher(self):
        with events._lock:
            if self._watcher is None:  # pragma: no branch
                self._watcher = SafeChildWatcher()
                if isinstance(threading.current_thread(),
                              threading._MainThread):
                    self._watcher.attach_loop(self._local._loop)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def set_event_loop(self, loop):
        """Set the event loop.

        As a side effect, if a child watcher was set before, then calling
        .set_event_loop() from the main thread will call .attach_loop(loop) on
        the child watcher.
        """

        super().set_event_loop(loop)

        if self._watcher is not None and \
            isinstance(threading.current_thread(), threading._MainThread):
            self._watcher.attach_loop(loop)
项目:stream2segment    作者:rizac    | 项目源码 | 文件源码
def _ismainthread():
    """
    utility function for testing, returns True if we are currently executing in the main thread"""
    # see:
    # http://stackoverflow.com/questions/23206787/check-if-current-thread-is-main-thread-in-python
    return isinstance(threading.current_thread(), threading._MainThread)
项目:autosub-bootstrapbill    作者:BenjV    | 项目源码 | 文件源码
def block(self, interval=0.1):
        """Wait for the EXITING state, KeyboardInterrupt or SystemExit.

        This function is intended to be called only by the main thread.
        After waiting for the EXITING state, it also waits for all threads
        to terminate, and then calls os.execv if self.execv is True. This
        design allows another thread to call bus.restart, yet have the main
        thread perform the actual execv call (required on some platforms).
        """
        try:
            self.wait(states.EXITING, interval=interval, channel='main')
        except (KeyboardInterrupt, IOError):
            # The time.sleep call might raise
            # "IOError: [Errno 4] Interrupted function call" on KBInt.
            self.log('Keyboard Interrupt: shutting down bus')
            self.exit()
        except SystemExit:
            self.log('SystemExit raised: shutting down bus')
            self.exit()
            raise

        # Waiting for ALL child threads to finish is necessary on OS X.
        # See https://github.com/cherrypy/cherrypy/issues/581.
        # It's also good to let them all shut down before allowing
        # the main thread to call atexit handlers.
        # See https://github.com/cherrypy/cherrypy/issues/751.
        self.log('Waiting for child threads to terminate...')
        for t in threading.enumerate():
            # Validate the we're not trying to join the MainThread
            # that will cause a deadlock and the case exist when
            # implemented as a windows service and in any other case
            # that another thread executes cherrypy.engine.exit()
            if (
                    t != threading.currentThread() and
                    t.isAlive() and
                    not isinstance(t, threading._MainThread)
            ):
                # Note that any dummy (external) threads are always daemonic.
                if hasattr(threading.Thread, 'daemon'):
                    # Python 2.6+
                    d = t.daemon
                else:
                    d = t.isDaemon()
                if not d:
                    self.log('Waiting for thread %s.' % t.getName())
                    t.join()

        if self.execv:
            self._do_execv()
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def block(self, interval=0.1):
        """Wait for the EXITING state, KeyboardInterrupt or SystemExit.

        This function is intended to be called only by the main thread.
        After waiting for the EXITING state, it also waits for all threads
        to terminate, and then calls os.execv if self.execv is True. This
        design allows another thread to call bus.restart, yet have the main
        thread perform the actual execv call (required on some platforms).
        """
        try:
            self.wait(states.EXITING, interval=interval, channel='main')
        except (KeyboardInterrupt, IOError):
            # The time.sleep call might raise
            # "IOError: [Errno 4] Interrupted function call" on KBInt.
            self.log('Keyboard Interrupt: shutting down bus')
            self.exit()
        except SystemExit:
            self.log('SystemExit raised: shutting down bus')
            self.exit()
            raise

        # Waiting for ALL child threads to finish is necessary on OS X.
        # See https://github.com/cherrypy/cherrypy/issues/581.
        # It's also good to let them all shut down before allowing
        # the main thread to call atexit handlers.
        # See https://github.com/cherrypy/cherrypy/issues/751.
        self.log("Waiting for child threads to terminate...")
        for t in threading.enumerate():
            # Validate the we're not trying to join the MainThread
            # that will cause a deadlock and the case exist when
            # implemented as a windows service and in any other case
            # that another thread executes cherrypy.engine.exit()
            if (
                    t != threading.currentThread() and
                    t.isAlive() and
                    not isinstance(t, threading._MainThread)
            ):
                # Note that any dummy (external) threads are always daemonic.
                if hasattr(threading.Thread, "daemon"):
                    # Python 2.6+
                    d = t.daemon
                else:
                    d = t.isDaemon()
                if not d:
                    self.log("Waiting for thread %s." % t.getName())
                    t.join()

        if self.execv:
            self._do_execv()