Python gc 模块,collect() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用gc.collect()

项目:pykit    作者:baishancloud    | 项目源码 | 文件源码
def mem_check(opts):

    while True:

        if opts['gc']:
            try:
                gc.collect()
            except Exception as e:
                logging.exception(repr(e) + ' while gc.collect()')

        try:
            rss = psutil.Process(os.getpid()).memory_info().rss

            logging.info('current memory used: {rss}'.format(rss=rss))

            if rss > opts['threshold']:
                memory_dump(opts)
                os.abort()
        except Exception as e:
            logging.exception(repr(e) + ' while checking memory usage')

        finally:
            time.sleep(opts['interval'])
项目:deb-python-pyngus    作者:openstack    | 项目源码 | 文件源码
def test_cleanup(self):
        gc.enable()
        gc.collect()
        assert not gc.garbage, "Object leak: %s" % str(gc.garbage)
        container = pyngus.Container("abc")
        c1 = container.create_connection("c1")
        c2 = container.create_connection("c2")
        assert c2
        del c2
        gc.collect()
        c2 = container.get_connection("c2")
        assert c2
        c1 = container.get_connection("c1")
        assert c1
        c1.create_receiver("r1")
        c1.create_sender("s1")
        del c1
        del c2
        container.destroy()
        del container
        gc.collect()
        assert not gc.garbage, "Object leak: %s" % str(gc.garbage)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def test_getViewWidget_deleted():
    view = pg.PlotWidget()
    item = pg.InfiniteLine()
    view.addItem(item)
    assert item.getViewWidget() is view

    # Arrange to have Qt automatically delete the view widget
    obj = pg.QtGui.QWidget()
    view.setParent(obj)
    del obj
    gc.collect()

    assert not pg.Qt.isQObjectAlive(view)
    assert item.getViewWidget() is None


#if __name__ == '__main__':
    #view = pg.PlotItem()
    #vref = weakref.ref(view)
    #item = pg.InfiniteLine()
    #view.addItem(item)
    #del view
    #gc.collect()
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def check(self):
        #return self.debug_cycles() # uncomment to just debug cycles
        l0, l1, l2 = gc.get_count()
        if self.debug:
            print('gc_check called:', l0, l1, l2)
        if l0 > self.threshold[0]:
            num = gc.collect(0)
            if self.debug:
                print('collecting gen 0, found: %d unreachable' % num)
            if l1 > self.threshold[1]:
                num = gc.collect(1)
                if self.debug:
                    print('collecting gen 1, found: %d unreachable' % num)
                if l2 > self.threshold[2]:
                    num = gc.collect(2)
                    if self.debug:
                        print('collecting gen 2, found: %d unreachable' % num)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def _getr(slist, olist, first=True):
    i = 0 
    for e in slist:

        oid = id(e)
        typ = type(e)
        if oid in olist or typ is int:    ## or e in olist:     ## since we're excluding all ints, there is no longer a need to check for olist keys
            continue
        olist[oid] = e
        if first and (i%1000) == 0:
            gc.collect()
        tl = gc.get_referents(e)
        if tl:
            _getr(tl, olist, first=False)
        i += 1        
# The public function.
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def start(self):
        """
        Remember the current set of objects as the comparison for all future calls to diff()
        Called automatically on init, but can be called manually as well.
        """
        refs, count, objs = self.collect()
        for r in self.startRefs:
            self.forgetRef(self.startRefs[r])
        self.startRefs.clear()
        self.startRefs.update(refs)
        for r in refs:
            self.rememberRef(r)
        self.startCount.clear()
        self.startCount.update(count)
        #self.newRefs.clear()
        #self.newRefs.update(refs)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def test_getViewWidget_deleted():
    view = pg.PlotWidget()
    item = pg.InfiniteLine()
    view.addItem(item)
    assert item.getViewWidget() is view

    # Arrange to have Qt automatically delete the view widget
    obj = pg.QtGui.QWidget()
    view.setParent(obj)
    del obj
    gc.collect()

    assert not pg.Qt.isQObjectAlive(view)
    assert item.getViewWidget() is None


