Python traceback 模块,extract_stack() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用traceback.extract_stack()

项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def error(*k, **kw):
    """
    Wrap logging.errors, display the origin of the message when '-vv' is set
    """
    global log
    log.error(*k, **kw)
    if verbose > 2:
        st = traceback.extract_stack()
        if st:
            st = st[:-1]
            buf = []
            for filename, lineno, name, line in st:
                buf.append('  File "%s", line %d, in %s' % (filename, lineno, name))
                if line:
                    buf.append('    %s' % line.strip())
            if buf: log.error("\n".join(buf))
项目:SWEETer-Cat    作者:DanielAndreasen    | 项目源码 | 文件源码
def worker_int(worker):
    worker.log.info("worker received INT or QUIT signal")

    ## get traceback info
    import threading, sys, traceback
    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
            threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    worker.log.debug("\n".join(code))
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def __init__(self, msg='', ex=None):
        """
        :param msg: error message
        :type msg: string
        :param ex: exception causing this error (optional)
        :type ex: exception
        """
        self.msg = msg
        assert not isinstance(msg, Exception)

        self.stack = []
        if ex:
            if not msg:
                self.msg = str(ex)
            if isinstance(ex, WafError):
                self.stack = ex.stack
            else:
                self.stack = traceback.extract_tb(sys.exc_info()[2])
        self.stack += traceback.extract_stack()[:-1]
        self.verbose_msg = ''.join(traceback.format_list(self.stack))
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def error(*k, **kw):
    """
    Wrap logging.errors, display the origin of the message when '-vv' is set
    """
    global log
    log.error(*k, **kw)
    if verbose > 2:
        st = traceback.extract_stack()
        if st:
            st = st[:-1]
            buf = []
            for filename, lineno, name, line in st:
                buf.append('  File "%s", line %d, in %s' % (filename, lineno, name))
                if line:
                    buf.append('    %s' % line.strip())
            if buf: log.error("\n".join(buf))
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def __init__(self, msg='', ex=None):
        """
        :param msg: error message
        :type msg: string
        :param ex: exception causing this error (optional)
        :type ex: exception
        """
        self.msg = msg
        assert not isinstance(msg, Exception)

        self.stack = []
        if ex:
            if not msg:
                self.msg = str(ex)
            if isinstance(ex, WafError):
                self.stack = ex.stack
            else:
                self.stack = traceback.extract_tb(sys.exc_info()[2])
        self.stack += traceback.extract_stack()[:-1]
        self.verbose_msg = ''.join(traceback.format_list(self.stack))
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def error(*k, **kw):
    """
    Wrap logging.errors, display the origin of the message when '-vv' is set
    """
    global log
    log.error(*k, **kw)
    if verbose > 2:
        st = traceback.extract_stack()
        if st:
            st = st[:-1]
            buf = []
            for filename, lineno, name, line in st:
                buf.append('  File "%s", line %d, in %s' % (filename, lineno, name))
                if line:
                    buf.append('    %s' % line.strip())
            if buf: log.error("\n".join(buf))
项目:CyberScan    作者:medbenali    | 项目源码 | 文件源码
def filter(self, record):        
        from config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def filter(self, record):        
        from config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
项目:annotated-py-asyncio    作者:hhstore    | 项目源码 | 文件源码
def __init__(self, *, loop=None):
        """Initialize the future.

        The optional event_loop argument allows to explicitly set the event
        loop object used by the future. If it's not provided, the future uses
        the default event loop.
        """
        if loop is None:
            self._loop = events.get_event_loop()      # ????
        else:
            self._loop = loop

        self._callbacks = []     # ??

        if self._loop.get_debug():
            self._source_traceback = traceback.extract_stack(sys._getframe(1))
