Python sys 模块,_current_frames() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sys._current_frames()

项目:satori-rtm-sdk-python    作者:satori-com    | 项目源码 | 文件源码
def print_all_stacktraces():
    print("\n*** STACKTRACE - START ***\n")
    code = []
    for threadId, stack in sys._current_frames().items():
        threadName = ''
        for t in threading.enumerate():
            if t.ident == threadId:
                threadName = t.name
        code.append("\n# ThreadID: %s %s" % (threadId, threadName))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                                                        lineno, name))
            if line:
                code.append("  %s" % (line.strip()))

    for line in code:
        print(line)
    print("\n*** STACKTRACE - END ***\n")
项目:stackimpact-python    作者:stackimpact    | 项目源码 | 文件源码
def process_sample(self, signal_frame, sample_time, main_thread_id):
        if self.profile:
            start = time.clock()

            current_frames = sys._current_frames()
            items = current_frames.items()
            for thread_id, thread_frame in items:
                if thread_id == main_thread_id:
                    thread_frame = signal_frame

                stack = self.recover_stack(thread_frame)
                if stack:
                    current_node = self.profile
                    for frame in reversed(stack):
                        current_node = current_node.find_or_add_child(str(frame))
                    current_node.increment(sample_time, 1)

                thread_id, thread_frame, stack = None, None, None

            items = None
            current_frames = None

            self.profile._overhead += (time.clock() - start)
项目:SWEETer-Cat    作者:DanielAndreasen    | 项目源码 | 文件源码
def worker_int(worker):
    worker.log.info("worker received INT or QUIT signal")

    ## get traceback info
    import threading, sys, traceback
    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
            threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    worker.log.debug("\n".join(code))
项目:nanobox-adapter-libcloud    作者:nanobox-io    | 项目源码 | 文件源码
def worker_int(worker):
    worker.log.info("worker received INT or QUIT signal")

    ## get traceback info
    import threading, sys, traceback
    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
            threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    worker.log.debug("\n".join(code))
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def sigquit_handler(sig, frame):
    """Helps debug deadlocks by printing stacktraces when this gets a SIGQUIT
    e.g. kill -s QUIT <PID> or CTRL+\
    """
    print("Dumping stack traces for all threads in PID {}".format(os.getpid()))
    id_to_name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for thread_id, stack in sys._current_frames().items():
        code.append("\n# Thread: {}({})"
                    .format(id_to_name.get(thread_id, ""), thread_id))
        for filename, line_number, name, line in traceback.extract_stack(stack):
            code.append('File: "{}", line {}, in {}'
                        .format(filename, line_number, name))
            if line:
                code.append("  {}".format(line.strip()))
    print("\n".join(code))
项目:QTAF    作者:Tencent    | 项目源码 | 文件源码
def _get_current_traceback(self, thread ):
        '''????????????

        :param thread: ????????
        :type thread: Thread
        '''
        for thread_id, stack in sys._current_frames().items():
            if thread_id != thread.ident:
                continue
            tb = "Traceback ( thread-%d possibly hold at ):\n" % thread_id
            for filename, lineno, name, line in traceback.extract_stack(stack):
                tb += '  File: "%s", line %d, in %s\n' % (filename, lineno, name)
                if line:
                    tb += "    %s\n" % (line.strip())
            return tb
        else:
            raise RuntimeError("thread not found")
项目:QTAF    作者:Tencent    | 项目源码 | 文件源码
def get_thread_traceback(thread):
    '''????????????

    :param thread: ????????
    :type thread: Thread
    '''
    for thread_id, stack in sys._current_frames().items():
        if thread_id != thread.ident:
            continue
        tb = "Traceback ( thread-%d possibly hold at ):\n" % thread_id
        for filename, lineno, name, line in traceback.extract_stack(stack):
            tb += '  File: "%s", line %d, in %s\n' % (filename, lineno, name)
            if line:
                tb += "    %s\n" % (line.strip())
        return tb
    else:
        raise RuntimeError("thread not found")