#if __name__ == '__main__':
    #view = pg.PlotItem()
    #vref = weakref.ref(view)
    #item = pg.InfiniteLine()
    #view.addItem(item)
    #del view
    #gc.collect()
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def check(self):
        #return self.debug_cycles() # uncomment to just debug cycles
        l0, l1, l2 = gc.get_count()
        if self.debug:
            print('gc_check called:', l0, l1, l2)
        if l0 > self.threshold[0]:
            num = gc.collect(0)
            if self.debug:
                print('collecting gen 0, found: %d unreachable' % num)
            if l1 > self.threshold[1]:
                num = gc.collect(1)
                if self.debug:
                    print('collecting gen 1, found: %d unreachable' % num)
                if l2 > self.threshold[2]:
                    num = gc.collect(2)
                    if self.debug:
                        print('collecting gen 2, found: %d unreachable' % num)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def _getr(slist, olist, first=True):
    i = 0 
    for e in slist:

        oid = id(e)
        typ = type(e)
        if oid in olist or typ is int:    ## or e in olist:     ## since we're excluding all ints, there is no longer a need to check for olist keys
            continue
        olist[oid] = e
        if first and (i%1000) == 0:
            gc.collect()
        tl = gc.get_referents(e)
        if tl:
            _getr(tl, olist, first=False)
        i += 1        
# The public function.
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def start(self):
        """
        Remember the current set of objects as the comparison for all future calls to diff()
        Called automatically on init, but can be called manually as well.
        """
        refs, count, objs = self.collect()
        for r in self.startRefs:
            self.forgetRef(self.startRefs[r])
        self.startRefs.clear()
        self.startRefs.update(refs)
        for r in refs:
            self.rememberRef(r)
        self.startCount.clear()
        self.startCount.update(count)
        #self.newRefs.clear()
        #self.newRefs.update(refs)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_lots_of_queries(self):
        import resource
        import objgraph

        class LoadTest(Model):
            k = columns.Integer(primary_key=True)
            v = columns.Integer()

        sync_table(LoadTest)
        gc.collect()
        objgraph.show_most_common_types()

        print("Starting...")

        for i in range(1000000):
            if i % 25000 == 0:
                # print memory statistic
                print("Memory usage: %s" % (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss))

            LoadTest.create(k=i, v=i)

        objgraph.show_most_common_types()

        raise Exception("you shouldn't be here")
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def setUp(self):
#        gc.collect()
        self.zeroVec = Vector2()
        self.e1 = Vector2(1, 0)
        self.e2 = Vector2(0, 1)
#        self.t1 = (random(), random())
        self.t1 = (1.2, 3.4)
        self.l1 = list(self.t1)
        self.v1 = Vector2(self.t1)
#        self.t2 = (random(), random())
        self.t2 = (5.6, 7.8)
        self.l2 = list(self.t2)
        self.v2 = Vector2(self.t2)
#        self.s1 = random()
#        self.s2 = random()
        self.s1 = 5.6
        self.s2 = 7.8
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def setUp(self):
#        gc.collect()
        self.zeroVec = Vector3()
        self.e1 = Vector3(1, 0, 0)
        self.e2 = Vector3(0, 1, 0)
        self.e3 = Vector3(0, 0, 1)
#        self.t1 = (random(), random())
        self.t1 = (1.2, 3.4, 9.6)
        self.l1 = list(self.t1)
        self.v1 = Vector3(self.t1)
#        self.t2 = (random(), random())
        self.t2 = (5.6, 7.8, 2.1)
        self.l2 = list(self.t2)
        self.v2 = Vector3(self.t2)
