Python traceback 模块,format_stack() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用traceback.format_stack()

项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def signal_manager(self):
        """
        Deprecated!

        .. deprecated:: 0.5.0
            Use :func:`self.context.signals` instead in your apps.

        :return: Signal manager (global).
        :rtype: pyplanet.core.events.manager._SignalManager
        """
        logger.warning(DeprecationWarning(
            'DEPRECATED: The usage of \'self.instance.signal_manager\' in apps is deprecated, '
            'use \'self.context.signals\' instead!'
        ))
        logger.warning('\n'.join(traceback.format_stack(limit=7)))

        return self.signals
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def tryLock(self, timeout=None, id=None):
        if timeout is None:
            locked = QtCore.QMutex.tryLock(self)
        else:
            locked = QtCore.QMutex.tryLock(self, timeout)

        if self.debug and locked:
            self.l.lock()
            try:
                if id is None:
                    self.tb.append(''.join(traceback.format_stack()[:-1]))
                else:
                    self.tb.append("  " + str(id))
                #print 'trylock', self, len(self.tb)
            finally:
                self.l.unlock()
        return locked
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def tryLock(self, timeout=None, id=None):
        if timeout is None:
            locked = QtCore.QMutex.tryLock(self)
        else:
            locked = QtCore.QMutex.tryLock(self, timeout)

        if self.debug and locked:
            self.l.lock()
            try:
                if id is None:
                    self.tb.append(''.join(traceback.format_stack()[:-1]))
                else:
                    self.tb.append("  " + str(id))
                #print 'trylock', self, len(self.tb)
            finally:
                self.l.unlock()
        return locked
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def __init__(self, *args):
                # build the message from the user
                user_msg = args[0] if args else "An error has occurred"
                # get the traceback from the last error if it exists
                try: tb = traceback.format_exc()
                # otherwise, get the tb prior to this frame and pretend its us
                except: 
                        tb = "Traceback (most recent call last):\n"
                        tb += ''.join(traceback.format_stack()[:-1])
                        tb += self.__class__.__name__ + ": " + user_msg
                        tb += '\n'
                # build the complete log message
                log_msg = user_msg + "\n" + tb
                # and log it
                self.log.error(log_msg)
                # store the args
                self.args = args
项目:ave    作者:sonyxperiadev    | 项目源码 | 文件源码
def handle_SIGUSR1(self, signum, frame):
        # propagate the signal to children, but only if they are AVE processes
        for pid in self.get_children():
            name = get_proc_name(pid)
            if name.startswith('ave-'):
                os.kill(pid, signal.SIGUSR1)
        # make the dump directory if it doesn't exist
        hickup_dir = os.path.join(self.home, '.ave', 'hickup')
        try:
            os.makedirs(hickup_dir)
        except OSError, e:
            if e.errno != errno.EEXIST:
                self.log('ERROR: could not create %s: %s' % (hickup_dir, e))
                return
        # create the trace file
        date = time.strftime('%Y%m%d-%H%M%S')
        name = '%s-%s-%d' % (date, self.proc_name, os.getpid())
        path = os.path.join(hickup_dir, name)
        with open(path, 'w') as f:
            f.write('stack:\n%s' % ''.join(traceback.format_stack(frame)))
            f.write('locals:\n%s\n' % frame.f_locals)
            f.write('globals:\n%s' % frame.f_globals)
