Python threading 模块,main_thread() 实例源码

我们从Python开源项目中,提取了以下39个代码示例,用于说明如何使用threading.main_thread()

项目:zmirror    作者:aploium    | 项目源码 | 文件源码
def cron_task_host():
    """??????, ?????????, ???????????"""
    while True:
        # ????????, ??????
        if not enable_cron_tasks:
            if threading.current_thread() != threading.main_thread():
                exit()
            else:
                return

        sleep(60)
        try:
            task_scheduler.run()
        except:  # coverage: exclude
            errprint('ErrorDuringExecutingCronTasks')
            traceback.print_exc()
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def ki_manager(deliver_cb, restrict_keyboard_interrupt_to_checkpoints):
    if (threading.current_thread() != threading.main_thread()
            or signal.getsignal(signal.SIGINT) != signal.default_int_handler):
        yield
        return

    def handler(signum, frame):
        assert signum == signal.SIGINT
        protection_enabled = ki_protection_enabled(frame)
        if protection_enabled or restrict_keyboard_interrupt_to_checkpoints:
            deliver_cb()
        else:
            raise KeyboardInterrupt

    signal.signal(signal.SIGINT, handler)
    try:
        yield
    finally:
        if signal.getsignal(signal.SIGINT) is handler:
            signal.signal(signal.SIGINT, signal.default_int_handler)
项目:r2-d7    作者:FreakyDug    | 项目源码 | 文件源码
def test_threaded(testbot):
    def threadtest(signal):
        # If a new event loop isn't created for the thread, this will crash
        try:
            assert threading.current_thread() != threading.main_thread()
            testbot.load_data()
        except Exception as error:
            # Pytest will catch this stdout and print it and the signal will
            # fail the test
            print(error)
            signal.clear()
        else:
            signal.set()

    signal = threading.Event()
    thread = threading.Thread(target=threadtest, args=(signal, ))
    thread.start()
    thread.join()
    assert signal.is_set()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_main_thread_after_fork(self):
        code = """if 1:
            import os, threading

            pid = os.fork()
            if pid == 0:
                main = threading.main_thread()
                print(main.name)
                print(main.ident == threading.current_thread().ident)
                print(main.ident == threading.get_ident())
            else:
                os.waitpid(pid, 0)
        """
        _, out, err = assert_python_ok("-c", code)
        data = out.decode().replace('\r', '')
        self.assertEqual(err, b"")
        self.assertEqual(data, "MainThread\nTrue\nTrue\n")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_main_thread_after_fork_from_nonmain_thread(self):
        code = """if 1:
            import os, threading, sys

            def f():
                pid = os.fork()
                if pid == 0:
                    main = threading.main_thread()
                    print(main.name)
                    print(main.ident == threading.current_thread().ident)
                    print(main.ident == threading.get_ident())
                    # stdout is fully buffered because not a tty,
                    # we have to flush before exit.
                    sys.stdout.flush()
                else:
                    os.waitpid(pid, 0)

            th = threading.Thread(target=f)
            th.start()
            th.join()
        """
        _, out, err = assert_python_ok("-c", code)
        data = out.decode().replace('\r', '')
        self.assertEqual(err, b"")
        self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_3_join_in_forked_from_thread(self):
        # Like the test above, but fork() was called from a worker thread
        # In the forked process, the main Thread object must be marked as stopped.

        script = """if 1:
            main_thread = threading.current_thread()
            def worker():
                childpid = os.fork()
                if childpid != 0:
                    os.waitpid(childpid, 0)
                    sys.exit(0)

                t = threading.Thread(target=joiningfunc,
                                     args=(main_thread,))
                print('end of main')
                t.start()
                t.join() # Should not block: main_thread is already stopped

            w = threading.Thread(target=worker)
            w.start()
            """
        self._run_and_join(script)
项目:easypy    作者:weka-io    | 项目源码 | 文件源码
def initialize_exception_listener():  # must be invoked in main thread in "geventless" runs in order for raise_in_main_thread to work
    global REGISTERED_SIGNAL
    if REGISTERED_SIGNAL:
        # already registered
        return

    if threading.current_thread() is not threading.main_thread():
        raise NotMainThread()

    def handle_signal(sig, stack):
        global LAST_ERROR
        error = LAST_ERROR
        LAST_ERROR = None
        if error:
            raise error
        raise LastErrorEmpty(signal=sig)

    custom_signal = signal.SIGUSR1
    if signal.getsignal(custom_signal) in (signal.SIG_DFL, signal.SIG_IGN):  # check if signal is already trapped
        signal.signal(custom_signal, handle_signal)
        REGISTERED_SIGNAL = custom_signal
    else:
        raise SignalAlreadyBound(signal=custom_signal)