#        self.s1 = random()
#        self.s2 = random()
        self.s1 = 5.6
        self.s2 = 7.8
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def make_gc_snapShot(filename, name):
        """Append the signatures to a file, giving them the given
        'name'. A signature is a pair object_id / type_name"""
    global first_time
    if first_time:
        gc.collect()
        first_time = False
    contents = []
    for o in gc.get_objects():
        try:
            tname = o.__class__.__name__
        except AttributeError:
            tname = str(type(o))
        contents.append((id(o), tname))
        del tname
    f = open(filename, 'a')
    pickle.dump((name, contents), f)
    f.close()
    del contents
    del f
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def reach(self, ids):
            """
            \param ids Iterable of object id, as returned by x[0],
            with x in the result of (snapshot2 - snapshot1)

            Return a dict id -> object with that id currently known.

            The objects recorded with these id might have been
            replaced by new ones... so we might end-up seeing objects
            that don't correspond to the original ones. This is
            especially true after a gc.collect()
            """
            result = dict()
            for obj in gc.get_objects():
                if id(obj) in ids:
                    result[id(obj)] = obj
            return result
项目:GOS    作者:crcresearch    | 项目源码 | 文件源码
def create_agents(self, generator):
        """
        Given information on a set of countries and a generator function,
        generate the agents and assign the results to ``self.agents``.

        :type generator: DataFrame, str, int
        :param generator: A function which generates the agents.
        """
        self.generator = generator
        country_array = pd.concat([pd.Series([c] * k["Population"]) for c, k in self.df.iterrows()])
        country_array.index = range(len(country_array))
        # Garbage collect before creating new processes.
        gc.collect()
        self.agents = pd.concat(
            self.pool.imap(self._gen_agents,
                           np.array_split(country_array, self.processes * self.splits))
        )
        self.agents.index = range(len(self.agents))
项目:GOS    作者:crcresearch    | 项目源码 | 文件源码
def create_agents(self, generator):
        """
        Given information on a set of countries and a generator function,
        generate the agents and assign the results to ``self.agents``.

        :type generator: DataFrame, str, int
        :param generator: A function which generates the agents.
        """
        self.generator = generator
        country_array = pd.concat([pd.Series([c] * k["Population"]) for c, k in self.df.iterrows()])
        country_array.index = range(len(country_array))
        # Garbage collect before creating new processes.
        gc.collect()
        self.agents = pd.concat(
            self.pool.imap(self._gen_agents,
                           np.array_split(country_array, self.processes * self.splits))
        )
        self.agents.index = range(len(self.agents))
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer(func, repetitions=100000):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper

#------------------------------------------------------------------------------
# [ timer_X function decorators ]
#   replicate the above decorator with different number of repetitions
#------------------------------------------------------------------------------
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer_10(func, repetitions=10):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer_100(func, repetitions=100):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer_1k(func, repetitions=1000):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer_10k(func, repetitions=10000):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer(func, repetitions=100000):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper

#------------------------------------------------------------------------------
# [ timer_X function decorators ]
#   replicate the above decorator with different number of repetitions
#------------------------------------------------------------------------------
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer_10(func, repetitions=10):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer_100(func, repetitions=100):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer_1k(func, repetitions=1000):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def timer_10k(func, repetitions=10000):
    @wraps(func)
    def wrapper(*args, **kwargs):
        sys.stdout.write("Starting " + str(repetitions) + " repetitions of " + func.__name__ + "()...")
        sys.stdout.flush()
        print(" ")
        # disable garbage collection
        gc.collect()
        gc.disable()
        start = time.time()
        for x in range(repetitions):
            result = func(*args, **kwargs)
        end = time.time()
        gc.enable()  # re-enable garbage collection
        gc.collect()
        print(str(repetitions) + " repetitions of " + func.__name__ + " : " + str(end-start) + " sec")
        return result
    return wrapper
项目:CNN_denoise    作者:weedwind    | 项目源码 | 文件源码
def make_chunk(filename, arr, arr_len):

   remain_arr = []
   dump_arr = []
   prob = float(chunk_num_frms) / arr_len

   for e in arr:
      flag_choose = np.random.binomial(1, prob)
      if flag_choose == 1:
         dump_arr.append(e)
         arr_len -= 1
      else:
         remain_arr.append(e)

   save_hd5(filename, dump_arr)

   del dump_arr[:]
   del dump_arr
   gc.collect()

   return remain_arr, arr_len