项目:ave    作者:sonyxperiadev    | 项目源码 | 文件源码
def handle_SIGUSR1(self, signum, frame):
        # propagate the signal to children, but only if they are AVE processes
        for pid in self.get_children():
            name = get_proc_name(pid)
            if name.startswith('ave-'):
                os.kill(pid, signal.SIGUSR1)
        # make the dump directory if it doesn't exist
        hickup_dir = os.path.join(self.home, '.ave', 'hickup')
        try:
            os.makedirs(hickup_dir)
        except OSError, e:
            if e.errno != errno.EEXIST:
                self.log('ERROR: could not create %s: %s' % (hickup_dir, e))
                return
        # create the trace file
        date = time.strftime('%Y%m%d-%H%M%S')
        name = '%s-%s-%d' % (date, self.proc_name, os.getpid())
        path = os.path.join(hickup_dir, name)
        with open(path, 'w') as f:
            f.write('stack:\n%s' % ''.join(traceback.format_stack(frame)))
            f.write('locals:\n%s\n' % frame.f_locals)
            f.write('globals:\n%s' % frame.f_globals)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, time, func, args, kw, cancel, reset, seconds=None):
        """
        @param time: Seconds from the epoch at which to call C{func}.
        @param func: The callable to call.
        @param args: The positional arguments to pass to the callable.
        @param kw: The keyword arguments to pass to the callable.
        @param cancel: A callable which will be called with this
            DelayedCall before cancellation.
        @param reset: A callable which will be called with this
            DelayedCall after changing this DelayedCall's scheduled
            execution time. The callable should adjust any necessary
            scheduling details to ensure this DelayedCall is invoked
            at the new appropriate time.
        @param seconds: If provided, a no-argument callable which will be
            used to determine the current time any time that information is
            needed.
        """
        self.time, self.func, self.args, self.kw = time, func, args, kw
        self.resetter = reset
        self.canceller = cancel
        self.seconds = seconds
        self.cancelled = self.called = 0
        self.delayed_time = 0
        if self.debug:
            self.creator = traceback.format_stack()[:-2]
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _startRunCallbacks(self, result):
        if self.called:
            if self.debug:
                if self._debugInfo is None:
                    self._debugInfo = DebugInfo()
                extra = "\n" + self._debugInfo._getDebugTracebacks()
                raise AlreadyCalledError(extra)
            raise AlreadyCalledError
        if self.debug:
            if self._debugInfo is None:
                self._debugInfo = DebugInfo()
            self._debugInfo.invoker = traceback.format_stack()[:-2]
        self.called = True
        self.result = result
        if self.timeoutCall:
            try:
                self.timeoutCall.cancel()
            except:
                pass

            del self.timeoutCall
        self._runCallbacks()
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:universe    作者:openai    | 项目源码 | 文件源码
def format_error(e):
    # errback automatically wraps everything in a Twisted Failure
    if isinstance(e, failure.Failure):
        e = e.value

    if isinstance(e, str):
        err_string = e
    elif six.PY2:
        err_string = traceback.format_exc(e).rstrip()
    else:
        err_string = ''.join(traceback.format_exception(type(e), e, e.__traceback__)).rstrip()

    if err_string == 'None':
        # Reasonable heuristic for exceptions that were created by hand
        last = traceback.format_stack()[-2]
        err_string = '{}\n  {}'.format(e, last)
    # Quick and dirty hack for now.
    err_string = err_string.replace('Connection to the other side was lost in a non-clean fashion', 'Connection to the other side was lost in a non-clean fashion (HINT: this generally actually means we got a connection refused error. Check that the remote is actually running.)')
    return error.Error(err_string)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:mdToRst    作者:kata198    | 项目源码 | 文件源码
def debugmsg(msg, msgTupleLambda=None):
        '''
            debugmsg - If #IS_DEVELOPER_DEBUG is changed to True (in the code, changing it after import has no affect ) this will print a debug message

                        to stderr prefixed with "DEBUG: " and suffixed with a newline.

                       Don't create your message like:

                           debugmsg( "Order(id=%s) has %d pepperoni pizzas" %( myOrder.id, len( [ topping for pizza in getPizzas() for topping in pizza.toppings() if 'pepperoni' in topping ] ) ) )

                       Which would evaluate that comprehension whether or not debug mode is on,

                         Instead, use a lambda to return that same tuple, so that code is only evaluated in the #debugmsg function calls your lambda:

                           debugmsg( "Order(id=%s) has %d pepperoni pizzas", lambda : ( myOrder.id, len( [ topping for pizza in getPizzas() for topping in pizza.toppings() if 'pepperoni' in topping ] ) ) )
        '''

        if issubclass(msgTupleLambda.__class__, (tuple, list)):
            sys.stderr.write('''WARNING: debugmsg called with a tuple/list for msgTupleLambda.\n\tShould be a lambda that returns the tuple, so when IS_DEVELOPER_DEBUG=False it is not evaluated. \n\n''' + ''.join(traceback.format_stack()[:-1]) + "\n\n" )

            msgTuple = msgTupleLambda
        else:
            msgTuple = msgTupleLambda()

        sys.stderr.write('DEBUG: %s\n' % msgTuple )
项目:tasker    作者:wavenator    | 项目源码 | 文件源码
def requeue(
        self,
        exception=None,
    ):
        task = self.current_task
        exception_traceback = ''.join(traceback.format_stack())

        if not exception:
            exception = WorkerRequeue()

        self._on_requeue(
            task=task,
            exception=exception,
            exception_traceback=exception_traceback,
            args=task['args'],
            kwargs=task['kwargs'],
        )