项目:easypy    作者:weka-io    | 项目源码 | 文件源码
def raise_in_main_thread(exception_type=Exception):

    try:
        yield
    except ProcessExiting:
        # this exception is meant to stay within the thread
        raise
    except exception_type as exc:
        if threading.current_thread() is threading.main_thread():
            raise
        exc._raised_asynchronously = True

        global LAST_ERROR
        if LAST_ERROR:
            _logger.warning("a different error (%s) is pending - skipping", type(LAST_ERROR))
            raise
        LAST_ERROR = exc
        _rimt(exc)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_main_thread_after_fork(self):
        code = """if 1:
            import os, threading

            pid = os.fork()
            if pid == 0:
                main = threading.main_thread()
                print(main.name)
                print(main.ident == threading.current_thread().ident)
                print(main.ident == threading.get_ident())
            else:
                os.waitpid(pid, 0)
        """
        _, out, err = assert_python_ok("-c", code)
        data = out.decode().replace('\r', '')
        self.assertEqual(err, b"")
        self.assertEqual(data, "MainThread\nTrue\nTrue\n")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_main_thread_after_fork_from_nonmain_thread(self):
        code = """if 1:
            import os, threading, sys

            def f():
                pid = os.fork()
                if pid == 0:
                    main = threading.main_thread()
                    print(main.name)
                    print(main.ident == threading.current_thread().ident)
                    print(main.ident == threading.get_ident())
                    # stdout is fully buffered because not a tty,
                    # we have to flush before exit.
                    sys.stdout.flush()
                else:
                    os.waitpid(pid, 0)

            th = threading.Thread(target=f)
            th.start()
            th.join()
        """
        _, out, err = assert_python_ok("-c", code)
        data = out.decode().replace('\r', '')
        self.assertEqual(err, b"")
        self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_3_join_in_forked_from_thread(self):
        # Like the test above, but fork() was called from a worker thread
        # In the forked process, the main Thread object must be marked as stopped.

        script = """if 1:
            main_thread = threading.current_thread()
            def worker():
                childpid = os.fork()
                if childpid != 0:
                    os.waitpid(childpid, 0)
                    sys.exit(0)

                t = threading.Thread(target=joiningfunc,
                                     args=(main_thread,))
                print('end of main')
                t.start()
                t.join() # Should not block: main_thread is already stopped

            w = threading.Thread(target=worker)
            w.start()
            """
        self._run_and_join(script)
项目:tm-librarian    作者:FabricAttachedMemory    | 项目源码 | 文件源码
def destroy(self, path):    # fusermount -u or SIGINT aka control-C
        self.lfs_status = FRDnode.SOC_STATUS_OFFLINE
        self.librarian(self.lcp('update_node_soc_status',
                                status=FRDnode.SOC_STATUS_OFFLINE,
                                cpu_percent=0,
                                rootfs_percent=0,
                                network_in=0,
                                network_out=0,
                                mem_percent=0))
        self.librarian(self.lcp('update_node_mc_status',
                                status=FRDFAModule.MC_STATUS_OFFLINE))
        assert threading.current_thread() is threading.main_thread()
        self.torms.close()
        del self.torms

    # helpers
项目:Learning-Concurrency-in-Python    作者:PacktPublishing    | 项目源码 | 文件源码
def myChildThread():
  print("Child Thread Starting")
  time.sleep(5)
  print("Current Thread ----------")
  print(threading.current_thread())
  print("-------------------------")
  print("Main Thread -------------")
  print(threading.main_thread())
  print("-------------------------")
  print("Child Thread Ending")
项目:Learning-Concurrency-in-Python    作者:PacktPublishing    | 项目源码 | 文件源码
def myChildThread():
  print("Child Thread Starting")
  time.sleep(5)
  print("Current Thread ----------")
  print(threading.current_thread())
  print("-------------------------")
  print("Main Thread -------------")
  print(threading.main_thread())
  print("-------------------------")
  print("Child Thread Ending")