项目:MIT-Thesis    作者:alec-heif    | 项目源码 | 文件源码
def first_spark_call():
    """
    Return a CallSite representing the first Spark call in the current call stack.
    """
    tb = traceback.extract_stack()
    if len(tb) == 0:
        return None
    file, line, module, what = tb[len(tb) - 1]
    sparkpath = os.path.dirname(file)
    first_spark_frame = len(tb) - 1
    for i in range(0, len(tb)):
        file, line, fun, what = tb[i]
        if file.startswith(sparkpath):
            first_spark_frame = i
            break
    if first_spark_frame == 0:
        file, line, fun, what = tb[0]
        return CallSite(function=fun, file=file, linenum=line)
    sfile, sline, sfun, swhat = tb[first_spark_frame]
    ufile, uline, ufun, uwhat = tb[first_spark_frame - 1]
    return CallSite(function=sfun, file=ufile, linenum=uline)
项目:CVE-2016-6366    作者:RiskSense-Ops    | 项目源码 | 文件源码
def filter(self, record):        
        from config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
项目:BlenderRobotDesigner    作者:HBPNeurorobotics    | 项目源码 | 文件源码
def log_callstack_last(back_trace=False):
    if not back_trace:
        stack = traceback.extract_stack()[:-1]
    else:
        stack = traceback.extract_tb(sys.exc_info()[2])

    message = "empty"

    print("Parsing stack")

    for path, line, func, code in stack:
        print(path,func)
        if func not in BACKTRACE_FILTER_FUNC:
            if func not in BACKTRACE_FILTER_HIDE_CODE:
                file = os.path.split(path)[-1]
                message = "%s:%s" % (file, line)

    return message
项目:hakkuframework    作者:4shadoww    | 项目源码 | 文件源码
def filter(self, record):        
        from .config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
项目:nanobox-adapter-libcloud    作者:nanobox-io    | 项目源码 | 文件源码
def worker_int(worker):
    worker.log.info("worker received INT or QUIT signal")

    ## get traceback info
    import threading, sys, traceback
    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
            threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    worker.log.debug("\n".join(code))
项目:PeekabooAV    作者:scVENUS    | 项目源码 | 文件源码
def known(s):
    tb = traceback.extract_stack()
    tb = tb[-1]
    position = "%s:%s" % (tb[2], tb[1])

    db = get_config().get_db_con()
    if db.known(s):
        sample_info = db.sample_info_fetch(s)
        return RuleResult(position,
                          result=sample_info.get_result(),
                          reason=sample_info.reason,
                          further_analysis=False)

    return RuleResult(position,
                      result=Result.unknown,
                      reason="Datei ist dem System noch nicht bekannt",
                      further_analysis=True)
项目:PeekabooAV    作者:scVENUS    | 项目源码 | 文件源码
def file_larger_than(s, args):
    tb = traceback.extract_stack()
    tb = tb[-1]
    position = "%s:%s" % (tb[2], tb[1])

    size = args['byte']
    if s.file_size > size:
        return RuleResult(position,
                          result=Result.unknown,
                          reason="Datei hat mehr als %d bytes"
                          % size,
                          further_analysis=True)

    return RuleResult(position,
                      result=Result.ignored,
                      reason="Datei ist nur %d bytes lang"
                      % s.file_size,
                      further_analysis=False)
项目:PeekabooAV    作者:scVENUS    | 项目源码 | 文件源码
def cuckoo_score(s, args):
    tb = traceback.extract_stack()
    tb = tb[-1]
    position = "%s:%s" % (tb[2], tb[1])

    threshold = args['higher']
    if s.cuckoo_report.score >= threshold:
        return RuleResult(position,
                          result=Result.bad,
                          reason="Cuckoo score >= %s: %s"
                          % (threshold, s.cuckoo_report.score),
                          further_analysis=False)

    return RuleResult(position,
                      result=Result.unknown,
                      reason="Cuckoo score < %s: %s"
                      % (threshold, s.cuckoo_report.score),
                      further_analysis=True)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def doTraceback(self, module):
        try:
            module.do_raise()
        except:
            tb = sys.exc_info()[2].tb_next

            f,lno,n,line = extract_tb(tb, 1)[0]
            self.assertEqual(line, raise_src.strip())

            f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
            self.assertEqual(line, raise_src.strip())

            s = io.StringIO()
            print_tb(tb, 1, s)
            self.assertTrue(s.getvalue().endswith(raise_src))
        else:
            raise AssertionError("This ought to be impossible")
