我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gc.isenabled()。
def timeit(self, number=default_number): """Time 'number' executions of the main statement. To be precise, this executes the setup statement once, and then returns the time it takes to execute the main statement a number of times, as a float measured in seconds. The argument is the number of times through the loop, defaulting to one million. The main statement, the setup statement and the timer function to be used are passed to the constructor. """ if itertools: it = itertools.repeat(None, number) else: it = [None] * number gcold = gc.isenabled() gc.disable() timing = self.inner(it, self.timer) if gcold: gc.enable() return timing
def timeit(self, number=default_number): """Time 'number' executions of the main statement. To be precise, this executes the setup statement once, and then returns the time it takes to execute the main statement a number of times, as a float measured in seconds. The argument is the number of times through the loop, defaulting to one million. The main statement, the setup statement and the timer function to be used are passed to the constructor. """ if itertools: it = itertools.repeat(None, number) else: it = [None] * number gcold = gc.isenabled() gc.disable() try: timing = self.inner(it, self.timer) finally: if gcold: gc.enable() return timing
def test_load_refcount(): # Check that objects returned by np.load are directly freed based on # their refcount, rather than needing the gc to collect them. f = BytesIO() np.savez(f, [1, 2, 3]) f.seek(0) assert_(gc.isenabled()) gc.disable() try: gc.collect() np.load(f) # gc.collect returns the number of unreachable objects in cycles that # were found -- we are checking that no cycles were created by np.load n_objects_in_cycles = gc.collect() finally: gc.enable() assert_equal(n_objects_in_cycles, 0)
def _run(self, n): self._make_args(n) gcold = gc.isenabled() gc.disable() times = [] for i in range(self._cmd.args.runs): t_start = time.time() self._compute() elapsed = time.time() - t_start times.append(elapsed) if gcold: gc.enable() return times
def timeit(self, number=timeit.default_number): """Time 'number' executions of the main statement. To be precise, this executes the setup statement once, and then returns the time it takes to execute the main statement a number of times, as a float measured in seconds. The argument is the number of times through the loop, defaulting to one million. The main statement, the setup statement and the timer function to be used are passed to the constructor. """ it = itertools.repeat(None, number) gcold = gc.isenabled() gc.disable() try: timing = self.inner(it, self.timer) finally: if gcold: gc.enable() return timing
def test_free_from_gc(self): # Check that freeing of blocks by the garbage collector doesn't deadlock # (issue #12352). # Make sure the GC is enabled, and set lower collection thresholds to # make collections more frequent (and increase the probability of # deadlock). if not gc.isenabled(): gc.enable() self.addCleanup(gc.disable) thresholds = gc.get_threshold() self.addCleanup(gc.set_threshold, *thresholds) gc.set_threshold(10) # perform numerous block allocations, with cyclic references to make # sure objects are collected asynchronously by the gc for i in range(5000): a = multiprocessing.heap.BufferWrapper(1) b = multiprocessing.heap.BufferWrapper(1) # circular references a.buddy = b b.buddy = a # # #
def test_main(): enabled = gc.isenabled() gc.disable() assert not gc.isenabled() debug = gc.get_debug() gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak try: gc.collect() # Delete 2nd generation garbage run_unittest(GCTests, GCTogglingTests) finally: gc.set_debug(debug) # test gc.enable() even if GC is disabled by default if verbose: print("restoring automatic collection") # make sure to always test gc.enable() gc.enable() assert gc.isenabled() if not enabled: gc.disable()
def timeit(self, number=default_number): """Time 'number' executions of the main statement. To be precise, this executes the setup statement once, and then returns the time it takes to execute the main statement a number of times, as a float measured in seconds. The argument is the number of times through the loop, defaulting to one million. The main statement, the setup statement and the timer function to be used are passed to the constructor. """ it = itertools.repeat(None, number) gcold = gc.isenabled() gc.disable() try: timing = self.inner(it, self.timer) finally: if gcold: gc.enable() return timing
def test_main(): enabled = gc.isenabled() gc.disable() assert not gc.isenabled() debug = gc.get_debug() gc.set_debug(debug & ~gc.DEBUG_LEAK) # this test is supposed to leak try: gc.collect() # Delete 2nd generation garbage run_unittest(GCTests, GCTogglingTests, GCCallbackTests) finally: gc.set_debug(debug) # test gc.enable() even if GC is disabled by default if verbose: print("restoring automatic collection") # make sure to always test gc.enable() gc.enable() assert gc.isenabled() if not enabled: gc.disable()
def timeit(self, number=default_number): """Time 'number' executions of the main statement. To be precise, this executes the setup statement once, and then returns the time it takes to execute the main statement a number of times, as a float measured in seconds. The argument is the number of times through the loop, defaulting to one million. The main statement, the setup statement and the timer function to be used are passed to the constructor. """ inner = self.make_inner() gcold = gc.isenabled() if '__pypy__' not in sys.builtin_module_names: gc.disable() # only do that on CPython try: timing = inner(number, self.timer) finally: if gcold: gc.enable() return timing
def test_free_from_gc(self): # Check that freeing of blocks by the garbage collector doesn't deadlock # (issue #12352). # Make sure the GC is enabled, and set lower collection thresholds to # make collections more frequent (and increase the probability of # deadlock). if not gc.isenabled(): gc.enable() self.addCleanup(gc.disable) #thresholds = gc.get_threshold() #self.addCleanup(gc.set_threshold, *thresholds) #gc.set_threshold(10) # perform numerous block allocations, with cyclic references to make # sure objects are collected asynchronously by the gc for i in range(5000): a = multiprocessing.heap.BufferWrapper(1) b = multiprocessing.heap.BufferWrapper(1) # circular references a.buddy = b b.buddy = a # # #
def test_get_stats(self): stats = gc.get_stats() self.assertEqual(len(stats), 3) for st in stats: self.assertIsInstance(st, dict) self.assertEqual(set(st), {"collected", "collections", "uncollectable"}) self.assertGreaterEqual(st["collected"], 0) self.assertGreaterEqual(st["collections"], 0) self.assertGreaterEqual(st["uncollectable"], 0) # Check that collection counts are incremented correctly if gc.isenabled(): self.addCleanup(gc.enable) gc.disable() old = gc.get_stats() gc.collect(0) new = gc.get_stats() self.assertEqual(new[0]["collections"], old[0]["collections"] + 1) self.assertEqual(new[1]["collections"], old[1]["collections"]) self.assertEqual(new[2]["collections"], old[2]["collections"]) gc.collect(2) new = gc.get_stats() self.assertEqual(new[0]["collections"], old[0]["collections"] + 1) self.assertEqual(new[1]["collections"], old[1]["collections"]) self.assertEqual(new[2]["collections"], old[2]["collections"] + 1)
def setUp(self): """ Replace L{process} os, fcntl, sys, switchUID, fdesc and pty modules with the mock class L{MockOS}. """ if gc.isenabled(): self.addCleanup(gc.enable) else: self.addCleanup(gc.disable) self.mockos = MockOS() self.mockos.euid = 1236 self.mockos.egid = 1234 self.patch(process, "os", self.mockos) self.patch(process, "fcntl", self.mockos) self.patch(process, "sys", self.mockos) self.patch(process, "switchUID", self.mockos.switchUID) self.patch(process, "fdesc", self.mockos) self.patch(process.Process, "processReaderFactory", DumbProcessReader) self.patch(process.Process, "processWriterFactory", DumbProcessWriter) self.patch(process, "pty", self.mockos) self.mocksig = MockSignal() self.patch(process, "signal", self.mocksig)
def test_mockFork(self): """ Test a classic spawnProcess. Check the path of the client code: fork, exec, exit. """ gc.enable() cmd = b'/mock/ouch' d = defer.Deferred() p = TrivialProcessProtocol(d) try: reactor.spawnProcess(p, cmd, [b'ouch'], env=None, usePTY=False) except SystemError: self.assertTrue(self.mockos.exited) self.assertEqual( self.mockos.actions, [("fork", False), "exec", ("exit", 1)]) else: self.fail("Should not be here") # It should leave the garbage collector disabled. self.assertFalse(gc.isenabled())
def test_getWorkerArguments(self): """ C{_getWorkerArguments} discards options like C{random} as they only matter in the manager, and forwards options like C{recursionlimit} or C{disablegc}. """ self.addCleanup(sys.setrecursionlimit, sys.getrecursionlimit()) if gc.isenabled(): self.addCleanup(gc.enable) self.options.parseOptions(["--recursionlimit", "2000", "--random", "4", "--disablegc"]) args = self.options._getWorkerArguments() self.assertIn("--disablegc", args) args.remove("--disablegc") self.assertEqual(["--recursionlimit", "2000"], args)
def executeSQL(self, sql, connection=None, commit=False): """Execute the given SQL. This will connect to the database for the first time if necessary. This method will also log the SQL to self._sqlEcho, if it is not None. Returns the connection and cursor used and relies on connectionAndCursor() to obtain these. Note that you can pass in a connection to force a particular one to be used and a flag to commit immediately. """ sql = str(sql) # Excel-based models yield Unicode strings which some db modules don't like sql = sql.strip() if aggressiveGC: import gc assert gc.isenabled() gc.collect() self._sqlCount += 1 if self._sqlEcho: timestamp = funcs.timestamp()['pretty'] self._sqlEcho.write('SQL %04i. %s %s\n' % (self._sqlCount, timestamp, sql)) self._sqlEcho.flush() conn, cur = self.connectionAndCursor(connection) self._executeSQL(cur, sql) if commit: conn.commit() return conn, cur
def bench_on(runner, sym, Ns, trials, dtype=None): global args, kernel, out, mkl_layer prepare = globals().get("prepare_"+sym, prepare_default) kernel = globals().get("kernel_"+sym, None) if not kernel: kernel = getattr(np.linalg, sym) out_lvl = runner.__doc__.split('.')[0].strip() func_s = kernel.__doc__.split('.')[0].strip() log.debug('Preparing input data for %s (%s).. ' % (sym, func_s)) args = [prepare(int(i)) for i in Ns] it = range(len(Ns)) # pprint(Ns) out = np.empty(shape=(len(Ns), trials)) b = body(trials) tic, toc = (0, 0) log.debug('Warming up %s (%s).. ' % (sym, func_s)) runner(range(1000), empty_work) kernel(*args[0]) runner(range(1000), empty_work) log.debug('Benchmarking %s on %s: ' % (func_s, out_lvl)) gc_old = gc.isenabled() # gc.disable() tic = time.time() runner(it, b) toc = time.time() - tic if gc_old: gc.enable() if 'reused_pool' in globals(): del globals()['reused_pool'] #calculate average time and min time and also keep track of outliers (max time in the loop) min_time = np.amin(out) max_time = np.amax(out) mean_time = np.mean(out) stdev_time = np.std(out) #print("Min = %.5f, Max = %.5f, Mean = %.5f, stdev = %.5f " % (min_time, max_time, mean_time, stdev_time)) #final_times = [min_time, max_time, mean_time, stdev_time] print('## %s: Outter:%s, Inner:%s, Wall seconds:%f\n' % (sym, out_lvl, mkl_layer, float(toc))) return out
def _exitfunc(cls): # At shutdown invoke finalizers for which atexit is true. # This is called once all other non-daemonic threads have been # joined. reenable_gc = False try: if cls._registry: import gc if gc.isenabled(): reenable_gc = True gc.disable() pending = None while True: if pending is None or finalize._dirty: pending = cls._select_for_exit() finalize._dirty = False if not pending: break f = pending.pop() try: # gc is disabled, so (assuming no daemonic # threads) the following is the only line in # this function which might trigger creation # of a new finalizer f() except Exception: sys.excepthook(*sys.exc_info()) assert f not in cls._registry finally: # prevent any more finalizers from executing during shutdown finalize._shutdown = True if reenable_gc: gc.enable()
def __enter__(self): if self.disable_gc: self.gc_state = gc.isenabled() gc.disable() self.start = self.timer() return self
def __enter__(self): parent = inspect.currentframe().f_back try: if parent.f_code.co_flags & inspect.CO_NEWLOCALS: raise RuntimeError('timers only work when invoked at the module/script level') self._with_start = parent.f_lasti finally: del parent gc_enabled = gc.isenabled() gc.disable() self._gc_enabled = gc_enabled self._start_time = self.time_function() return self
def run(self, iterations=1000): gc_enabled = gc.isenabled() gc.disable() try: return _MeasurementSamples(self.get_sample() for _ in xrange(iterations)) finally: if gc_enabled: gc.enable()
def disable_gc(): have_gc = gc.isenabled() gc.disable() try: yield finally: if have_gc: gc.enable()
def setUp(self): self.using_gc = gc.isenabled() gc.disable()