项目:ikpdb    作者:audaxis    | 项目源码 | 文件源码
def enable_tracing(self):
        """ Enable tracing if it is disabled and debugged program is running, 
        else do nothing.
        Do this on all threads but the debugger thread.
        :return: True if tracing has been enabled, False else.
        """
        _logger.x_debug("enable_tracing()")
        #self.dump_tracing_state("before enable_tracing()")
        if not self.tracing_enabled and self.execution_started:
            # Restore or set trace function on all existing frames appart from 
            # debugger
            threading.settrace(self._tracer)  # then enable on all threads to come
            for thr in threading.enumerate():
                if thr.ident != self.debugger_thread_ident:  # skip debugger thread
                    a_frame = sys._current_frames()[thr.ident]
                    while a_frame:
                        a_frame.f_trace = self._tracer
                        a_frame = a_frame.f_back
            iksettrace._set_trace_on(self._tracer, self.debugger_thread_ident)
            self.tracing_enabled = True

        #self.dump_tracing_state("after enable_tracing()")
        return self.tracing_enabled
项目:TripMeal    作者:DanielAndreasen    | 项目源码 | 文件源码
def worker_int(worker):
    worker.log.info("worker received INT or QUIT signal")

    ## get traceback info
    import threading, sys, traceback
    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
            threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    worker.log.debug("\n".join(code))
项目:data-store    作者:HumanCellAtlas    | 项目源码 | 文件源码
def timeout_response() -> chalice.Response:
    """
    Produce a chalice Response object that indicates a timeout.  Stacktraces for all running threads, other than the
    current thread, are provided in the response object.
    """
    frames = sys._current_frames()
    current_threadid = threading.get_ident()
    trace_dump = {
        thread_id: traceback.format_stack(frame)
        for thread_id, frame in frames.items()
        if thread_id != current_threadid}

    problem = {
        'status': requests.codes.gateway_timeout,
        'code': "timed_out",
        'title': "Timed out processing request.",
        'traces': trace_dump,
    }
    return chalice.Response(
        status_code=problem['status'],
        headers={"Content-Type": "application/problem+json"},
        body=json.dumps(problem),
    )
项目:py-ipv8    作者:qstokkink    | 项目源码 | 文件源码
def setUpClass(cls):
        cls.__lockup_timestamp__ = time.time()
        def check_twisted():
            while time.time() - cls.__lockup_timestamp__ < cls.MAX_TEST_TIME:
                time.sleep(2)
                # If the test class completed normally, exit
                if not cls.__testing__:
                    return
            # If we made it here, there is a serious issue which we cannot recover from.
            # Most likely the Twisted threadpool got into a deadlock while shutting down.
            import os, traceback
            print >> sys.stderr, "The test-suite locked up! Force quitting! Thread dump:"
            for tid, stack in sys._current_frames().items():
                if tid != threading.currentThread().ident:
                    print >> sys.stderr, "THREAD#%d" % tid
                    for line in traceback.format_list(traceback.extract_stack(stack)):
                        print >> sys.stderr, "|", line[:-1].replace('\n', '\n|   ')
            os._exit(1)
        t = threading.Thread(target=check_twisted)
        t.daemon = True
        t.start()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_clear_threads_states_after_fork(self):
        # Issue #17094: check that threads states are cleared after fork()

        # start a bunch of threads
        threads = []
        for i in range(16):
            t = threading.Thread(target=lambda : time.sleep(0.3))
            threads.append(t)
            t.start()

        pid = os.fork()
        if pid == 0:
            # check that threads states have been cleared
            if len(sys._current_frames()) == 1:
                os._exit(0)
            else:
                os._exit(1)
        else:
            _, status = os.waitpid(pid, 0)
            self.assertEqual(0, status)

        for t in threads:
            t.join()