项目:pytypes    作者:Stewori    | 项目源码 | 文件源码
def _raise_typecheck_error(msg, is_return=False, value=None, received_type=None,
            expected_type=None, func=None):
    if pytypes.warning_mode:
        import traceback
        tb = traceback.extract_stack()
        off = util._calc_traceback_list_offset(tb)
        cat = pytypes.ReturnTypeWarning if is_return else pytypes.InputTypeWarning
        warn_explicit(msg, cat, tb[off][0], tb[off][1])
#       if not func is None:
#           warn_explicit(msg, cat, func.__code__.co_filename,
#                   func.__code__.co_firstlineno, func.__module__)
#       else:
#           warn(msg, pytypes.ReturnTypeWarning)
    else:
        if is_return:
            raise pytypes.ReturnTypeError(msg)
        else:
            raise pytypes.InputTypeError(msg)
项目:pytypes    作者:Stewori    | 项目源码 | 文件源码
def _warn_argname(msg, func, slf, clsm, cls=None, warn_tp=pytypes.exceptions.TypeWarning):
    if not pytypes.warn_argnames:
        return
    if cls is None:
        if slf or clsm:
            try:
                cls_name = get_class_that_defined_method(func).__name__
            except:
                cls_name = '<unknown class>'
        else:
            cls_name = None
    else:
        cls_name = cls.__name__
    tb = traceback.extract_stack()
    off = _calc_traceback_list_offset(tb)
    if cls_name is None:
        _msg = '%s: %s.%s'%(msg, func.__module__, func.__name__)
    else:
        _msg = '%s: %s.%s.%s'%(msg, func.__module__, cls_name, func.__name__)
    warn_explicit(_msg, warn_tp, tb[off][0], tb[off][1])
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def sigquit_handler(sig, frame):
    """Helps debug deadlocks by printing stacktraces when this gets a SIGQUIT
    e.g. kill -s QUIT <PID> or CTRL+\
    """
    print("Dumping stack traces for all threads in PID {}".format(os.getpid()))
    id_to_name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for thread_id, stack in sys._current_frames().items():
        code.append("\n# Thread: {}({})"
                    .format(id_to_name.get(thread_id, ""), thread_id))
        for filename, line_number, name, line in traceback.extract_stack(stack):
            code.append('File: "{}", line {}, in {}'
                        .format(filename, line_number, name))
            if line:
                code.append("  {}".format(line.strip()))
    print("\n".join(code))
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def filter(self, record):        
        from config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
项目:trex-http-proxy    作者:alwye    | 项目源码 | 文件源码
def filter(self, record):        
        from .config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
项目:pyprob    作者:probprog    | 项目源码 | 文件源码
def extract_address():
    #tb = traceback.extract_stack()
    # print()
    # for t in tb:
    #     print(t[0], t[1], t[2], t[3])
    #frame = tb[-3]
    # return '{0}/{1}/{2}'.format(frame[1], frame[2], frame[3])
    #return '{0}/{1}'.format(frame[1], frame[2])
    # Retun an address in the format:
    # 'instruction pointer' / 'qualified function name'
    frame = sys._getframe(2)
    ip = frame.f_lasti
    names = []
    var_name = _extract_target_of_assignment()
    if var_name is not None:
        names.append(var_name)
    while frame is not None:
        n = frame.f_code.co_name
        if n.startswith('<'): break
        names.append(n)
        if n == _current_function_name: break
        frame = frame.f_back
    return "{}/{}".format(ip, '.'.join(reversed(names)))