项目:pykit    作者:baishancloud    | 项目源码 | 文件源码
def test_it(collectible):

    dd()
    dd('======= ', ('collectible' if collectible else 'uncollectible'), ' object =======')
    dd()

    gc.collect()
    dd('*** init,     nr of referrers: ', len(gc.get_referrers(One)))
    dd('              garbage:         ', gc.garbage)

    one = One(collectible)
    dd('              created:         ', one.typ, ': ', one)
    dd('              nr of referrers: ', len(gc.get_referrers(One)))

    dd('              delete:')
    del one

    gc.collect()

    dd('*** after gc, nr of referrers: ', len(gc.get_referrers(One)))
    dd('              garbage:         ', gc.garbage)
项目:zoocore    作者:dsparrow27    | 项目源码 | 文件源码
def flushUnder(dirpath):
    """Flushes all modules that live under the given directory

    :param dirpath: the name of the top most directory to search under.
    :type dirpath: str
    """
    modulePaths = list()
    for name, module in sys.modules.items():
        if module is None:
            del sys.modules[name]
            continue
        try:
            moduleDirpath = os.path.realpath(os.path.dirname(inspect.getfile(module)))
            if moduleDirpath.startswith(dirpath):
                modulePaths.append((name, inspect.getfile(sys.modules[name])))
                del sys.modules[name]
                logger.debug('unloaded module: %s ' % name)

        except TypeError:
            continue

    # Force a garbage collection
    gc.collect()
    return modulePaths
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_diagnostics_life(self):
        import gc
        from weakref import ref

        def tmp():
            cur = self.conn.cursor()
            try:
                cur.execute("select * from nonexist")
            except psycopg2.Error as exc:
                return cur, exc

        cur, e = tmp()
        diag = e.diag
        w = ref(cur)

        del e, cur
        gc.collect()
        assert(w() is not None)

        self.assertEqual(diag.sqlstate, '42P01')

        del diag
        gc.collect()
        gc.collect()
        assert(w() is None)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_diagnostics_life(self):
        import gc
        from weakref import ref

        def tmp():
            cur = self.conn.cursor()
            try:
                cur.execute("select * from nonexist")
            except psycopg2.Error, exc:
                return cur, exc

        cur, e = tmp()
        diag = e.diag
        w = ref(cur)

        del e, cur
        gc.collect()
        assert(w() is not None)

        self.assertEqual(diag.sqlstate, '42P01')

        del diag
        gc.collect()
        gc.collect()
        assert(w() is None)
