Python inspect 模块,currentframe() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用inspect.currentframe()

项目:radar    作者:amoose136    | 项目源码 | 文件源码
def run_tests_if_main(show_coverage=False):
    """ Run tests in a given file if it is run as a script

    Coverage is reported for running this single test. Set show_coverage to
    launch the report in the web browser.
    """
    local_vars = inspect.currentframe().f_back.f_locals
    if not local_vars.get('__name__', '') == '__main__':
        return
    # we are in a "__main__"
    os.chdir(ROOT_DIR)
    fname = str(local_vars['__file__'])
    _clear_imageio()
    _enable_faulthandler()
    pytest.main('-v -x --color=yes --cov imageio '
                '--cov-config .coveragerc --cov-report html %s' % repr(fname))
    if show_coverage:
        import webbrowser
        fname = os.path.join(ROOT_DIR, 'htmlcov', 'index.html')
        webbrowser.open_new_tab(fname)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def __ImportFrom(module_xname, xname):
    """
    Import a specified name from a python module into the global namespace. This can be accomplished
    programatically and inside a method or class.

    @param module_xname : name of the module to load x.y.z
    @param xname :  symbol to import from loaded module
    @return method/class : imported item from loaded module
    """
    try:
        module = __import__(module_xname,globals(),locals(), [xname])
    except ImportError:
        return None

    try:
      module = getattr(module,xname)
      if xname:
        for x in range( len(inspect.stack()) ):
            inspect.currentframe(x).f_globals[xname]=module
    except AttributeError as e:
        module=None

    return module
项目:gransk    作者:pcbje    | 项目源码 | 文件源码
def produce(self, topic, doc, payload):
    """
    Produce a new event.

    :param topic: The topic of the produced event.
    :param doc: The document to which the event belongs.
    :param payload: The file pointer beloning to the document.
    :type topic: ``str``
    :type doc: ``gransk.core.Document``
    :type payload: ``file``
    """
    caller = inspect.currentframe().f_back.f_locals['self'].__module__
    listeners = self.listeners.get(topic, [])
    filename = os.path.basename(doc.path)

    for listener, callback in listeners:
      LOGGER.debug('[%s] %s -> %s (%s)', topic, caller, listener, filename)
      callback(doc, payload)

    if len(listeners) == 0:
      LOGGER.debug('[%s] %s -> no listeners (%s)', topic, caller, filename)
项目:pykit    作者:baishancloud    | 项目源码 | 文件源码
def _find_frame_by_self(clz):
    """
    Find the first frame on stack in which there is local variable 'self' of
    type clz.
    """

    frame = inspect.currentframe()

    while frame:
        self = frame.f_locals.get('self')
        if isinstance(self, clz):
            return frame

        frame = frame.f_back

    return None
项目:ave    作者:sonyxperiadev    | 项目源码 | 文件源码
def step_listener(self, connection, event, fd):
        #print('%s(%s, %s)' %
        #   (inspect.currentframe().f_code.co_name,connection,event_str(event)))

        if event & ERRMASK:
            raise Exception('INTERNAL ERROR. LISTENER NEVER DIES')

        elif event & INMASK:
            new = self.accept(connection)
            if not new:
                # happens if peer hangs up during accept or the control is
                # rejecting new connections
                return
            self.pollable(new.fileno(), new, OUTMASK)
            self.accepting.append(new)
            # ignore all events for the same file descriptor in the current step
            # of the main loop. the OS may reuse descriptors aggressively and so
            # the events list may include POLLNVAL for the same descriptor. we
            # don't need to handle such a POLLNVAL event because that connection
            # is replaced (and GCed) by the call to self.pollable() above.
            self.unpend.append(new.fileno())