项目:adblockradio    作者:quasoft    | 项目源码 | 文件源码
def on_title_read(self, title):
        assert threading.current_thread() == threading.main_thread()

        if title is None:
            return

        if title != self._last_title:
            self._last_title = title

            # TODO: Fade volume gradually
            # TODO: Allow user to choose what to do when an advertisement block is detected.
            #       Ideas for possible options:
            #       * reduce or mute volume
            #       * play random audio file from a local directory
            #       * switch to another radio station
            #       * repeat part of last song
            print("Title changed to %s" % title)

            # If the title contains a blacklisted tag, reduce volume
            if BlacklistStorage.is_blacklisted(title):
                if not self._in_ad_block:
                    print('Advertisement tag detected.')
                    if config.block_mode in (config.BlockMode.REDUCE_VOLUME, config.BlockMode.REDUCE_AND_SWITCH):
                        print('Reducing volume.')
                        self.volume = config.ad_block_volume
                        self._in_ad_block = True
                        self._last_ad_time = time.time()
                    elif config.block_mode == config.BlockMode.SWITCH_STATION:
                        self.switch_to_another_station()
            else:
                if self._in_ad_block:
                    print('Restoring volume to maximum.')
                    if config.block_mode in (config.BlockMode.REDUCE_VOLUME, config.BlockMode.REDUCE_AND_SWITCH):
                        self.volume = config.max_volume
                    self._in_ad_block = False
                    self._last_ad_time = None
                    self._just_switched = False

            dispatchers.player.song_changed(title)
项目:adblockradio    作者:quasoft    | 项目源码 | 文件源码
def fire_state_change(self):
        assert threading.current_thread() == threading.main_thread()
        dispatchers.player.playing_state_changed(self.is_playing)
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def __init__(self):
        # https://msdn.microsoft.com/en-us/library/windows/desktop/aa363862(v=vs.85).aspx
        self._closed = True
        self._iocp = _check(
            kernel32.
            CreateIoCompletionPort(INVALID_HANDLE_VALUE, ffi.NULL, 0, 0)
        )
        self._closed = False
        self._iocp_queue = deque()
        self._iocp_thread = None
        self._overlapped_waiters = {}
        self._completion_key_queues = {}
        # Completion key 0 is reserved for regular IO events
        self._completion_key_counter = itertools.count(1)

        # {stdlib socket object: task}
        # except that wakeup socket is mapped to None
        self._socket_waiters = {"read": {}, "write": {}}
        self._main_thread_waker = WakeupSocketpair()
        wakeup_sock = self._main_thread_waker.wakeup_sock
        self._socket_waiters["read"][wakeup_sock] = None

        # This is necessary to allow control-C to interrupt select().
        # https://github.com/python-trio/trio/issues/42
        if threading.current_thread() == threading.main_thread():
            fileno = self._main_thread_waker.write_sock.fileno()
            self._old_signal_wakeup_fd = signal.set_wakeup_fd(fileno)
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def close(self):
        if not self._closed:
            self._closed = True
            _check(kernel32.CloseHandle(self._iocp))
            if self._iocp_thread is not None:
                self._iocp_thread.join()
            self._main_thread_waker.close()
            if threading.current_thread() == threading.main_thread():
                signal.set_wakeup_fd(self._old_signal_wakeup_fd)
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def main_native_thread():
        return __threading__.main_thread() # pylint:disable=no-member
项目:ribosome    作者:tek    | 项目源码 | 文件源码
def not_on_main_thread() -> bool:
    return threading.current_thread() != threading.main_thread()
项目:OdooQuant    作者:haogefeifei    | 项目源码 | 文件源码
def remove_heart_log(*args, **kwargs):
    if six.PY2:
        if threading.current_thread().name == 'MainThread':
            debug_log(*args, **kwargs)
    else:
        if threading.current_thread() == threading.main_thread():
            debug_log(*args, **kwargs)
项目:py_daemoniker    作者:Muterra    | 项目源码 | 文件源码
def _sketch_raise_in_main(exc):
    ''' Sketchy way to raise an exception in the main thread.
    '''
    if isinstance(exc, BaseException):
        exc = type(exc)
    elif issubclass(exc, BaseException):
        pass
    else:
        raise TypeError('Must raise an exception.')

    # Figure out the id of the main thread
    main_id = threading.main_thread().ident
    thread_ref = ctypes.c_long(main_id)
    exc = ctypes.py_object(exc)

    result = ctypes.pythonapi.PyThreadState_SetAsyncExc(
        thread_ref,
        exc
    )

    # 0 Is failed.
    if result == 0:
        raise SystemError('Main thread had invalid ID?')
    # 1 succeeded
    # > 1 failed
    elif result > 1:
        ctypes.pythonapi.PyThreadState_SetAsyncExc(main_id, 0)
        raise SystemError('Failed to raise in main thread.')