项目:airflow    作者:apache-airflow    | 项目源码 | 文件源码
def sigquit_handler(sig, frame):
    """Helps debug deadlocks by printing stacktraces when this gets a SIGQUIT
    e.g. kill -s QUIT <PID> or CTRL+\
    """
    print("Dumping stack traces for all threads in PID {}".format(os.getpid()))
    id_to_name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for thread_id, stack in sys._current_frames().items():
        code.append("\n# Thread: {}({})"
                    .format(id_to_name.get(thread_id, ""), thread_id))
        for filename, line_number, name, line in traceback.extract_stack(stack):
            code.append('File: "{}", line {}, in {}'
                        .format(filename, line_number, name))
            if line:
                code.append("  {}".format(line.strip()))
    print("\n".join(code))
项目:pypers    作者:frankosan    | 项目源码 | 文件源码
def worker_int(worker):
    worker.log.info("worker received INT or QUIT signal")

    ## get traceback info
    import threading, sys, traceback
    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
            threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    worker.log.debug("\n".join(code))
项目:easypy    作者:weka-io    | 项目源码 | 文件源码
def iter_thread_frames():
        main_thread_frame = None
        for ident, frame in sys._current_frames().items():
            if IDENT_TO_UUID.get(ident) == MAIN_UUID:
                main_thread_frame = frame
                # the MainThread should be shown in it's "greenlet" version
                continue
            yield ident, frame

        for thread in threading.enumerate():
            if not getattr(thread, '_greenlet', None):
                # some inbetween state, before greenlet started or after it died?...
                pass
            elif thread._greenlet.gr_frame:
                yield thread.ident, thread._greenlet.gr_frame
            else:
                # a thread with greenlet but without gr_frame will be fetched from sys._current_frames
                # If we switch to another greenlet by the time we get there we will get inconsistent dup of threads.
                # TODO - make best-effort attempt to show coherent thread dump
                yield thread.ident, main_thread_frame
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_clear_threads_states_after_fork(self):
        # Issue #17094: check that threads states are cleared after fork()

        # start a bunch of threads
        threads = []
        for i in range(16):
            t = threading.Thread(target=lambda : time.sleep(0.3))
            threads.append(t)
            t.start()

        pid = os.fork()
        if pid == 0:
            # check that threads states have been cleared
            if len(sys._current_frames()) == 1:
                os._exit(0)
            else:
                os._exit(1)
        else:
            _, status = os.waitpid(pid, 0)
            self.assertEqual(0, status)

        for t in threads:
            t.join()
项目:w4py    作者:Cito    | 项目源码 | 文件源码
def threadDump(signum, frame):
    """Signal handler for dumping thread stack frames to stdout."""
    print
    print "App server has been signaled to attempt a thread dump."
    print
    print "Thread stack frame dump at", asclocaltime()
    sys.stdout.flush()
    frames = sys._current_frames()
    print
    print "-" * 79
    print
    for threadID in sorted(frames):
        frame = frames[threadID]
        print "Thread ID: %d (reference count = %d)" % (
            threadID, sys.getrefcount(frame))
        print ''.join(traceback.format_list(traceback.extract_stack(frame)))
    print "-" * 79
    sys.stdout.flush()
项目:maas    作者:maas    | 项目源码 | 文件源码
def get_full_thread_dump():
    """Returns a string containing a traceback for all threads"""
    output = io.StringIO()
    time = strftime("%Y-%m-%d %H:%M:%S", gmtime())
    thread_names = {}
    for thread in threading.enumerate():
        thread_names[thread.ident] = thread.name
    output.write("\n>>>> Begin stack trace (%s) >>>>\n" % time)
    for threadId, stack in current_frames().items():
        output.write(
            "\n# ThreadID: %s (%s)\n" %
            (threadId, thread_names.get(threadId, "unknown")))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            output.write(
                'File: "%s", line %d, in %s\n' %
                (filename, lineno, name))
            if line:
                output.write("  %s\n" % (line.strip()))
    output.write("\n<<<< End stack trace <<<<\n\n")

    thread_dump = output.getvalue()
    output.close()
    return thread_dump