项目:ave    作者:sonyxperiadev    | 项目源码 | 文件源码
def step_accepting(self, connection, event, fd):
        #print('%s(%s, %s)' %
        #   (inspect.currentframe().f_code.co_name,connection,event_str(event)))

        if event & ERRMASK:
            #print('%s %d close accepting %d %d %s' % (
            #   self.proc_name,os.getpid(),fd,connection.port,event_str(event)))
            self.accepting.remove(connection)
            connection.close()
            self.unpollable(fd)

        elif event & OUTMASK:
            self.accepting.remove(connection)
            salt = make_salt()
            try:
                connection.put(CHALLENGE + salt)
                self.authenticating[connection] = salt
                self.pollable(fd, connection, INMASK)
            except ConnectionClosed:
                #print('%s %d peer closed accepting %d %d OUT' % (
                #    self.proc_name, os.getpid(), fd, connection.port))
                connection.close()
                self.unpollable(fd)
项目:ave    作者:sonyxperiadev    | 项目源码 | 文件源码
def step_listener(self, connection, event, fd):
        #print('%s(%s, %s)' %
        #   (inspect.currentframe().f_code.co_name,connection,event_str(event)))

        if event & ERRMASK:
            raise Exception('INTERNAL ERROR. LISTENER NEVER DIES')

        elif event & INMASK:
            new = self.accept(connection)
            if not new:
                # happens if peer hangs up during accept or the control is
                # rejecting new connections
                return
            self.pollable(new.fileno(), new, OUTMASK)
            self.accepting.append(new)
            # ignore all events for the same file descriptor in the current step
            # of the main loop. the OS may reuse descriptors aggressively and so
            # the events list may include POLLNVAL for the same descriptor. we
            # don't need to handle such a POLLNVAL event because that connection
            # is replaced (and GCed) by the call to self.pollable() above.
            self.unpend.append(new.fileno())
项目:ave    作者:sonyxperiadev    | 项目源码 | 文件源码
def step_accepting(self, connection, event, fd):
        #print('%s(%s, %s)' %
        #   (inspect.currentframe().f_code.co_name,connection,event_str(event)))

        if event & ERRMASK:
            #print('%s %d close accepting %d %d %s' % (
            #   self.proc_name,os.getpid(),fd,connection.port,event_str(event)))
            self.accepting.remove(connection)
            connection.close()
            self.unpollable(fd)

        elif event & OUTMASK:
            self.accepting.remove(connection)
            salt = make_salt()
            try:
                connection.put(CHALLENGE + salt)
                self.authenticating[connection] = salt
                self.pollable(fd, connection, INMASK)
            except ConnectionClosed:
                #print('%s %d peer closed accepting %d %d OUT' % (
                #    self.proc_name, os.getpid(), fd, connection.port))
                connection.close()
                self.unpollable(fd)
项目:dartqc    作者:esteinig    | 项目源码 | 文件源码
def __init__(self, *msg):
        message = ""
        for arg in msg:
            message += str(arg)

        try:
            ln = sys.exc_info()[-1].tb_lineno
            file = traceback.extract_tb(sys.exc_info()[-1])
            if isinstance(file, list):
                file = file[0].filename
        except AttributeError:
            ln = inspect.currentframe().f_back.f_lineno
            file = inspect.getframeinfo(inspect.currentframe().f_back).filename

        if file is not None:
            file = re.compile("[\\\/]+").split(file)
            file = file[-1]

        self.args = "Error ({3}:{1}): {2}".format(type(self), ln, message, file),
        sys.exit(self)

    # def __init__(self, *args, **kwargs):
    #     RuntimeError.__init__(self, args, kwargs)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def guess_app_stacklevel(start=1):
    """
    try to guess stacklevel for application warning.
    looks for first frame not part of passlib.
    """
    frame = inspect.currentframe()
    count = -start
    try:
        while frame:
            name = frame.f_globals.get('__name__', "")
            if name.startswith("passlib.tests.") or not name.startswith("passlib."):
                return max(1, count)
            count += 1
            frame = frame.f_back
        return start
    finally:
        del frame
