Python gc 模块,get_threshold() 实例源码

我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用gc.get_threshold()

项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_free_from_gc(self):
        # Check that freeing of blocks by the garbage collector doesn't deadlock
        # (issue #12352).
        # Make sure the GC is enabled, and set lower collection thresholds to
        # make collections more frequent (and increase the probability of
        # deadlock).
        if not gc.isenabled():
            gc.enable()
            self.addCleanup(gc.disable)
        thresholds = gc.get_threshold()
        self.addCleanup(gc.set_threshold, *thresholds)
        gc.set_threshold(10)

        # perform numerous block allocations, with cyclic references to make
        # sure objects are collected asynchronously by the gc
        for i in range(5000):
            a = multiprocessing.heap.BufferWrapper(1)
            b = multiprocessing.heap.BufferWrapper(1)
            # circular references
            a.buddy = b
            b.buddy = a

#
#
#
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def check_gc_during_creation(self, makeref):
        thresholds = gc.get_threshold()
        gc.set_threshold(1, 1, 1)
        gc.collect()
        class A:
            pass

        def callback(*args):
            pass

        referenced = A()

        a = A()
        a.a = a
        a.wr = makeref(referenced)

        try:
            # now make sure the object and the ref get labeled as
            # cyclic trash:
            a = A()
            weakref.ref(referenced, callback)

        finally:
            gc.set_threshold(*thresholds)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_del_newclass(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A(object):
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)

    # The following two tests are fragile:
    # They precisely count the number of allocations,
    # which is highly implementation-dependent.
    # For example, disposed tuples are not freed, but reused.
    # To minimize variations, though, we first store the get_count() results
    # and check them at the end.
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_free_from_gc(self):
        # Check that freeing of blocks by the garbage collector doesn't deadlock
        # (issue #12352).
        # Make sure the GC is enabled, and set lower collection thresholds to
        # make collections more frequent (and increase the probability of
        # deadlock).
        if not gc.isenabled():
            gc.enable()
            self.addCleanup(gc.disable)
        thresholds = gc.get_threshold()
        self.addCleanup(gc.set_threshold, *thresholds)
        gc.set_threshold(10)

        # perform numerous block allocations, with cyclic references to make
        # sure objects are collected asynchronously by the gc
        for i in range(5000):
            a = multiprocessing.heap.BufferWrapper(1)
            b = multiprocessing.heap.BufferWrapper(1)
            # circular references
            a.buddy = b
            b.buddy = a

#
#
#
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def check_gc_during_creation(self, makeref):
        thresholds = gc.get_threshold()
        gc.set_threshold(1, 1, 1)
        gc.collect()
        class A:
            pass

        def callback(*args):
            pass

        referenced = A()

        a = A()
        a.a = a
        a.wr = makeref(referenced)

        try:
            # now make sure the object and the ref get labeled as
            # cyclic trash:
            a = A()
            weakref.ref(referenced, callback)

        finally:
            gc.set_threshold(*thresholds)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def check_len_race(self, dict_type, cons):
        # Extended sanity checks for len() in the face of cyclic collection
        self.addCleanup(gc.set_threshold, *gc.get_threshold())
        for th in range(1, 100):
            N = 20
            gc.collect(0)
            gc.set_threshold(th, th, th)
            items = [RefCycle() for i in range(N)]
            dct = dict_type(cons(o) for o in items)
            del items
            # All items will be collected at next garbage collection pass
            it = dct.iteritems()
            try:
                next(it)
            except StopIteration:
                pass
            n1 = len(dct)
            del it
            n2 = len(dct)
            self.assertGreaterEqual(n1, 0)
            self.assertLessEqual(n1, N)
            self.assertGreaterEqual(n2, 0)
            self.assertLessEqual(n2, n1)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_del_newclass(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A(object):
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)

    # The following two tests are fragile:
    # They precisely count the number of allocations,
    # which is highly implementation-dependent.
    # For example:
    # - disposed tuples are not freed, but reused
    # - the call to assertEqual somehow avoids building its args tuple
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_free_from_gc(self):
        # Check that freeing of blocks by the garbage collector doesn't deadlock
        # (issue #12352).
        # Make sure the GC is enabled, and set lower collection thresholds to
        # make collections more frequent (and increase the probability of
        # deadlock).
        if not gc.isenabled():
            gc.enable()
            self.addCleanup(gc.disable)
        thresholds = gc.get_threshold()
        self.addCleanup(gc.set_threshold, *thresholds)
        gc.set_threshold(10)

        # perform numerous block allocations, with cyclic references to make
        # sure objects are collected asynchronously by the gc
        for i in range(5000):
            a = multiprocessing.heap.BufferWrapper(1)
            b = multiprocessing.heap.BufferWrapper(1)
            # circular references
            a.buddy = b
            b.buddy = a