项目:ANNarchy    作者:vitay    | 项目源码 | 文件源码
def _warning(*var_text):
    """
    Prints a warning message to standard out.
    """
    text = 'WARNING: '
    for var in var_text:
        text += str(var) + ' '
    if not config['suppress_warnings']:
        print(text)
    #     # Print the trace
    #     tb = traceback.format_stack()
    #     for line in tb:
    #         if not '/ANNarchy/core/' in line and \
    #            not '/ANNarchy/parser/' in line and \
    #            not '/ANNarchy/generator/' in line :
    #             print(line)
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:kafkatos3    作者:ConnectedHomes    | 项目源码 | 文件源码
def exit_gracefully(self, signum, frame):
        '''exit the consumer gracefully'''
        if not self.message_processing:
            self.logger.info("Fast shutdown available ... exiting (signum %d)" \
                             % (signum))
            self.logger.info("Print stack trace. Don't panic!")
            self.logger.info("-----------------------------------------------")
            for chunk in traceback.format_stack(frame):
                for line in chunk.split("\n"):
                    self.logger.info(line)
            self.logger.info("-----------------------------------------------")
            for part in self.partitions:
                self.partitions[part].writer.close()
            self.consumer.commit()
            sys.exit(0)

        self.logger.info("Graceful shutdown of consumer " +
                         str(self.consumer_id) + " started....")
        self.shutting_down = True
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def __init__(self, time, func, args, kw, cancel, reset, seconds=None):
        """
        @param time: Seconds from the epoch at which to call C{func}.
        @param func: The callable to call.
        @param args: The positional arguments to pass to the callable.
        @param kw: The keyword arguments to pass to the callable.
        @param cancel: A callable which will be called with this
            DelayedCall before cancellation.
        @param reset: A callable which will be called with this
            DelayedCall after changing this DelayedCall's scheduled
            execution time. The callable should adjust any necessary
            scheduling details to ensure this DelayedCall is invoked
            at the new appropriate time.
        @param seconds: If provided, a no-argument callable which will be
            used to determine the current time any time that information is
            needed.
        """
        self.time, self.func, self.args, self.kw = time, func, args, kw
        self.resetter = reset
        self.canceller = cancel
        self.seconds = seconds
        self.cancelled = self.called = 0
        self.delayed_time = 0
        if self.debug:
            self.creator = traceback.format_stack()[:-2]
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _startRunCallbacks(self, result):
        if self.called:
            if self.debug:
                if self._debugInfo is None:
                    self._debugInfo = DebugInfo()
                extra = "\n" + self._debugInfo._getDebugTracebacks()
                raise AlreadyCalledError(extra)
            raise AlreadyCalledError
        if self.debug:
            if self._debugInfo is None:
                self._debugInfo = DebugInfo()
            self._debugInfo.invoker = traceback.format_stack()[:-2]
        self.called = True
        self.result = result
        if self.timeoutCall:
            try:
                self.timeoutCall.cancel()
            except:
                pass

            del self.timeoutCall
        self._runCallbacks()
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:Callandtext    作者:iaora    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:dcos    作者:dcos    | 项目源码 | 文件源码
def setup_thread_debugger():
    """Setup a thread debbuger for pytest session

    This function, based on http://stackoverflow.com/a/133384, is meant to
    add debugging facility to pytest that will allow to debug deadlock that
    may sometimes occur.
    """
    def debug(signal, frame):
        """Interrupt running process and provide a python prompt for
        interactive debugging."""
        d = {'_frame': frame}  # Allow access to frame object.
        d.update(frame.f_globals)  # Unless shadowed by global
        d.update(frame.f_locals)

        i = code.InteractiveConsole(d)
        message = "Signal received : entering python shell.\nTraceback:\n"
        message += ''.join(traceback.format_stack(frame))
        i.interact(message)

    signal.signal(signal.SIGUSR1, debug)  # Register handler
项目:splunk_ta_ps4_f1_2016    作者:jonathanvarley    | 项目源码 | 文件源码
def log(msg, msgx='', level=logging.INFO, need_tb=False):
    """
    Logging in UCC Config Module.
    :param msg: message content
    :param msgx: detail info.
    :param level: logging level
    :param need_tb: if need logging traceback
    :return:
    """
    global LOGGING_STOPPED
    if LOGGING_STOPPED:
        return

    msgx = ' - ' + msgx if msgx else ''
    content = 'UCC Config Module: %s%s' % (msg, msgx)
    if need_tb:
        stack = ''.join(traceback.format_stack())
        content = '%s\r\n%s' % (content, stack)
    stulog.logger.log(level, content, exc_info=1)
