Python profile 模块,runctx() 实例源码

我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用profile.runctx()

项目:checkip    作者:china-shang    | 项目源码 | 文件源码
def run(self):
        try:
            loop = asyncio.get_event_loop()
            ipfactory = get_ip.ipFactory(self.q, self.iprange)
            testip = Test_Ip(loop, ipfactory)
            loop.create_task(testip.Server())
            profile.runctx(
                "loop.run_until_complete(testip.SuccessStop())",
                globals(),
                locals())
            # loop.run_until_complete(testip.SuccessStop())

        except (KeyboardInterrupt, SystemExit) as e:
            loop.create_task(testip.stop())
            loop.run_until_complete(testip.SuccessStop())

        finally:
            loop.close()
            print("Task exit")
项目:hdlcc    作者:suoto    | 项目源码 | 文件源码
def main():
    "Main hook for standalone usage"
    start = time.time()
    runner_args = parseArguments()
    setupLogging(sys.stdout, runner_args.log_level)
    logging.root.setLevel(runner_args.log_level)
    #  logging.getLogger('hdlcc.source_file').setLevel(logging.WARNING)
    logging.getLogger('hdlcc.config_parser').setLevel(logging.WARNING)
    #  logging.getLogger('hdlcc.builders').setLevel(logging.INFO)
    logging.getLogger('vunit.project').setLevel(logging.ERROR)

    # Running hdlcc with threads has two major drawbacks:
    # 1) Makes interrupting it impossible currently because each source
    #    file is parsed on is own thread. Since there can be lots of
    #    sources, interrupting a single thread is not enough. This is
    #    discussed at https://github.com/suoto/hdlcc/issues/19
    # 2) When profiling, the result expected is of the inner hdlcc calls
    #    and with threads we have no info. This is discussed at
    #    https://github.com/suoto/hdlcc/issues/16
    # poor results (see suoto/hdlcc/issues/16).
    # To circumvent this we disable using threads at all when running
    # via standalone (it's ugly, I know)
    # pylint: disable=protected-access
    StandaloneProjectBuilder._USE_THREADS = False
    # pylint: enable=protected-access

    if runner_args.debug_profiling:
        profile.runctx(
            'runner(runner_args)',
            globals=globals(),
            locals={'runner_args' : runner_args},
            filename=runner_args.debug_profiling, sort=-1)
    else:
        runner(runner_args)
    end = time.time()
    _logger.info("Process took %.2fs", (end - start))
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def profile(name, env, filename=None, verbose=False):
    if filename:
        filename = name + '-' + filename
        print('Profiling %s ==> %s' % (name, filename))

    else:
        filename = None

        title = name + ' profile'
        print()
        print('=' * len(title))
        print(title)
        print('=' * len(title))

    func = create_bench(name, env)

    gc.collect()
    code = 'for x in range(10000): func()'

    if verbose:
        if pprofile is None:
            print('pprofile not found. Please install pprofile and try again.')
            return

        pprofile.runctx(code, locals(), globals(), filename=filename)

    else:
        cProfile.runctx(code, locals(), globals(),
                        sort='tottime', filename=filename)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def profile(name, env, filename=None, verbose=False):
    if filename:
        filename = name + '-' + filename
        print('Profiling %s ==> %s' % (name, filename))

    else:
        filename = None

        title = name + ' profile'
        print()
        print('=' * len(title))
        print(title)
        print('=' * len(title))

    func = create_bench(name, env)

    gc.collect()
    code = 'for x in range(10000): func()'

    if verbose:
        if pprofile is None:
            print('pprofile not found. Please install pprofile and try again.')
            return

        pprofile.runctx(code, locals(), globals(), filename=filename)

    else:
        cProfile.runctx(code, locals(), globals(),
                        sort='tottime', filename=filename)