#
#
#
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def check_gc_during_creation(self, makeref):
        thresholds = gc.get_threshold()
        gc.set_threshold(1, 1, 1)
        gc.collect()
        class A:
            pass

        def callback(*args):
            pass

        referenced = A()

        a = A()
        a.a = a
        a.wr = makeref(referenced)

        try:
            # now make sure the object and the ref get labeled as
            # cyclic trash:
            a = A()
            weakref.ref(referenced, callback)

        finally:
            gc.set_threshold(*thresholds)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def check_len_race(self, dict_type, cons):
        # Extended sanity checks for len() in the face of cyclic collection
        self.addCleanup(gc.set_threshold, *gc.get_threshold())
        for th in range(1, 100):
            N = 20
            gc.collect(0)
            gc.set_threshold(th, th, th)
            items = [RefCycle() for i in range(N)]
            dct = dict_type(cons(o) for o in items)
            del items
            # All items will be collected at next garbage collection pass
            it = dct.iteritems()
            try:
                next(it)
            except StopIteration:
                pass
            n1 = len(dct)
            del it
            n2 = len(dct)
            self.assertGreaterEqual(n1, 0)
            self.assertLessEqual(n1, N)
            self.assertGreaterEqual(n2, 0)
            self.assertLessEqual(n2, n1)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_del_newclass(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A(object):
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)

    # The following two tests are fragile:
    # They precisely count the number of allocations,
    # which is highly implementation-dependent.
    # For example:
    # - disposed tuples are not freed, but reused
    # - the call to assertEqual somehow avoids building its args tuple
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_free_from_gc(self):
        # Check that freeing of blocks by the garbage collector doesn't deadlock
        # (issue #12352).
        # Make sure the GC is enabled, and set lower collection thresholds to
        # make collections more frequent (and increase the probability of
        # deadlock).
        if not gc.isenabled():
            gc.enable()
            self.addCleanup(gc.disable)
        thresholds = gc.get_threshold()
        self.addCleanup(gc.set_threshold, *thresholds)
        gc.set_threshold(10)

        # perform numerous block allocations, with cyclic references to make
        # sure objects are collected asynchronously by the gc
        for i in range(5000):
            a = multiprocessing.heap.BufferWrapper(1)
            b = multiprocessing.heap.BufferWrapper(1)
            # circular references
            a.buddy = b
            b.buddy = a

#
#
#
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def check_gc_during_creation(self, makeref):
        thresholds = gc.get_threshold()
        gc.set_threshold(1, 1, 1)
        gc.collect()
        class A:
            pass

        def callback(*args):
            pass

        referenced = A()

        a = A()
        a.a = a
        a.wr = makeref(referenced)

        try:
            # now make sure the object and the ref get labeled as
            # cyclic trash:
            a = A()
            weakref.ref(referenced, callback)

        finally:
            gc.set_threshold(*thresholds)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def check_len_race(self, dict_type, cons):
        # Extended sanity checks for len() in the face of cyclic collection
        self.addCleanup(gc.set_threshold, *gc.get_threshold())
        for th in range(1, 100):
            N = 20
            gc.collect(0)
            gc.set_threshold(th, th, th)
            items = [RefCycle() for i in range(N)]
            dct = dict_type(cons(o) for o in items)
            del items
            # All items will be collected at next garbage collection pass
            it = dct.items()
            try:
                next(it)
            except StopIteration:
                pass
            n1 = len(dct)
            del it
            n2 = len(dct)
            self.assertGreaterEqual(n1, 0)
            self.assertLessEqual(n1, N)
            self.assertGreaterEqual(n2, 0)
            self.assertLessEqual(n2, n1)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_del_newclass(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A(object):
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)

    # The following two tests are fragile:
    # They precisely count the number of allocations,
    # which is highly implementation-dependent.
    # For example, disposed tuples are not freed, but reused.
    # To minimize variations, though, we first store the get_count() results
    # and check them at the end.