项目:chuck    作者:Calysto    | 项目源码 | 文件源码
def timeRemaining(seconds=0):
    """ Function to be used with 'while' """
    global _timers
    if seconds == 0: return True
    now = time.time()
    stack = traceback.extract_stack()
    filename, line_no, q1, q2 = stack[-2]
    if filename.startswith("<pyshell"):
        filename = "pyshell"
    if (filename, line_no) not in _timers:
        _timers[(filename, line_no)] = (now, seconds)
        return True
    start, duration = _timers[(filename, line_no)]
    if seconds != duration:
        _timers[(filename, line_no)] = (now, seconds)
        return True
    if now - start > duration:
        del _timers[(filename, line_no)]
        return False
    else:
        return True
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def doTraceback(self, module):
        try:
            module.do_raise()
        except:
            tb = sys.exc_info()[2].tb_next

            f,lno,n,line = extract_tb(tb, 1)[0]
            self.assertEqual(line, raise_src.strip())

            f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
            self.assertEqual(line, raise_src.strip())

            s = StringIO.StringIO()
            print_tb(tb, 1, s)
            self.assertTrue(s.getvalue().endswith(raise_src))
        else:
            raise AssertionError("This ought to be impossible")
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def doTraceback(self, module):
        try:
            module.do_raise()
        except:
            tb = sys.exc_info()[2].tb_next

            f,lno,n,line = extract_tb(tb, 1)[0]
            self.assertEqual(line, raise_src.strip())

            f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
            self.assertEqual(line, raise_src.strip())

            s = StringIO.StringIO()
            print_tb(tb, 1, s)
            self.assertTrue(s.getvalue().endswith(raise_src))
        else:
            raise AssertionError("This ought to be impossible")
项目:scapy-bpf    作者:guedou    | 项目源码 | 文件源码
def filter(self, record):        
        from config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def filter(self, record):        
        from config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
项目:Andes    作者:cuihantao    | 项目源码 | 文件源码
def _raw_log(self, logfn, message, exc_info):
        cname = ''
        loc = ''
        fn = ''
        tb = traceback.extract_stack()
        if len(tb) > 2:
            if self.show_loc:
                loc = '(%s:%d):' % (os.path.basename(tb[-3][0]), tb[-3][1])
            fn = tb[-3][2]
            if fn != '<module>':
                if self.__class__.__name__ != Logger.__name__:
                    fn = self.__class__.__name__ + '.' + fn
                fn += '()'
        if self.show_fcn:
            logfn(loc + cname + fn + ': ' + message, exc_info=exc_info)
        else:
            logfn(message, exc_info=exc_info)
项目:scapy-radio    作者:BastilleResearch    | 项目源码 | 文件源码
def filter(self, record):        
        from config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
项目:QTAF    作者:Tencent    | 项目源码 | 文件源码
def _get_current_traceback(self, thread ):
        '''????????????

        :param thread: ????????
        :type thread: Thread
        '''
        for thread_id, stack in sys._current_frames().items():
            if thread_id != thread.ident:
                continue
            tb = "Traceback ( thread-%d possibly hold at ):\n" % thread_id
            for filename, lineno, name, line in traceback.extract_stack(stack):
                tb += '  File: "%s", line %d, in %s\n' % (filename, lineno, name)
                if line:
                    tb += "    %s\n" % (line.strip())
            return tb
        else:
            raise RuntimeError("thread not found")
项目:QTAF    作者:Tencent    | 项目源码 | 文件源码
def get_thread_traceback(thread):
    '''????????????

    :param thread: ????????
    :type thread: Thread
    '''
    for thread_id, stack in sys._current_frames().items():
        if thread_id != thread.ident:
            continue
        tb = "Traceback ( thread-%d possibly hold at ):\n" % thread_id
        for filename, lineno, name, line in traceback.extract_stack(stack):
            tb += '  File: "%s", line %d, in %s\n' % (filename, lineno, name)
            if line:
                tb += "    %s\n" % (line.strip())
        return tb
    else:
        raise RuntimeError("thread not found")