项目:maas    作者:maas    | 项目源码 | 文件源码
def _captureThreadStacks(self):
        """Capture the stacks for all currently running threads.

        :return: A list of ``(thread-description, stack)`` tuples. See
            `traceback.format_stack` for the format of ``stack``.
        """
        threads = {t.ident: t for t in threading.enumerate()}

        def describe(ident):
            if ident in threads:
                return repr(threads[ident])
            else:
                return "<*Unknown* %d>" % ident

        return [
            (describe(ident), traceback.format_stack(frame))
            for ident, frame in sys._current_frames().items()
        ]
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def _debug_lock_release(self):
                errbuf = ""
                owner = self._RLock__owner
                if not owner:
                        return errbuf

                # Get stack of current owner, if lock is owned.
                for tid, stack in sys._current_frames().items():
                        if tid != owner.ident:
                                continue
                        errbuf += "Stack of owner:\n"
                        for filenm, lno, func, txt in \
                            traceback.extract_stack(stack):
                                errbuf += "  File: \"{0}\", line {1:d},in {2}".format(
                                    filenm, lno, func)
                                if txt:
                                        errbuf += "\n    {0}".format(txt.strip())
                                errbuf += "\n"
                        break

                return errbuf
项目:funcserver    作者:deep-compute    | 项目源码 | 文件源码
def dump_stacks(self):
        '''
        Dumps the stack of all threads. This function
        is meant for debugging. Useful when a deadlock happens.

        borrowed from: http://blog.ziade.org/2012/05/25/zmq-and-gevent-debugging-nightmares/
        '''

        dump = []

        # threads
        threads = dict([(th.ident, th.name)
                            for th in threading.enumerate()])

        for thread, frame in sys._current_frames().items():
            if thread not in threads: continue
            dump.append('Thread 0x%x (%s)\n' % (thread, threads[thread]))
            dump.append(''.join(traceback.format_stack(frame)))
            dump.append('\n')

        return ''.join(dump)
项目:topotests    作者:FRRouting    | 项目源码 | 文件源码
def checkAddressSanitizerError(output, router, component):
    "Checks for AddressSanitizer in output. If found, then logs it and returns true, false otherwise"

    addressSantizerError = re.search('(==[0-9]+==)ERROR: AddressSanitizer: ([^\s]*) ', output)
    if addressSantizerError:
        sys.stderr.write("%s: %s triggered an exception by AddressSanitizer\n" % (router, component))
        # Sanitizer Error found in log
        pidMark = addressSantizerError.group(1)
        addressSantizerLog = re.search('%s(.*)%s' % (pidMark, pidMark), output, re.DOTALL)
        if addressSantizerLog:
            callingTest = os.path.basename(sys._current_frames().values()[0].f_back.f_back.f_globals['__file__'])
            callingProc = sys._getframe(2).f_code.co_name
            with open("/tmp/AddressSanitzer.txt", "a") as addrSanFile:
                sys.stderr.write('\n'.join(addressSantizerLog.group(1).splitlines()) + '\n')
                addrSanFile.write("## Error: %s\n\n" % addressSantizerError.group(2))
                addrSanFile.write("### AddressSanitizer error in topotest `%s`, test `%s`, router `%s`\n\n" % (callingTest, callingProc, router))
                addrSanFile.write('    '+ '\n    '.join(addressSantizerLog.group(1).splitlines()) + '\n')
                addrSanFile.write("\n---------------\n")
        return True
    return False