项目:qudi    作者:Ulm-IQO    | 项目源码 | 文件源码
def __init__(self, interval=1.0, debug=False):
        """
        Initializes garbage collector

        @param interval float: timeout interval in seconds. Default: 1s
        @param debug bool: debug output. Default: False
        """
        super().__init__()
        self.debug = debug
        if debug:
            gc.set_debug(gc.DEBUG_LEAK)

        self.timer = QTimer()
        self.timer.timeout.connect(self.check)

        self.threshold = gc.get_threshold()
        gc.disable()
        self.timer.start(interval * 1000)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_free_from_gc(self):
        # Check that freeing of blocks by the garbage collector doesn't deadlock
        # (issue #12352).
        # Make sure the GC is enabled, and set lower collection thresholds to
        # make collections more frequent (and increase the probability of
        # deadlock).
        if not gc.isenabled():
            gc.enable()
            self.addCleanup(gc.disable)
        #thresholds = gc.get_threshold()
        #self.addCleanup(gc.set_threshold, *thresholds)
        #gc.set_threshold(10)

        # perform numerous block allocations, with cyclic references to make
        # sure objects are collected asynchronously by the gc
        for i in range(5000):
            a = multiprocessing.heap.BufferWrapper(1)
            b = multiprocessing.heap.BufferWrapper(1)
            # circular references
            a.buddy = b
            b.buddy = a

#
#
#
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def check_len_race(self, dict_type, cons):
        # Extended sanity checks for len() in the face of cyclic collection
        #self.addCleanup(gc.set_threshold, *gc.get_threshold())
        for th in range(1, 100):
            N = 20
            test_support.gc_collect()
            #gc.set_threshold(th, th, th)
            items = [RefCycle() for i in range(N)]
            dct = dict_type(cons(o) for o in items)
            del items
            # All items will be collected at next garbage collection pass
            it = dct.iteritems()
            try:
                next(it)
            except StopIteration:
                pass
            n1 = len(dct)
            del it
            n2 = len(dct)
            self.assertGreaterEqual(n1, 0)
            self.assertLessEqual(n1, N)
            self.assertGreaterEqual(n2, 0)
            self.assertLessEqual(n2, n1)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_del_newclass(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A(object):
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)

    # The following two tests are fragile:
    # They precisely count the number of allocations,
    # which is highly implementation-dependent.
    # For example:
    # - disposed tuples are not freed, but reused
    # - the call to assertEqual somehow avoids building its args tuple
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def check_gc_during_creation(self, makeref):
        thresholds = gc.get_threshold()
        gc.set_threshold(1, 1, 1)
        gc.collect()
        class A:
            pass

        def callback(*args):
            pass

        referenced = A()

        a = A()
        a.a = a
        a.wr = makeref(referenced)

        try:
            # now make sure the object and the ref get labeled as
            # cyclic trash:
            a = A()
            weakref.ref(referenced, callback)

        finally:
            gc.set_threshold(*thresholds)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def check_len_race(self, dict_type, cons):
        # Extended sanity checks for len() in the face of cyclic collection
        self.addCleanup(gc.set_threshold, *gc.get_threshold())
        for th in range(1, 100):
            N = 20
            gc.collect(0)
            gc.set_threshold(th, th, th)
            items = [RefCycle() for i in range(N)]
            dct = dict_type(cons(o) for o in items)
            del items
            # All items will be collected at next garbage collection pass
            it = dct.items()
            try:
                next(it)
            except StopIteration:
                pass
            n1 = len(dct)
            del it
            n2 = len(dct)
            self.assertGreaterEqual(n1, 0)
            self.assertLessEqual(n1, N)
            self.assertGreaterEqual(n2, 0)
            self.assertLessEqual(n2, n1)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_del_newclass(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A(object):
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)

    # The following two tests are fragile:
    # They precisely count the number of allocations,
    # which is highly implementation-dependent.
    # For example, disposed tuples are not freed, but reused.
    # To minimize variations, though, we first store the get_count() results
    # and check them at the end.
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_free_from_gc(self):
        # Check that freeing of blocks by the garbage collector doesn't deadlock
        # (issue #12352).
        # Make sure the GC is enabled, and set lower collection thresholds to
        # make collections more frequent (and increase the probability of
        # deadlock).
        if not gc.isenabled():
            gc.enable()
            self.addCleanup(gc.disable)
        thresholds = gc.get_threshold()
        self.addCleanup(gc.set_threshold, *thresholds)
        gc.set_threshold(10)

        # perform numerous block allocations, with cyclic references to make
        # sure objects are collected asynchronously by the gc
        for i in range(5000):
            a = multiprocessing.heap.BufferWrapper(1)
            b = multiprocessing.heap.BufferWrapper(1)
            # circular references
            a.buddy = b
            b.buddy = a