项目:Citation-Fetcher    作者:andrewhead    | 项目源码 | 文件源码
def configure_parser(parser):

    parser.description = "Run a migration script. These can only be run forward, not in reverse."

    # Inspect the local directory for which migration modules the user can run.
    local_files = os.listdir(
        os.path.dirname(
            inspect.getfile(
                inspect.currentframe()
            )))
    migration_files = [f for f in local_files if re.match('^\d{4}.*\.py$', f)]
    migration_names = [m.rstrip('.py') for m in migration_files]

    parser.add_argument(
        'migration_name',
        choices=migration_names,
        help="The name of the migration script you want to run."
    )
项目:zeronet-debian    作者:bashrc    | 项目源码 | 文件源码
def new_sslwrap(
        sock, server_side=False, keyfile=None, certfile=None,
        cert_reqs=__ssl__.CERT_NONE, ssl_version=__ssl__.PROTOCOL_SSLv23,
        ca_certs=None, ciphers=None
):
    context = __ssl__.SSLContext(ssl_version)
    context.verify_mode = cert_reqs or __ssl__.CERT_NONE
    if ca_certs:
        context.load_verify_locations(ca_certs)
    if certfile:
        context.load_cert_chain(certfile, keyfile)
    if ciphers:
        context.set_ciphers(ciphers)

    caller_self = inspect.currentframe().f_back.f_locals['self']
    return context._wrap_socket(sock, server_side=server_side, ssl_sock=caller_self)


# Re-add sslwrap to Python 2.7.9+
项目:skybeard-2    作者:LanceMaverick    | 项目源码 | 文件源码
def get_beard_config(config_file="../../config.yml"):
    """Attempts to load a yaml file in the beard directory.

    NOTE: The file location should be relative from where this function is
    called.

    """

    # Sometimes external libraries change the logging level. This is not
    # acceptable, so we assert after importing a beard that it has not changed.
    logger_level_before = logger.getEffectiveLevel()
    logging.debug("logging level: {}".format(logger.getEffectiveLevel()))

    callers_frame = inspect.currentframe().f_back
    logger.debug("This function was called from the file: " +
                 callers_frame.f_code.co_filename)
    base_path = os.path.dirname(callers_frame.f_code.co_filename)
    config = yaml.safe_load(open(os.path.join(base_path, config_file)))

    logging.debug("logging level: {}".format(logger.getEffectiveLevel()))
    logger_level_after = logger.getEffectiveLevel()
    assert logger_level_before == logger_level_after, \
        "Something has changed the logger level!"

    return config
项目:plone.server    作者:plone    | 项目源码 | 文件源码
def get_current_request():
    """Return the current request by heuristically looking it up from stack
    """
    frame = inspect.currentframe()
    while frame is not None:
        request = getattr(frame.f_locals.get('self'), 'request', None)
        if request is not None:
            return request
        elif isinstance(frame.f_locals.get('self'), RequestHandler):
            return frame.f_locals['request']
        # if isinstance(frame.f_locals.get('self'), RequestHandler):
        #     return frame.f_locals.get('self').request
        # elif IView.providedBy(frame.f_locals.get('self')):
        #     return frame.f_locals['request']
        frame = frame.f_back
    raise RequestNotFound(RequestNotFound.__doc__)
项目:ngraph    作者:NervanaSystems    | 项目源码 | 文件源码
def __init__(self, **kwargs):
        # TODO This is a good first cut for debugging info, but it would be nice to
        # TODO be able to reliably walk the stack back to user code rather than just
        # TODO back past this constructor
        super(DebugInfo, self).__init__(**kwargs)
        frame = None
        try:
            frame = inspect.currentframe()
            while frame.f_locals.get('self', None) is self:
                frame = frame.f_back
            while frame:
                filename, lineno, function, code_context, index = inspect.getframeinfo(
                    frame)
                if -1 == filename.find('ngraph/op_graph'):
                    break
                frame = frame.f_back

            self.filename = filename
            self.lineno = lineno
            self.code_context = code_context
        finally:
            del frame