项目:py_daemoniker    作者:Muterra    | 项目源码 | 文件源码
def _watch_for_exit(self):
        ''' Automatically watches for termination of the main thread and
        then closes self gracefully.
        '''
        main = threading.main_thread()
        main.join()
        self._stop_nowait()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_main_thread(self):
        main = threading.main_thread()
        self.assertEqual(main.name, 'MainThread')
        self.assertEqual(main.ident, threading.current_thread().ident)
        self.assertEqual(main.ident, threading.get_ident())

        def f():
            self.assertNotEqual(threading.main_thread().ident,
                                threading.current_thread().ident)
        th = threading.Thread(target=f)
        th.start()
        th.join()
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def on_main_thread():
    """Checks if we are on the main thread or not."""
    return threading.current_thread() is threading.main_thread()
项目:gitsome    作者:donnemartin    | 项目源码 | 文件源码
def _on_main_thread():
    """Checks if we are on the main thread or not. Duplicated from xonsh.tools 
    here so that this module only relies on the Python standrd library.
    """
    return threading.current_thread() is threading.main_thread()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def main_native_thread():
        return __threading__.main_thread() # pylint:disable=no-member
项目:easypy    作者:weka-io    | 项目源码 | 文件源码
def _patch_module_locks():
    # gevent will not patch existing locks (including ModuleLocks) when it's not single threaded
    # our solution is to monkey patch the release method for ModuleLocks objects
    # we assume that patching is done early enough so no other locks are present

    import importlib
    _old_release = importlib._bootstrap._ModuleLock.release

    def _release(*args, **kw):
        lock = args[0]
        if lock.owner == main_thread_ident_before_patching:
            lock.owner = threading.main_thread().ident
        _old_release(*args, **kw)

    importlib._bootstrap._ModuleLock.release = _release
项目:easypy    作者:weka-io    | 项目源码 | 文件源码
def _rimt(exc):
        _logger.info('YELLOW<<killing main thread greenlet>>')
        main_thread_greenlet = threading.main_thread()._greenlet
        orig_throw = main_thread_greenlet.throw

        # we must override "throw" method so exception will be raised with the original traceback
        def throw(*args):
            if len(args) == 1:
                ex = args[0]
                return orig_throw(ex.__class__, ex, ex.__traceback__)
            return orig_throw(*args)
        main_thread_greenlet.throw = throw
        gevent.kill(main_thread_greenlet, exc)
        _logger.debug('exiting the thread that failed')
        raise exc
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_main_thread(self):
        main = threading.main_thread()
        self.assertEqual(main.name, 'MainThread')
        self.assertEqual(main.ident, threading.current_thread().ident)
        self.assertEqual(main.ident, threading.get_ident())

        def f():
            self.assertNotEqual(threading.main_thread().ident,
                                threading.current_thread().ident)
        th = threading.Thread(target=f)
        th.start()
        th.join()
项目:Concurrency-With-Python    作者:elliotforbes    | 项目源码 | 文件源码
def myChildThread():
  print("Child Thread Starting")
  time.sleep(5)
  print("Current Thread ----------")
  print(threading.current_thread())
  print("-------------------------")
  print("Main Thread -------------")
  print(threading.main_thread())
  print("-------------------------")
  print("Child Thread Ending")
项目:Concurrency-With-Python    作者:elliotforbes    | 项目源码 | 文件源码
def myChildThread():
  print("Child Thread Starting")
  time.sleep(5)
  print("Current Thread ----------")
  print(threading.current_thread())
  print("-------------------------")
  print("Main Thread -------------")
  print(threading.main_thread())
  print("-------------------------")
  print("Child Thread Ending")
项目:asyncio_extras    作者:agronholm    | 项目源码 | 文件源码
def test_call_in_executor(executor):
    """Test that call_in_thread actually runs the target in a worker thread."""
    assert not await call_in_executor(lambda: current_thread() is main_thread(),
                                      executor=executor)
