Python sys 模块,getrefcount() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sys.getrefcount()

项目:code    作者:ActiveState    | 项目源码 | 文件源码
def getrc(defns,depth=1):
    '''get rc's for things in the space separated string defns'''
    from sys import getrefcount, _getframe
    f = _getframe(depth)
    G0 = f.f_globals
    L = f.f_locals
    if L is not G0:
        LL = [L]
        while 1:
            f = f.f_back
            G = f.f_globals
            L = f.f_locals
            if G is not G0 or G is L: break
            LL.append(L)
        L = {}
        for l in reversed(LL):
            L.update(l)
    else:
        L = L.copy()
    G0 = G0.copy()
    return ' '.join([str(getrefcount(eval(x,L,G0))-1) for x in defns.split()])
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test__POINTER_c_char(self):
        class X(Structure):
            _fields_ = [("str", POINTER(c_char))]
        x = X()

        # NULL pointer access
        self.assertRaises(ValueError, getattr, x.str, "contents")
        b = c_buffer("Hello, World")
        from sys import getrefcount as grc
        self.assertEqual(grc(b), 2)
        x.str = b
        self.assertEqual(grc(b), 3)

        # POINTER(c_char) and Python string is NOT compatible
        # POINTER(c_char) and c_buffer() is compatible
        for i in range(len(b)):
            self.assertEqual(b[i], x.str[i])

        self.assertRaises(TypeError, setattr, x, "str", "Hello, World")
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_1(self):
        from sys import getrefcount as grc

        f = dll._testfunc_callback_i_if
        f.restype = ctypes.c_int
        f.argtypes = [ctypes.c_int, MyCallback]

        def callback(value):
            #print "called back with", value
            return value

        self.assertEqual(grc(callback), 2)
        cb = MyCallback(callback)

        self.assertGreater(grc(callback), 2)
        result = f(-10, cb)
        self.assertEqual(result, -18)
        cb = None

        gc.collect()

        self.assertEqual(grc(callback), 2)
项目:Leics    作者:LeicsFrameWork    | 项目源码 | 文件源码
def test_two_recursive_children(self):
        lst = []

        def f():
            lst.append(1)
            greenlet.getcurrent().parent.switch()

        def g():
            lst.append(1)
            g = greenlet(f)
            g.switch()
            lst.append(1)
        g = greenlet(g)
        g.switch()
        self.assertEqual(len(lst), 3)
        self.assertEqual(sys.getrefcount(g), 2)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def _check_multi_index(self, arr, index):
        """Check a multi index item getting and simple setting.

        Parameters
        ----------
        arr : ndarray
            Array to be indexed, must be a reshaped arange.
        index : tuple of indexing objects
            Index being tested.
        """
        # Test item getting
        try:
            mimic_get, no_copy = self._get_multi_index(arr, index)
        except Exception:
            prev_refcount = sys.getrefcount(arr)
            assert_raises(Exception, arr.__getitem__, index)
            assert_raises(Exception, arr.__setitem__, index, 0)
            assert_equal(prev_refcount, sys.getrefcount(arr))
            return

        self._compare_index_result(arr, index, mimic_get, no_copy)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def _check_single_index(self, arr, index):
        """Check a single index item getting and simple setting.

        Parameters
        ----------
        arr : ndarray
            Array to be indexed, must be an arange.
        index : indexing object
            Index being tested. Must be a single index and not a tuple
            of indexing objects (see also `_check_multi_index`).
        """
        try:
            mimic_get, no_copy = self._get_multi_index(arr, (index,))
        except Exception:
            prev_refcount = sys.getrefcount(arr)
            assert_raises(Exception, arr.__getitem__, index)
            assert_raises(Exception, arr.__setitem__, index, 0)
            assert_equal(prev_refcount, sys.getrefcount(arr))
            return

        self._compare_index_result(arr, index, mimic_get, no_copy)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def test_dot_3args(self):
        from numpy.core.multiarray import dot

        np.random.seed(22)
        f = np.random.random_sample((1024, 16))
        v = np.random.random_sample((16, 32))

        r = np.empty((1024, 32))
        for i in range(12):
            dot(f, v, r)
        assert_equal(sys.getrefcount(r), 2)
        r2 = dot(f, v, out=None)
        assert_array_equal(r2, r)
        assert_(r is dot(f, v, out=r))

        v = v[:, 0].copy()  # v.shape == (16,)
        r = r[:, 0].copy()  # r.shape == (1024,)
        r2 = dot(f, v)
        assert_(r is dot(f, v, r))
        assert_array_equal(r2, r)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def _assert_valid_refcount(op):
    """
    Check that ufuncs don't mishandle refcount of object `1`.
    Used in a few regression tests.
    """
    import numpy as np

    b = np.arange(100*100).reshape(100, 100)
    c = b
    i = 1

    rc = sys.getrefcount(i)
    for j in range(15):
        d = op(b, c)
    assert_(sys.getrefcount(i) >= rc)
    del d  # for pyflakes