项目:brainpipe    作者:EtienneCmb    | 项目源码 | 文件源码
def loadatlas(r=5):
    """Load the atlas from the brainpipe module
    """
    B3Dpath = dirname(
        abspath(join(getfile(currentframe()), '..', '..', '..', 'atlas')))

    # Load talairach atlas :
    with open(B3Dpath + '/atlas/labels/talairach_atlas.pickle', "rb") as f:
        TAL = pickle.load(f)
    label = TAL['label']
    strGM = ['No Gray Matter found within +/-'+str(r)+'mm']
    label = concat([label, DataFrame({'hemisphere': [strGM], 'lobe':[
                   strGM], 'gyrus':[strGM], 'matter':[strGM], 'brodmann':[
        0]})])
    label = label.set_index([list(n.arange(label.shape[0]))])
    return TAL['hdr'], TAL['mask'], TAL['gray'], label
项目:Yugioh-bot    作者:will7200    | 项目源码 | 文件源码
def async_calling_function(callFrame):
    def calling_functionn(__function):
        """
        View Calling Function to debug
        """

        def wrapper(*args, **kwargs):
            cur_frame = inspect.currentframe()
            cal_frame = inspect.getouterframes(cur_frame, callFrame)
            stack = []
            for t in cal_frame:
                if 'Yu-gi' in t[1]:
                    stack.append(t[3])
            logger.debug("Called by: " + str(stack))
            return __function(*args, **kwargs)

        return wrapper
    return calling_functionn
项目:calmjs    作者:calmjs    | 项目源码 | 文件源码
def __advice_stack_frame_protection(self, frame):
        """
        Overriding of this is only permitted if and only if your name is
        Megumin and you have a pet/familiar named Chomusuke.
        """

        if frame is None:
            logger.debug(
                'currentframe() returned None; frame protection disabled')
            return

        f_back = frame.f_back
        while f_back:
            if f_back.f_code is self.handle.__code__:
                raise RuntimeError(
                    "indirect invocation of '%s' by 'handle' is forbidden" %
                    frame.f_code.co_name,
                )
            f_back = f_back.f_back
项目:PyFRAP    作者:alexblaessle    | 项目源码 | 文件源码
def getFunctionCall(idx=1):

    """Returns the name of function or method that was called idx frames outwards.

    .. note:: ``idx=0`` will of course return ``getFunctionCall``.

    Args:
        idx (int): Steps outwards.

    Returns:
        str: Name of function or method.
    """

    frame = inspect.currentframe()
    try:
        return inspect.getouterframes(frame)[idx][3]
    except IndexError:
        return ""
项目:pyopenvr    作者:cmbruns    | 项目源码 | 文件源码
def shader_string(body, glsl_version='450 core'):
    """
    Call this method from a function that defines a literal shader string as the "body" argument.
    Dresses up a shader string in three ways:
        1) Insert #version at the top
        2) Insert #line number declaration
        3) un-indents
    The line number information can help debug glsl compile errors.
    The version string needs to be the very first characters in the shader,
    which can be distracting, requiring backslashes or other tricks.
    The unindenting allows you to type the shader code at a pleasing indent level
    in your python method, while still creating an unindented GLSL string at the end.
    """
    line_count = len(body.split('\n'))
    line_number = inspect.currentframe().f_back.f_lineno + 1 - line_count
    return """\
#version %s
%s
""" % (glsl_version, shader_substring(body, stack_frame=2))
项目:guillotina    作者:plone    | 项目源码 | 文件源码
def get_current_request() -> IRequest:
    """
    Return the current request by heuristically looking it up from stack
    """
    try:
        task_context = aiotask_context.get('request')
        if task_context is not None:
            return task_context
    except (ValueError, AttributeError, RuntimeError):
        pass

    # fallback
    frame = inspect.currentframe()
    while frame is not None:
        request = getattr(frame.f_locals.get('self'), 'request', None)
        if request is not None:
            return request
        elif isinstance(frame.f_locals.get('request'), Request):
            return frame.f_locals['request']
        frame = frame.f_back
    raise RequestNotFound(RequestNotFound.__doc__)
