Python copyreg 模块,pickle() 实例源码


项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_long(self):
        for proto in protocols:
            # 256 bytes is where LONG4 begins.
            for nbits in 1, 8, 8*254, 8*255, 8*256, 8*257:
                nbase = 1 << nbits
                for npos in nbase-1, nbase, nbase+1:
                    for n in npos, -npos:
                        pickle = self.dumps(n, proto)
                        got = self.loads(pickle)
                        self.assertEqual(n, got)
        # Try a monster.  This is quadratic-time in protos 0 & 1, so don't
        # bother with those.
        nbase = int("deadbeeffeedface", 16)
        nbase += nbase << 1000000
        for n in nbase, -nbase:
            p = self.dumps(n, 2)
            got = self.loads(p)
            self.assertEqual(n, got)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_proto(self):
        build_none = pickle.NONE + pickle.STOP
        for proto in protocols:
            expected = build_none
            if proto >= 2:
                expected = pickle.PROTO + bytes([proto]) + expected
            p = self.dumps(None, proto)
            self.assertEqual(p, expected)

        oob = protocols[-1] + 1     # a future protocol
        badpickle = pickle.PROTO + bytes([oob]) + build_none
        except ValueError as detail:
                                            "unsupported pickle protocol"))
  "expected bad protocol number to raise ValueError")
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_singletons(self):
        # Map (proto, singleton) to expected opcode.
        expected_opcode = {(0, None): pickle.NONE,
                           (1, None): pickle.NONE,
                           (2, None): pickle.NONE,
                           (3, None): pickle.NONE,

                           (0, True): pickle.INT,
                           (1, True): pickle.INT,
                           (2, True): pickle.NEWTRUE,
                           (3, True): pickle.NEWTRUE,

                           (0, False): pickle.INT,
                           (1, False): pickle.INT,
                           (2, False): pickle.NEWFALSE,
                           (3, False): pickle.NEWFALSE,
        for proto in protocols:
            for x in None, False, True:
                s = self.dumps(x, proto)
                y = self.loads(s)
                self.assertTrue(x is y, (proto, x, s, y))
                expected = expected_opcode[proto, x]
                self.assertEqual(opcode_in_pickle(expected, s), True)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_list_chunking(self):
        n = 10  # too small to chunk
        x = list(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_appends = count_opcode(pickle.APPENDS, s)
            self.assertEqual(num_appends, proto > 0)

        n = 2500  # expect at least two chunks when proto > 0
        x = list(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_appends = count_opcode(pickle.APPENDS, s)
            if proto == 0:
                self.assertEqual(num_appends, 0)
                self.assertTrue(num_appends >= 2)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_dict_chunking(self):
        n = 10  # too small to chunk
        x = dict.fromkeys(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            self.assertIsInstance(s, bytes_types)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_setitems = count_opcode(pickle.SETITEMS, s)
            self.assertEqual(num_setitems, proto > 0)

        n = 2500  # expect at least two chunks when proto > 0
        x = dict.fromkeys(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_setitems = count_opcode(pickle.SETITEMS, s)
            if proto == 0:
                self.assertEqual(num_setitems, 0)
                self.assertTrue(num_setitems >= 2)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_reduce_bad_iterator(self):
        # Issue4176: crash when 4th and 5th items of __reduce__()
        # are not iterators
        class C(object):
            def __reduce__(self):
                # 4th item is not an iterator
                return list, (), None, [], None
        class D(object):
            def __reduce__(self):
                # 5th item is not an iterator
                return dict, (), None, None, []

        # Protocol 0 is less strict and also accept iterables.
        for proto in protocols:
                self.dumps(C(), proto)
            except (pickle.PickleError):
                self.dumps(D(), proto)
            except (pickle.PickleError):
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def putmessage(self, message):
        self.debug("putmessage:%d:" % message[0])
            s = pickle.dumps(message)
        except pickle.PicklingError:
            print("Cannot pickle:", repr(message), file=sys.__stderr__)
        s = struct.pack("<i", len(s)) + s
        while len(s) > 0:
                r, w, x =[], [self.sock], [])
                n = self.sock.send(s[:BUFSIZE])
            except (AttributeError, TypeError):
                raise IOError("socket no longer exists")
            except socket.error:
                s = s[n:]
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_long(self):
        for proto in protocols:
            # 256 bytes is where LONG4 begins.
            for nbits in 1, 8, 8*254, 8*255, 8*256, 8*257:
                nbase = 1 << nbits
                for npos in nbase-1, nbase, nbase+1:
                    for n in npos, -npos:
                        pickle = self.dumps(n, proto)
                        got = self.loads(pickle)
                        self.assertEqual(n, got)
        # Try a monster.  This is quadratic-time in protos 0 & 1, so don't
        # bother with those.
        nbase = int("deadbeeffeedface", 16)
        nbase += nbase << 1000000
        for n in nbase, -nbase:
            p = self.dumps(n, 2)
            got = self.loads(p)
            self.assertEqual(n, got)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_proto(self):
        build_none = pickle.NONE + pickle.STOP
        for proto in protocols:
            expected = build_none
            if proto >= 2:
                expected = pickle.PROTO + bytes([proto]) + expected
            p = self.dumps(None, proto)
            self.assertEqual(p, expected)

        oob = protocols[-1] + 1     # a future protocol
        badpickle = pickle.PROTO + bytes([oob]) + build_none
        except ValueError as detail:
                                            "unsupported pickle protocol"))
  "expected bad protocol number to raise ValueError")
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_singletons(self):
        # Map (proto, singleton) to expected opcode.
        expected_opcode = {(0, None): pickle.NONE,
                           (1, None): pickle.NONE,
                           (2, None): pickle.NONE,
                           (3, None): pickle.NONE,

                           (0, True): pickle.INT,
                           (1, True): pickle.INT,
                           (2, True): pickle.NEWTRUE,
                           (3, True): pickle.NEWTRUE,

                           (0, False): pickle.INT,
                           (1, False): pickle.INT,
                           (2, False): pickle.NEWFALSE,
                           (3, False): pickle.NEWFALSE,
        for proto in protocols:
            for x in None, False, True:
                s = self.dumps(x, proto)
                y = self.loads(s)
                self.assertTrue(x is y, (proto, x, s, y))
                expected = expected_opcode[proto, x]
                self.assertEqual(opcode_in_pickle(expected, s), True)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_list_chunking(self):
        n = 10  # too small to chunk
        x = list(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_appends = count_opcode(pickle.APPENDS, s)
            self.assertEqual(num_appends, proto > 0)

        n = 2500  # expect at least two chunks when proto > 0
        x = list(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_appends = count_opcode(pickle.APPENDS, s)
            if proto == 0:
                self.assertEqual(num_appends, 0)
                self.assertTrue(num_appends >= 2)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_dict_chunking(self):
        n = 10  # too small to chunk
        x = dict.fromkeys(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            self.assertIsInstance(s, bytes_types)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_setitems = count_opcode(pickle.SETITEMS, s)
            self.assertEqual(num_setitems, proto > 0)

        n = 2500  # expect at least two chunks when proto > 0
        x = dict.fromkeys(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            num_setitems = count_opcode(pickle.SETITEMS, s)
            if proto == 0:
                self.assertEqual(num_setitems, 0)
                self.assertTrue(num_setitems >= 2)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_reduce_bad_iterator(self):
        # Issue4176: crash when 4th and 5th items of __reduce__()
        # are not iterators
        class C(object):
            def __reduce__(self):
                # 4th item is not an iterator
                return list, (), None, [], None
        class D(object):
            def __reduce__(self):
                # 5th item is not an iterator
                return dict, (), None, None, []

        # Protocol 0 is less strict and also accept iterables.
        for proto in protocols:
                self.dumps(C(), proto)
            except (pickle.PickleError):
                self.dumps(D(), proto)
            except (pickle.PickleError):
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_badly_quoted_string(self):
        # Issue #17710
        badpickles = [b"S'\n.",
                      b'S\' \n.',
                      b'S" \n.',
                      b"S' ' \n.",
                      b'S" " \n.',
                      b"S ''\n.",
                      b'S ""\n.',
                      b'S \n.',
        for p in badpickles:
            self.check_unpickling_error(pickle.UnpicklingError, p)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_long(self):
        for proto in protocols:
            # 256 bytes is where LONG4 begins.
            for nbits in 1, 8, 8*254, 8*255, 8*256, 8*257:
                nbase = 1 << nbits
                for npos in nbase-1, nbase, nbase+1:
                    for n in npos, -npos:
                        pickle = self.dumps(n, proto)
                        got = self.loads(pickle)
                        self.assert_is_copy(n, got)
        # Try a monster.  This is quadratic-time in protos 0 & 1, so don't
        # bother with those.
        nbase = int("deadbeeffeedface", 16)
        nbase += nbase << 1000000
        for n in nbase, -nbase:
            p = self.dumps(n, 2)
            got = self.loads(p)
            # assert_is_copy is very expensive here as it precomputes
            # a failure message by computing the repr() of n and got,
            # we just do the check ourselves.
            self.assertIs(type(got), int)
            self.assertEqual(n, got)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_proto(self):
        for proto in protocols:
            pickled = self.dumps(None, proto)
            if proto >= 2:
                proto_header = pickle.PROTO + bytes([proto])
                self.assertEqual(count_opcode(pickle.PROTO, pickled), 0)

        oob = protocols[-1] + 1     # a future protocol
        build_none = pickle.NONE + pickle.STOP
        badpickle = pickle.PROTO + bytes([oob]) + build_none
        except ValueError as err:
            self.assertIn("unsupported pickle protocol", str(err))
  "expected bad protocol number to raise ValueError")
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_list_chunking(self):
        n = 10  # too small to chunk
        x = list(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assert_is_copy(x, y)
            num_appends = count_opcode(pickle.APPENDS, s)
            self.assertEqual(num_appends, proto > 0)

        n = 2500  # expect at least two chunks when proto > 0
        x = list(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assert_is_copy(x, y)
            num_appends = count_opcode(pickle.APPENDS, s)
            if proto == 0:
                self.assertEqual(num_appends, 0)
                self.assertTrue(num_appends >= 2)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_set_chunking(self):
        n = 10  # too small to chunk
        x = set(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assert_is_copy(x, y)
            num_additems = count_opcode(pickle.ADDITEMS, s)
            if proto < 4:
                self.assertEqual(num_additems, 0)
                self.assertEqual(num_additems, 1)

        n = 2500  # expect at least two chunks when proto >= 4
        x = set(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assert_is_copy(x, y)
            num_additems = count_opcode(pickle.ADDITEMS, s)
            if proto < 4:
                self.assertEqual(num_additems, 0)
                self.assertGreaterEqual(num_additems, 2)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_reduce_bad_iterator(self):
        # Issue4176: crash when 4th and 5th items of __reduce__()
        # are not iterators
        class C(object):
            def __reduce__(self):
                # 4th item is not an iterator
                return list, (), None, [], None
        class D(object):
            def __reduce__(self):
                # 5th item is not an iterator
                return dict, (), None, None, []

        # Protocol 0 is less strict and also accept iterables.
        for proto in protocols:
                self.dumps(C(), proto)
            except (pickle.PickleError):
                self.dumps(D(), proto)
            except (pickle.PickleError):
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def check_frame_opcodes(self, pickled):
        Check the arguments of FRAME opcodes in a protocol 4+ pickle.
        frame_opcode_size = 9
        last_arg = last_pos = None
        for op, arg, pos in pickletools.genops(pickled):
            if != 'FRAME':
            if last_pos is not None:
                # The previous frame's size should be equal to the number
                # of bytes up to the current frame.
                frame_size = pos - last_pos - frame_opcode_size
                self.assertEqual(frame_size, last_arg)
            last_arg, last_pos = arg, pos
        # The last frame's size should be equal to the number of bytes up
        # to the pickle's end.
        frame_size = len(pickled) - last_pos - frame_opcode_size
        self.assertEqual(frame_size, last_arg)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_huge_bytes_32b(self, size):
        data = b"abcd" * (size // 4)
            for proto in protocols:
                if proto < 3:
                with self.subTest(proto=proto):
                        pickled = self.dumps(data, protocol=proto)
                        header = (pickle.BINBYTES +
                                  struct.pack("<I", len(data)))
                        data_start = pickled.index(data)
                        pickled = None
            data = None
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_huge_str_32b(self, size):
        data = "abcd" * (size // 4)
            for proto in protocols:
                if proto == 0:
                with self.subTest(proto=proto):
                        pickled = self.dumps(data, protocol=proto)
                        header = (pickle.BINUNICODE +
                                  struct.pack("<I", len(data)))
                        data_start = pickled.index(b'abcd')
                        self.assertEqual((pickled.rindex(b"abcd") + len(b"abcd") -
                                          pickled.index(b"abcd")), len(data))
                        pickled = None
            data = None

    # BINUNICODE (protocols 1, 2 and 3) cannot carry more than 2**32 - 1 bytes
    # of utf-8 encoded unicode. BINUNICODE8 (protocol 4) supports these huge
    # unicode strings however.
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def InitMessage(descriptor, cls):
  cls._decoders_by_tag = {}
  cls._extensions_by_name = {}
  cls._extensions_by_number = {}
  if (descriptor.has_options and
    cls._decoders_by_tag[decoder.MESSAGE_SET_ITEM_TAG] = (
        decoder.MessageSetItemDecoder(cls._extensions_by_number), None)

  for field in descriptor.fields:
    _AttachFieldHelpers(cls, field)

  _AddEnumValues(descriptor, cls)
  _AddInitMethod(descriptor, cls)
  _AddPropertiesForFields(descriptor, cls)
  _AddPropertiesForExtensions(descriptor, cls)
  _AddMessageMethods(descriptor, cls)
  _AddPrivateHelperMethods(descriptor, cls)
  copyreg.pickle(cls, lambda obj: (cls, (), obj.__getstate__()))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_nonIdentityHash(self):
        global ClassWithCustomHash
        class ClassWithCustomHash(styles.Versioned):
            def __init__(self, unique, hash):
                self.unique = unique
                self.hash = hash
            def __hash__(self):
                return self.hash

        v1 = ClassWithCustomHash('v1', 0)
        v2 = ClassWithCustomHash('v2', 0)

        pkl = pickle.dumps((v1, v2))
        del v1, v2
        ClassWithCustomHash.persistenceVersion = 1
        ClassWithCustomHash.upgradeToVersion1 = lambda self: setattr(self, 'upgraded', True)
        v1, v2 = pickle.loads(pkl)
        self.assertEqual(v1.unique, 'v1')
        self.assertEqual(v2.unique, 'v2')
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_upgradeDeserializesObjectsRequiringUpgrade(self):
        global ToyClassA, ToyClassB
        class ToyClassA(styles.Versioned):
        class ToyClassB(styles.Versioned):
        x = ToyClassA()
        y = ToyClassB()
        pklA, pklB = pickle.dumps(x), pickle.dumps(y)
        del x, y
        ToyClassA.persistenceVersion = 1
        def upgradeToVersion1(self):
            self.y = pickle.loads(pklB)
        ToyClassA.upgradeToVersion1 = upgradeToVersion1
        ToyClassB.persistenceVersion = 1
        ToyClassB.upgradeToVersion1 = lambda self: setattr(self, 'upgraded', True)

        x = pickle.loads(pklA)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _pickleFunction(f):
    Reduce, in the sense of L{pickle}'s C{object.__reduce__} special method, a
    function object into its constituent parts.

    @param f: The function to reduce.
    @type f: L{types.FunctionType}

    @return: a 2-tuple of a reference to L{_unpickleFunction} and a tuple of
        its arguments, a 1-tuple of the function's fully qualified name.
    @rtype: 2-tuple of C{callable, native string}
    if f.__name__ == '<lambda>':
        raise _UniversalPicklingError(
            "Cannot pickle lambda function: {}".format(f))
    return (_unpickleFunction,
            tuple([".".join([f.__module__, f.__qualname__])]))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_long(self):
        for proto in protocols:
            # 256 bytes is where LONG4 begins.
            for nbits in 1, 8, 8*254, 8*255, 8*256, 8*257:
                nbase = 1 << nbits
                for npos in nbase-1, nbase, nbase+1:
                    for n in npos, -npos:
                        pickle = self.dumps(n, proto)
                        got = self.loads(pickle)
                        self.assert_is_copy(n, got)
        # Try a monster.  This is quadratic-time in protos 0 & 1, so don't
        # bother with those.
        nbase = int("deadbeeffeedface", 16)
        nbase += nbase << 1000000
        for n in nbase, -nbase:
            p = self.dumps(n, 2)
            got = self.loads(p)
            # assert_is_copy is very expensive here as it precomputes
            # a failure message by computing the repr() of n and got,
            # we just do the check ourselves.
            self.assertIs(type(got), int)
            self.assertEqual(n, got)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_proto(self):
        for proto in protocols:
            pickled = self.dumps(None, proto)
            if proto >= 2:
                proto_header = pickle.PROTO + bytes([proto])
                self.assertEqual(count_opcode(pickle.PROTO, pickled), 0)

        oob = protocols[-1] + 1     # a future protocol
        build_none = pickle.NONE + pickle.STOP
        badpickle = pickle.PROTO + bytes([oob]) + build_none
        except ValueError as err:
            self.assertIn("unsupported pickle protocol", str(err))
  "expected bad protocol number to raise ValueError")
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_singletons(self):
        # Map (proto, singleton) to expected opcode.
        expected_opcode = {(0, None): pickle.NONE,
                           (1, None): pickle.NONE,
                           (2, None): pickle.NONE,
                           (3, None): pickle.NONE,

                           (0, True): pickle.INT,
                           (1, True): pickle.INT,
                           (2, True): pickle.NEWTRUE,
                           (3, True): pickle.NEWTRUE,

                           (0, False): pickle.INT,
                           (1, False): pickle.INT,
                           (2, False): pickle.NEWFALSE,
                           (3, False): pickle.NEWFALSE,
        for proto in protocols:
            for x in None, False, True:
                s = self.dumps(x, proto)
                y = self.loads(s)
                self.assertTrue(x is y, (proto, x, s, y))
                expected = expected_opcode[min(proto, 3), x]
                self.assertTrue(opcode_in_pickle(expected, s))
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_list_chunking(self):
        n = 10  # too small to chunk
        x = list(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assert_is_copy(x, y)
            num_appends = count_opcode(pickle.APPENDS, s)
            self.assertEqual(num_appends, proto > 0)

        n = 2500  # expect at least two chunks when proto > 0
        x = list(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assert_is_copy(x, y)
            num_appends = count_opcode(pickle.APPENDS, s)
            if proto == 0:
                self.assertEqual(num_appends, 0)
                self.assertTrue(num_appends >= 2)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_dict_chunking(self):
        n = 10  # too small to chunk
        x = dict.fromkeys(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            self.assertIsInstance(s, bytes_types)
            y = self.loads(s)
            self.assert_is_copy(x, y)
            num_setitems = count_opcode(pickle.SETITEMS, s)
            self.assertEqual(num_setitems, proto > 0)

        n = 2500  # expect at least two chunks when proto > 0
        x = dict.fromkeys(range(n))
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assert_is_copy(x, y)
            num_setitems = count_opcode(pickle.SETITEMS, s)
            if proto == 0:
                self.assertEqual(num_setitems, 0)
                self.assertTrue(num_setitems >= 2)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_reduce_bad_iterator(self):
        # Issue4176: crash when 4th and 5th items of __reduce__()
        # are not iterators
        class C(object):
            def __reduce__(self):
                # 4th item is not an iterator
                return list, (), None, [], None
        class D(object):
            def __reduce__(self):
                # 5th item is not an iterator
                return dict, (), None, None, []

        # Protocol 0 is less strict and also accept iterables.
        for proto in protocols:
                self.dumps(C(), proto)
            except (pickle.PickleError):
                self.dumps(D(), proto)
            except (pickle.PickleError):
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_badly_quoted_string(self):
        # Issue #17710
        badpickles = [b"S'\n.",
                      b'S\' \n.',
                      b'S" \n.',
                      b"S' ' \n.",
                      b'S" " \n.',
                      b"S ''\n.",
                      b'S ""\n.',
                      b'S \n.',
        for p in badpickles:
            self.assertRaises(pickle.UnpicklingError, self.loads, p)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def check_frame_opcodes(self, pickled):
        Check the arguments of FRAME opcodes in a protocol 4+ pickle.
        frame_opcode_size = 9
        last_arg = last_pos = None
        for op, arg, pos in pickletools.genops(pickled):
            if != 'FRAME':
            if last_pos is not None:
                # The previous frame's size should be equal to the number
                # of bytes up to the current frame.
                frame_size = pos - last_pos - frame_opcode_size
                self.assertEqual(frame_size, last_arg)
            last_arg, last_pos = arg, pos
        # The last frame's size should be equal to the number of bytes up
        # to the pickle's end.
        frame_size = len(pickled) - last_pos - frame_opcode_size
        self.assertEqual(frame_size, last_arg)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_huge_str_32b(self, size):
        data = "abcd" * (size // 4)
            for proto in protocols:
                if proto == 0:
                with self.subTest(proto=proto):
                        pickled = self.dumps(data, protocol=proto)
                        header = (pickle.BINUNICODE +
                                  struct.pack("<I", len(data)))
                        data_start = pickled.index(b'abcd')
                        self.assertEqual((pickled.rindex(b"abcd") + len(b"abcd") -
                                          pickled.index(b"abcd")), len(data))
                        pickled = None
            data = None

    # BINUNICODE (protocols 1, 2 and 3) cannot carry more than 2**32 - 1 bytes
    # of utf-8 encoded unicode. BINUNICODE8 (protocol 4) supports these huge
    # unicode strings however.
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def _ufunc_reconstruct(module, name):
    # The `fromlist` kwarg is required to ensure that `mod` points to the
    # inner-most module rather than the parent package when module name is
    # nested. This makes it possible to pickle non-toplevel ufuncs such as
    # scipy.special.expit for instance.
    mod = __import__(module, fromlist=[name])
    return getattr(mod, name)
项目:radar    作者:amoose136    | 项目源码 | 文件源码
def _ufunc_reduce(func):
    from pickle import whichmodule
    name = func.__name__
    return _ufunc_reconstruct, (whichmodule(func, name), name)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def install():
        import copy_reg
    except ImportError:
        import copyreg as copy_reg

    copy_reg.pickle(TracebackType, pickle_traceback)

# Added by gevent

# We have to defer the initialization, and especially the import of platform,
# until runtime. If we're monkey patched, we need to be sure to use
# the original __import__ to avoid switching through the hub due to
# import locks on Python 2. See also for details.
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def _import_dump_load():
    global dumps
    global loads
        import cPickle as pickle
    except ImportError:
        import pickle
    dumps = pickle.dumps
    loads = pickle.loads
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def dump_traceback(tb):
    # Both _init and dump/load have to be unlocked, because
    # copy_reg and pickle can do imports to resolve class names; those
    # class names are in this module and greenlet safe though
    return dumps(tb)
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def install():
        import copy_reg
    except ImportError:
        import copyreg as copy_reg

    copy_reg.pickle(TracebackType, pickle_traceback)

# Added by gevent

# We have to defer the initialization, and especially the import of platform,
# until runtime. If we're monkey patched, we need to be sure to use
# the original __import__ to avoid switching through the hub due to
# import locks on Python 2. See also for details.
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def _import_dump_load():
    global dumps
    global loads
        import cPickle as pickle
    except ImportError:
        import pickle
    dumps = pickle.dumps
    loads = pickle.loads
项目:RealtimePythonChat    作者:quangtqag    | 项目源码 | 文件源码
def dump_traceback(tb):
    # Both _init and dump/load have to be unlocked, because
    # copy_reg and pickle can do imports to resolve class names; those
    # class names are in this module and greenlet safe though
    return dumps(tb)
项目:spc    作者:whbrewer    | 项目源码 | 文件源码
def save_dbt(self,table, sql_fields_current):
        tfile = self.file_open(table._dbt, 'w')
        pickle.dump(sql_fields_current, tfile)
项目:spc    作者:whbrewer    | 项目源码 | 文件源码
def import_table_definitions(self, path, migrate=False,
                                 fake_migrate=False, items=None):
        pattern = pjoin(path,self._uri_hash+'_*.table')
        if items:
            for tablename, table in items.iteritems():
                # TODO: read all field/table options
                fields = []
                # remove unsupported/illegal Table arguments
                [table.pop(name) for name in ("name", "fields") if
                 name in table]
                if "items" in table:
                    for fieldname, field in table.pop("items").iteritems():
                        # remove unsupported/illegal Field arguments
                        [field.pop(key) for key in ("requires", "name",
                         "compute", "colname") if key in field]
                        fields.append(Field(str(fieldname), **field))
                self.define_table(str(tablename), *fields, **table)
            for filename in glob.glob(pattern):
                tfile = self._adapter.file_open(filename, 'r')
                    sql_fields = pickle.load(tfile)
                    name = filename[len(pattern)-7:-6]
                    mf = [(value['sortable'],
                                 unique=value.get('unique',False))) \
                              for key, value in sql_fields.iteritems()]
                    mf.sort(lambda a,b: cmp(a[0],b[0]))
                    self.define_table(name,*[item[1] for item in mf],
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def __init__(self, address=None, authkey=None, serializer='pickle'):
        if authkey is None:
            authkey = current_process().authkey
        self._address = address     # XXX not final address if eg ('', 0)
        self._authkey = AuthenticationString(authkey)
        self._state = State()
        self._state.value = State.INITIAL
        self._serializer = serializer
        self._Listener, self._Client = listener_client[serializer]
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def opcode_in_pickle(code, pickle):
    for op, dummy, dummy in pickletools.genops(pickle):
        if op.code == code.decode("latin-1"):
            return True
    return False

# Return the number of times opcode code appears in pickle.
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def count_opcode(code, pickle):
    n = 0
    for op, dummy, dummy in pickletools.genops(pickle):
        if op.code == code.decode("latin-1"):
            n += 1
    return n
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_float(self):
        test_values = [0.0, 4.94e-324, 1e-310, 7e-308, 6.626e-34, 0.1, 0.5,
                       3.14, 263.44582062374053, 6.022e23, 1e30]
        test_values = test_values + [-x for x in test_values]
        for proto in protocols:
            for value in test_values:
                pickle = self.dumps(value, proto)
                got = self.loads(pickle)
                self.assertEqual(value, got)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_long1(self):
        x = 12345678910111213141516178920
        for proto in protocols:
            s = self.dumps(x, proto)
            y = self.loads(s)
            self.assertEqual(x, y)
            self.assertEqual(opcode_in_pickle(pickle.LONG1, s), proto >= 2)