项目:python_ddd_flask    作者:igorvinnicius    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:TA-SyncKVStore    作者:georgestarcher    | 项目源码 | 文件源码
def log(msg, msgx='', level=logging.INFO, need_tb=False):
    """
    Logging in UCC Config Module.
    :param msg: message content
    :param msgx: detail info.
    :param level: logging level
    :param need_tb: if need logging traceback
    :return:
    """
    global LOGGING_STOPPED
    if LOGGING_STOPPED:
        return

    msgx = ' - ' + msgx if msgx else ''
    content = 'UCC Config Module: %s%s' % (msg, msgx)
    if need_tb:
        stack = ''.join(traceback.format_stack())
        content = '%s\r\n%s' % (content, stack)
    stulog.logger.log(level, content, exc_info=1)
项目:cb-defense-splunk-app    作者:carbonblack    | 项目源码 | 文件源码
def log(msg, msgx='', level=logging.INFO, need_tb=False):
    """
    Logging in UCC Config Module.
    :param msg: message content
    :param msgx: detail info.
    :param level: logging level
    :param need_tb: if need logging traceback
    :return:
    """
    global LOGGING_STOPPED
    if LOGGING_STOPPED:
        return

    msgx = ' - ' + msgx if msgx else ''
    content = 'UCC Config Module: %s%s' % (msg, msgx)
    if need_tb:
        stack = ''.join(traceback.format_stack())
        content = '%s\r\n%s' % (content, stack)
    stulog.logger.log(level, content, exc_info=1)
项目:data-store    作者:HumanCellAtlas    | 项目源码 | 文件源码
def timeout_response() -> chalice.Response:
    """
    Produce a chalice Response object that indicates a timeout.  Stacktraces for all running threads, other than the
    current thread, are provided in the response object.
    """
    frames = sys._current_frames()
    current_threadid = threading.get_ident()
    trace_dump = {
        thread_id: traceback.format_stack(frame)
        for thread_id, frame in frames.items()
        if thread_id != current_threadid}

    problem = {
        'status': requests.codes.gateway_timeout,
        'code': "timed_out",
        'title': "Timed out processing request.",
        'traces': trace_dump,
    }
    return chalice.Response(
        status_code=problem['status'],
        headers={"Content-Type": "application/problem+json"},
        body=json.dumps(problem),
    )
项目:qudi    作者:Ulm-IQO    | 项目源码 | 文件源码
def tryLock(self, timeout=None, id=None):
        """ Try to lock  the mutex.

            @param timeout int: give up after this many milliseconds
            @param id: debug id

            @return bool: whether locking succeeded
        """
        if timeout is None:
            locked = QtCore.QMutex.tryLock(self)
        else:
            locked = QtCore.QMutex.tryLock(self, timeout)

        if self.debug and locked:
            self.l.lock()
            try:
                if id is None:
                    self.tb.append(''.join(traceback.format_stack()[:-1]))
                else:
                    self.tb.append("  " + str(id))
                #print 'trylock', self, len(self.tb)
            finally:
                self.l.unlock()
        return locked
项目:qudi    作者:Ulm-IQO    | 项目源码 | 文件源码
def lock(self, id=None):
        """ Lock mutex. Will try again every 5 seconds.

            @param id: debug id
        """
        c = 0
        waitTime = 5000  # in ms
        while True:
            if self.tryLock(waitTime, id):
                break
            c += 1
            if self.debug:
                self.l.lock()
                try:
                    logger.debug('Waiting for mutex lock ({:.1} sec). '
                    'Traceback follows:'.format(c*waitTime/1000.))
                    logger.debug(''.join(traceback.format_stack()))
                    if len(self.tb) > 0:
                        logger.debug('Mutex is currently locked from: {0}\n'
                                ''.format(self.tb[-1]))
                    else:
                        logger.debug('Mutex is currently locked from [???]')
                finally:
                    self.l.unlock()
        #print 'lock', self, len(self.tb)