项目:endrebot0    作者:endreman0    | 项目源码 | 文件源码
def command(fn, name=None):
    """Decorator for functions that should be exposed as commands."""
    module = sys.modules[fn.__module__]
    if name is None:
        name = fn.__name__
    if asyncio.iscoroutinefunction(fn if inspect.isfunction(fn) else fn.__func__ if inspect.ismethod(fn) else fn.__call__): # Get the actual function for coroutine check
        @functools.wraps(fn)
        async def wrapper(*args, **kwargs):
            try:
                frame = inspect.currentframe()
                ctx = frame.f_back.f_locals['ctx']
                return await fn(ctx, *args, **kwargs)
            finally:
                del frame
    else:
        @functools.wraps(fn)
        def wrapper(*args, **kwargs):
            try:
                frame = inspect.currentframe()
                ctx = frame.f_back.f_locals['ctx']
                return fn(ctx, *args, **kwargs)
            finally:
                del frame
    vars(module).setdefault('commands', {})[fn.__name__] = wrapper
    return wrapper
项目:data_utilities    作者:fmv1992    | 项目源码 | 文件源码
def is_inside_recursive_test_call():
    """Test if a function is running from a call to du.test()."""
    frame = inspect.currentframe()
    count_test = 0
    while frame:
        # Test breaking condition.
        if count_test >= 1:
            return True
        # Find if there is a breaking condition and update the value.
        test_function = frame.f_locals.get('test')
        if (hasattr(test_function, '_testMethodName')) and (
                test_function._testMethodName ==
                'test_module_level_test_calls'):
            count_test += 1
        # Go to next frame.
        frame = frame.f_back
    return False
项目:data_utilities    作者:fmv1992    | 项目源码 | 文件源码
def is_inside_unittest():
    """Test if a function is running from unittest.

    This function will help freezing the __setattr__ freezing when running
    inside a unittest environment.

    """
    frame = inspect.currentframe()

    # Calls from unittest discover have the following properties:
    # 1) Its stack is longer
    # 2) It contains arguments ('argv', 'pkg_name', 'unittest', etc) for
    # instance: a key value pair in the format:
    # ('argv', ['python3 -m unittest', 'discover', '-vvv', '.'])
    key = 'argv'  # this may be error prone...
    value = 'unittest'
    while frame:
        frame_argv = frame.f_locals.get(key)
        if frame_argv and value in ''.join(frame_argv):
            return True
        frame = frame.f_back
    return False
项目:REPIC    作者:dpinney    | 项目源码 | 文件源码
def REPIC(obToPrint):
    '''REPIC stands for Read, Evaluate, Print In Comment. Call this function with an object obToPrint and it will rewrite the current file with the output in the comment in a line after this was called.'''
    cf = inspect.currentframe()
    callingFile = inspect.getfile(cf.f_back)
    callingLine = cf.f_back.f_lineno
    # print 'Line I am calling REPIC from:', callingLine
    for line in fileinput.input(callingFile, inplace=1):
        if callingLine == fileinput.filelineno():
            # Make results, but get rid of newlines in output since that will break the comment:
            resultString = '#OUTPUT: ' + str(obToPrint).replace('\n','\\n') +'\n'
            writeIndex = line.rfind('\n')
            # Watch out for last line without newlines, there the end is just the line length.
            if '\n' not in line:
                writeIndex = len(line)
            # Replace old output and/or any comments:
            if '#' in line:
                writeIndex = line.rfind('#')
            output = line[0:writeIndex] + resultString
        else:
            output = line # If no REPIC, then don't change the line.
        sys.stdout.write(output)