项目:python-doublescript    作者:fdintino    | 项目源码 | 文件源码
def test_set_bases_refcount(self):
        def normally_set_base():
            return a_factory(set_b_base=True)

        A_normal = normally_set_base()
        gc.collect()
        normal_ref_count = sys.getrefcount(A_normal.__bases__[0])

        def patch_set_base():
            A = a_factory()
            B = b_factory()
            type_set_bases(A, (B,))
            return A

        A_patch = patch_set_base()

        gc.collect()
        patch_ref_count = sys.getrefcount(A_patch.__bases__[0])

        self.assertEqual(normal_ref_count, patch_ref_count)
项目:python-doublescript    作者:fdintino    | 项目源码 | 文件源码
def test_set_bases_mro_refcount(self):
        def normally_set_base():
            return a_factory(set_b_base=True)

        A_normal = normally_set_base()
        gc.collect()
        normal_ref_count = sys.getrefcount(A_normal.__mro__)

        def patch_set_base():
            A = a_factory()
            B = b_factory()
            type_set_bases(A, (B,))
            return A

        A_patch = patch_set_base()

        gc.collect()
        patch_ref_count = sys.getrefcount(A_patch.__mro__)
        self.assertEqual(normal_ref_count, patch_ref_count)
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def test_element_cyclic_gc_none(self):
        # test if cyclic reference can crash etree
        Element = self.etree.Element

        # must disable tracing as it could change the refcounts
        trace_func = sys.gettrace()
        try:
            sys.settrace(None)
            gc.collect()

            count = sys.getrefcount(None)

            l = [Element('name'), Element('name')]
            l.append(l)

            del l
            gc.collect()

            self.assertEqual(sys.getrefcount(None), count)
        finally:
            sys.settrace(trace_func)
项目:macos-st-packages    作者:zce    | 项目源码 | 文件源码
def testRefCount(self):
        count = sys.getrefcount(None)

        class Global(JSClass):
            pass

        with JSContext(Global()) as ctxt:
            ctxt.eval("""
                var none = null;
            """)

            self.assertEqual(count+1, sys.getrefcount(None))

            ctxt.eval("""
                var none = null;
            """)

            self.assertEqual(count+1, sys.getrefcount(None))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_bug1229429(self):
        # this bug was never in reversed, it was in
        # PyObject_CallMethod, and reversed_new calls that sometimes.
        if not hasattr(sys, "getrefcount"):
            return
        def f():
            pass
        r = f.__reversed__ = object()
        rc = sys.getrefcount(r)
        for i in range(10):
            try:
                reversed(f)
            except TypeError:
                pass
            else:
                self.fail("non-callable __reversed__ didn't raise!")
        self.assertEqual(rc, sys.getrefcount(r))
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def xmltoolkit63():
    """

    Check reference leak.
    >>> xmltoolkit63()
    >>> count = sys.getrefcount(None)  #doctest: +SKIP
    >>> for i in range(1000):
    ...     xmltoolkit63()
    >>> sys.getrefcount(None) - count  #doctest: +SKIP
    0

    """
    tree = ET.TreeBuilder()
    tree.start("tag", {})
    tree.data("text")
    tree.end("tag")