项目:SIDR    作者:damurdock    | 项目源码 | 文件源码
def runAnalysis(bam, fasta, blastresults, taxdump, modelOutput, output, tokeep, toremove, binary, target, level):
    taxdump, taxidDict = common.parseTaxdump(taxdump, False)
    gc.collect()
    click.echo("Taxdump parsed, %d taxIDs loaded" % len(taxdump))
    contigs = readFasta(fasta)
    gc.collect()
    click.echo("FASTA loaded, %d contigs returned" % len(contigs))
    contigs = readBAM(bam, contigs)
    gc.collect()
    click.echo("BAM loaded")
    contigs, classMap, classList = readBLAST(blastresults,
                                             taxdump, level.lower(), contigs)
    gc.collect()
    click.echo("BLAST results loaded")
    corpus, testdata, features = common.constructCorpus(list(contigs.values()), classMap, binary, target)
    gc.collect()
    click.echo("Corpus constucted, %d contigs in corpus and %d contigs in test data" % (len(corpus), len(testdata)))
    classifier = common.constructModel(corpus, classList, features, modelOutput)
    result = common.classifyData(classifier, testdata, classMap)
    common.generateOutput(tokeep, toremove, result, contigs.values(), target, output)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_gc(self):
        """test close&term by garbage collection alone"""
        if PYPY:
            raise SkipTest("GC doesn't work ")

        # test credit @dln (GH #137):
        def gcf():
            def inner():
                ctx = self.Context()
                s = ctx.socket(zmq.PUSH)
            inner()
            gc.collect()
        t = Thread(target=gcf)
        t.start()
        t.join(timeout=1)
        self.assertFalse(t.is_alive(), "Garbage collection should have cleaned up context")
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _cleanUp(self, result):
        try:
            if self.forceGarbageCollection:
                gc.collect()
            util._Janitor().postCaseCleanup()
        except util.FailureError, e:
            result.addError(self, e.original)
            self._passed = False
        except:
            result.cleanupErrors(failure.Failure())
            self._passed = False
        for error in self._observer.getErrors():
            result.addError(self, error)
            self._passed = False
        self.flushLoggedErrors()
        self._removeObserver()
        if self._passed:
            result.addSuccess(self)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def pytest_runtest_item(self, item):
        lines1 = self.get_open_files()
        yield
        if hasattr(sys, "pypy_version_info"):
            gc.collect()
        lines2 = self.get_open_files()

        new_fds = set([t[0] for t in lines2]) - set([t[0] for t in lines1])
        leaked_files = [t for t in lines2 if t[0] in new_fds]
        if leaked_files:
            error = []
            error.append("***** %s FD leakage detected" % len(leaked_files))
            error.extend([str(f) for f in leaked_files])
            error.append("*** Before:")
            error.extend([str(f) for f in lines1])
            error.append("*** After:")
            error.extend([str(f) for f in lines2])
            error.append(error[0])
            error.append("*** function %s:%s: %s " % item.location)
            pytest.fail("\n".join(error), pytrace=False)


# XXX copied from execnet's conftest.py - needs to be merged
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def getmodulecol(self,  source, configargs=(), withinit=False):
        """Return the module collection node for ``source``.

        This writes ``source`` to a file using :py:meth:`makepyfile`
        and then runs the pytest collection on it, returning the
        collection node for the test module.

        :param source: The source code of the module to collect.

        :param configargs: Any extra arguments to pass to
           :py:meth:`parseconfigure`.

        :param withinit: Whether to also write a ``__init__.py`` file
           to the temporarly directory to ensure it is a package.

        """
        kw = {self.request.function.__name__: Source(source).strip()}
        path = self.makepyfile(**kw)
        if withinit:
            self.makepyfile(__init__ = "#")
        self.config = config = self.parseconfigure(path, *configargs)
        node = self.getnode(config, path)
        return node
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_fom_buffer(self):
        a = array.array("i", range(16))
        x = (c_int * 16).from_buffer(a)

        y = X.from_buffer(a)
        self.assertEqual(y.c_int, a[0])
        self.assertFalse(y.init_called)

        self.assertEqual(x[:], a.tolist())

        a[0], a[-1] = 200, -200
        self.assertEqual(x[:], a.tolist())

        self.assertIn(a, x._objects.values())

        self.assertRaises(ValueError,
                          c_int.from_buffer, a, -1)

        expected = x[:]
        del a; gc.collect(); gc.collect(); gc.collect()
        self.assertEqual(x[:], expected)

        self.assertRaises(TypeError,
                          (c_char * 16).from_buffer, "a" * 16)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_from_buffer_copy(self):
        a = array.array("i", range(16))
        x = (c_int * 16).from_buffer_copy(a)

        y = X.from_buffer_copy(a)
        self.assertEqual(y.c_int, a[0])
        self.assertFalse(y.init_called)

        self.assertEqual(x[:], range(16))

        a[0], a[-1] = 200, -200
        self.assertEqual(x[:], range(16))

        self.assertEqual(x._objects, None)

        self.assertRaises(ValueError,
                          c_int.from_buffer, a, -1)

        del a; gc.collect(); gc.collect(); gc.collect()
        self.assertEqual(x[:], range(16))

        x = (c_char * 16).from_buffer_copy("a" * 16)
        self.assertEqual(x[:], "a" * 16)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_1(self):
        from sys import getrefcount as grc

        f = dll._testfunc_callback_i_if
        f.restype = ctypes.c_int
        f.argtypes = [ctypes.c_int, MyCallback]

        def callback(value):
            #print "called back with", value
            return value

        self.assertEqual(grc(callback), 2)
        cb = MyCallback(callback)

        self.assertGreater(grc(callback), 2)
        result = f(-10, cb)
        self.assertEqual(result, -18)
        cb = None

        gc.collect()

        self.assertEqual(grc(callback), 2)