项目:keras_experiments    作者:avolkov1    | 项目源码 | 文件源码
def execute_only_once():
    """
    Each called in the code to this function is guranteed to return True the
    first time and False afterwards.

    Returns:
        bool: whether this is the first time this function gets called from
            this line of code.

    Example:
        .. code-block:: python

            if execute_only_once():
                # do something only once
    """
    f = inspect.currentframe().f_back
    ident = (f.f_code.co_filename, f.f_lineno)
    if ident in _EXECUTE_HISTORY:
        return False
    _EXECUTE_HISTORY.add(ident)
    return True
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def get_linenumber(self):
        '''?????????'''
        cf = currentframe()
        return "run_line: file %s line %s " % (self.file_path, cf.f_back.f_back.f_lineno)
项目:Starfish    作者:BillWang139967    | 项目源码 | 文件源码
def get_linenumber(self):
        '''?????????'''
        cf = currentframe()
        return "run_line: file %s line %s " % (self.file_path, cf.f_back.f_back.f_lineno)
项目:python-    作者:secondtonone1    | 项目源码 | 文件源码
def run(self):
        # Explicit request for old-style install?  Just do it
        if self.old_and_unmanageable or self.single_version_externally_managed:
            return orig.install.run(self)

        if not self._called_from_setup(inspect.currentframe()):
            # Run in backward-compatibility mode to support bdist_* commands.
            orig.install.run(self)
        else:
            self.do_egg_install()
项目:my-first-blog    作者:AnkurBegining    | 项目源码 | 文件源码
def run(self):
        # Explicit request for old-style install?  Just do it
        if self.old_and_unmanageable or self.single_version_externally_managed:
            return orig.install.run(self)

        if not self._called_from_setup(inspect.currentframe()):
            # Run in backward-compatibility mode to support bdist_* commands.
            orig.install.run(self)
        else:
            self.do_egg_install()
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def __ImportModule( module_name, asName=None ):
    """
    Import a python module as needed into the global namespace. This can be accomplished
    programatically and inside a method or class.

    @param module_name : name of the module to load x.y.z
    @return module : python module object that was loaded
    """
    module = __import__(module_name)
    for layer in module_name.split('.')[1:]:
        module = getattr(module,layer)
    if asName:
        for x in range( len(inspect.stack()) ):
            inspect.currentframe(x).f_globals[asName]=module
    return module
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def mycallersname():
    """Returns the name of the caller of the caller of this function
    (hence the name of the caller of the function in which
    "mycallersname()" textually appears).  Returns None if this cannot
    be determined."""

    # http://docs.python.org/library/inspect.html#the-interpreter-stack
    import inspect

    frame = inspect.currentframe()
    if not frame:
        return None
    frame_,filename_,lineno_,funname,linelist_,listi_ = (
      inspect.getouterframes(frame)[2])
    return funname
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def call_contextual_template(template):
    # this is the magic line    
    frame = inspect.currentframe().f_back

    # again, we don't care about efficiency, it's not the point here
    d = dict(frame.f_globals)
    d.update(frame.f_locals)

    return call_template(template,d)
项目:risk-slim    作者:ustunb    | 项目源码 | 文件源码
def ipsh():
    ipshell = InteractiveShellEmbed(config=cfg, banner1=banner_msg, exit_msg=exit_msg)
    frame = inspect.currentframe().f_back
    msg   = 'Stopped at {0.f_code.co_filename} at line {0.f_lineno}'.format(frame)
    # Go back one level!
    # This is needed because the call to ipshell is inside the function ipsh()
    ipshell(msg, stack_depth=2)