#
#
#
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_free_from_gc(self):
        # Check that freeing of blocks by the garbage collector doesn't deadlock
        # (issue #12352).
        # Make sure the GC is enabled, and set lower collection thresholds to
        # make collections more frequent (and increase the probability of
        # deadlock).
        if not gc.isenabled():
            gc.enable()
            self.addCleanup(gc.disable)
        thresholds = gc.get_threshold()
        self.addCleanup(gc.set_threshold, *thresholds)
        gc.set_threshold(10)

        # perform numerous block allocations, with cyclic references to make
        # sure objects are collected asynchronously by the gc
        for i in range(5000):
            a = multiprocessing.heap.BufferWrapper(1)
            b = multiprocessing.heap.BufferWrapper(1)
            # circular references
            a.buddy = b
            b.buddy = a

#
#
#
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def check_gc_during_creation(self, makeref):
        thresholds = gc.get_threshold()
        gc.set_threshold(1, 1, 1)
        gc.collect()
        class A:
            pass

        def callback(*args):
            pass

        referenced = A()

        a = A()
        a.a = a
        a.wr = makeref(referenced)

        try:
            # now make sure the object and the ref get labeled as
            # cyclic trash:
            a = A()
            weakref.ref(referenced, callback)

        finally:
            gc.set_threshold(*thresholds)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def check_len_race(self, dict_type, cons):
        # Extended sanity checks for len() in the face of cyclic collection
        self.addCleanup(gc.set_threshold, *gc.get_threshold())
        for th in range(1, 100):
            N = 20
            gc.collect(0)
            gc.set_threshold(th, th, th)
            items = [RefCycle() for i in range(N)]
            dct = dict_type(cons(o) for o in items)
            del items
            # All items will be collected at next garbage collection pass
            it = dct.iteritems()
            try:
                next(it)
            except StopIteration:
                pass
            n1 = len(dct)
            del it
            n2 = len(dct)
            self.assertGreaterEqual(n1, 0)
            self.assertLessEqual(n1, N)
            self.assertGreaterEqual(n2, 0)
            self.assertLessEqual(n2, n1)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_del_newclass(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A(object):
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)

    # The following two tests are fragile:
    # They precisely count the number of allocations,
    # which is highly implementation-dependent.
    # For example:
    # - disposed tuples are not freed, but reused
    # - the call to assertEqual somehow avoids building its args tuple
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def check_gc_during_creation(self, makeref):
        thresholds = gc.get_threshold()
        gc.set_threshold(1, 1, 1)
        gc.collect()
        class A:
            pass

        def callback(*args):
            pass

        referenced = A()

        a = A()
        a.a = a
        a.wr = makeref(referenced)

        try:
            # now make sure the object and the ref get labeled as
            # cyclic trash:
            a = A()
            weakref.ref(referenced, callback)

        finally:
            gc.set_threshold(*thresholds)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def check_len_race(self, dict_type, cons):
        # Extended sanity checks for len() in the face of cyclic collection
        self.addCleanup(gc.set_threshold, *gc.get_threshold())
        for th in range(1, 100):
            N = 20
            gc.collect(0)
            gc.set_threshold(th, th, th)
            items = [RefCycle() for i in range(N)]
            dct = dict_type(cons(o) for o in items)
            del items
            # All items will be collected at next garbage collection pass
            it = dct.items()
            try:
                next(it)
            except StopIteration:
                pass
            n1 = len(dct)
            del it
            n2 = len(dct)
            self.assertGreaterEqual(n1, 0)
            self.assertLessEqual(n1, N)
            self.assertGreaterEqual(n2, 0)
            self.assertLessEqual(n2, n1)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_del_newclass(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A(object):
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)

    # The following two tests are fragile:
    # They precisely count the number of allocations,
    # which is highly implementation-dependent.
    # For example, disposed tuples are not freed, but reused.
    # To minimize variations, though, we first store the get_count() results
    # and check them at the end.
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_free_from_gc(self):
        # Check that freeing of blocks by the garbage collector doesn't deadlock
        # (issue #12352).
        # Make sure the GC is enabled, and set lower collection thresholds to
        # make collections more frequent (and increase the probability of
        # deadlock).
        if not gc.isenabled():
            gc.enable()
            self.addCleanup(gc.disable)
        thresholds = gc.get_threshold()
        self.addCleanup(gc.set_threshold, *thresholds)
        gc.set_threshold(10)

        # perform numerous block allocations, with cyclic references to make
        # sure objects are collected asynchronously by the gc
        for i in range(5000):
            a = multiprocessing.heap.BufferWrapper(1)
            b = multiprocessing.heap.BufferWrapper(1)
            # circular references
            a.buddy = b
            b.buddy = a