项目:POTCO-PS    作者:ksmit799    | 项目源码 | 文件源码
def quickProfile(name="unnamed"):
    import pstats
    def profileDecorator(f):
        if(not config.GetBool("use-profiler",0)):
            return f
        def _profiled(*args, **kArgs):
            # must do this in here because we don't have base/simbase
            # at the time that PythonUtil is loaded
            if(not config.GetBool("profile-debug",0)):
                #dumb timings
                st=globalClock.getRealTime()
                f(*args,**kArgs)
                s=globalClock.getRealTime()-st
                print "Function %s.%s took %s seconds"%(f.__module__, f.__name__,s)
            else:
                import profile as prof, pstats
                #detailed profile, stored in base.stats under (
                if(not hasattr(base,"stats")):
                    base.stats={}
                if(not base.stats.get(name)):
                    base.stats[name]=[]

                prof.runctx('f(*args, **kArgs)', {'f':f,'args':args,'kArgs':kArgs},None,"t.prof")
                s=pstats.Stats("t.prof")
                #p=hotshot.Profile("t.prof")
                #p.runctx('f(*args, **kArgs)', {'f':f,'args':args,'kArgs':kArgs},None)
                #s = hotshot.stats.load("t.prof")
                s.strip_dirs()
                s.sort_stats("cumulative")
                base.stats[name].append(s)

        _profiled.__doc__ = f.__doc__
        return _profiled
    return profileDecorator
项目:checkip    作者:china-shang    | 项目源码 | 文件源码
def main():
    global iprange, ipHasFind
    print("Process Sum:", ProcessSum)
    if ProcessSum > 1:
        q = Queue()
        q.put(ipHasFind)
        for i in range(ProcessSum):
            print("Start Process:", i)
            p = CheckProcess(q, iprange)
            p.start()

        try:
            with open("ip.txt", "w") as f:
                sum = 0
                while True:
                    ip = ipList.get()
                    s = ip + "|"
                    f.write(s)
                    sum += 1
                    print("All Sucess Ip:%4d" % sum)
        except (KeyboardInterrupt, SystemExit) as e:
            print("main exited")
        finally:
            file_name = "find_log" +"0"  + ".txt"
            with open(file_name) as f:
                s = f.readlines()

            d = dict([[int(i.split(":")[0]), int(i.split(":")[-1])]
                      for i in list(map(lambda x:x[:-1], s))])

            while GoodIpRange.qsize() > 0:
                nowdict = GoodIpRange.get()
                d.update(nowdict)

            with open(file_name, "w") as f:
                for i in d.items():
                    print(i[0], ":", i[1])
                    f.write(str(i[0]) + ":" + str(i[1]) + "\n")
    else:
        try:
            q = Queue()
            q.put(ipHasFind)
            loop = asyncio.get_event_loop()
            ipfactory = get_ip.ipFactory(q, iprange)
            testip = Test_Ip(loop, ipfactory)
            loop.create_task(testip.Server())
            profile.runctx(
                "loop.run_until_complete(testip.SuccessStop())",
                globals(),
                locals())
            # loop.run_until_complete(testip.SuccessStop())

        except (KeyboardInterrupt, SystemExit) as e:
            loop.create_task(testip.stop())
            loop.run_until_complete(testip.SuccessStop())

        finally:
            loop.close()
            print("Task exit")
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def execute(self, argv):
        options, args = self.parseOptions(argv)

        self.setUp(options)

        if options.interactive:
            while True:
                try:
                    input = raw_input(">>> ")
                except (EOFError, KeyboardInterrupt):
                    self.stdout.write("\nBye.\n")
                    break

                inStream = antlr3.ANTLRStringStream(input)
                self.parseStream(options, inStream)

        else:
            if options.input is not None:
                inStream = antlr3.ANTLRStringStream(options.input)

            elif len(args) == 1 and args[0] != '-':
                inStream = antlr3.ANTLRFileStream(
                    args[0], encoding=options.encoding
                    )

            else:
                inStream = antlr3.ANTLRInputStream(
                    self.stdin, encoding=options.encoding
                    )

            if options.profile:
                try:
                    import cProfile as profile
                except ImportError:
                    import profile

                profile.runctx(
                    'self.parseStream(options, inStream)',
                    globals(),
                    locals(),
                    'profile.dat'
                    )

                import pstats
                stats = pstats.Stats('profile.dat')
                stats.strip_dirs()
                stats.sort_stats('time')
                stats.print_stats(100)

            elif options.hotshot:
                import hotshot

                profiler = hotshot.Profile('hotshot.dat')
                profiler.runctx(
                    'self.parseStream(options, inStream)',
                    globals(),
                    locals()
                    )

            else:
                self.parseStream(options, inStream)