# --------------------------------------------------------------------
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def check_getitem_with_type(self, tp):
        item = self.getitem_type
        b = tp(self._source)
        oldrefcount = getrefcount(b)
        m = self._view(b)
        self.assertEqual(m[0], item(b"a"))
        self.assertIsInstance(m[0], bytes)
        self.assertEqual(m[5], item(b"f"))
        self.assertEqual(m[-1], item(b"f"))
        self.assertEqual(m[-6], item(b"a"))
        # Bounds checking
        self.assertRaises(IndexError, lambda: m[6])
        self.assertRaises(IndexError, lambda: m[-7])
        self.assertRaises(IndexError, lambda: m[sys.maxsize])
        self.assertRaises(IndexError, lambda: m[-sys.maxsize])
        # Type checking
        self.assertRaises(TypeError, lambda: m[None])
        self.assertRaises(TypeError, lambda: m[0.0])
        self.assertRaises(TypeError, lambda: m["a"])
        m = None
        self.assertEqual(getrefcount(b), oldrefcount)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test__POINTER_c_char(self):
        class X(Structure):
            _fields_ = [("str", POINTER(c_char))]
        x = X()

        # NULL pointer access
        self.assertRaises(ValueError, getattr, x.str, "contents")
        b = c_buffer(b"Hello, World")
        from sys import getrefcount as grc
        self.assertEqual(grc(b), 2)
        x.str = b
        self.assertEqual(grc(b), 3)

        # POINTER(c_char) and Python string is NOT compatible
        # POINTER(c_char) and c_buffer() is compatible
        for i in range(len(b)):
            self.assertEqual(b[i], x.str[i])

        self.assertRaises(TypeError, setattr, x, "str", "Hello, World")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_callback(self):
        import sys
        try:
            from sys import getrefcount
        except ImportError:
            return unittest.skip("no sys.getrefcount()")

        proto = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_int, ctypes.c_int)
        def func(a, b):
            return a * b * 2
        f = proto(func)

        gc.collect()
        a = sys.getrefcount(ctypes.c_int)
        f(1, 2)
        self.assertEqual(sys.getrefcount(ctypes.c_int), a)
项目:krpcScripts    作者:jwvanderbeck    | 项目源码 | 文件源码
def _check_multi_index(self, arr, index):
        """Check a multi index item getting and simple setting.

        Parameters
        ----------
        arr : ndarray
            Array to be indexed, must be a reshaped arange.
        index : tuple of indexing objects
            Index being tested.
        """
        # Test item getting
        try:
            mimic_get, no_copy = self._get_multi_index(arr, index)
        except Exception:
            prev_refcount = sys.getrefcount(arr)
            assert_raises(Exception, arr.__getitem__, index)
            assert_raises(Exception, arr.__setitem__, index, 0)
            assert_equal(prev_refcount, sys.getrefcount(arr))
            return

        self._compare_index_result(arr, index, mimic_get, no_copy)
项目:krpcScripts    作者:jwvanderbeck    | 项目源码 | 文件源码
def _check_single_index(self, arr, index):
        """Check a single index item getting and simple setting.

        Parameters
        ----------
        arr : ndarray
            Array to be indexed, must be an arange.
        index : indexing object
            Index being tested. Must be a single index and not a tuple
            of indexing objects (see also `_check_multi_index`).
        """
        try:
            mimic_get, no_copy = self._get_multi_index(arr, (index,))
        except Exception:
            prev_refcount = sys.getrefcount(arr)
            assert_raises(Exception, arr.__getitem__, index)
            assert_raises(Exception, arr.__setitem__, index, 0)
            assert_equal(prev_refcount, sys.getrefcount(arr))
            return

        self._compare_index_result(arr, index, mimic_get, no_copy)