项目:tornadopy    作者:xubigshu    | 项目源码 | 文件源码
def worker_int(worker):
    worker.log.info("worker received INT or QUIT signal")

    ## get traceback info
    import threading, sys, traceback

    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId, ""),
                                            threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                                                        lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    worker.log.debug("\n".join(code))
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def run(self):
        while True:
            with self.lock:
                if self._stop is True:
                    return

            print("\n=============  THREAD FRAMES:  ================")
            for id, frame in sys._current_frames().items():
                if id == threading.current_thread().ident:
                    continue
                print("<< thread %d >>" % id)
                traceback.print_stack(frame)
            print("===============================================\n")

            time.sleep(self.interval)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def run(self):
        while True:
            with self.lock:
                if self._stop is True:
                    return

            print("\n=============  THREAD FRAMES:  ================")
            for id, frame in sys._current_frames().items():
                if id == threading.current_thread().ident:
                    continue
                print("<< thread %d >>" % id)
                traceback.print_stack(frame)
            print("===============================================\n")

            time.sleep(self.interval)
项目:iotronic    作者:openstack    | 项目源码 | 文件源码
def _print_nativethreads():
    for threadId, stack in sys._current_frames().items():
        print(threadId)
        traceback.print_stack(stack)
        print()
项目:storperf    作者:opnfv    | 项目源码 | 文件源码
def delete(self):
        self.logger.info("Threads: %s" % sys._current_frames())
        print sys._current_frames()
        try:
            return jsonify({'Slaves': storperf.terminate_workloads()})
        except Exception as e:
            abort(400, str(e))
项目:site    作者:alphageek-xyz    | 项目源码 | 文件源码
def worker_int(worker):
    worker.log.info("worker received INT or QUIT signal")
    import threading, sys, traceback
    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
            threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    worker.log.debug("\n".join(code))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_current_frames(self):
        have_threads = True
        try:
            import _thread
        except ImportError:
            have_threads = False

        if have_threads:
            self.current_frames_with_threads()
        else:
            self.current_frames_without_threads()

    # Test sys._current_frames() in a WITH_THREADS build.
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def current_frames_without_threads(self):
        # Not much happens here:  there is only one thread, with artificial
        # "thread id" 0.
        d = sys._current_frames()
        self.assertEqual(len(d), 1)
        self.assertIn(0, d)
        self.assertTrue(d[0] is sys._getframe())
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_getframe(self):
        self.assertRaises(TypeError, sys._getframe, 42, 42)
        self.assertRaises(ValueError, sys._getframe, 2000000000)
        self.assertTrue(
            SysModuleTest.test_getframe.im_func.func_code \
            is sys._getframe().f_code
        )

    # sys._current_frames() is a CPython-only gimmick.
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_current_frames(self):
        have_threads = True
        try:
            import thread
        except ImportError:
            have_threads = False

        if have_threads:
            self.current_frames_with_threads()
        else:
            self.current_frames_without_threads()

    # Test sys._current_frames() in a WITH_THREADS build.
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def current_frames_without_threads(self):
        # Not much happens here:  there is only one thread, with artificial
        # "thread id" 0.
        d = sys._current_frames()
        self.assertEqual(len(d), 1)
        self.assertIn(0, d)
        self.assertTrue(d[0] is sys._getframe())
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_getframe(self):
        self.assertRaises(TypeError, sys._getframe, 42, 42)
        self.assertRaises(ValueError, sys._getframe, 2000000000)
        self.assertTrue(
            SysModuleTest.test_getframe.im_func.func_code \
            is sys._getframe().f_code
        )

    # sys._current_frames() is a CPython-only gimmick.
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_current_frames(self):
        have_threads = True
        try:
            import thread
        except ImportError:
            have_threads = False

        if have_threads:
            self.current_frames_with_threads()
        else:
            self.current_frames_without_threads()

    # Test sys._current_frames() in a WITH_THREADS build.
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def current_frames_without_threads(self):
        # Not much happens here:  there is only one thread, with artificial
        # "thread id" 0.
        d = sys._current_frames()
        self.assertEqual(len(d), 1)
        self.assertIn(0, d)
        self.assertTrue(d[0] is sys._getframe())