项目:StatisKit    作者:StatisKit    | 项目源码 | 文件源码
def caller_trace(back=0):
    """
    Trace caller stack and save info into global dicts, which
    are printed automatically at the end of SCons execution.
    """
    global caller_bases, caller_dicts
    import traceback
    tb = traceback.extract_stack(limit=3+back)
    tb.reverse()
    callee = tb[1][:3]
    caller_bases[callee] = caller_bases.get(callee, 0) + 1
    for caller in tb[2:]:
        caller = callee + caller[:3]
        try:
            entry = caller_dicts[callee]
        except KeyError:
            caller_dicts[callee] = entry = {}
        entry[caller] = entry.get(caller, 0) + 1
        callee = caller

# print a single caller and its callers, if any
项目:specto    作者:mrknow    | 项目源码 | 文件源码
def get_interactive_console(thread_id, frame_id, frame, console_message):
    """returns the global interactive console.
    interactive console should have been initialized by this time
    :rtype: DebugConsole
    """
    if InteractiveConsoleCache.thread_id == thread_id and InteractiveConsoleCache.frame_id == frame_id:
        return InteractiveConsoleCache.interactive_console_instance

    InteractiveConsoleCache.interactive_console_instance = DebugConsole()
    InteractiveConsoleCache.thread_id = thread_id
    InteractiveConsoleCache.frame_id = frame_id

    console_stacktrace = traceback.extract_stack(frame, limit=1)
    if console_stacktrace:
        current_context = console_stacktrace[0] # top entry from stacktrace
        context_message = 'File "%s", line %s, in %s' % (current_context[0], current_context[1], current_context[2])
        console_message.add_console_message(CONSOLE_OUTPUT, "[Current context]: %s" % (context_message,))
    return InteractiveConsoleCache.interactive_console_instance
项目:isf    作者:w3h    | 项目源码 | 文件源码
def filter(self, record):        
        from config import conf
        wt = conf.warning_threshold
        if wt > 0:
            stk = traceback.extract_stack()
            caller=None
            for f,l,n,c in stk:
                if n == 'warning':
                    break
                caller = l
            tm,nb = self.warning_table.get(caller, (0,0))
            ltm = time.time()
            if ltm-tm > wt:
                tm = ltm
                nb = 0
            else:
                if nb < 2:
                    nb += 1
                    if nb == 2:
                        record.msg = "more "+record.msg
                else:
                    return 0
            self.warning_table[caller] = (tm,nb)
        return 1
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def doTraceback(self, module):
        try:
            module.do_raise()
        except:
            tb = sys.exc_info()[2].tb_next

            f,lno,n,line = extract_tb(tb, 1)[0]
            self.assertEqual(line, raise_src.strip())

            f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
            self.assertEqual(line, raise_src.strip())

            s = io.StringIO()
            print_tb(tb, 1, s)
            self.assertTrue(s.getvalue().endswith(raise_src))
        else:
            raise AssertionError("This ought to be impossible")
项目:g3ar    作者:VillanCh    | 项目源码 | 文件源码
def _worker(self, dictobj):
        """"""
        func = dictobj['func']
        args = dictobj['args']
        argv = dictobj['argv']

        try:
            result = func(*args, **argv)
        except Exception as e:
            #print 'ecp occured'
            result = tuple([e, traceback.extract_stack()])

        self.lock.acquire()
        self._executed_task_count = self._executed_task_count + 1
        self._add_result_to_queue(result=result)
        self.lock.release()
项目:g3ar    作者:VillanCh    | 项目源码 | 文件源码
def _worker(self, dictobj):
        """"""
        func = dictobj['func']
        args = dictobj['args']
        argv = dictobj['argv']

        try:
            result = func(*args, **argv)
        except Exception as e:
            #print 'ecp occured'
            result = tuple([e, traceback.extract_stack()])

        self.lock.acquire()
        self._executed_task_count = self._executed_task_count + 1
        self._add_result_to_queue(result=result)
        self.lock.release()