项目:krpcScripts    作者:jwvanderbeck    | 项目源码 | 文件源码
def test_dot_3args(self):
        from numpy.core.multiarray import dot

        np.random.seed(22)
        f = np.random.random_sample((1024, 16))
        v = np.random.random_sample((16, 32))

        r = np.empty((1024, 32))
        for i in range(12):
            dot(f, v, r)
        assert_equal(sys.getrefcount(r), 2)
        r2 = dot(f, v, out=None)
        assert_array_equal(r2, r)
        assert_(r is dot(f, v, out=r))

        v = v[:, 0].copy()  # v.shape == (16,)
        r = r[:, 0].copy()  # r.shape == (1024,)
        r2 = dot(f, v)
        assert_(r is dot(f, v, r))
        assert_array_equal(r2, r)
项目:krpcScripts    作者:jwvanderbeck    | 项目源码 | 文件源码
def _assert_valid_refcount(op):
    """
    Check that ufuncs don't mishandle refcount of object `1`.
    Used in a few regression tests.
    """
    import numpy as np

    b = np.arange(100*100).reshape(100, 100)
    c = b
    i = 1

    rc = sys.getrefcount(i)
    for j in range(15):
        d = op(b, c)
    assert_(sys.getrefcount(i) >= rc)
    del d  # for pyflakes
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def xmltoolkit63():
    """

    Check reference leak.
    >>> xmltoolkit63()
    >>> count = sys.getrefcount(None)
    >>> for i in range(1000):
    ...     xmltoolkit63()
    >>> sys.getrefcount(None) - count
    0

    """
    tree = ET.TreeBuilder()
    tree.start("tag", {})
    tree.data("text")
    tree.end("tag")

# --------------------------------------------------------------------
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_attributes_writable(self):
        if not self.rw_type:
            self.skipTest("no writable type to test")
        m = self.check_attributes_with_type(self.rw_type)
        self.assertEqual(m.readonly, False)

    # Disabled: unicode uses the old buffer API in 2.x

    #def test_getbuffer(self):
        ## Test PyObject_GetBuffer() on a memoryview object.
        #for tp in self._types:
            #b = tp(self._source)
            #oldrefcount = sys.getrefcount(b)
            #m = self._view(b)
            #oldviewrefcount = sys.getrefcount(m)
            #s = unicode(m, "utf-8")
            #self._check_contents(tp, b, s.encode("utf-8"))
            #self.assertEqual(sys.getrefcount(m), oldviewrefcount)
            #m = None
            #self.assertEqual(sys.getrefcount(b), oldrefcount)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test__POINTER_c_char(self):
        class X(Structure):
            _fields_ = [("str", POINTER(c_char))]
        x = X()

        # NULL pointer access
        self.assertRaises(ValueError, getattr, x.str, "contents")
        b = c_buffer("Hello, World")
        from sys import getrefcount as grc
        self.assertEqual(grc(b), 2)
        x.str = b
        self.assertEqual(grc(b), 3)

        # POINTER(c_char) and Python string is NOT compatible
        # POINTER(c_char) and c_buffer() is compatible
        for i in range(len(b)):
            self.assertEqual(b[i], x.str[i])

        self.assertRaises(TypeError, setattr, x, "str", "Hello, World")
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_1(self):
        from sys import getrefcount as grc

        f = dll._testfunc_callback_i_if
        f.restype = ctypes.c_int
        f.argtypes = [ctypes.c_int, MyCallback]

        def callback(value):
            #print "called back with", value
            return value

        self.assertEqual(grc(callback), 2)
        cb = MyCallback(callback)

        self.assertGreater(grc(callback), 2)
        result = f(-10, cb)
        self.assertEqual(result, -18)
        cb = None

        gc.collect()

        self.assertEqual(grc(callback), 2)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def xmltoolkit63():
    """

    Check reference leak.
    >>> xmltoolkit63()
    >>> count = sys.getrefcount(None)
    >>> for i in range(1000):
    ...     xmltoolkit63()
    >>> sys.getrefcount(None) - count
    0

    """
    tree = ET.TreeBuilder()
    tree.start("tag", {})
    tree.data("text")
    tree.end("tag")

