我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用copy_reg.pickle()。
def manageWork(self): #file = open("/home/ddimitrov/20160305_en_wikilinks/tmp/missing_article_ids.p",'r') file = open(SSD_HOME+"pickle/redirects_ids.obj",'r') object_file = pickle.load(file) #print object_file #print type(object_file) for root, dirs, files in os.walk(STATIC_HTML_DUMP_ARTICLES_DIR+self.path): for i, file_name in enumerate(files): if file_name.endswith(".zip"): parts = file_name.split('_') if long(parts[1]) in object_file: try: self.parse_article(file_name,root) except Exception as e: print("FILENAME_FAIL:"+file_name) print(type(e)) # the exception instance print(e) print (e.message)
def _try_store_in_file(self, request, response): try: if (not response.session_id or self._forget or self._unchanged(response)): # self.clear_session_cookies() self.save_session_id_cookie() return False if response.session_new or not response.session_file: # Tests if the session sub-folder exists, if not, create it session_folder = os.path.dirname(response.session_filename) if not os.path.exists(session_folder): os.mkdir(session_folder) response.session_file = recfile.open(response.session_filename, 'wb') portalocker.lock(response.session_file, portalocker.LOCK_EX) response.session_locked = True if response.session_file: session_pickled = response.session_pickled or pickle.dumps(self, pickle.HIGHEST_PROTOCOL) response.session_file.write(session_pickled) response.session_file.truncate() finally: self._close(response) self.save_session_id_cookie() return True
def doMap(self, i): f = open("#split-%s-%s" % (self.path, i), "r") keyvalue = f.readline() value = f.read() f.close() os.unlink("#split-%s-%s" % (self.path, i)) keyvaluelist = self.Map(keyvalue, value) for r in range(0, self.reducetask): # print "map", i, "#map-%s-%s-%d" % (self.path, i, r) f = open("#map-%s-%s-%d" % (self.path, i, r), "w+") itemlist = [item for item in keyvaluelist if self.Partition(item) == r] pickle.dump(itemlist, f) f.close() return [(i, r) for r in range(0, self.reducetask)] # Get reduce regions from maptasks, sort by key, and apply Reduce for each key
def doReduce(self, i): keys = {} out = [] for m in range(0, self.maptask): # print "reduce", i, "#map-%s-%s-%d" % (self.path, m, i) f = open("#map-%s-%s-%d" % (self.path, m, i), "r") itemlist = pickle.load(f) for item in itemlist: if keys.has_key(item[0]): keys[item[0]].append(item) else: keys[item[0]] = [item] f.close() os.unlink("#map-%s-%s-%d" % (self.path, m, i)) for k in sorted(keys.keys()): out.append(self.Reduce(k, keys[k])) f = open("#reduce-%s-%d" % (self.path, i), "w+") pickle.dump(out, f) f.close() return i # The master.
def no_tracing(func): """Decorator to temporarily turn off tracing for the duration of a test.""" if not hasattr(sys, 'gettrace'): return func else: def wrapper(*args, **kwargs): original_trace = sys.gettrace() try: sys.settrace(None) return func(*args, **kwargs) finally: sys.settrace(original_trace) wrapper.__name__ = func.__name__ return wrapper # Return True if opcode code appears in the pickle, else False.
def test_long(self): for proto in protocols: # 256 bytes is where LONG4 begins. for nbits in 1, 8, 8*254, 8*255, 8*256, 8*257: nbase = 1L << nbits for npos in nbase-1, nbase, nbase+1: for n in npos, -npos: pickle = self.dumps(n, proto) got = self.loads(pickle) self.assertEqual(n, got) # Try a monster. This is quadratic-time in protos 0 & 1, so don't # bother with those. nbase = long("deadbeeffeedface", 16) nbase += nbase << 1000000 for n in nbase, -nbase: p = self.dumps(n, 2) got = self.loads(p) self.assertEqual(n, got)
def test_singletons(self): # Map (proto, singleton) to expected opcode. expected_opcode = {(0, None): pickle.NONE, (1, None): pickle.NONE, (2, None): pickle.NONE, (0, True): pickle.INT, (1, True): pickle.INT, (2, True): pickle.NEWTRUE, (0, False): pickle.INT, (1, False): pickle.INT, (2, False): pickle.NEWFALSE, } for proto in protocols: for x in None, False, True: s = self.dumps(x, proto) y = self.loads(s) self.assertTrue(x is y, (proto, x, s, y)) expected = expected_opcode[proto, x] self.assertEqual(opcode_in_pickle(expected, s), True)
def test_newobj_proxies(self): # NEWOBJ should use the __class__ rather than the raw type import weakref classes = myclasses[:] # Cannot create weakproxies to these classes for c in (MyInt, MyLong, MyStr, MyTuple): classes.remove(c) for proto in protocols: for C in classes: B = C.__base__ x = C(C.sample) x.foo = 42 p = weakref.proxy(x) s = self.dumps(p, proto) y = self.loads(s) self.assertEqual(type(y), type(x)) # rather than type(p) detail = (proto, C, B, x, y, type(y)) self.assertEqual(B(x), B(y), detail) self.assertEqual(x.__dict__, y.__dict__, detail) # Register a type with copy_reg, with extension code extcode. Pickle # an object of that type. Check that the resulting pickle uses opcode # (EXT[124]) under proto 2, and not in proto 1.
def test_list_chunking(self): n = 10 # too small to chunk x = range(n) for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(x, y) num_appends = count_opcode(pickle.APPENDS, s) self.assertEqual(num_appends, proto > 0) n = 2500 # expect at least two chunks when proto > 0 x = range(n) for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(x, y) num_appends = count_opcode(pickle.APPENDS, s) if proto == 0: self.assertEqual(num_appends, 0) else: self.assertTrue(num_appends >= 2)
def test_dict_chunking(self): n = 10 # too small to chunk x = dict.fromkeys(range(n)) for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(x, y) num_setitems = count_opcode(pickle.SETITEMS, s) self.assertEqual(num_setitems, proto > 0) n = 2500 # expect at least two chunks when proto > 0 x = dict.fromkeys(range(n)) for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(x, y) num_setitems = count_opcode(pickle.SETITEMS, s) if proto == 0: self.assertEqual(num_setitems, 0) else: self.assertTrue(num_setitems >= 2)
def test_reduce_bad_iterator(self): # Issue4176: crash when 4th and 5th items of __reduce__() # are not iterators class C(object): def __reduce__(self): # 4th item is not an iterator return list, (), None, [], None class D(object): def __reduce__(self): # 5th item is not an iterator return dict, (), None, None, [] # Protocol 0 in Python implementation is less strict and also accepts # iterables. for proto in protocols: try: self.dumps(C(), proto) except (AttributeError, pickle.PicklingError, cPickle.PicklingError): pass try: self.dumps(D(), proto) except (AttributeError, pickle.PicklingError, cPickle.PicklingError): pass
def __init__(self, datadir, loadFunc): self.datadir = datadir if datadir[-1] == '/': self.datadir = self.datadir[:-1] idxf = datadir + '/index.pkl' if not os.path.exists(idxf): raise ValueError('Index pickle not found') idx = pickle.load(open(idxf, 'rb')) fnames = set([]) for k in idx: for fname in idx[k]: ### Backward compat with earlier versions of this code: if isinstance(fname, (long, int)): fname = '%d.ogg' % (fname) if fname is None: continue fname = fname.split('/')[-1] ### fnames.add(fname) self.idx = idx self.fnames = list(fnames) self.loadFunc = loadFunc
def _try_store_in_file(self, request, response): if not self._enabled: return try: if (not response.session_id or self._unchanged(response)): # self.save_session_id_cookie() # CHECK THIS return False if response.session_new or not response.session_file: # Tests if the session sub-folder exists, if not, create it session_folder = os.path.dirname(response.session_filename) if not os.path.exists(session_folder): os.mkdir(session_folder) response.session_file = recfile.open(response.session_filename, 'wb') portalocker.lock(response.session_file, portalocker.LOCK_EX) response.session_locked = True if response.session_file: session_pickled = response.session_pickled or pickle.dumps(self, pickle.HIGHEST_PROTOCOL) response.session_file.write(session_pickled) response.session_file.truncate() finally: self._close(response) self.save_session_id_cookie() return True
def count_opcode(code, pickle): n = 0 for op, dummy, dummy in pickletools.genops(pickle): if op.code == code: n += 1 return n # We can't very well test the extension registry without putting known stuff # in it, but we have to be careful to restore its original state. Code # should do this: # # e = ExtensionSaver(extension_code) # try: # fiddle w/ the extension registry's stuff for extension_code # finally: # e.restore()
def test_proto(self): build_none = pickle.NONE + pickle.STOP for proto in protocols: expected = build_none if proto >= 2: expected = pickle.PROTO + chr(proto) + expected p = self.dumps(None, proto) self.assertEqual(p, expected) oob = protocols[-1] + 1 # a future protocol badpickle = pickle.PROTO + chr(oob) + build_none try: self.loads(badpickle) except ValueError, detail: self.assertTrue(str(detail).startswith( "unsupported pickle protocol")) else: self.fail("expected bad protocol number to raise ValueError")
def test_reduce_bad_iterator(self): # Issue4176: crash when 4th and 5th items of __reduce__() # are not iterators class C(object): def __reduce__(self): # 4th item is not an iterator return list, (), None, [], None class D(object): def __reduce__(self): # 5th item is not an iterator return dict, (), None, None, [] # Protocol 0 is less strict and also accept iterables. for proto in protocols: try: self.dumps(C(), proto) except (AttributeError, pickle.PickleError, cPickle.PickleError): pass try: self.dumps(D(), proto) except (AttributeError, pickle.PickleError, cPickle.PickleError): pass
def InitMessage(descriptor, cls): cls._decoders_by_tag = {} cls._extensions_by_name = {} cls._extensions_by_number = {} if (descriptor.has_options and descriptor.GetOptions().message_set_wire_format): cls._decoders_by_tag[decoder.MESSAGE_SET_ITEM_TAG] = ( decoder.MessageSetItemDecoder(cls._extensions_by_number), None) for field in descriptor.fields: _AttachFieldHelpers(cls, field) _AddEnumValues(descriptor, cls) _AddInitMethod(descriptor, cls) _AddPropertiesForFields(descriptor, cls) _AddPropertiesForExtensions(descriptor, cls) _AddStaticMethods(cls) _AddMessageMethods(descriptor, cls) _AddPrivateHelperMethods(descriptor, cls) copyreg.pickle(cls, lambda obj: (cls, (), obj.__getstate__()))
def test_nonIdentityHash(self): global ClassWithCustomHash class ClassWithCustomHash(styles.Versioned): def __init__(self, unique, hash): self.unique = unique self.hash = hash def __hash__(self): return self.hash v1 = ClassWithCustomHash('v1', 0) v2 = ClassWithCustomHash('v2', 0) pkl = pickle.dumps((v1, v2)) del v1, v2 ClassWithCustomHash.persistenceVersion = 1 ClassWithCustomHash.upgradeToVersion1 = lambda self: setattr(self, 'upgraded', True) v1, v2 = pickle.loads(pkl) styles.doUpgrade() self.assertEqual(v1.unique, 'v1') self.assertEqual(v2.unique, 'v2') self.assertTrue(v1.upgraded) self.assertTrue(v2.upgraded)
def test_upgradeDeserializesObjectsRequiringUpgrade(self): global ToyClassA, ToyClassB class ToyClassA(styles.Versioned): pass class ToyClassB(styles.Versioned): pass x = ToyClassA() y = ToyClassB() pklA, pklB = pickle.dumps(x), pickle.dumps(y) del x, y ToyClassA.persistenceVersion = 1 def upgradeToVersion1(self): self.y = pickle.loads(pklB) styles.doUpgrade() ToyClassA.upgradeToVersion1 = upgradeToVersion1 ToyClassB.persistenceVersion = 1 ToyClassB.upgradeToVersion1 = lambda self: setattr(self, 'upgraded', True) x = pickle.loads(pklA) styles.doUpgrade() self.assertTrue(x.y.upgraded)
def pickleMethod(method): 'support function for copy_reg to pickle method refs' return unpickleMethod, (method.im_func.__name__, method.im_self, method.im_class)
def pickleModule(module): 'support function for copy_reg to pickle module refs' return unpickleModule, (module.__name__,)
def pickleStringO(stringo): 'support function for copy_reg to pickle StringIO.OutputTypes' return unpickleStringO, (stringo.getvalue(), stringo.tell())
def _ufunc_reconstruct(module, name): # The `fromlist` kwarg is required to ensure that `mod` points to the # inner-most module rather than the parent package when module name is # nested. This makes it possible to pickle non-toplevel ufuncs such as # scipy.special.expit for instance. mod = __import__(module, fromlist=[name]) return getattr(mod, name)
def _ufunc_reduce(func): from pickle import whichmodule name = func.__name__ return _ufunc_reconstruct, (whichmodule(func, name), name)
def _unchanged(self, response): session_pickled = pickle.dumps(self, pickle.HIGHEST_PROTOCOL) response.session_pickled = session_pickled session_hash = hashlib.md5(session_pickled).hexdigest() return response.session_hash == session_hash
def _try_store_in_db(self, request, response): # don't save if file-based sessions, # no session id, or session being forgotten # or no changes to session (Unless the session is new) if (not response.session_db_table or self._forget or (self._unchanged(response) and not response.session_new)): if (not response.session_db_table and global_settings.db_sessions is not True and response.session_masterapp in global_settings.db_sessions): global_settings.db_sessions.remove(response.session_masterapp) # self.clear_session_cookies() self.save_session_id_cookie() return False table = response.session_db_table record_id = response.session_db_record_id if response.session_new: unique_key = web2py_uuid() else: unique_key = response.session_db_unique_key session_pickled = response.session_pickled or pickle.dumps(self, pickle.HIGHEST_PROTOCOL) dd = dict(locked=False, client_ip=response.session_client, modified_datetime=request.now, session_data=session_pickled, unique_key=unique_key) if record_id: if not table._db(table.id == record_id).update(**dd): record_id = None if not record_id: record_id = table.insert(**dd) response.session_id = '%s:%s' % (record_id, unique_key) response.session_db_unique_key = unique_key response.session_db_record_id = record_id self.save_session_id_cookie() return True
def load_storage(filename): fp = None try: fp = portalocker.LockedFile(filename, 'rb') storage = pickle.load(fp) finally: if fp: fp.close() return Storage(storage)
def save_storage(storage, filename): fp = None try: fp = portalocker.LockedFile(filename, 'wb') pickle.dump(dict(storage), fp) finally: if fp: fp.close()
def TAG_pickler(data): d = DIV() d.__dict__ = data.__dict__ marshal_dump = pickle.dumps(d, pickle.HIGHEST_PROTOCOL) return (TAG_unpickler, (marshal_dump,))
def install(): try: import copy_reg except ImportError: import copyreg as copy_reg copy_reg.pickle(TracebackType, pickle_traceback) # Added by gevent # We have to defer the initialization, and especially the import of platform, # until runtime. If we're monkey patched, we need to be sure to use # the original __import__ to avoid switching through the hub due to # import locks on Python 2. See also builtins.py for details.
def _import_dump_load(): global dumps global loads try: import cPickle as pickle except ImportError: import pickle dumps = pickle.dumps loads = pickle.loads
def dump_traceback(tb): # Both _init and dump/load have to be unlocked, because # copy_reg and pickle can do imports to resolve class names; those # class names are in this module and greenlet safe though _init() return dumps(tb)