项目:tm-librarian    作者:FabricAttachedMemory    | 项目源码 | 文件源码
def handleOOB(self):
        # ALTERNATIVE: Put the message on a Queue for the main thread.
        assert threading.current_thread() is threading.main_thread()
        for oob in self.torms.inOOB:
            self.logger.warning('\t\t!!!!!!!!!!!!!!!!!!!!!!!! %s' % oob)
        self.torms.clearOOB()
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def _run(name, queue, options):
    """
    The actual process that runs the separate controller instance.

    :param name: name of the process
    :param queue: Queue of the binding parent.
    :param options: Custom Options
    :type name: str
    """
    from pyplanet.core.instance import Controller
    from pyplanet.utils.log import initiate_logger, QueueHandler
    import logging

    # Tokio Asyncio (EXPERIMENTAL).
    if 'tokio' in options and options['tokio'] is True:
        import tokio
        import asyncio
        policy = tokio.TokioLoopPolicy()
        asyncio.set_event_loop_policy(policy)
        asyncio.set_event_loop(tokio.new_event_loop())
        logging.warning('Using experimental Tokio Asyncio Loop!')

    # Logging to queue.
    if multiprocessing.get_start_method() != 'fork':  # pragma: no cover
        initiate_logger()
        root_logger = logging.getLogger()
        formatter = ColoredFormatter(
            '%(log_color)s%(levelname)-8s%(reset)s %(yellow)s[%(threadName)s][%(name)s]%(reset)s %(blue)s%(message)s'
        )
        queue_handler = QueueHandler(queue)
        queue_handler.setFormatter(formatter)
        root_logger.addHandler(queue_handler)

    logging.getLogger(__name__).info('Starting pool process for \'{}\'...'.format(name))

    # Setting thread name to our process name.
    threading.main_thread().setName(name)

    # Start instance.
    instance = Controller.prepare(name).instance
    instance._queue = queue
    instance.start()
项目:zmirror    作者:aploium    | 项目源码 | 文件源码
def cron_task_container(task_dict, add_task_only=False):
    """
    ??????. ??????, ??????????????

    :param task_dict: ?????????, dict
      { "target":????(????????,????????) ??,
        "iterval":????(?) ??,
        "priority":??? ??,
        "name":?????? ??
        "args":????? (arg1,arg2) ??,
        "kwargs":????? {key:value,} ??,
      }
    :param add_task_only: ?????????????
    """
    global task_scheduler
    if not add_task_only:
        # ????
        try:
            infoprint('CronTask:', task_dict.get('name', str(task_dict['target'])), 'Target:', str(task_dict['target']))

            target_func = task_dict.get('target')
            if target_func is None:
                raise ValueError("target is not given in " + str(task_dict))
            target_func(
                *(task_dict.get('args', ())),  # ????????
                **(task_dict.get('kwargs', {}))
            )
        except:  # coverage: exclude
            errprint('ErrorWhenProcessingCronTasks', task_dict)
            traceback.print_exc()

    # ????????, ??????
    if not enable_cron_tasks:
        if threading.current_thread() != threading.main_thread():
            exit()
        else:
            return

    # ?????????
    task_scheduler.enter(
        task_dict.get('interval', 300),
        task_dict.get('priority', 999),
        cron_task_container,
        (task_dict,)
    )
项目:trio    作者:python-trio    | 项目源码 | 文件源码
def catch_signals(signals):
    """A context manager for catching signals.

    Entering this context manager starts listening for the given signals and
    returns an async iterator; exiting the context manager stops listening.

    The async iterator blocks until at least one signal has arrived, and then
    yields a :class:`set` containing all of the signals that were received
    since the last iteration.

    Note that if you leave the ``with`` block while the iterator has
    unextracted signals still pending inside it, then they will be
    re-delivered using Python's regular signal handling logic. This avoids a
    race condition when signals arrives just before we exit the ``with``
    block.

    Args:
      signals: a set of signals to listen for.

    Raises:
      RuntimeError: if you try to use this anywhere except Python's main
          thread. (This is a Python limitation.)

    Example:

      A common convention for Unix daemons is that they should reload their
      configuration when they receive a ``SIGHUP``. Here's a sketch of what
      that might look like using :func:`catch_signals`::

         with trio.catch_signals({signal.SIGHUP}) as batched_signal_aiter:
             async for batch in batched_signal_aiter:
                 # We're only listening for one signal, so the batch is always
                 # {signal.SIGHUP}, but if we were listening to more signals
                 # then it could vary.
                 for signum in batch:
                     assert signum == signal.SIGHUP
                     reload_configuration()

    """
    if threading.current_thread() != threading.main_thread():
        raise RuntimeError(
            "Sorry, catch_signals is only possible when running in the "
            "Python interpreter's main thread"
        )
    token = _core.current_trio_token()
    queue = SignalQueue()

    def handler(signum, _):
        token.run_sync_soon(queue._add, signum, idempotent=True)

    try:
        with _signal_handler(signals, handler):
            yield queue
    finally:
        queue._redeliver_remaining()