项目:pyDriveWire    作者:n6il    | 项目源码 | 文件源码
def dumpstacks(self, data):
            import threading, sys, traceback
        id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
        code = []
        for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""), threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
            if line:
            code.append("  %s" % (line.strip()))
        return "\r\n".join(code)
项目:specto    作者:mrknow    | 项目源码 | 文件源码
def iter_frames(self, t):
        #sys._current_frames(): dictionary with thread id -> topmost frame
        current_frames = sys._current_frames()
        v = current_frames.get(t.ident)
        if v is not None:
            return [v]
        return []

    # IFDEF CYTHON
    # def create_db_frame(self, *args, **kwargs):
    #     raise AssertionError('This method should not be called on cython (PyDbFrame should be used directly).')
    # ELSE
    # just create the db frame directly
项目:specto    作者:mrknow    | 项目源码 | 文件源码
def create_db_frame(self, args):
        #the frame must be cached as a weak-ref (we return the actual db frame -- which will be kept
        #alive until its trace_dispatch method is not referenced anymore).
        #that's a large workaround because:
        #1. we can't have weak-references to python frame object
        #2. only from 2.5 onwards we have _current_frames support from the interpreter
        db_frame = PyDBFrame(args)
        db_frame.frame = args[-1]
        self._AddDbFrame(db_frame)
        return db_frame
项目:specto    作者:mrknow    | 项目源码 | 文件源码
def __str__(self):
        return 'State:%s Stop:%s Cmd: %s Kill:%s Frames:%s' % (
            self.pydev_state, self.pydev_step_stop, self.pydev_step_cmd, self.pydev_notify_kill, len(self.iter_frames(None)))

#=======================================================================================================================
# NOW, WE HAVE TO DEFINE WHICH THREAD INFO TO USE
# (whether we have to keep references to the frames or not)
# from version 2.5 onwards, we can use sys._current_frames to get a dict with the threads
# and frames, but to support other versions, we can't rely on that.
#=======================================================================================================================
项目:specto    作者:mrknow    | 项目源码 | 文件源码
def run(self):
                time.sleep(10)

                thread_id_to_name = {}
                try:
                    for t in threading.enumerate():
                        thread_id_to_name[t.ident] = '%s  (daemon: %s)' % (t.name, t.daemon)
                except:
                    pass

                stack_trace = [
                    '===============================================================================',
                    'pydev pyunit runner: Threads still found running after tests finished',
                    '================================= Thread Dump =================================']

                for thread_id, stack in sys._current_frames().items():
                    stack_trace.append('\n-------------------------------------------------------------------------------')
                    stack_trace.append(" Thread %s" % thread_id_to_name.get(thread_id, thread_id))
                    stack_trace.append('')

                    if 'self' in stack.f_locals:
                        sys.stderr.write(str(stack.f_locals['self']) + '\n')

                    for filename, lineno, name, line in traceback.extract_stack(stack):
                        stack_trace.append(' File "%s", line %d, in %s' % (filename, lineno, name))
                        if line:
                            stack_trace.append("   %s" % (line.strip()))
                stack_trace.append('\n=============================== END Thread Dump ===============================')
                sys.stderr.write('\n'.join(stack_trace))
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_getframe(self):
        self.assertRaises(TypeError, sys._getframe, 42, 42)
        self.assertRaises(ValueError, sys._getframe, 2000000000)
        self.assertTrue(
            SysModuleTest.test_getframe.__code__ \
            is sys._getframe().f_code
        )

    # sys._current_frames() is a CPython-only gimmick.
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_current_frames(self):
        have_threads = True
        try:
            import _thread
        except ImportError:
            have_threads = False

        if have_threads:
            self.current_frames_with_threads()
        else:
            self.current_frames_without_threads()

    # Test sys._current_frames() in a WITH_THREADS build.
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def current_frames_without_threads(self):
        # Not much happens here:  there is only one thread, with artificial
        # "thread id" 0.
        d = sys._current_frames()
        self.assertEqual(len(d), 1)
        self.assertIn(0, d)
        self.assertTrue(d[0] is sys._getframe())