项目:TripMeal    作者:DanielAndreasen    | 项目源码 | 文件源码
def worker_int(worker):
    worker.log.info("worker received INT or QUIT signal")

    ## get traceback info
    import threading, sys, traceback
    id2name = dict([(th.ident, th.name) for th in threading.enumerate()])
    code = []
    for threadId, stack in sys._current_frames().items():
        code.append("\n# Thread: %s(%d)" % (id2name.get(threadId,""),
            threadId))
        for filename, lineno, name, line in traceback.extract_stack(stack):
            code.append('File: "%s", line %d, in %s' % (filename,
                lineno, name))
            if line:
                code.append("  %s" % (line.strip()))
    worker.log.debug("\n".join(code))
项目:tripoli    作者:DDMAL    | 项目源码 | 文件源码
def mute_errors(self, fn, *args, **kwargs):
        """Run given function and catch any of the errors it logged.

        The self._errors key will not be changed by using this function.

        Works by patching the BaseValidator.log_error function.
        BaseValidator.log_error should not be overridden in children.
        """
        caught_errors = set()

        def patched_log_error(self, field, msg):
            tb = traceback.extract_stack()[:-1] if self.debug else None
            caught_errors.add(ValidatorLogError(msg, self._path + (field,), tb))

        old_log_error = BaseValidator.log_error
        try:
            BaseValidator.log_error = patched_log_error
            val = fn(*args, **kwargs)
        finally:
            BaseValidator.log_error = old_log_error
        return val, caught_errors
项目:tripoli    作者:DDMAL    | 项目源码 | 文件源码
def log_error(self, field, msg):
        """Add an error to the validator.

        This method should not be overridden in subclasses, as doing so
        is likely to break the error and warning coercion decorators.

        :param field: The field the error was raised on.
        :param msg: The message to associate with the error.
        """
        if self.collect_errors:
            tb = traceback.extract_stack()[:-1] if self.debug else None
            err = ValidatorLogError(msg, self._path + (field,), tb)
            if self.verbose:
                self._IIIFValidator.logger.error(str(err))
            self._errors.add(err)
        if self.fail_fast:
            raise FailFastException
项目:modelforge    作者:src-d    | 项目源码 | 文件源码
def refresh():
    override_files = []
    for stack in traceback.extract_stack():
        f = os.path.join(os.path.dirname(stack[0]), OVERRIDE_FILE)
        if f not in override_files:
            override_files.insert(0, f)
    if OVERRIDE_FILE in override_files:
        del override_files[override_files.index(OVERRIDE_FILE)]
    override_files.append(OVERRIDE_FILE)

    def import_path(path):
        if sys.version_info < (3, 5, 0):
            from importlib.machinery import SourceFileLoader
            return SourceFileLoader(__name__, path).load_module()
        import importlib.util
        spec = importlib.util.spec_from_file_location(__name__, path)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        return module

    for override_file in override_files:
        if not os.path.isfile(override_file):
            continue
        mod = import_path(override_file)
        globals().update({n: getattr(mod, n) for n in dir(mod) if not n.startswith("__")})
项目:py-ipv8    作者:qstokkink    | 项目源码 | 文件源码
def blockingCallFromThread(reactor, f, *args, **kwargs):
    """
    Improved version of twisted's blockingCallFromThread that shows the complete
    stacktrace when an exception is raised on the reactor's thread.
    If being called from the reactor thread already, just return the result of execution of the callable.
    """
    if isInIOThread():
            return f(*args, **kwargs)
    else:
        queue = Queue.Queue()

        def _callFromThread():
            result = defer.maybeDeferred(f, *args, **kwargs)
            result.addBoth(queue.put)
        reactor.callFromThread(_callFromThread)
        result = queue.get()
        if isinstance(result, failure.Failure):
            other_thread_tb = traceback.extract_tb(result.getTracebackObject())
            this_thread_tb = traceback.extract_stack()
            logger.error("Exception raised on the reactor's thread %s: \"%s\".\n Traceback from this thread:\n%s\n"
                         " Traceback from the reactor's thread:\n %s", result.type.__name__, result.getErrorMessage(),
                         ''.join(traceback.format_list(this_thread_tb)), ''.join(traceback.format_list(other_thread_tb)))
            result.raiseException()
        return result