项目:Recruit    作者:Weiyanyu    | 项目源码 | 文件源码
def showImage(self): 
        #????????????????? main thread is not in main loop ????????main loop???????????????????????????UI???????????
        try:
            image = GenImage(os.getcwd() + '/resource/%s/' % (self.type))
            image.generateImage('position_for_image.csv','1.png','bar')             
            image.generateImage('salary_for_image.csv','2.png','pie')
        except:
            self.networkError()

        PixMapSalary = QtGui.QPixmap(os.getcwd() + '/resource/%s/images/1.png' % (self.type)).scaled(400,600)
        self.SalaryImage.setPixmap(PixMapSalary)
        PixMapPosition = QtGui.QPixmap(os.getcwd() + '/resource/%s/images/2.png' % (self.type)).scaled(500,500)
        self.PositionImage.setPixmap(PixMapPosition)

        del image
        gc.collect()

        #??????
        self.showStaff()

    #????????????????????????????webbrowser???
项目:deb-python-functools32    作者:openstack    | 项目源码 | 文件源码
def test_main(verbose=None):
    test_classes = (
        TestPartial,
        TestPartialSubclass,
        TestPythonPartial,
        TestUpdateWrapper,
        TestTotalOrdering,
        TestCmpToKey,
        TestWraps,
        TestReduce,
        TestLRU,
        TestOrderedDict,
    )
    support.run_unittest(*test_classes)

    # verify reference counting
    if verbose and hasattr(sys, "gettotalrefcount"):
        import gc
        counts = [None] * 5
        for i in range(len(counts)):
            support.run_unittest(*test_classes)
            gc.collect()
            counts[i] = sys.gettotalrefcount()
        print(counts)
项目:deb-python-functools32    作者:openstack    | 项目源码 | 文件源码
def gc_collect():
    """Force as many objects as possible to be collected.

    In non-CPython implementations of Python, this is needed because timely
    deallocation is not guaranteed by the garbage collector.  (Even in CPython
    this can be the case in case of reference cycles.)  This means that __del__
    methods may be called later than expected and weakrefs may remain alive for
    longer than expected.  This function tries its best to force all garbage
    objects to disappear.
    """
    gc.collect()
    if is_jython:
        time.sleep(0.1)
    gc.collect()
    gc.collect()


#=======================================================================
# Decorator for running a function in a different locale, correctly resetting
# it afterwards.
项目:zing    作者:evernote    | 项目源码 | 文件源码
def delete(self, *args, **kwargs):
        directory = self.directory

        # Just doing a plain delete will collect all related objects in memory
        # before deleting: translation projects, stores, units, quality checks,
        # suggestions, and submissions.
        # This can easily take down a process. If we do a translation project
        # at a time and force garbage collection, things stay much more
        # managable.
        import gc
        gc.collect()
        for tp in self.translationproject_set.iterator():
            tp.delete()
            gc.collect()

        super(Project, self).delete(*args, **kwargs)

        directory.delete()
项目:Leics    作者:LeicsFrameWork    | 项目源码 | 文件源码
def test_threaded_leak(self):
            gg = []
            def worker():
                # only main greenlet present
                gg.append(weakref.ref(greenlet.getcurrent()))
            for i in range(2):
                t = threading.Thread(target=worker)
                t.start()
                t.join()
                del t
            greenlet.getcurrent() # update ts_current
            self.recycle_threads()
            greenlet.getcurrent() # update ts_current
            gc.collect()
            greenlet.getcurrent() # update ts_current
            for g in gg:
                self.assertTrue(g() is None)
