Python copy_reg 模块,pickle() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用copy_reg.pickle()

项目:wikilinks    作者:trovdimi    | 项目源码 | 文件源码
def manageWork(self):
        #file = open("/home/ddimitrov/20160305_en_wikilinks/tmp/missing_article_ids.p",'r')
        file = open(SSD_HOME+"pickle/redirects_ids.obj",'r')
        object_file = pickle.load(file)
        #print object_file
        #print type(object_file)
        for root, dirs, files in os.walk(STATIC_HTML_DUMP_ARTICLES_DIR+self.path):
            for i, file_name in enumerate(files):
                if file_name.endswith(".zip"):
                    parts = file_name.split('_')
                    if long(parts[1]) in object_file:

                        try:
                            self.parse_article(file_name,root)
                        except  Exception as e:
                            print("FILENAME_FAIL:"+file_name)
                            print(type(e))    # the exception instance
                            print(e)
                            print (e.message)
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def _try_store_in_file(self, request, response):
        try:
            if (not response.session_id or self._forget
                    or self._unchanged(response)):
                # self.clear_session_cookies()
                self.save_session_id_cookie()
                return False
            if response.session_new or not response.session_file:
                # Tests if the session sub-folder exists, if not, create it
                session_folder = os.path.dirname(response.session_filename)
                if not os.path.exists(session_folder):
                    os.mkdir(session_folder)
                response.session_file = recfile.open(response.session_filename, 'wb')
                portalocker.lock(response.session_file, portalocker.LOCK_EX)
                response.session_locked = True
            if response.session_file:
                session_pickled = response.session_pickled or pickle.dumps(self, pickle.HIGHEST_PROTOCOL)
                response.session_file.write(session_pickled)
                response.session_file.truncate()
        finally:
            self._close(response)

        self.save_session_id_cookie()
        return True
项目:Problematica-public    作者:TechMaz    | 项目源码 | 文件源码
def _try_store_in_file(self, request, response):
        try:
            if (not response.session_id or self._forget
                    or self._unchanged(response)):
                # self.clear_session_cookies()
                self.save_session_id_cookie()
                return False
            if response.session_new or not response.session_file:
                # Tests if the session sub-folder exists, if not, create it
                session_folder = os.path.dirname(response.session_filename)
                if not os.path.exists(session_folder):
                    os.mkdir(session_folder)
                response.session_file = recfile.open(response.session_filename, 'wb')
                portalocker.lock(response.session_file, portalocker.LOCK_EX)
                response.session_locked = True
            if response.session_file:
                session_pickled = response.session_pickled or pickle.dumps(self, pickle.HIGHEST_PROTOCOL)
                response.session_file.write(session_pickled)
                response.session_file.truncate()
        finally:
            self._close(response)

        self.save_session_id_cookie()
        return True
项目:6.033    作者:kchobo13    | 项目源码 | 文件源码
def doMap(self, i):
        f = open("#split-%s-%s" % (self.path, i), "r")
        keyvalue = f.readline()
        value = f.read()
        f.close()
        os.unlink("#split-%s-%s" % (self.path, i))
        keyvaluelist = self.Map(keyvalue, value)
        for r in range(0, self.reducetask):
            # print "map", i, "#map-%s-%s-%d" % (self.path, i, r)
            f = open("#map-%s-%s-%d" % (self.path, i, r), "w+")
            itemlist = [item for item in keyvaluelist if self.Partition(item) == r]
            pickle.dump(itemlist, f)
            f.close()
        return [(i, r) for r in range(0, self.reducetask)]

    # Get reduce regions from maptasks, sort by key, and apply Reduce for each key