项目:webapp    作者:superchilli    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:QualquerMerdaAPI    作者:tiagovizoto    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:gardenbot    作者:GoestaO    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:Data-visualization    作者:insta-code1    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:frontera-docs-zh_CN    作者:xsren    | 项目源码 | 文件源码
def run(self):
        def log_failure(failure):
            logger.exception(failure.value)
            if failure.frames:
                logger.critical(str("").join(format_tb(failure.getTracebackObject())))

        def errback_main(failure):
            log_failure(failure)
            self.task.start(interval=0).addErrback(errback_main)

        def errback_flush_states(failure):
            log_failure(failure)
            self._flush_states_task.start(interval=300).addErrback(errback_flush_states)

        def debug(sig, frame):
            logger.critical("Signal received: printing stack trace")
            logger.critical(str("").join(format_stack(frame)))

        self.task.start(interval=0).addErrback(errback_main)
        self._logging_task.start(interval=30)
        self._flush_states_task.start(interval=300).addErrback(errback_flush_states)
        signal(SIGUSR1, debug)
        reactor.addSystemEventTrigger('before', 'shutdown', self.stop)
        reactor.run()
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _startRunCallbacks(self, result):
        if self.called:
            if self._suppressAlreadyCalled:
                self._suppressAlreadyCalled = False
                return
            if self.debug:
                if self._debugInfo is None:
                    self._debugInfo = DebugInfo()
                extra = "\n" + self._debugInfo._getDebugTracebacks()
                raise AlreadyCalledError(extra)
            raise AlreadyCalledError
        if self.debug:
            if self._debugInfo is None:
                self._debugInfo = DebugInfo()
            self._debugInfo.invoker = traceback.format_stack()[:-2]
        self.called = True
        self.result = result
        self._runCallbacks()
项目:pillar    作者:armadillica    | 项目源码 | 文件源码
def get_file(file_id, api=None):
    # TODO: remove this function and just use the Pillar SDK directly.
    if file_id is None:
        return None

    if api is None:
        api = system_util.pillar_api()

    try:
        return File.find(file_id, api=api)
    except ResourceNotFound:
        f = sys.exc_info()[2].tb_frame.f_back
        tb = traceback.format_stack(f=f, limit=2)
        log.warning('File %s not found, but requested from %s\n%s',
                    file_id, request.url, ''.join(tb))
        return None
项目:deb-python-httpretty    作者:openstack    | 项目源码 | 文件源码
def debug(self, truesock_func, *a, **kw):
            if self.is_http:
                frame = inspect.stack()[0][0]
                lines = list(map(utf8, traceback.format_stack(frame)))

                message = [
                    "HTTPretty intercepted and unexpected socket method call.",
                    ("Please open an issue at "
                     "'https://github.com/gabrielfalcao/HTTPretty/issues'"),
                    "And paste the following traceback:\n",
                    "".join(decode_utf8(lines)),
                ]
                raise RuntimeError("\n".join(message))
            if not self.truesock:
                raise UnmockedError()
            return getattr(self.truesock, truesock_func)(*a, **kw)
项目:micro-blog    作者:nickChenyx    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:python-flask-security    作者:weinbergdavid    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:django-snoopy    作者:Pradeek    | 项目源码 | 文件源码
def execute_insert_sql(self, *args, **kwargs):
    stack_trace = traceback.format_stack()
    query_dict = {
        'query': [],
        'query_type': QUERY_TYPE_WRITE,
        'traceback': stack_trace,
        'model': "%s.%s" % (self.query.model.__module__, self.query.model.__name__),
        'start_time': datetime.datetime.now(),
    }
    for sql, params in self.as_sql():
        query_dict['query'].append(sql % params)

    try:
        return self._snoopy_execute_insert_sql(*args, **kwargs)
    finally:
        # This gets called just before the `return`
        query_dict['end_time'] = datetime.datetime.now()
        SnoopyRequest.record_query_data(query_dict)
项目:watcher    作者:nosmokingbandit    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:Lixiang_zhaoxin    作者:hejaxian    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn
项目:maas    作者:maas    | 项目源码 | 文件源码
def _captureThreadStacks(self):
        """Capture the stacks for all currently running threads.

        :return: A list of ``(thread-description, stack)`` tuples. See
            `traceback.format_stack` for the format of ``stack``.
        """
        threads = {t.ident: t for t in threading.enumerate()}

        def describe(ident):
            if ident in threads:
                return repr(threads[ident])
            else:
                return "<*Unknown* %d>" % ident

        return [
            (describe(ident), traceback.format_stack(frame))
            for ident, frame in sys._current_frames().items()
        ]
项目:flask    作者:bobohope    | 项目源码 | 文件源码
def _do_get(self):
        if self._checked_out:
            if self._checkout_traceback:
                suffix = ' at:\n%s' % ''.join(
                    chop_traceback(self._checkout_traceback))
            else:
                suffix = ''
            raise AssertionError("connection is already checked out" + suffix)

        if not self._conn:
            self._conn = self._create_connection()

        self._checked_out = True
        if self._store_traceback:
            self._checkout_traceback = traceback.format_stack()
        return self._conn