# --------------------------------------------------------------------
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_attributes_writable(self):
        if not self.rw_type:
            self.skipTest("no writable type to test")
        m = self.check_attributes_with_type(self.rw_type)
        self.assertEqual(m.readonly, False)

    # Disabled: unicode uses the old buffer API in 2.x

    #def test_getbuffer(self):
        ## Test PyObject_GetBuffer() on a memoryview object.
        #for tp in self._types:
            #b = tp(self._source)
            #oldrefcount = sys.getrefcount(b)
            #m = self._view(b)
            #oldviewrefcount = sys.getrefcount(m)
            #s = unicode(m, "utf-8")
            #self._check_contents(tp, b, s.encode("utf-8"))
            #self.assertEqual(sys.getrefcount(m), oldviewrefcount)
            #m = None
            #self.assertEqual(sys.getrefcount(b), oldrefcount)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test__POINTER_c_char(self):
        class X(Structure):
            _fields_ = [("str", POINTER(c_char))]
        x = X()

        # NULL pointer access
        self.assertRaises(ValueError, getattr, x.str, "contents")
        b = c_buffer("Hello, World")
        from sys import getrefcount as grc
        self.assertEqual(grc(b), 2)
        x.str = b
        self.assertEqual(grc(b), 3)

        # POINTER(c_char) and Python string is NOT compatible
        # POINTER(c_char) and c_buffer() is compatible
        for i in range(len(b)):
            self.assertEqual(b[i], x.str[i])

        self.assertRaises(TypeError, setattr, x, "str", "Hello, World")
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_1(self):
        from sys import getrefcount as grc

        f = dll._testfunc_callback_i_if
        f.restype = ctypes.c_int
        f.argtypes = [ctypes.c_int, MyCallback]

        def callback(value):
            #print "called back with", value
            return value

        self.assertEqual(grc(callback), 2)
        cb = MyCallback(callback)

        self.assertGreater(grc(callback), 2)
        result = f(-10, cb)
        self.assertEqual(result, -18)
        cb = None

        gc.collect()

        self.assertEqual(grc(callback), 2)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test__POINTER_c_char(self):
        class X(Structure):
            _fields_ = [("str", POINTER(c_char))]
        x = X()

        # NULL pointer access
        self.assertRaises(ValueError, getattr, x.str, "contents")
        b = c_buffer("Hello, World")
        from sys import getrefcount as grc
        self.assertEqual(grc(b), 2)
        x.str = b
        self.assertEqual(grc(b), 3)

        # POINTER(c_char) and Python string is NOT compatible
        # POINTER(c_char) and c_buffer() is compatible
        for i in range(len(b)):
            self.assertEqual(b[i], x.str[i])

        self.assertRaises(TypeError, setattr, x, "str", "Hello, World")
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def test_1(self):
        from sys import getrefcount as grc

        f = dll._testfunc_callback_i_if
        f.restype = ctypes.c_int
        f.argtypes = [ctypes.c_int, MyCallback]

        def callback(value):
            #print "called back with", value
            return value

        self.assertEqual(grc(callback), 2)
        cb = MyCallback(callback)

        self.assertGreater(grc(callback), 2)
        result = f(-10, cb)
        self.assertEqual(result, -18)
        cb = None

        gc.collect()

        self.assertEqual(grc(callback), 2)
