我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用gc.get_count()。
def check(self): #return self.debug_cycles() # uncomment to just debug cycles l0, l1, l2 = gc.get_count() if self.debug: print('gc_check called:', l0, l1, l2) if l0 > self.threshold[0]: num = gc.collect(0) if self.debug: print('collecting gen 0, found: %d unreachable' % num) if l1 > self.threshold[1]: num = gc.collect(1) if self.debug: print('collecting gen 1, found: %d unreachable' % num) if l2 > self.threshold[2]: num = gc.collect(2) if self.debug: print('collecting gen 2, found: %d unreachable' % num)
def test_del_newclass(self): # __del__ methods can trigger collection, make this to happen thresholds = gc.get_threshold() gc.enable() gc.set_threshold(1) class A(object): def __del__(self): dir(self) a = A() del a gc.disable() gc.set_threshold(*thresholds) # The following two tests are fragile: # They precisely count the number of allocations, # which is highly implementation-dependent. # For example, disposed tuples are not freed, but reused. # To minimize variations, though, we first store the get_count() results # and check them at the end.
def test_collect_generations(self): gc.collect() # This object will "trickle" into generation N + 1 after # each call to collect(N) x = [] gc.collect(0) # x is now in gen 1 a, b, c = gc.get_count() gc.collect(1) # x is now in gen 2 d, e, f = gc.get_count() gc.collect(2) # x is now in gen 3 g, h, i = gc.get_count() # We don't check a, d, g since their exact values depends on # internal implementation details of the interpreter. self.assertEqual((b, c), (1, 0)) self.assertEqual((e, f), (0, 1)) self.assertEqual((h, i), (0, 0))
def check(self): """ Method called by the garbage collector timer to check if there is something to collect. """ # return self.debug_cycles() # uncomment to just debug cycles l0, l1, l2 = gc.get_count() if self.debug: logger.debug('gc_check called: {0} {1} {2}'.format(l0, l1, l2)) if l0 > self.threshold[0]: num = gc.collect(0) if self.debug: logger.debug('collecting gen 0, found: {0:d} unreachable' ''.format(num)) if l1 > self.threshold[1]: num = gc.collect(1) if self.debug: logger.debug('collecting gen 1, found: {0:d} unreachable' ''.format(num)) if l2 > self.threshold[2]: num = gc.collect(2) if self.debug: logger.debug('collecting gen 2, found: {0:d} ' 'unreachable'.format(num))
def test_get_count(self): gc.collect() a, b, c = gc.get_count() x = [] d, e, f = gc.get_count() self.assertEqual((b, c), (0, 0)) self.assertEqual((e, f), (0, 0)) # This is less fragile than asserting that a equals 0. self.assertLess(a, 5) # Between the two calls to get_count(), at least one object was # created (the list). self.assertGreater(d, a)
def test_get_count(self): # Avoid future allocation of method object assertEqual = self._baseAssertEqual gc.collect() assertEqual(gc.get_count(), (0, 0, 0)) a = dict() # since gc.collect(), we created two objects: # the dict, and the tuple returned by get_count() assertEqual(gc.get_count(), (2, 0, 0))
def test_collect_generations(self): # Avoid future allocation of method object assertEqual = self.assertEqual gc.collect() a = dict() gc.collect(0) assertEqual(gc.get_count(), (0, 1, 0)) gc.collect(1) assertEqual(gc.get_count(), (0, 0, 1)) gc.collect(2) assertEqual(gc.get_count(), (0, 0, 0))
def report(self): # CPU if not runtime_info.OS_WIN: cpu_time = read_cpu_time() if cpu_time != None: cpu_time_metric = self.report_metric(Metric.TYPE_COUNTER, Metric.CATEGORY_CPU, Metric.NAME_CPU_TIME, Metric.UNIT_NANOSECOND, cpu_time) if cpu_time_metric.has_measurement(): cpu_usage = (cpu_time_metric.measurement.value / (60 * 1e9)) * 100 try: cpu_usage = cpu_usage / multiprocessing.cpu_count() except Exception: pass self.report_metric(Metric.TYPE_STATE, Metric.CATEGORY_CPU, Metric.NAME_CPU_USAGE, Metric.UNIT_PERCENT, cpu_usage) # Memory if not runtime_info.OS_WIN: max_rss = read_max_rss() if max_rss != None: self.report_metric(Metric.TYPE_STATE, Metric.CATEGORY_MEMORY, Metric.NAME_MAX_RSS, Metric.UNIT_KILOBYTE, max_rss) if runtime_info.OS_LINUX: current_rss = read_current_rss() if current_rss != None: self.report_metric(Metric.TYPE_STATE, Metric.CATEGORY_MEMORY, Metric.NAME_CURRENT_RSS, Metric.UNIT_KILOBYTE, current_rss) vm_size = read_vm_size() if vm_size != None: self.report_metric(Metric.TYPE_STATE, Metric.CATEGORY_MEMORY, Metric.NAME_VM_SIZE, Metric.UNIT_KILOBYTE, vm_size) # GC stats gc_count0, gc_count1, gc_count2 = gc.get_count() total_gc_count = gc_count0 + gc_count1 + gc_count2 self.report_metric(Metric.TYPE_STATE, Metric.CATEGORY_GC, Metric.NAME_GC_COUNT, Metric.UNIT_NONE, total_gc_count) if min_version(3, 4): gc_stats = gc.get_stats() if gc_stats and gc_stats[0] and gc_stats[1] and gc_stats[2]: total_collections = gc_stats[0]['collections'] + gc_stats[1]['collections'] + gc_stats[2]['collections'] self.report_metric(Metric.TYPE_COUNTER, Metric.CATEGORY_GC, Metric.NAME_GC_COLLECTIONS, Metric.UNIT_NONE, total_collections) total_collected = gc_stats[0]['collected'] + gc_stats[1]['collected'] + gc_stats[2]['collected'] self.report_metric(Metric.TYPE_COUNTER, Metric.CATEGORY_GC, Metric.NAME_GC_COLLECTED, Metric.UNIT_NONE, total_collected) total_uncollectable = gc_stats[0]['uncollectable'] + gc_stats[1]['uncollectable'] + gc_stats[2]['uncollectable'] self.report_metric(Metric.TYPE_STATE, Metric.CATEGORY_GC, Metric.NAME_GC_UNCOLLECTABLE, Metric.UNIT_NONE, total_uncollectable) # Runtime thread_count = threading.active_count() self.report_metric(Metric.TYPE_STATE, Metric.CATEGORY_RUNTIME, Metric.NAME_THREAD_COUNT, Metric.UNIT_NONE, thread_count)
def collect_metrics(self): u = resource.getrusage(resource.RUSAGE_SELF) if gc_.isenabled(): c = list(gc_.get_count()) th = list(gc_.get_threshold()) g = GC(collect0=c[0] if not self.last_collect else c[0] - self.last_collect[0], collect1=c[1] if not self.last_collect else c[ 1] - self.last_collect[1], collect2=c[2] if not self.last_collect else c[ 2] - self.last_collect[2], threshold0=th[0], threshold1=th[1], threshold2=th[2]) thr = t.enumerate() daemon_threads = len([tr.daemon and tr.is_alive() for tr in thr]) alive_threads = len([not tr.daemon and tr.is_alive() for tr in thr]) dead_threads = len([not tr.is_alive() for tr in thr]) m = Metrics(ru_utime=u[0] if not self.last_usage else u[0] - self.last_usage[0], ru_stime=u[1] if not self.last_usage else u[1] - self.last_usage[1], ru_maxrss=u[2], ru_ixrss=u[3], ru_idrss=u[4], ru_isrss=u[5], ru_minflt=u[6] if not self.last_usage else u[6] - self.last_usage[6], ru_majflt=u[7] if not self.last_usage else u[7] - self.last_usage[7], ru_nswap=u[8] if not self.last_usage else u[8] - self.last_usage[8], ru_inblock=u[9] if not self.last_usage else u[9] - self.last_usage[9], ru_oublock=u[10] if not self.last_usage else u[10] - self.last_usage[10], ru_msgsnd=u[11] if not self.last_usage else u[11] - self.last_usage[11], ru_msgrcv=u[12] if not self.last_usage else u[12] - self.last_usage[12], ru_nsignals=u[13] if not self.last_usage else u[13] - self.last_usage[13], ru_nvcs=u[14] if not self.last_usage else u[14] - self.last_usage[14], ru_nivcsw=u[15] if not self.last_usage else u[15] - self.last_usage[15], alive_threads=alive_threads, dead_threads=dead_threads, daemon_threads=daemon_threads, gc=g) self.last_usage = u if gc_.isenabled(): self.last_collect = c return m