项目:Leics    作者:LeicsFrameWork    | 项目源码 | 文件源码
def test_threaded_adv_leak(self):
            gg = []
            def worker():
                # main and additional *finished* greenlets
                ll = greenlet.getcurrent().ll = []
                def additional():
                    ll.append(greenlet.getcurrent())
                for i in range(2):
                    greenlet.greenlet(additional).switch()
                gg.append(weakref.ref(greenlet.getcurrent()))
            for i in range(2):
                t = threading.Thread(target=worker)
                t.start()
                t.join()
                del t
            greenlet.getcurrent() # update ts_current
            self.recycle_threads()
            greenlet.getcurrent() # update ts_current
            gc.collect()
            greenlet.getcurrent() # update ts_current
            for g in gg:
                self.assertTrue(g() is None)
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def __call__(self, result = None):
        # For the COM suite's sake, always ensure we don't leak
        # gateways/interfaces
        from pythoncom import _GetInterfaceCount, _GetGatewayCount
        gc.collect()
        ni = _GetInterfaceCount()
        ng = _GetGatewayCount()
        self.real_test(result)
        # Failed - no point checking anything else
        if result.shouldStop or not result.wasSuccessful():
            return
        self._do_leak_tests(result)
        gc.collect()
        lost_i = _GetInterfaceCount() - ni
        lost_g = _GetGatewayCount() - ng
        if lost_i or lost_g:
            msg = "%d interface objects and %d gateway objects leaked" \
                                                        % (lost_i, lost_g)
            exc = AssertionError(msg)
            result.addFailure(self.real_test, (exc.__class__, exc, None))
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def _do_leak_tests(self, result = None):
        try:
            gtrc = sys.gettotalrefcount
        except AttributeError:
            return # can't do leak tests in this build
        # Assume already called once, to prime any caches etc
        gc.collect()
        trc = gtrc()
        for i in range(self.num_leak_iters):
            self.real_test(result)
            if result.shouldStop:
                break
        del i # created after we remembered the refcount!
        # int division here means one or 2 stray references won't force 
        # failure, but one per loop
        gc.collect()
        lost = (gtrc() - trc) // self.num_leak_iters
        if lost < 0:
            msg = "LeakTest: %s appeared to gain %d references!!" % (self.real_test, -lost)
            result.addFailure(self.real_test, (AssertionError, msg, None))
        if lost > 0:
            msg = "LeakTest: %s lost %d references" % (self.real_test, lost)
            exc = AssertionError(msg)
            result.addFailure(self.real_test, (exc.__class__, exc, None))
项目:human-name-py    作者:djudd    | 项目源码 | 文件源码
def test_no_memory_leak():
    import gc
    import os

    def rss():
        gc.collect()
        out = os.popen("ps -o rss= -p %d" % os.getpid()).read()
        return int(out.strip())

    before = rss()

    for _ in range(100000):
        n = Name.parse("Reallyverylongfirstname Reallyverylonglastname")
        n.given_name
        n.surname

    after = rss()

    assert after < 1.25 * before
项目:SwiftKitten    作者:johncsnyder    | 项目源码 | 文件源码
def test_ffi_type_not_immortal():
    import weakref, gc
    ffi = _cffi1_backend.FFI()
    t1 = ffi.typeof("int **")
    t2 = ffi.typeof("int *")
    w1 = weakref.ref(t1)
    w2 = weakref.ref(t2)
    del t1, ffi
    gc.collect()
    assert w1() is None
    assert w2() is t2
    ffi = _cffi1_backend.FFI()
    assert ffi.typeof(ffi.new("int **")[0]) is t2
    #
    ffi = _cffi1_backend.FFI()
    t1 = ffi.typeof("int ***")
    t2 = ffi.typeof("int **")
    w1 = weakref.ref(t1)
    w2 = weakref.ref(t2)
    del t2, ffi
    gc.collect()
    assert w1() is t1
    assert w2() is not None   # kept alive by t1
    ffi = _cffi1_backend.FFI()
    assert ffi.typeof("int * *") is t1.item