项目:ngas    作者:ICRAR    | 项目源码 | 文件源码
def _getRefCounts():
    """
    Return information about all object allocated and the number of
    references pointing to each object. This can be used to check a running
    server for memory (object) leaks.

    Taken from: http://www.nightmare.com/medusa/memory-leaks.html.

    Returns:
    """
    T = TRACE()

    d = {}
    sys.modules
    # Collect all classes
    for m in sys.modules.values():
        for sym in dir(m):
            o = getattr (m, sym)
            if type(o) is types.ClassType:
                d[o] = sys.getrefcount (o)
    # sort by refcount
    pairs = map(lambda x: (x[1],x[0]), d.items())
    pairs.sort()
    pairs.reverse()
    return pairs
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_bug1229429(self):
        # this bug was never in reversed, it was in
        # PyObject_CallMethod, and reversed_new calls that sometimes.
        if not hasattr(sys, "getrefcount"):
            return
        def f():
            pass
        r = f.__reversed__ = object()
        rc = sys.getrefcount(r)
        for i in range(10):
            try:
                reversed(f)
            except TypeError:
                pass
            else:
                self.fail("non-callable __reversed__ didn't raise!")
        self.assertEqual(rc, sys.getrefcount(r))
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def test_ass_subscript (self):
        for bpp in (8, 16, 24, 32):
            sf = pygame.Surface ((6, 8), 0, bpp)
            sf.fill ((255, 255, 255))
            ar = pygame.PixelArray (sf)

            # Test ellipse working
            ar[...,...] = (0, 0, 0)
            self.assertEqual (ar[0,0], 0)
            self.assertEqual (ar[1,0], 0)
            self.assertEqual (ar[-1,-1], 0)
            ar[...,] = (0, 0, 255)
            self.assertEqual (ar[0,0], sf.map_rgb ((0, 0, 255)))
            self.assertEqual (ar[1,0], sf.map_rgb ((0, 0, 255)))
            self.assertEqual (ar[-1,-1], sf.map_rgb ((0, 0, 255)))
            ar[:,...] = (255, 0, 0)
            self.assertEqual (ar[0,0], sf.map_rgb ((255, 0, 0)))
            self.assertEqual (ar[1,0], sf.map_rgb ((255, 0, 0)))
            self.assertEqual (ar[-1,-1], sf.map_rgb ((255, 0, 0)))
            ar[...] = (0, 255, 0)
            self.assertEqual (ar[0,0], sf.map_rgb ((0, 255, 0)))
            self.assertEqual (ar[1,0], sf.map_rgb ((0, 255, 0)))
            self.assertEqual (ar[-1,-1], sf.map_rgb ((0, 255, 0)))

            # Ensure x and y are freed for array[x, y] = p
            # Bug fix: reference counting
            if hasattr(sys, 'getrefcount'):
                class Int(int):
                    """Unique int instances"""
                    pass

                sf = pygame.Surface ((2, 2), 0, 32)
                ar = pygame.PixelArray (sf)
                x, y = Int(0), Int(1)
                rx_before, ry_before = sys.getrefcount (x), sys.getrefcount (y)
                ar[x, y] = 0
                rx_after, ry_after = sys.getrefcount (x), sys.getrefcount (y)
                self.assertEqual (rx_after, rx_before)
                self.assertEqual (ry_after, ry_before)
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def test_GetView_array_struct(self):
        from pygame.bufferproxy import BufferProxy

        class Exporter(self.ExporterBase):
            def __init__(self, shape, typechar, itemsize):
                super(Exporter, self).__init__(shape, typechar, itemsize)
                self.view = BufferProxy(self.__dict__)

            def get__array_struct__(self):
                return self.view.__array_struct__
            __array_struct__ = property(get__array_struct__)
            # Should not cause PgObject_GetBuffer to fail
            __array_interface__ = property(lambda self: None)

        _shape = [2, 3, 5, 7, 11]  # Some prime numbers
        for ndim in range(1, len(_shape)):
            o = Exporter(_shape[0:ndim], 'i', 2)
            v = BufferProxy(o)
            self.assertSame(v, o)
        ndim = 2
        shape = _shape[0:ndim]
        for typechar in ('i', 'u'):
            for itemsize in (1, 2, 4, 8):
                o = Exporter(shape, typechar, itemsize)
                v = BufferProxy(o)
                self.assertSame(v, o)
        for itemsize in (4, 8):
            o = Exporter(shape, 'f', itemsize)
            v = BufferProxy(o)
            self.assertSame(v, o)

        # Check returned cobject/capsule reference count
        try:
            from sys import getrefcount
        except ImportError:
            # PyPy: no reference counting
            pass
        else:
            o = Exporter(shape, typechar, itemsize)
            self.assertEqual(getrefcount(o.__array_struct__), 1)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def skip_if_no_getrefcount(f):
    @wraps(f)
    def skip_if_no_getrefcount_(self):
        if not hasattr(sys, 'getrefcount'):
            return self.skipTest('skipped, no sys.getrefcount()')
        else:
            return f(self)
    return skip_if_no_getrefcount_
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_mogrify_leak_on_multiple_reference(self):
        # issue #81: reference leak when a parameter value is referenced
        # more than once from a dict.
        cur = self.conn.cursor()
        foo = (lambda x: x)('foo') * 10
        import sys
        nref1 = sys.getrefcount(foo)
        cur.mogrify("select %(foo)s, %(foo)s, %(foo)s", {'foo': foo})
        nref2 = sys.getrefcount(foo)
        self.assertEqual(nref1, nref2)
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def skip_if_no_getrefcount(f):
    @wraps(f)
    def skip_if_no_getrefcount_(self):
        if not hasattr(sys, 'getrefcount'):
            return self.skipTest('skipped, no sys.getrefcount()')
        else:
            return f(self)
    return skip_if_no_getrefcount_