#
#
#
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, interval=1.0, debug=False):
        self.debug = debug
        if debug:
            gc.set_debug(gc.DEBUG_LEAK)

        self.timer = QtCore.QTimer()
        self.timer.timeout.connect(self.check)

        self.threshold = gc.get_threshold()
        gc.disable()
        self.timer.start(interval * 1000)
项目:NeoAnalysis    作者:neoanalysis    | 项目源码 | 文件源码
def __init__(self, interval=1.0, debug=False):
        self.debug = debug
        if debug:
            gc.set_debug(gc.DEBUG_LEAK)

        self.timer = QtCore.QTimer()
        self.timer.timeout.connect(self.check)

        self.threshold = gc.get_threshold()
        gc.disable()
        self.timer.start(interval * 1000)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_del(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A:
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_del(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A:
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_del(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A:
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)
项目:ngas    作者:ICRAR    | 项目源码 | 文件源码
def _genObjectStatus():
    """
    Generate a report with information about objects allocated, numbers of
    references to each object and other information that can be used to
    track down memory (object) leaks in the server.

    Returns:  Object report (string).
    """
    T = TRACE()

    rep = "NG/AMS SERVER OBJECT STATUS REPORT\n\n"
    import gc
    gc.set_debug(gc.DEBUG_COLLECTABLE | gc.DEBUG_UNCOLLECTABLE |
                 gc.DEBUG_INSTANCES | gc.DEBUG_OBJECTS)
    rep += "=Garbage Collector Status:\n\n"
    rep += "Enabled:             %d\n" % gc.isenabled()
    rep += "Unreachable objects: %d\n" % gc.collect()
    rep += "Threshold:           %s\n" % str(gc.get_threshold())
    #rep += "Objects:\n"
    #rep += str(gc.get_objects()) + "\n"
    rep += "Garbage:\n"
    rep += str(gc.garbage) + "\n\n"

    # Dump refence count status of all objects allocated into file specified.
    rep += "=Object Reference Counts:\n\n"
    for objInfo in _genRefCountRep():
        rep += "%-4d %s\n" % (objInfo[0], objInfo[1])

    rep += "\n=EOF"
    return rep
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_del(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A:
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def check_gc_during_creation(self, makeref):
        if test_support.check_impl_detail():
            import gc
            thresholds = gc.get_threshold()
            gc.set_threshold(1, 1, 1)
        gc_collect()
        class A:
            pass

        def callback(*args):
            pass

        referenced = A()

        a = A()
        a.a = a
        a.wr = makeref(referenced)

        try:
            # now make sure the object and the ref get labeled as
            # cyclic trash:
            a = A()
            weakref.ref(referenced, callback)

        finally:
            if test_support.check_impl_detail():
                gc.set_threshold(*thresholds)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_del(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A:
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_del(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A:
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_del(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A:
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_del(self):
        # __del__ methods can trigger collection, make this to happen
        thresholds = gc.get_threshold()
        gc.enable()
        gc.set_threshold(1)

        class A:
            def __del__(self):
                dir(self)
        a = A()
        del a

        gc.disable()
        gc.set_threshold(*thresholds)
项目:Leics    作者:LeicsFrameWork    | 项目源码 | 文件源码
def test_implicit_parent_with_threads(self):
        if not gc.isenabled():
            return # cannot test with disabled gc
        N = gc.get_threshold()[0]
        if N < 50:
            return # cannot test with such a small N
        def attempt():
            lock1 = threading.Lock()
            lock1.acquire()
            lock2 = threading.Lock()
            lock2.acquire()
            recycled = [False]
            def another_thread():
                lock1.acquire() # wait for gc
                greenlet.getcurrent() # update ts_current
                lock2.release() # release gc
            t = threading.Thread(target=another_thread)
            t.start()
            class gc_callback(object):
                def __del__(self):
                    lock1.release()
                    lock2.acquire()
                    recycled[0] = True
            class garbage(object):
                def __init__(self):
                    self.cycle = self
                    self.callback = gc_callback()
            l = []
            x = range(N*2)
            current = greenlet.getcurrent()
            g = garbage()
            for i in x:
                g = None # lose reference to garbage
                if recycled[0]:
                    # gc callback called prematurely
                    t.join()
                    return False
                last = greenlet()
                if recycled[0]:
                    break # yes! gc called in green_new
                l.append(last) # increase allocation counter
            else:
                # gc callback not called when expected
                gc.collect()
                if recycled[0]:
                    t.join()
                return False
            self.assertEqual(last.parent, current)
            for g in l:
                self.assertEqual(g.parent, current)
            return True
        for i in range(5):
            if attempt():
                break
项目:python-sensor    作者:instana    | 项目源码 | 文件源码
def collect_metrics(self):
        u = resource.getrusage(resource.RUSAGE_SELF)
        if gc_.isenabled():
            c = list(gc_.get_count())
            th = list(gc_.get_threshold())
            g = GC(collect0=c[0] if not self.last_collect else c[0] - self.last_collect[0],
                   collect1=c[1] if not self.last_collect else c[
                       1] - self.last_collect[1],
                   collect2=c[2] if not self.last_collect else c[
                       2] - self.last_collect[2],
                   threshold0=th[0],
                   threshold1=th[1],
                   threshold2=th[2])

        thr = t.enumerate()
        daemon_threads = len([tr.daemon and tr.is_alive() for tr in thr])
        alive_threads = len([not tr.daemon and tr.is_alive() for tr in thr])
        dead_threads = len([not tr.is_alive() for tr in thr])

        m = Metrics(ru_utime=u[0] if not self.last_usage else u[0] - self.last_usage[0],
                    ru_stime=u[1] if not self.last_usage else u[1] - self.last_usage[1],
                    ru_maxrss=u[2],
                    ru_ixrss=u[3],
                    ru_idrss=u[4],
                    ru_isrss=u[5],
                    ru_minflt=u[6] if not self.last_usage else u[6] - self.last_usage[6],
                    ru_majflt=u[7] if not self.last_usage else u[7] - self.last_usage[7],
                    ru_nswap=u[8] if not self.last_usage else u[8] - self.last_usage[8],
                    ru_inblock=u[9] if not self.last_usage else u[9] - self.last_usage[9],
                    ru_oublock=u[10] if not self.last_usage else u[10] - self.last_usage[10],
                    ru_msgsnd=u[11] if not self.last_usage else u[11] - self.last_usage[11],
                    ru_msgrcv=u[12] if not self.last_usage else u[12] - self.last_usage[12],
                    ru_nsignals=u[13] if not self.last_usage else u[13] - self.last_usage[13],
                    ru_nvcs=u[14] if not self.last_usage else u[14] - self.last_usage[14],
                    ru_nivcsw=u[15] if not self.last_usage else u[15] - self.last_usage[15],
                    alive_threads=alive_threads,
                    dead_threads=dead_threads,
                    daemon_threads=daemon_threads,
                    gc=g)

        self.last_usage = u
        if gc_.isenabled():
            self.last_collect = c

        return m
项目:weakmon    作者:rtmrtmrtmrtm    | 项目源码 | 文件源码
def main():
    # gc.set_debug(gc.DEBUG_STATS)
    thv = gc.get_threshold()
    gc.set_threshold(10*thv[0], 10*thv[1], 10*thv[2])

    filenames = [ ]
    desc = None
    bench = None
    opt = None
    send_msg = None

    i = 1
    while i < len(sys.argv):
        if sys.argv[i] == "-in":
            desc = sys.argv[i+1]
            i += 2
        elif sys.argv[i] == "-file":
            filenames.append(sys.argv[i+1])
            i += 2
        elif sys.argv[i] == "-bench":
            bench = sys.argv[i+1]
            i += 2
        elif sys.argv[i] == "-opt":
            opt = sys.argv[i+1]
            i += 2
        elif sys.argv[i] == "-send":
            send_msg = sys.argv[i+1]
            i += 2
        else:
            usage()

    if send_msg != None:
        js = JT65Send()
        js.send(send_msg)
        sys.exit(0)

    if bench != None:
        benchmark(bench, True)
        sys.exit(0)

    if opt != None:
        optimize(opt)
        sys.exit(0)

    if len(filenames) > 0 and desc == None:
        r = JT65()
        r.verbose = True
        for filename in filenames:
            r.gowav(filename, 0)
    elif len(filenames) == 0 and desc != None:
        r = JT65()
        r.verbose = True
        r.opencard(desc)
        r.gocard()
    else:
        usage()