我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用cProfile.Profile()。
def __call__(self, *args, **kw): """Profile a singe call to the function.""" self.ncalls += 1 if self.skip > 0: self.skip -= 1 self.skipped += 1 return self.fn(*args, **kw) if FuncProfile.in_profiler: # handle recursive calls return self.fn(*args, **kw) # You cannot reuse the same profiler for many calls and accumulate # stats that way. :-/ profiler = cProfile.Profile() try: FuncProfile.in_profiler = True return profiler.runcall(self.fn, *args, **kw) finally: FuncProfile.in_profiler = False self.stats.add(profiler) if self.immediate: self.print_stats() self.reset_stats()
def profileit(name): """cProfile decorator to profile said function, must pass in a filename to write the information out to use RunSnakeRun to run the output :param name: The output file path :type name: str :return: Function """ def inner(func): @wraps(func) def wrapper(*args, **kwargs): prof = cProfile.Profile() retval = prof.runcall(func, *args, **kwargs) # Note use of name from outer scope prof.dump_stats(name) return retval return wrapper return inner
def handle(self, *args, **kwargs): # pylint: disable=unused-argument """ Recreates the index """ log = logging.getLogger(indexing_api_name) console = logging.StreamHandler(self.stderr) console.setLevel(logging.DEBUG) log.addHandler(console) log.level = logging.INFO if kwargs['profile']: import cProfile import uuid profile = cProfile.Profile() profile.enable() recreate_index() profile.disable() filename = 'recreate_index_{}.profile'.format(uuid.uuid4()) profile.dump_stats(filename) self.stdout.write('Output profiling data to: {}'.format(filename)) else: recreate_index()
def load_with_profiler( context, filepath, *, global_matrix=None ): import cProfile import pstats pro = cProfile.Profile() pro.runctx("load_web3d(context.scene, filepath, PREF_FLAT=True, " "PREF_CIRCLE_DIV=16, global_matrix=global_matrix)", globals(), locals()) st = pstats.Stats(pro) st.sort_stats("time") st.print_stats(0.1) # st.print_callers(0.1)
def enable_profiling(): global _profile_hook import cProfile, pstats def _profile_hook(name, func, *args): profiler = cProfile.Profile() profiler.enable() try: return func(*args) finally: profiler.create_stats() fp = open('/tmp/mitogen.stats.%d.%s.log' % (os.getpid(), name), 'w') try: stats = pstats.Stats(profiler, stream=fp) stats.sort_stats('cumulative') stats.print_stats() finally: fp.close()
def process_fp(fp, p, sparse): if _PROFILE: pr = cProfile.Profile() pr.enable() beg = time.time() try: p.ParseFile(fp) except xml.parsers.expat.ExpatError as err: app.logger.error("Bad XML: %r" % err) sparse.exceptions += 1 # Bulk upload the remainder sparse._bulk_upload() end = time.time() if _PROFILE: pr.disable() s = StringIO.StringIO() sortby = 'cumulative' ps = pstats.Stats(pr, stream=s).sort_stats(sortby) ps.print_stats() app.logger.info(s.getvalue()) return beg, end
def run(self): self.taskStarted.emit() self.exiting = False self.quitted = False result = None try: if options.cfg.profile: import cProfile profiler = cProfile.Profile() try: result = profiler.runcall(self.FUN, *self.args, **self.kwargs) finally: profiler.dump_stats(os.path.join( general.get_home_dir(), "thread{}.profile".format(hex(id(self))))) else: result = self.FUN(*self.args, **self.kwargs) except Exception as e: if self.parent: self.parent().exc_info = sys.exc_info() self.parent().exception = e self.taskException.emit(e) print("CoqThread.run():", e) self.taskFinished.emit() return result
def do_cprofile(func): """Decorator for profiling a function """ def profiled_func(*args, **kwargs): """Wrapper """ profile = cProfile.Profile() try: profile.enable() result = func(*args, **kwargs) profile.disable() return result finally: stats = pstats.Stats(profile) stats.sort_stats("time").print_stats(20) return profiled_func
def __init__(self): idaapi.processor_t.__init__(self) # TODO: logging not working. # self.work_folder = "" # self.log_fn = self.work_folder + 'work.log' # logging.basicConfig(filename=self.log_fn, level=logging.DEBUG, filemode='w') # self.logger = open(self.log_fn, 'w') self.relocatable_file = re.search(r'\.o$', GetInputFile()) != None self.init_instructions() self.prev_addr_analyzed = -1 self.current_hex_packet = None self.hd = HexagonDisassembler() # TODO: this should be instatiated on demand, because I think the init is called every time IDA starts self.disasm_cache = {} # TODO: use orderdict to remove old entries self.profiler = cProfile.Profile() hexagondisasm.profiler = self.profiler # TODO: I don't know how to access this class from the IDA Python # console to get the profiler, I do it through the module
def get_parser(self): parser = argparse.ArgumentParser(description=self.description) parser.add_argument('-c', '--configuration', default='config.yaml', help='Configuration file') parser.add_argument('--debug', dest='debug', action='store_true', help='Log verbose') parser.add_argument('-m', '--monitor', action='store_true', dest='monitor', help='Monitor', default=False) parser.add_argument('--profile', action='store_true', dest='profile', help='Profile execution', default=False) parser.add_argument('--profile-output', help='Where to store the output of the profile data', default=None) parser.add_argument('--line-profiler', action='store_true', dest='line_profiler', help='Line profiler execution', default=False) parser.add_argument('--line-profiler-matcher', help='Line profiler execution', default=None) parser.add_argument('--line-profiler-output', help='Where to store the output of the line profiler data', default=None) parser.set_defaults(debug=False) return parser
def server_main(cooker, func, *args): cooker.pre_serve() if cooker.configuration.profile: try: import cProfile as profile except: import profile prof = profile.Profile() ret = profile.Profile.runcall(prof, func, *args) prof.dump_stats("profile.log") bb.utils.process_profilelog("profile.log") print("Raw profiling information saved to profile.log and processed statistics to profile.log.processed") else: ret = func(*args) cooker.post_serve() return ret
def run(self): if not self.profile: self.realrun() return try: import cProfile as profile except: import profile prof = profile.Profile() try: profile.Profile.runcall(prof, self.realrun) finally: logfile = "profile-parse-%s.log" % multiprocessing.current_process().name prof.dump_stats(logfile)
def run_profile(func, sort_order="cumtime", count=1, strip_dir=True, name_filter=""): """sort_order : keywords 'ncalls', 'tottime', 'cumtime', 'filename' """ @wraps(func) def wrapper(*args, **kwargs): def cmd(): for i in range(count): func(*args, **kwargs) prof = cProfile.Profile() _profile = prof.runctx("cmd()", globals(), locals()) stream = StringIO.StringIO() stats = pstats.Stats(_profile, stream=stream) if strip_dir: stats.strip_dirs() stats.sort_stats(sort_order) stats.print_stats(name_filter) return stream.getvalue() return wrapper
def tween_request_profiling(handler, registry): from cProfile import Profile req = [0] num_profile = registry["xom"].config.args.profile_requests # we need to use a list, so we can create a new Profile instance without # getting variable scope issues profile = [Profile()] def request_profiling_handler(request): profile[0].enable() try: return handler(request) finally: profile[0].disable() req[0] += 1 if req[0] >= num_profile: profile[0].print_stats("cumulative") req[0] = 0 profile[:] = [Profile()] return request_profiling_handler
def c_profile(sort_by='tottime'): def decorator(func): @wraps(func) def wrapped(*args, **kwargs): profiler = cProfile.Profile() profiler.enable() result = func(*args, **kwargs) profiler.disable() stream = StringIO() stats = pstats.Stats(profiler, stream=stream).sort_stats(sort_by) stats.print_stats(25) print(stream.getvalue()) return result return wrapped return decorator # https://zapier.com/engineering/profiling-python-boss/
def profiler(request): class CustomProfiler(object): def __init__(self, request): if request.config.getoption("--profile"): import cProfile self.profile = cProfile.Profile() else: self.profile = None def __enter__(self): if self.profile is None: return False self.profile.enable() def __exit__(self, *args, **kwargs): if self.profile is None: return False self.profile.disable() self.profile.dump_stats(request.function.__name__) return False return CustomProfiler(request)
def profileit(func): """ Decorator straight up stolen from stackoverflow """ def wrapper(*args, **kwargs): datafn = func.__name__ + ".profile" # Name the data file sensibly prof = cProfile.Profile() prof.enable() retval = prof.runcall(func, *args, **kwargs) prof.disable() stats = pstats.Stats(prof) stats.sort_stats('tottime').print_stats(20) print() print() stats.sort_stats('cumtime').print_stats(20) return retval return wrapper
def do_cprofile(func): """ ?????cProfile??????????? ??? @do_cprofile def expensive_function(): for x in range(10000): _ = x ** x return 'OK!' result = expensive_function() """ def profiled_func(*args, **kwargs): profile = cProfile.Profile() try: profile.enable() result = func(*args, **kwargs) profile.disable() return result finally: profile.print_stats() return profiled_func
def profiled_thread(func): """decorator to profile a thread or function. Profiling output will be written to 'agent_profile_<process_id>.<thread_id_>.<thread_name>.log'""" def wrapper(*args, **kwargs): profile = Profile() profile.enable() try: func(*args, **kwargs) finally: profile.disable() try: thread = current_thread() profile.dump_stats('profile_%s.%s.%s.log' % (getpid(), thread.name, thread.ident)) except: logger.exception('Failed to dump stats') return wrapper
def list_logs(): prof = cProfile.Profile() prof.enable() headers = {"Content-Type": "application/json"} conn = sqlite3.connect(app.config['DBFILE']) cur = conn.cursor() cur.execute("SELECT * FROM logs") logs = [{"id": log[0], "datetime": log[1], "level": log[2], "message": log[3]} for log in cur.fetchall()] conn.commit() conn.close() filter_query = request.args.get('filter', None) if filter_query: pattern = re.compile(filter_query) filtered_logs = [] for log in logs: if re.match(pattern, log['message']): filtered_logs.append(log) logs = filtered_logs response = json.dumps(logs), 200, headers prof.disable() prof.print_stats() return response
def __init__(self, file_path, sort_by='time', builtins=False): self._profiler = cProfile.Profile(builtins=builtins) self.file_path = file_path self.sort_by = sort_by
def __init__(self, thread_num, session, query, values, num_queries, protocol_version, profile): Thread.__init__(self) self.thread_num = thread_num self.session = session self.query = query self.values = values self.num_queries = num_queries self.protocol_version = protocol_version self.profiler = Profile() if profile else None
def __call__(self, environ, start_response): response_body = [] def catching_start_response(status, headers, exc_info=None): start_response(status, headers, exc_info) return response_body.append def runapp(): appiter = self._app(environ, catching_start_response) response_body.extend(appiter) if hasattr(appiter, 'close'): appiter.close() p = Profile() start = time.time() p.runcall(runapp) body = b''.join(response_body) elapsed = time.time() - start if self._profile_dir is not None: prof_filename = os.path.join(self._profile_dir, '%s.%s.%06dms.%d.prof' % ( environ['REQUEST_METHOD'], environ.get('PATH_INFO').strip( '/').replace('/', '.') or 'root', elapsed * 1000.0, time.time() )) p.dump_stats(prof_filename) else: stats = Stats(p, stream=self._stream) stats.sort_stats(*self._sort_by) self._stream.write('-' * 80) self._stream.write('\nPATH: %r\n' % environ.get('PATH_INFO')) stats.print_stats(*self._restrictions) self._stream.write('-' * 80 + '\n\n') return [body]
def setUp(self): random.seed(20) if PROFILE: self.profile = cProfile.Profile() self.profile.enable()
def setUp(self): random.seed(20) self.level_stats = [LevelStats()] * (CellId.MAX_LEVEL + 1) if PROFILE: self.profile = cProfile.Profile() self.profile.enable()
def reset_stats(self): """Reset accumulated profiler statistics.""" # Note: not using self.Profile, since pstats.Stats() fails then self.stats = pstats.Stats(profile.Profile()) self.ncalls = 0 self.skipped = 0
def _run_profiler(function, *args, **kwargs): """Run a profiler on the specified function.""" profiler = cProfile.Profile() profiler.enable() result = function(*args, **kwargs) profiler.disable() stats = pstats.Stats(profiler).sort_stats('cumtime') stats.print_stats() return result
def main(parser): opt = parser.parse_args() if opt['torch']: with torch.autograd.profiler.profile() as prof: TrainLoop(parser).train() print(prof.total_average()) sort_cpu = sorted(prof.key_averages(), key=lambda k: k.cpu_time) sort_cuda = sorted(prof.key_averages(), key=lambda k: k.cuda_time) def cpu(): for e in sort_cpu: print(e) def cuda(): for e in sort_cuda: print(e) cpu() if opt['debug']: print('`cpu()` prints out cpu-sorted list, ' '`cuda()` prints cuda-sorted list') pdb.set_trace() else: pr = cProfile.Profile() pr.enable() TrainLoop(parser).train() pr.disable() s = io.StringIO() sortby = 'cumulative' ps = pstats.Stats(pr, stream=s).sort_stats(sortby) ps.print_stats() print(s.getvalue()) if opt['debug']: pdb.set_trace()
def profile(func): @functools.wraps(func) def wrapper(*args, **kwargs): profiler = cProfile.Profile() try: return profiler.runcall(func, *args, **kwargs) finally: stats = pstats.Stats(profiler) stats.strip_dirs() stats.sort_stats('tottime') stats.print_stats() return wrapper
def profile_execute(self, pstat_file=None): pr = cProfile.Profile() pr.enable() exec_node_group(self) pr.disable() if pstat_file is not None: pr.dump_stats(pstat_file) s = io.StringIO() sortby = 'cumulative' ps = pstats.Stats(pr, stream=s) ps.strip_dirs() ps.sort_stats(sortby) ps.print_stats() text_name = self.name + " Profile" if text_name in bpy.data.texts: text = bpy.data.texts[text_name] else: text = bpy.data.texts.new(text_name) text.from_string(s.getvalue())
def run_profiled(self, coeff_file=None, kgrid_tp="coarse", write_outputs=True): profiler = cProfile.Profile() profiler.runcall(lambda: self.run(coeff_file, kgrid_tp=kgrid_tp, write_outputs=write_outputs)) stats = Stats(profiler, stream=STDOUT) stats.strip_dirs() stats.sort_stats('cumulative') stats.print_stats(15) # only print the top 10 (10 slowest functions)
def show(self, id): """ Print the profile stats to stdout, id is the RDD id """ stats = self.stats() if stats: print("=" * 60) print("Profile of RDD<id=%d>" % id) print("=" * 60) stats.sort_stats("time", "cumulative").print_stats()
def profile(self, func): """ Runs and profiles the method to_profile passed in. A profile object is returned. """ pr = cProfile.Profile() pr.runcall(func) st = pstats.Stats(pr) st.stream = None # make it picklable st.strip_dirs() # Adds a new profile to the existing accumulated value self._accumulator.add(st)
def _ui_event_loop(self): self._sem.acquire() if self._profile: import StringIO import cProfile import pstats pr = cProfile.Profile() pr.enable() self._ui.start(self) if self._profile: pr.disable() s = StringIO.StringIO() ps = pstats.Stats(pr, stream=s) ps.strip_dirs().sort_stats(self._profile).print_stats(30) self._profile = s.getvalue()
def run(self): import cProfile profiler = cProfile.Profile() profiler.enable() threading.Thread.run(self) profiler.disable() profiler.dump_stats(self.filename)
def profileMain(args, config): # pragma: no cover '''This is the main function for profiling http://code.google.com/appengine/kb/commontasks.html#profiling ''' import cProfile import pstats eyed3.log.debug("driver profileMain") prof = cProfile.Profile() prof = prof.runctx("main(args)", globals(), locals()) stream = StringIO() stats = pstats.Stats(prof, stream=stream) stats.sort_stats("time") # Or cumulative stats.print_stats(100) # 80 = how many to print # The rest is optional. stats.print_callees() stats.print_callers() sys.stderr.write("Profile data:\n%s\n" % stream.getvalue()) return 0
def __call__(self, environ, start_response): response_body = [] def catching_start_response(status, headers, exc_info=None): start_response(status, headers, exc_info) return response_body.append def runapp(): appiter = self._app(environ, catching_start_response) response_body.extend(appiter) if hasattr(appiter, 'close'): appiter.close() p = Profile() start = time.time() p.runcall(runapp) body = ''.join(response_body) elapsed = time.time() - start if self._profile_dir is not None: prof_filename = os.path.join(self._profile_dir, '%s.%s.%06dms.%d.prof' % ( environ['REQUEST_METHOD'], environ.get('PATH_INFO').strip('/').replace('/', '.') or 'root', elapsed * 1000.0, time.time() )) p.dump_stats(prof_filename) else: stats = Stats(p, stream=self._stream) stats.sort_stats(*self._sort_by) self._stream.write('-' * 80) self._stream.write('\nPATH: %r\n' % environ.get('PATH_INFO')) stats.print_stats(*self._restrictions) self._stream.write('-' * 80 + '\n\n') return [body]
def __init__(self, profilingProxy): """@param profilingProxy A multiprocessing.BaseProxy instance that should be connected to a remote ProfileAggregator. """ self.profilingProxy = profilingProxy self.profile = cProfile.Profile()
def __call__(self, environ, start_response): response_body = [] def catching_start_response(status, headers, exc_info=None): start_response(status, headers, exc_info) return response_body.append def runapp(): appiter = self._app(environ, catching_start_response) response_body.extend(appiter) if hasattr(appiter, 'close'): appiter.close() p = Profile() start = time.time() p.runcall(runapp) body = b''.join(response_body) elapsed = time.time() - start if self._profile_dir is not None: prof_filename = os.path.join(self._profile_dir, '%s.%s.%06dms.%d.prof' % ( environ['REQUEST_METHOD'], environ.get('PATH_INFO').strip('/').replace('/', '.') or 'root', elapsed * 1000.0, time.time() )) p.dump_stats(prof_filename) else: stats = Stats(p, stream=self._stream) stats.sort_stats(*self._sort_by) self._stream.write('-' * 80) self._stream.write('\nPATH: %r\n' % environ.get('PATH_INFO')) stats.print_stats(*self._restrictions) self._stream.write('-' * 80 + '\n\n') return [body]