项目:objEnhancer    作者:BabbageCom    | 项目源码 | 文件源码
def caller_trace(back=0):
    """
    Trace caller stack and save info into global dicts, which
    are printed automatically at the end of SCons execution.
    """
    global caller_bases, caller_dicts
    import traceback
    tb = traceback.extract_stack(limit=3+back)
    tb.reverse()
    callee = tb[1][:3]
    caller_bases[callee] = caller_bases.get(callee, 0) + 1
    for caller in tb[2:]:
        caller = callee + caller[:3]
        try:
            entry = caller_dicts[callee]
        except KeyError:
            caller_dicts[callee] = entry = {}
        entry[caller] = entry.get(caller, 0) + 1
        callee = caller

# print a single caller and its callers, if any
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def doTraceback(self, module):
        try:
            module.do_raise()
        except:
            tb = sys.exc_info()[2].tb_next

            f,lno,n,line = extract_tb(tb, 1)[0]
            self.assertEqual(line, raise_src.strip())

            f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
            self.assertEqual(line, raise_src.strip())

            s = StringIO.StringIO()
            print_tb(tb, 1, s)
            self.assertTrue(s.getvalue().endswith(raise_src))
        else:
            raise AssertionError("This ought to be impossible")
项目:pyspark    作者:v-v-vishnevskiy    | 项目源码 | 文件源码
def first_spark_call():
    """
    Return a CallSite representing the first Spark call in the current call stack.
    """
    tb = traceback.extract_stack()
    if len(tb) == 0:
        return None
    file, line, module, what = tb[len(tb) - 1]
    sparkpath = os.path.dirname(file)
    first_spark_frame = len(tb) - 1
    for i in range(0, len(tb)):
        file, line, fun, what = tb[i]
        if file.startswith(sparkpath):
            first_spark_frame = i
            break
    if first_spark_frame == 0:
        file, line, fun, what = tb[0]
        return CallSite(function=fun, file=file, linenum=line)
    sfile, sline, sfun, swhat = tb[first_spark_frame]
    ufile, uline, ufun, uwhat = tb[first_spark_frame - 1]
    return CallSite(function=sfun, file=ufile, linenum=uline)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def doTraceback(self, module):
        try:
            module.do_raise()
        except:
            tb = sys.exc_info()[2].tb_next

            f,lno,n,line = extract_tb(tb, 1)[0]
            self.assertEqual(line, raise_src.strip())

            f,lno,n,line = extract_stack(tb.tb_frame, 1)[0]
            self.assertEqual(line, raise_src.strip())

            s = io.StringIO()
            print_tb(tb, 1, s)
            self.assertTrue(s.getvalue().endswith(raise_src))
        else:
            raise AssertionError("This ought to be impossible")
项目:autoxd    作者:nessessary    | 项目源码 | 文件源码
def getCallerName():
    """?????????????????, ???????????????"""
    import traceback
    s_trace = traceback.extract_stack()
    full_function_name = s_trace[-3][2]
    #?????????
    #s_trace = s_trace[-4]
    #full_function_name = s_trace[-1]
    #s_filters = list('().')
    #for s_filter in s_filters:
        #if full_function_name != None:
            #full_function_name = str.replace(full_function_name, s_filter, '')
        #else:
            #full_function_name = ''
    #???? ????
    filename = sys._getframe(2).f_code.co_filename
    filename = os.path.basename(filename)
    filename = filename.split('.')[0]
    return filename + "_" + full_function_name
项目:autoxd    作者:nessessary    | 项目源码 | 文件源码
def assemble(*args, **kwargs):
    """????, ????????, ????????
    args: tuple boll
    return: bool
    """
    #?????????????? ??four<-0.3, ???????
    #???????? ???????? ??????
    s_trace = traceback.extract_stack()
    if len(s_trace)>=2 and is_allow:
        #print s_trace[-2]
        fname,line,mod,fn_name = s_trace[-2]
        fn_name = s_trace[-1][-2]
        args_text = _getFunctionArgs(fname, line, fn_name)
        #print args_text
    #print args
    args = np.array(args)
    return args.all()