项目:6.033    作者:kchobo13    | 项目源码 | 文件源码
def doReduce(self, i):
        keys = {}
        out = []
        for m in range(0, self.maptask):
            # print "reduce", i, "#map-%s-%s-%d" % (self.path, m, i)
            f = open("#map-%s-%s-%d" % (self.path, m, i), "r")
            itemlist = pickle.load(f)
            for item in itemlist:
                if keys.has_key(item[0]):
                    keys[item[0]].append(item)
                else:
                    keys[item[0]] = [item]
            f.close()
            os.unlink("#map-%s-%s-%d" % (self.path, m, i))
        for k in sorted(keys.keys()):
            out.append(self.Reduce(k, keys[k]))
        f = open("#reduce-%s-%d" % (self.path, i), "w+")
        pickle.dump(out, f)
        f.close()
        return i

    # The master.
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def no_tracing(func):
    """Decorator to temporarily turn off tracing for the duration of a test."""
    if not hasattr(sys, 'gettrace'):
        return func
    else:
        def wrapper(*args, **kwargs):
            original_trace = sys.gettrace()
            try:
                sys.settrace(None)
                return func(*args, **kwargs)
            finally:
                sys.settrace(original_trace)
        wrapper.__name__ = func.__name__
        return wrapper


# Return True if opcode code appears in the pickle, else False.
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_long(self):
        for proto in protocols:
            # 256 bytes is where LONG4 begins.
            for nbits in 1, 8, 8*254, 8*255, 8*256, 8*257:
                nbase = 1L << nbits
                for npos in nbase-1, nbase, nbase+1:
                    for n in npos, -npos:
                        pickle = self.dumps(n, proto)
                        got = self.loads(pickle)
                        self.assertEqual(n, got)
        # Try a monster.  This is quadratic-time in protos 0 & 1, so don't
        # bother with those.
        nbase = long("deadbeeffeedface", 16)
        nbase += nbase << 1000000
        for n in nbase, -nbase:
            p = self.dumps(n, 2)
            got = self.loads(p)
            self.assertEqual(n, got)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_singletons(self):
        # Map (proto, singleton) to expected opcode.
        expected_opcode = {(0, None): pickle.NONE,
                           (1, None): pickle.NONE,
                           (2, None): pickle.NONE,

                           (0, True): pickle.INT,
                           (1, True): pickle.INT,
                           (2, True): pickle.NEWTRUE,

                           (0, False): pickle.INT,
                           (1, False): pickle.INT,
                           (2, False): pickle.NEWFALSE,
                          }
        for proto in protocols:
            for x in None, False, True:
                s = self.dumps(x, proto)
                y = self.loads(s)
                self.assertTrue(x is y, (proto, x, s, y))
                expected = expected_opcode[proto, x]
                self.assertEqual(opcode_in_pickle(expected, s), True)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_newobj_proxies(self):
        # NEWOBJ should use the __class__ rather than the raw type
        import weakref
        classes = myclasses[:]
        # Cannot create weakproxies to these classes
        for c in (MyInt, MyLong, MyStr, MyTuple):
            classes.remove(c)
        for proto in protocols:
            for C in classes:
                B = C.__base__
                x = C(C.sample)
                x.foo = 42
                p = weakref.proxy(x)
                s = self.dumps(p, proto)
                y = self.loads(s)
                self.assertEqual(type(y), type(x))  # rather than type(p)
                detail = (proto, C, B, x, y, type(y))
                self.assertEqual(B(x), B(y), detail)
                self.assertEqual(x.__dict__, y.__dict__, detail)

    # Register a type with copy_reg, with extension code extcode.  Pickle
    # an object of that type.  Check that the resulting pickle uses opcode
    # (EXT[124]) under proto 2, and not in proto 1.
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_list_chunking(self):
        n = 10  # too small to chunk
        x = range(n)
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_appends = count_opcode(pickle.APPENDS, s)
            self.assertEqual(num_appends, proto > 0)

        n = 2500  # expect at least two chunks when proto > 0
        x = range(n)
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_appends = count_opcode(pickle.APPENDS, s)
            if proto == 0:
                self.assertEqual(num_appends, 0)
            else:
                self.assertTrue(num_appends >= 2)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_dict_chunking(self):
        n = 10  # too small to chunk
        x = dict.fromkeys(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_setitems = count_opcode(pickle.SETITEMS, s)
            self.assertEqual(num_setitems, proto > 0)

        n = 2500  # expect at least two chunks when proto > 0
        x = dict.fromkeys(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_setitems = count_opcode(pickle.SETITEMS, s)
            if proto == 0:
                self.assertEqual(num_setitems, 0)
            else:
                self.assertTrue(num_setitems >= 2)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_reduce_bad_iterator(self):
        # Issue4176: crash when 4th and 5th items of __reduce__()
        # are not iterators
        class C(object):
            def __reduce__(self):
                # 4th item is not an iterator
                return list, (), None, [], None
        class D(object):
            def __reduce__(self):
                # 5th item is not an iterator
                return dict, (), None, None, []

        # Protocol 0 in Python implementation is less strict and also accepts
        # iterables.
        for proto in protocols:
            try:
                self.dumps(C(), proto)
            except (AttributeError, pickle.PicklingError, cPickle.PicklingError):
                pass
            try:
                self.dumps(D(), proto)
            except (AttributeError, pickle.PicklingError, cPickle.PicklingError):
                pass
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def no_tracing(func):
    """Decorator to temporarily turn off tracing for the duration of a test."""
    if not hasattr(sys, 'gettrace'):
        return func
    else:
        def wrapper(*args, **kwargs):
            original_trace = sys.gettrace()
            try:
                sys.settrace(None)
                return func(*args, **kwargs)
            finally:
                sys.settrace(original_trace)
        wrapper.__name__ = func.__name__
        return wrapper


# Return True if opcode code appears in the pickle, else False.
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_long(self):
        for proto in protocols:
            # 256 bytes is where LONG4 begins.
            for nbits in 1, 8, 8*254, 8*255, 8*256, 8*257:
                nbase = 1L << nbits
                for npos in nbase-1, nbase, nbase+1:
                    for n in npos, -npos:
                        pickle = self.dumps(n, proto)
                        got = self.loads(pickle)
                        self.assertEqual(n, got)
        # Try a monster.  This is quadratic-time in protos 0 & 1, so don't
        # bother with those.
        nbase = long("deadbeeffeedface", 16)
        nbase += nbase << 1000000
        for n in nbase, -nbase:
            p = self.dumps(n, 2)
            got = self.loads(p)
            self.assertEqual(n, got)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_singletons(self):
        # Map (proto, singleton) to expected opcode.
        expected_opcode = {(0, None): pickle.NONE,
                           (1, None): pickle.NONE,
                           (2, None): pickle.NONE,

                           (0, True): pickle.INT,
                           (1, True): pickle.INT,
                           (2, True): pickle.NEWTRUE,

                           (0, False): pickle.INT,
                           (1, False): pickle.INT,
                           (2, False): pickle.NEWFALSE,
                          }
        for proto in protocols:
            for x in None, False, True:
                s = self.dumps(x, proto)
                y = self.loads(s)
                self.assertTrue(x is y, (proto, x, s, y))
                expected = expected_opcode[proto, x]
                self.assertEqual(opcode_in_pickle(expected, s), True)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_newobj_proxies(self):
        # NEWOBJ should use the __class__ rather than the raw type
        import weakref
        classes = myclasses[:]
        # Cannot create weakproxies to these classes
        for c in (MyInt, MyLong, MyStr, MyTuple):
            classes.remove(c)
        for proto in protocols:
            for C in classes:
                B = C.__base__
                x = C(C.sample)
                x.foo = 42
                p = weakref.proxy(x)
                s = self.dumps(p, proto)
                y = self.loads(s)
                self.assertEqual(type(y), type(x))  # rather than type(p)
                detail = (proto, C, B, x, y, type(y))
                self.assertEqual(B(x), B(y), detail)
                self.assertEqual(x.__dict__, y.__dict__, detail)

    # Register a type with copy_reg, with extension code extcode.  Pickle
    # an object of that type.  Check that the resulting pickle uses opcode
    # (EXT[124]) under proto 2, and not in proto 1.
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_list_chunking(self):
        n = 10  # too small to chunk
        x = range(n)
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_appends = count_opcode(pickle.APPENDS, s)
            self.assertEqual(num_appends, proto > 0)

        n = 2500  # expect at least two chunks when proto > 0
        x = range(n)
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_appends = count_opcode(pickle.APPENDS, s)
            if proto == 0:
                self.assertEqual(num_appends, 0)
            else:
                self.assertTrue(num_appends >= 2)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_dict_chunking(self):
        n = 10  # too small to chunk
        x = dict.fromkeys(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_setitems = count_opcode(pickle.SETITEMS, s)
            self.assertEqual(num_setitems, proto > 0)

        n = 2500  # expect at least two chunks when proto > 0
        x = dict.fromkeys(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_setitems = count_opcode(pickle.SETITEMS, s)
            if proto == 0:
                self.assertEqual(num_setitems, 0)
            else:
                self.assertTrue(num_setitems >= 2)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_reduce_bad_iterator(self):
        # Issue4176: crash when 4th and 5th items of __reduce__()
        # are not iterators
        class C(object):
            def __reduce__(self):
                # 4th item is not an iterator
                return list, (), None, [], None
        class D(object):
            def __reduce__(self):
                # 5th item is not an iterator
                return dict, (), None, None, []

        # Protocol 0 in Python implementation is less strict and also accepts
        # iterables.
        for proto in protocols:
            try:
                self.dumps(C(), proto)
            except (AttributeError, pickle.PicklingError, cPickle.PicklingError):
                pass
            try:
                self.dumps(D(), proto)
            except (AttributeError, pickle.PicklingError, cPickle.PicklingError):
                pass
项目:mmfeat    作者:douwekiela    | 项目源码 | 文件源码
def __init__(self, datadir, loadFunc):
        self.datadir = datadir
        if datadir[-1] == '/': self.datadir = self.datadir[:-1]

        idxf = datadir + '/index.pkl'
        if not os.path.exists(idxf):
            raise ValueError('Index pickle not found')

        idx = pickle.load(open(idxf, 'rb'))
        fnames = set([])
        for k in idx:
            for fname in idx[k]:
                ### Backward compat with earlier versions of this code:
                if isinstance(fname, (long, int)):
                    fname = '%d.ogg' % (fname)
                if fname is None: continue
                fname = fname.split('/')[-1]
                ###

                fnames.add(fname)

        self.idx = idx
        self.fnames = list(fnames)
        self.loadFunc = loadFunc
项目:web3py    作者:web2py    | 项目源码 | 文件源码
def _try_store_in_file(self, request, response):
        if not self._enabled: return
        try:
            if (not response.session_id or self._unchanged(response)):
                # self.save_session_id_cookie() # CHECK THIS
                return False
            if response.session_new or not response.session_file:
                # Tests if the session sub-folder exists, if not, create it
                session_folder = os.path.dirname(response.session_filename)
                if not os.path.exists(session_folder):
                    os.mkdir(session_folder)
                response.session_file = recfile.open(response.session_filename, 'wb')
                portalocker.lock(response.session_file, portalocker.LOCK_EX)
                response.session_locked = True
            if response.session_file:
                session_pickled = response.session_pickled or pickle.dumps(self, pickle.HIGHEST_PROTOCOL)
                response.session_file.write(session_pickled)
                response.session_file.truncate()
        finally:
            self._close(response)

        self.save_session_id_cookie()
        return True
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def count_opcode(code, pickle):
    n = 0
    for op, dummy, dummy in pickletools.genops(pickle):
        if op.code == code:
            n += 1
    return n

# We can't very well test the extension registry without putting known stuff
# in it, but we have to be careful to restore its original state.  Code
# should do this:
#
#     e = ExtensionSaver(extension_code)
#     try:
#         fiddle w/ the extension registry's stuff for extension_code
#     finally:
#         e.restore()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_proto(self):
        build_none = pickle.NONE + pickle.STOP
        for proto in protocols:
            expected = build_none
            if proto >= 2:
                expected = pickle.PROTO + chr(proto) + expected
            p = self.dumps(None, proto)
            self.assertEqual(p, expected)

        oob = protocols[-1] + 1     # a future protocol
        badpickle = pickle.PROTO + chr(oob) + build_none
        try:
            self.loads(badpickle)
        except ValueError, detail:
            self.assertTrue(str(detail).startswith(
                                            "unsupported pickle protocol"))
        else:
            self.fail("expected bad protocol number to raise ValueError")
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_singletons(self):
        # Map (proto, singleton) to expected opcode.
        expected_opcode = {(0, None): pickle.NONE,
                           (1, None): pickle.NONE,
                           (2, None): pickle.NONE,

                           (0, True): pickle.INT,
                           (1, True): pickle.INT,
                           (2, True): pickle.NEWTRUE,

                           (0, False): pickle.INT,
                           (1, False): pickle.INT,
                           (2, False): pickle.NEWFALSE,
                          }
        for proto in protocols:
            for x in None, False, True:
                s = self.dumps(x, proto)
                y = self.loads(s)
                self.assertTrue(x is y, (proto, x, s, y))
                expected = expected_opcode[proto, x]
                self.assertEqual(opcode_in_pickle(expected, s), True)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_dict_chunking(self):
        n = 10  # too small to chunk
        x = dict.fromkeys(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_setitems = count_opcode(pickle.SETITEMS, s)
            self.assertEqual(num_setitems, proto > 0)

        n = 2500  # expect at least two chunks when proto > 0
        x = dict.fromkeys(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_setitems = count_opcode(pickle.SETITEMS, s)
            if proto == 0:
                self.assertEqual(num_setitems, 0)
            else:
                self.assertTrue(num_setitems >= 2)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_reduce_bad_iterator(self):
        # Issue4176: crash when 4th and 5th items of __reduce__()
        # are not iterators
        class C(object):
            def __reduce__(self):
                # 4th item is not an iterator
                return list, (), None, [], None
        class D(object):
            def __reduce__(self):
                # 5th item is not an iterator
                return dict, (), None, None, []

        # Protocol 0 is less strict and also accept iterables.
        for proto in protocols:
            try:
                self.dumps(C(), proto)
            except (AttributeError, pickle.PickleError, cPickle.PickleError):
                pass
            try:
                self.dumps(D(), proto)
            except (AttributeError, pickle.PickleError, cPickle.PickleError):
                pass
项目:slugiot-client    作者:slugiot    | 项目源码 | 文件源码
def _try_store_in_file(self, request, response):
        try:
            if (not response.session_id or self._forget
                    or self._unchanged(response)):
                # self.clear_session_cookies()
                self.save_session_id_cookie()
                return False
            if response.session_new or not response.session_file:
                # Tests if the session sub-folder exists, if not, create it
                session_folder = os.path.dirname(response.session_filename)
                if not os.path.exists(session_folder):
                    os.mkdir(session_folder)
                response.session_file = recfile.open(response.session_filename, 'wb')
                portalocker.lock(response.session_file, portalocker.LOCK_EX)
                response.session_locked = True
            if response.session_file:
                session_pickled = response.session_pickled or pickle.dumps(self, pickle.HIGHEST_PROTOCOL)
                response.session_file.write(session_pickled)
                response.session_file.truncate()
        finally:
            self._close(response)

        self.save_session_id_cookie()
        return True
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def count_opcode(code, pickle):
    n = 0
    for op, dummy, dummy in pickletools.genops(pickle):
        if op.code == code:
            n += 1
    return n

# We can't very well test the extension registry without putting known stuff
# in it, but we have to be careful to restore its original state.  Code
# should do this:
#
#     e = ExtensionSaver(extension_code)
#     try:
#         fiddle w/ the extension registry's stuff for extension_code
#     finally:
#         e.restore()
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_proto(self):
        build_none = pickle.NONE + pickle.STOP
        for proto in protocols:
            expected = build_none
            if proto >= 2:
                expected = pickle.PROTO + chr(proto) + expected
            p = self.dumps(None, proto)
            self.assertEqual(p, expected)

        oob = protocols[-1] + 1     # a future protocol
        badpickle = pickle.PROTO + chr(oob) + build_none
        try:
            self.loads(badpickle)
        except ValueError, detail:
            self.assertTrue(str(detail).startswith(
                                            "unsupported pickle protocol"))
        else:
            self.fail("expected bad protocol number to raise ValueError")
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_singletons(self):
        # Map (proto, singleton) to expected opcode.
        expected_opcode = {(0, None): pickle.NONE,
                           (1, None): pickle.NONE,
                           (2, None): pickle.NONE,

                           (0, True): pickle.INT,
                           (1, True): pickle.INT,
                           (2, True): pickle.NEWTRUE,

                           (0, False): pickle.INT,
                           (1, False): pickle.INT,
                           (2, False): pickle.NEWFALSE,
                          }
        for proto in protocols:
            for x in None, False, True:
                s = self.dumps(x, proto)
                y = self.loads(s)
                self.assertTrue(x is y, (proto, x, s, y))
                expected = expected_opcode[proto, x]
                self.assertEqual(opcode_in_pickle(expected, s), True)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_dict_chunking(self):
        n = 10  # too small to chunk
        x = dict.fromkeys(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_setitems = count_opcode(pickle.SETITEMS, s)
            self.assertEqual(num_setitems, proto > 0)

        n = 2500  # expect at least two chunks when proto > 0
        x = dict.fromkeys(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_setitems = count_opcode(pickle.SETITEMS, s)
            if proto == 0:
                self.assertEqual(num_setitems, 0)
            else:
                self.assertTrue(num_setitems >= 2)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_reduce_bad_iterator(self):
        # Issue4176: crash when 4th and 5th items of __reduce__()
        # are not iterators
        class C(object):
            def __reduce__(self):
                # 4th item is not an iterator
                return list, (), None, [], None
        class D(object):
            def __reduce__(self):
                # 5th item is not an iterator
                return dict, (), None, None, []

        # Protocol 0 is less strict and also accept iterables.
        for proto in protocols:
            try:
                self.dumps(C(), proto)
            except (AttributeError, pickle.PickleError, cPickle.PickleError):
                pass
            try:
                self.dumps(D(), proto)
            except (AttributeError, pickle.PickleError, cPickle.PickleError):
                pass
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def InitMessage(descriptor, cls):
  cls._decoders_by_tag = {}
  cls._extensions_by_name = {}
  cls._extensions_by_number = {}
  if (descriptor.has_options and
      descriptor.GetOptions().message_set_wire_format):
    cls._decoders_by_tag[decoder.MESSAGE_SET_ITEM_TAG] = (
        decoder.MessageSetItemDecoder(cls._extensions_by_number), None)


  for field in descriptor.fields:
    _AttachFieldHelpers(cls, field)

  _AddEnumValues(descriptor, cls)
  _AddInitMethod(descriptor, cls)
  _AddPropertiesForFields(descriptor, cls)
  _AddPropertiesForExtensions(descriptor, cls)
  _AddStaticMethods(cls)
  _AddMessageMethods(descriptor, cls)
  _AddPrivateHelperMethods(descriptor, cls)
  copyreg.pickle(cls, lambda obj: (cls, (), obj.__getstate__()))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_nonIdentityHash(self):
        global ClassWithCustomHash
        class ClassWithCustomHash(styles.Versioned):
            def __init__(self, unique, hash):
                self.unique = unique
                self.hash = hash
            def __hash__(self):
                return self.hash

        v1 = ClassWithCustomHash('v1', 0)
        v2 = ClassWithCustomHash('v2', 0)

        pkl = pickle.dumps((v1, v2))
        del v1, v2
        ClassWithCustomHash.persistenceVersion = 1
        ClassWithCustomHash.upgradeToVersion1 = lambda self: setattr(self, 'upgraded', True)
        v1, v2 = pickle.loads(pkl)
        styles.doUpgrade()
        self.assertEqual(v1.unique, 'v1')
        self.assertEqual(v2.unique, 'v2')
        self.assertTrue(v1.upgraded)
        self.assertTrue(v2.upgraded)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_upgradeDeserializesObjectsRequiringUpgrade(self):
        global ToyClassA, ToyClassB
        class ToyClassA(styles.Versioned):
            pass
        class ToyClassB(styles.Versioned):
            pass
        x = ToyClassA()
        y = ToyClassB()
        pklA, pklB = pickle.dumps(x), pickle.dumps(y)
        del x, y
        ToyClassA.persistenceVersion = 1
        def upgradeToVersion1(self):
            self.y = pickle.loads(pklB)
            styles.doUpgrade()
        ToyClassA.upgradeToVersion1 = upgradeToVersion1
        ToyClassB.persistenceVersion = 1
        ToyClassB.upgradeToVersion1 = lambda self: setattr(self, 'upgraded', True)

        x = pickle.loads(pklA)
        styles.doUpgrade()
        self.assertTrue(x.y.upgraded)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def pickleMethod(method):
    'support function for copy_reg to pickle method refs'
    return unpickleMethod, (method.im_func.__name__,
                             method.im_self,
                             method.im_class)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def pickleModule(module):
    'support function for copy_reg to pickle module refs'
    return unpickleModule, (module.__name__,)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def pickleStringO(stringo):
    'support function for copy_reg to pickle StringIO.OutputTypes'
    return unpickleStringO, (stringo.getvalue(), stringo.tell())
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def _ufunc_reconstruct(module, name):
    # The `fromlist` kwarg is required to ensure that `mod` points to the
    # inner-most module rather than the parent package when module name is
    # nested. This makes it possible to pickle non-toplevel ufuncs such as
    # scipy.special.expit for instance.
    mod = __import__(module, fromlist=[name])
    return getattr(mod, name)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def _ufunc_reduce(func):
    from pickle import whichmodule
    name = func.__name__
    return _ufunc_reconstruct, (whichmodule(func, name), name)
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def _unchanged(self, response):
        session_pickled = pickle.dumps(self, pickle.HIGHEST_PROTOCOL)
        response.session_pickled = session_pickled
        session_hash = hashlib.md5(session_pickled).hexdigest()
        return response.session_hash == session_hash
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def _try_store_in_db(self, request, response):
        # don't save if file-based sessions,
        # no session id, or session being forgotten
        # or no changes to session (Unless the session is new)
        if (not response.session_db_table
                or self._forget
                or (self._unchanged(response) and not response.session_new)):
            if (not response.session_db_table
                    and global_settings.db_sessions is not True
                    and response.session_masterapp in global_settings.db_sessions):
                global_settings.db_sessions.remove(response.session_masterapp)
            # self.clear_session_cookies()
            self.save_session_id_cookie()
            return False

        table = response.session_db_table
        record_id = response.session_db_record_id
        if response.session_new:
            unique_key = web2py_uuid()
        else:
            unique_key = response.session_db_unique_key

        session_pickled = response.session_pickled or pickle.dumps(self, pickle.HIGHEST_PROTOCOL)

        dd = dict(locked=False,
                  client_ip=response.session_client,
                  modified_datetime=request.now,
                  session_data=session_pickled,
                  unique_key=unique_key)
        if record_id:
            if not table._db(table.id == record_id).update(**dd):
                record_id = None
        if not record_id:
            record_id = table.insert(**dd)
            response.session_id = '%s:%s' % (record_id, unique_key)
            response.session_db_unique_key = unique_key
            response.session_db_record_id = record_id

        self.save_session_id_cookie()
        return True
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def load_storage(filename):
    fp = None
    try:
        fp = portalocker.LockedFile(filename, 'rb')
        storage = pickle.load(fp)
    finally:
        if fp:
            fp.close()
    return Storage(storage)
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def save_storage(storage, filename):
    fp = None
    try:
        fp = portalocker.LockedFile(filename, 'wb')
        pickle.dump(dict(storage), fp)
    finally:
        if fp:
            fp.close()
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def TAG_pickler(data):
    d = DIV()
    d.__dict__ = data.__dict__
    marshal_dump = pickle.dumps(d, pickle.HIGHEST_PROTOCOL)
    return (TAG_unpickler, (marshal_dump,))
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def install():
    try:
        import copy_reg
    except ImportError:
        import copyreg as copy_reg

    copy_reg.pickle(TracebackType, pickle_traceback)

# Added by gevent

# We have to defer the initialization, and especially the import of platform,
# until runtime. If we're monkey patched, we need to be sure to use
# the original __import__ to avoid switching through the hub due to
# import locks on Python 2. See also builtins.py for details.
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _import_dump_load():
    global dumps
    global loads
    try:
        import cPickle as pickle
    except ImportError:
        import pickle
    dumps = pickle.dumps
    loads = pickle.loads
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def dump_traceback(tb):
    # Both _init and dump/load have to be unlocked, because
    # copy_reg and pickle can do imports to resolve class names; those
    # class names are in this module and greenlet safe though
    _init()
    return dumps(tb)
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def install():
    try:
        import copy_reg
    except ImportError:
        import copyreg as copy_reg

    copy_reg.pickle(TracebackType, pickle_traceback)

# Added by gevent

# We have to defer the initialization, and especially the import of platform,
# until runtime. If we're monkey patched, we need to be sure to use
# the original __import__ to avoid switching through the hub due to
# import locks on Python 2. See also builtins.py for details.
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def _import_dump_load():
    global dumps
    global loads
    try:
        import cPickle as pickle
    except ImportError:
        import pickle
    dumps = pickle.dumps
    loads = pickle.loads