项目:psycopg2-for-aws-lambda    作者:iwitaly    | 项目源码 | 文件源码
def test_mogrify_leak_on_multiple_reference(self):
        # issue #81: reference leak when a parameter value is referenced
        # more than once from a dict.
        cur = self.conn.cursor()
        foo = (lambda x: x)('foo') * 10
        import sys
        nref1 = sys.getrefcount(foo)
        cur.mogrify("select %(foo)s, %(foo)s, %(foo)s", {'foo': foo})
        nref2 = sys.getrefcount(foo)
        self.assertEqual(nref1, nref2)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def await_gc(obj, rc):
    """wait for refcount on an object to drop to an expected value

    Necessary because of the zero-copy gc thread,
    which can take some time to receive its DECREF message.
    """
    for i in range(50):
        # rc + 2 because of the refs in this function
        if grc(obj) <= rc + 2:
            return
        time.sleep(0.05)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_above_30(self):
        """Message above 30 bytes are never copied by 0MQ."""
        for i in range(5, 16):  # 32, 64,..., 65536
            s = (2**i)*x
            self.assertEqual(grc(s), 2)
            m = zmq.Frame(s)
            self.assertEqual(grc(s), 4)
            del m
            await_gc(s, 2)
            self.assertEqual(grc(s), 2)
            del s
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_lifecycle1(self):
        """Run through a ref counting cycle with a copy."""
        for i in range(5, 16):  # 32, 64,..., 65536
            s = (2**i)*x
            rc = 2
            self.assertEqual(grc(s), rc)
            m = zmq.Frame(s)
            rc += 2
            self.assertEqual(grc(s), rc)
            m2 = copy.copy(m)
            rc += 1
            self.assertEqual(grc(s), rc)
            buf = m2.buffer

            rc += view_rc
            self.assertEqual(grc(s), rc)

            self.assertEqual(s, b(str(m)))
            self.assertEqual(s, bytes(m2))
            self.assertEqual(s, m.bytes)
            # self.assert_(s is str(m))
            # self.assert_(s is str(m2))
            del m2
            rc -= 1
            self.assertEqual(grc(s), rc)
            rc -= view_rc
            del buf
            self.assertEqual(grc(s), rc)
            del m
            rc -= 2
            await_gc(s, rc)
            self.assertEqual(grc(s), rc)
            self.assertEqual(rc, 2)
            del s
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_lifecycle2(self):
        """Run through a different ref counting cycle with a copy."""
        for i in range(5, 16):  # 32, 64,..., 65536
            s = (2**i)*x
            rc = 2
            self.assertEqual(grc(s), rc)
            m = zmq.Frame(s)
            rc += 2
            self.assertEqual(grc(s), rc)
            m2 = copy.copy(m)
            rc += 1
            self.assertEqual(grc(s), rc)
            buf = m.buffer
            rc += view_rc
            self.assertEqual(grc(s), rc)
            self.assertEqual(s, b(str(m)))
            self.assertEqual(s, bytes(m2))
            self.assertEqual(s, m2.bytes)
            self.assertEqual(s, m.bytes)
            # self.assert_(s is str(m))
            # self.assert_(s is str(m2))
            del buf
            self.assertEqual(grc(s), rc)
            del m
            # m.buffer is kept until m is del'd
            rc -= view_rc
            rc -= 1
            self.assertEqual(grc(s), rc)
            del m2
            rc -= 2
            await_gc(s, rc)
            self.assertEqual(grc(s), rc)
            self.assertEqual(rc, 2)
            del s
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def view_get_refcount(self, perspective):
        return sys.getrefcount(self)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test03_leak_assignment(self) :
        a = "example of private object"
        refcount = sys.getrefcount(a)
        self.obj.set_private(a)
        self.assertEqual(refcount+1, sys.getrefcount(a))
        self.obj.set_private(None)
        self.assertEqual(refcount, sys.getrefcount(a))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test04_leak_GC(self) :
        a = "example of private object"
        refcount = sys.getrefcount(a)
        self.obj.set_private(a)
        self.obj = None
        self.assertEqual(refcount, sys.getrefcount(a))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_PyString_FromString(self):
        pythonapi.PyString_FromString.restype = py_object
        pythonapi.PyString_FromString.argtypes = (c_char_p,)

        s = "abc"
        refcnt = grc(s)
        pyob = pythonapi.PyString_FromString(s)
        self.assertEqual(grc(s), refcnt)
        self.assertEqual(s, pyob)
        del pyob
        self.assertEqual(grc(s), refcnt)

    # This test is unreliable, because it is possible that code in
    # unittest changes the refcount of the '42' integer.  So, it
    # is disabled by default.
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_PyObj_FromPtr(self):
        s = "abc def ghi jkl"
        ref = grc(s)
        # id(python-object) is the address
        pyobj = PyObj_FromPtr(id(s))
        self.assertIs(s, pyobj)

        self.assertEqual(grc(s), ref + 1)
        del pyobj
        self.assertEqual(grc(s), ref)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_pyobject(self):
        o = ()
        from sys import getrefcount as grc
        for o in (), [], object():
            initial = grc(o)
            # This call leaks a reference to 'o'...
            self.check_type(py_object, o)
            before = grc(o)
            # ...but this call doesn't leak any more.  Where is the refcount?
            self.check_type(py_object, o)
            after = grc(o)
            self.assertEqual((after, o), (before, o))
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def test_X(self):
        class X(Structure):
            _fields_ = [("p", POINTER(c_char_p))]
        x = X()
        i = c_char_p("abc def")
        from sys import getrefcount as grc
        print "2?", grc(i)
        x.p = pointer(i)
        print "3?", grc(i)
        for i in range(320):
            c_int(99)
            x.p[0]
        print x.p[0]
##        del x
##        print "2?", grc(i)
##        del i
        import gc
        gc.collect()
        for i in range(320):
            c_int(99)
            x.p[0]
        print x.p[0]
        print x.p.contents
##        print x._objects

        x.p[0] = "spam spam"
##        print x.p[0]
        print "+" * 42
        print x._objects