项目:bibcat    作者:KnowledgeLinks    | 项目源码 | 文件源码
def _index_subjects(self):
        """ quereies the triplestore for all subject uri"""

        lg = logging.getLogger("%s.%s" % (self.ln, inspect.stack()[0][3]))
        lg.setLevel(self.log_level)

        # if the subjects have been indexed and there are no new subjects exit
        if self.data_status.get("indexed") and not self.new_subjects:
            return
        # get a list of all the loc_subject URIs
        sparql = """
            SELECT ?s 
            {
                ?s skos:inScheme <http://id.loc.gov/authorities/subjects> .
            }"""
        results = run_sparql_query(sparql=sparql)
        # Start processing through
        self.time_start = datetime.datetime.now()
        batch_size = 12000
        if len(results) > batch_size:
            batch_end = batch_size
        else:
            batch_end = len(results) - 1
        batch_start = 0
        batch_num = 1
        self.batch_data = {}
        self.batch_data[batch_num] = []
        end = False
        last = False
        while not end:
            lg.debug("batch %s: %s-%s", batch_num, batch_start, batch_end)
            for i, subj in enumerate(results[batch_start:batch_end]):
                th = threading.Thread(name=batch_start + i + 1,
                                      target=self._index_subject_item,
                                      args=(iri(subj['s']['value']),
                                            i+1,batch_num,))
                th.start()
                #self._index_subject_item(iri(subj['s']['value']),i+1)
            print(datetime.datetime.now() - self.time_start)
            main_thread = threading.main_thread()
            for t in threading.enumerate():
                if t is main_thread:
                    continue
                #print('joining %s', t.getName())
                t.join()
            action_list = \
                    self.es_worker.make_action_list(self.batch_data[batch_num])
            self.es_worker.bulk_save(action_list)
            del self.batch_data[batch_num]
            batch_end += batch_size
            batch_start += batch_size
            if last:
                end = True
            if len(results) <= batch_end:
                batch_end = len(results)
                last = True
            batch_num += 1
            self.batch_data[batch_num] = []
            print(datetime.datetime.now() - self.time_start)
项目:easypy    作者:weka-io    | 项目源码 | 文件源码
def get_thread_tree(including_this=True):
    from .logging import THREAD_LOGGING_CONTEXT
    from .bunch import Bunch

    tree = {}
    dead_threads = set()
    contexts = {}
    stacks = {}

    def add_to_tree(thread):
        contexts[thread.ident] = THREAD_LOGGING_CONTEXT.flatten(thread.uuid)
        parent = get_thread_parent(thread)
        if isinstance(parent, DeadThread) and parent not in dead_threads:
            dead_threads.add(parent)
            add_to_tree(parent)
        tree.setdefault(parent, []).append(thread)

    for thread in threading.enumerate():
        add_to_tree(thread)

    current_ident = threading.current_thread().ident
    main_ident = threading.main_thread().ident

    for thread_ident, frame in iter_thread_frames():
        if not including_this and thread_ident == current_ident:
            formatted = "  <this frame>"
        else:
            # show the entire stack if it's this thread, don't skip ('after_module') anything
            show_all = thread_ident in (current_ident, main_ident)
            formatted = format_thread_stack(frame, skip_modules=[] if show_all else _BOOTSTRAPPERS) if frame else ''
        stacks[thread_ident] = formatted, time.time()

    def add_thread(parent_thread, parent):
        for thread in sorted(tree[parent_thread], key=lambda thread: thread.name):
            ident = thread.ident or 0
            stack, ts = stacks.get(ident, ("", 0))
            context = contexts.get(ident, {})
            context_line = ", ".join("%s: %s" % (k, context[k]) for k in "host context".split() if context.get(k))

            this = Bunch(
                name=thread.name,
                daemon="[D]" if getattr(thread, "daemon", False) else "",
                ident=ident,
                context_line="({})".format(context_line) if context_line else "",
                stack=stack,
                timestamp=ts,
                children=[],
                )
            parent.children.append(this)
            if thread in tree:
                add_thread(thread, this)
        return parent

    return add_thread(None, Bunch(children=[]))