项目:zmirror    作者:aploium    | 项目源码 | 文件源码
def current_line_number():
    """Returns the current line number in our program.
    :return: current line number
    :rtype: int
    """
    import inspect
    return inspect.currentframe().f_back.f_lineno
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def run(self):
        # Explicit request for old-style install?  Just do it
        if self.old_and_unmanageable or self.single_version_externally_managed:
            return orig.install.run(self)

        if not self._called_from_setup(inspect.currentframe()):
            # Run in backward-compatibility mode to support bdist_* commands.
            orig.install.run(self)
        else:
            self.do_egg_install()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def run(self):
        # Explicit request for old-style install?  Just do it
        if self.old_and_unmanageable or self.single_version_externally_managed:
            return orig.install.run(self)

        if not self._called_from_setup(inspect.currentframe()):
            # Run in backward-compatibility mode to support bdist_* commands.
            orig.install.run(self)
        else:
            self.do_egg_install()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def run(self):
        # Explicit request for old-style install?  Just do it
        if self.old_and_unmanageable or self.single_version_externally_managed:
            return orig.install.run(self)

        if not self._called_from_setup(inspect.currentframe()):
            # Run in backward-compatibility mode to support bdist_* commands.
            orig.install.run(self)
        else:
            self.do_egg_install()
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def run(self):
        # Explicit request for old-style install?  Just do it
        if self.old_and_unmanageable or self.single_version_externally_managed:
            return orig.install.run(self)

        if not self._called_from_setup(inspect.currentframe()):
            # Run in backward-compatibility mode to support bdist_* commands.
            orig.install.run(self)
        else:
            self.do_egg_install()
项目:formatizer    作者:fgimian    | 项目源码 | 文件源码
def f(format_string):
    caller_frame = inspect.currentframe().f_back
    caller_globals = caller_frame.f_globals
    caller_locals = caller_frame.f_locals
    lf = LiteralFormatter()
    return lf.format(format_string, caller_globals, caller_locals)
项目:jira_worklog_scanner    作者:pgarneau    | 项目源码 | 文件源码
def run(self):
        # Explicit request for old-style install?  Just do it
        if self.old_and_unmanageable or self.single_version_externally_managed:
            return orig.install.run(self)

        if not self._called_from_setup(inspect.currentframe()):
            # Run in backward-compatibility mode to support bdist_* commands.
            orig.install.run(self)
        else:
            self.do_egg_install()
项目:sublime-text-3-packages    作者:nickjj    | 项目源码 | 文件源码
def mycallersname():
    """Returns the name of the caller of the caller of this function
    (hence the name of the caller of the function in which
    "mycallersname()" textually appears).  Returns None if this cannot
    be determined."""

    # http://docs.python.org/library/inspect.html#the-interpreter-stack
    import inspect

    frame = inspect.currentframe()
    if not frame:
        return None
    frame_,filename_,lineno_,funname,linelist_,listi_ = (
      inspect.getouterframes(frame)[2])
    return funname
项目:django-virtual-pos    作者:intelligenia    | 项目源码 | 文件源码
def _prepare_debuglog_message(message, caller_level=2):
    # Por si acaso se le mete una cadena de tipo str,
    # este módulo es capaz de detectar eso y convertirla a UTF8
    if type(message) == str:
        message = unicode(message, "UTF-8")

    # Hora
    now = timezone.now()

    # Contexto desde el que se ha llamado
    curframe = inspect.currentframe()

    # Objeto frame que llamó a dlprint
    calframes = inspect.getouterframes(curframe, caller_level)
    caller_frame = calframes[2][0]
    caller_name = calframes[2][3]

    # Ruta del archivo que llamó a dlprint
    filename_path = caller_frame.f_code.co_filename
    filename = filename_path.split("/")[-1]

    # Obtención del mensaje
    return u"DjangoVirtualPOS: {0} {1} \"{2}\" at {3}:{5} in {6} ({4}:{5})\n".format(
        now.strftime("%Y-%m-%d %H:%M:%S %Z"), settings.DOMAIN, message,
                     filename, filename_path, caller_frame.f_lineno, caller_name
    )