项目:eva-didi    作者:eljefec    | 项目源码 | 文件源码
def dumpstacks(self):
        id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
        for threadId, stack in sys._current_frames().items():
            self.logger.debug("# Thread: %s(%d)" % (id2name.get(threadId,""), threadId))
            for filename, lineno, name, line in traceback.extract_stack(stack):
                self.logger.debug('File: "%s", line %d, in %s' % (filename, lineno, name))
                if line:
                    self.logger.debug("  %s" % (line.strip()))
项目:ikpdb    作者:audaxis    | 项目源码 | 文件源码
def dump_tracing_state(self, context):
        """ A debug tool to dump all threads tracing state 
        """
        print "Dumping all threads Tracing state: (%s)" % context
        print "    self.tracing_enabled=%s" % self.tracing_enabled
        print "    self.execution_started=%s" % self.execution_started
        print "    self.frame_beginning=%s" % self.frame_beginning
        print "    self.debugger_thread_ident=%s" % self.debugger_thread_ident
        for thr in threading.enumerate():
            is_current_thread = thr.ident == threading.current_thread().ident
            print "    Thread: %s, %s %s" % (thr.name, thr.ident, "<= Current*" if is_current_thread else '')
            a_frame = sys._current_frames()[thr.ident]
            while a_frame:
                flags = []
                if a_frame == self.frame_beginning:
                    flags.append("beginning")
                if a_frame == inspect.currentframe():
                    flags.append("current")
                if flags:
                    flags_str = "**"+",".join(flags)
                else:
                    flags_str = ""
                print "        => %s, %s:%s(%s) | %s %s" % (a_frame, 
                                                            a_frame.f_code.co_filename, 
                                                            a_frame.f_lineno,
                                                            a_frame.f_code.co_name, 
                                                            a_frame.f_trace,
                                                            flags_str)
                a_frame = a_frame.f_back
项目:AmqpCode    作者:SVADemoAPP    | 项目源码 | 文件源码
def stackdump(sig, frm):
  code = []
  for threadId, stack in sys._current_frames().items():
    code.append("\n# ThreadID: %s" % threadId)
    for filename, lineno, name, line in traceback.extract_stack(stack):
      code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
      if line:
        code.append("  %s" % (line.strip()))
  print "\n".join(code)
项目:qpid-python    作者:apache    | 项目源码 | 文件源码
def stackdump(sig, frm):
  code = []
  for threadId, stack in sys._current_frames().items():
    code.append("\n# ThreadID: %s" % threadId)
    for filename, lineno, name, line in traceback.extract_stack(stack):
      code.append('File: "%s", line %d, in %s' % (filename, lineno, name))
      if line:
        code.append("  %s" % (line.strip()))
  print "\n".join(code)
项目:sadm    作者:prologin    | 项目源码 | 文件源码
def threads_handler():
    frames = sys._current_frames()
    text = ['%d threads found\n\n' % len(frames)]
    for i, frame in frames.items():
        s = 'Thread 0x%x:\n%s\n' % (i, ''.join(traceback.format_stack(frame)))
        text.append(s)
    return { 'Content-Type': 'text/plain' }, ''.join(text)
项目:sadm    作者:prologin    | 项目源码 | 文件源码
def threads_handler():
    frames = sys._current_frames()
    text = ['%d threads found\n\n' % len(frames)]
    for i, frame in frames.items():
        s = 'Thread 0x%x:\n%s\n' % (i, ''.join(traceback.format_stack(frame)))
        text.append(s)
    return { 'Content-Type': 'text/plain' }, ''.join(text)