# Prints the debug message
项目:nn4nlp-code    作者:neubig    | 项目源码 | 文件源码
def fully_aligned_distance(p1, p2):
    """compares each one to its' best mapping"""
    word2word = align_yields(p1, p2)
    nodes1 = set(node for node in p1.layer(layer1.LAYER_ID).all if is_comparable(node))
    nodes2 = set(node for node in p2.layer(layer1.LAYER_ID).all if is_comparable(node))
    first = align_nodes(nodes1, nodes2, word2word)
    word2word = reverse_mapping(word2word)
    second = align_nodes(nodes2, nodes1, word2word)

    count1 = len(set((i, j) for (i, j) in first.items() if compare(i, j)))
    count2 = len(set((i, j) for (i, j) in second.items() if compare(i, j)))
    print(inspect.currentframe().f_code.co_name, " returns ", two_sided_f(count1, count2, len(nodes1), len(nodes2)))
    return two_sided_f(count1, count2, len(nodes1), len(nodes2))
项目:nn4nlp-code    作者:neubig    | 项目源码 | 文件源码
def token_distance(p1, p2, map_by=buttom_up_by_levels_align):
    """compares considering only the main relation of each node"""
    count1 = token_matches(p1, p2, map_by)
    count2 = token_matches(p2, p1, map_by)
    nodes1 = set(node for node in p1.layer(layer1.LAYER_ID).all
                 if is_comparable(node) and label(node) in MAIN_RELATIONS)
    nodes2 = set(node for node in p2.layer(layer1.LAYER_ID).all
                 if is_comparable(node) and label(node) in MAIN_RELATIONS)
    print(inspect.currentframe().f_code.co_name)
    print("counts", count1, count2)
    print("lens", len(nodes1), len(nodes2))
    print(two_sided_f(count1, count2, len(nodes1), len(nodes2)))
    return two_sided_f(count1, count2, len(nodes1), len(nodes2))
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def run(self):
        # Explicit request for old-style install?  Just do it
        if self.old_and_unmanageable or self.single_version_externally_managed:
            return orig.install.run(self)

        if not self._called_from_setup(inspect.currentframe()):
            # Run in backward-compatibility mode to support bdist_* commands.
            orig.install.run(self)
        else:
            self.do_egg_install()
项目:mongodb_consistent_backup    作者:Percona-Lab    | 项目源码 | 文件源码
def get_oplog_cursor_since(self, caller, ts=None):
        frame   = getframeinfo(currentframe().f_back)
        comment = "%s:%s;%s:%i" % (caller.__name__, frame.function, frame.filename, frame.lineno)
        if not ts:
            ts = self.get_oplog_tail_ts()
        query = {'ts': {'$gte': ts}}
        logging.debug("Querying oplog on %s with query: %s" % (self.uri, query))
        # http://api.mongodb.com/python/current/examples/tailable.html
        return self.get_oplog_rs().find(query, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True).comment(comment)
项目:python-application    作者:AGProjects    | 项目源码 | 文件源码
def __enter__(self):
        parent = inspect.currentframe().f_back
        try:
            if parent.f_code.co_flags & inspect.CO_NEWLOCALS:
                raise RuntimeError('timers only work when invoked at the module/script level')
            self._with_start = parent.f_lasti
        finally:
            del parent
        gc_enabled = gc.isenabled()
        gc.disable()
        self._gc_enabled = gc_enabled
        self._start_time = self.time_function()
        return self
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def run(self):
        # Explicit request for old-style install?  Just do it
        if self.old_and_unmanageable or self.single_version_externally_managed:
            return orig.install.run(self)

        if not self._called_from_setup(inspect.currentframe()):
            # Run in backward-compatibility mode to support bdist_* commands.
            orig.install.run(self)
        else:
            self.do_egg_install()
项目:Sci-Finder    作者:snverse    | 项目源码 | 文件源码
def run(self):
        # Explicit request for old-style install?  Just do it
        if self.old_and_unmanageable or self.single_version_externally_managed:
            return orig.install.run(self)

        if not self._called_from_setup(inspect.currentframe()):
            # Run in backward-compatibility mode to support bdist_* commands.
            orig.install.run(self)
        else:
            self.do_egg_install()