@Override public Queue read(Kryo kryo, Input input, Class<Queue> type) { int length = input.readVarInt(true); Registration registration = kryo.readClass(input); Class cls = registration == null ? Object.class : registration.getType(); Queue queue = new Queue(length, cls); kryo.reference(queue); Class elementClass = null; Serializer serializer = null; if (genericType != null) { elementClass = genericType; serializer = kryo.getSerializer(genericType); genericType = null; } if (serializer != null) { for (int i = 0; i < length; i++) queue.addLast(kryo.readObjectOrNull(input, elementClass, serializer)); } else { for (int i = 0; i < length; i++) queue.addLast(kryo.readClassAndObject(input)); } return queue; }
/** * Tests that the registered classes in Kryo did not change. * * <p>Once we have proper serializer versioning this test will become obsolete. * But currently a change in the serializers can break savepoint backwards * compatibility between Flink versions. */ @Test public void testDefaultKryoRegisteredClassesDidNotChange() throws Exception { final Kryo kryo = new KryoSerializer<>(Integer.class, new ExecutionConfig()).getKryo(); try (BufferedReader reader = new BufferedReader(new InputStreamReader( getClass().getClassLoader().getResourceAsStream("flink_11-kryo_registrations")))) { String line; while ((line = reader.readLine()) != null) { String[] split = line.split(","); final int tag = Integer.parseInt(split[0]); final String registeredClass = split[1]; Registration registration = kryo.getRegistration(tag); if (registration == null) { fail(String.format("Registration for %d = %s got lost", tag, registeredClass)); } else if (!registeredClass.equals(registration.getType().getName())) { fail(String.format("Registration for %d = %s changed to %s", tag, registeredClass, registration.getType().getName())); } } } }
/** * Creates a Kryo serializer and writes the default registrations out to a * comma separated file with one entry per line: * * <pre> * id,class * </pre> * * <p>The produced file is used to check that the registered IDs don't change * in future Flink versions. * * <p>This method is not used in the tests, but documents how the test file * has been created and can be used to re-create it if needed. * * @param filePath File path to write registrations to */ private void writeDefaultKryoRegistrations(String filePath) throws IOException { final File file = new File(filePath); if (file.exists()) { assertTrue(file.delete()); } final Kryo kryo = new KryoSerializer<>(Integer.class, new ExecutionConfig()).getKryo(); final int nextId = kryo.getNextRegistrationId(); try (BufferedWriter writer = new BufferedWriter(new FileWriter(file))) { for (int i = 0; i < nextId; i++) { Registration registration = kryo.getRegistration(i); String str = registration.getId() + "," + registration.getType().getName(); writer.write(str, 0, str.length()); writer.newLine(); } System.out.println("Created file with registrations at " + file.getAbsolutePath()); } }
public Registration register (Registration registration) { if (registration == null) throw new IllegalArgumentException("registration cannot be null."); if (registration.getId() != NAME) { if (TRACE) { trace("kryo", "Register class ID " + registration.getId() + ": " + className(registration.getType()) + " (" + registration.getSerializer().getClass().getName() + ")"); } idToRegistration.put(registration.getId(), registration); } else if (TRACE) { trace("kryo", "Register class name: " + className(registration.getType()) + " (" + registration.getSerializer().getClass().getName() + ")"); } classToRegistration.put(registration.getType(), registration); if (registration.getType().isPrimitive()) classToRegistration.put(getWrapperClass(registration.getType()), registration); return registration; }
public Registration readClass (Input input) { int classID = input.readVarInt(true); switch (classID) { case Kryo.NULL: if (TRACE || (DEBUG && kryo.getDepth() == 1)) log("Read", null); return null; case NAME + 2: // Offset for NAME and NULL. return readName(input); } if (classID == memoizedClassId) return memoizedClassIdValue; Registration registration = idToRegistration.get(classID - 2); if (registration == null) throw new KryoException("Encountered unregistered class ID: " + (classID - 2)); if (TRACE) trace("kryo", "Read class " + (classID - 2) + ": " + className(registration.getType())); memoizedClassId = classID; memoizedClassIdValue = registration; return registration; }
protected Registration readName (Input input) { int nameId = input.readVarInt(true); if (nameIdToClass == null) nameIdToClass = new IntMap(); Class type = nameIdToClass.get(nameId); if (type == null) { // Only read the class name the first time encountered in object graph. String className = input.readString(); type = getTypeByName(className); if (type == null) { try { type = Class.forName(className, false, kryo.getClassLoader()); } catch (ClassNotFoundException ex) { throw new KryoException("Unable to find class: " + className, ex); } if (nameToClass == null) nameToClass = new ObjectMap(); nameToClass.put(className, type); } nameIdToClass.put(nameId, type); if (TRACE) trace("kryo", "Read class name: " + className); } else { if (TRACE) trace("kryo", "Read class name reference " + nameId + ": " + className(type)); } return kryo.getRegistration(type); }
public Registration register (Registration registration) { if (registration == null) throw new IllegalArgumentException("registration cannot be null."); if (TRACE) { if (registration.getId() == NAME) { trace("kryo", "Register class name: " + className(registration.getType()) + " (" + registration.getSerializer().getClass().getName() + ")"); } else { trace("kryo", "Register class ID " + registration.getId() + ": " + className(registration.getType()) + " (" + registration.getSerializer().getClass().getName() + ")"); } } classToRegistration.put(registration.getType(), registration); idToRegistration.put(registration.getId(), registration); if (registration.getType().isPrimitive()) classToRegistration.put(getWrapperClass(registration.getType()), registration); return registration; }
protected Registration readName (Input input) { int nameId = input.readVarInt(true); if (nameIdToClass == null) nameIdToClass = new IntMap(); Class type = nameIdToClass.get(nameId); if (type == null) { // Only read the class name the first time encountered in object graph. String className = input.readString(); if (nameToClass != null) type = nameToClass.get(className); if (type == null) { try { type = Class.forName(className, false, kryo.getClassLoader()); } catch (ClassNotFoundException ex) { throw new KryoException("Unable to find class: " + className, ex); } if (nameToClass == null) nameToClass = new ObjectMap(); nameToClass.put(className, type); } nameIdToClass.put(nameId, type); if (TRACE) trace("kryo", "Read class name: " + className); } else { if (TRACE) trace("kryo", "Read class name reference " + nameId + ": " + className(type)); } return kryo.getRegistration(type); }
public void testRegistration() { int id = kryo.getNextRegistrationId(); kryo.register(DefaultTypes.class, id); kryo.register(DefaultTypes.class, id); kryo.register(new Registration(byte[].class, kryo .getDefaultSerializer(byte[].class), id + 1)); kryo.register(byte[].class, kryo.getDefaultSerializer(byte[].class), id + 1); kryo.register(HasStringField.class, kryo.getDefaultSerializer(HasStringField.class)); DefaultTypes test = new DefaultTypes(); test.intField = 12; test.StringField = "meow"; test.CharacterField = 'z'; test.byteArrayField = new byte[] { 0, 1, 2, 3, 4 }; test.child = new DefaultTypes(); roundTrip(75, 95, test); }
@Override public Skybox read(Kryo kryo, Input input, Class<Skybox> type) { Registration skyboxClass = kryo.readClass(input); if (skyboxClass.getType() == CubemapSkybox.class) { CubemapSkybox skybox = new CubemapSkybox(null); skybox.rotationDirection.set(kryo.readObject(input, Vector3.class)); skybox.rotationSpeed = input.readFloat(); skybox.setSkyboxAsset(kryo.readObjectOrNull(input, CubemapAsset.class)); return skybox; } else { DayNightSkybox dayNightSkybox = new DayNightSkybox(); dayNightSkybox.setMoonAsset(kryo.readObjectOrNull(input, TextureAsset.class)); dayNightSkybox.setSunAsset(kryo.readObjectOrNull(input, TextureAsset.class)); dayNightSkybox.setSkyMapAsset(kryo.readObjectOrNull(input, TextureAsset.class)); dayNightSkybox.setStarsAlphaAsset(kryo.readObjectOrNull(input, TextureAsset.class)); dayNightSkybox.setSateliteLightingAsset(kryo.readObjectOrNull(input, TextureAsset.class)); // throw new GdxRuntimeException("Implement!"); return dayNightSkybox; } }
@Override public NodeDocument read(Kryo kryo, Input input, Class<NodeDocument> nodeDocumentClass) { long created = input.readLong(true); int mapSize = input.readInt(true); NodeDocument doc = new NodeDocument(documentStore, created); for (int i = 0; i < mapSize; i++) { String key = input.readString(); Registration reg = kryo.readClass(input); Object value; if (reg.getType() == NavigableMap.class) { value = new RevisionedMapSerializer(kryo).read(kryo, input, Map.class); } else { value = kryo.readObject(input, reg.getType()); } doc.put(key, value); } //Seal the doc once all changes done doc.seal(); return doc; }
/** * Register {@code type} and {@code serializer} to {@code kryo} instance. * * @param kryo Kryo instance * @param type type to register * @param serializer Specific serializer to register or null to use default. * @param id type registration id to use */ private void register(Kryo kryo, Class<?> type, Serializer<?> serializer, int id) { Registration existing = kryo.getRegistration(id); if (existing != null) { if (existing.getType() != type) { log.error("{}: Failed to register {} as {}, {} was already registered.", friendlyName(), type, id, existing.getType()); throw new IllegalStateException(String.format( "Failed to register %s as %s, %s was already registered.", type, id, existing.getType())); } // falling through to register call for now. // Consider skipping, if there's reasonable // way to compare serializer equivalence. } Registration r; if (serializer == null) { r = kryo.register(type, id); } else { r = kryo.register(type, serializer, id); } if (r.getId() != id) { log.warn("{}: {} already registed as {}. Skipping {}.", friendlyName(), r.getType(), r.getId(), id); } log.trace("{} registered as {}", r.getType(), r.getId()); }
/** * Tests that the registered classes in Kryo did not change. * * <p>Once we have proper serializer versioning this test will become obsolete. * But currently a change in the serializers can break savepoint backwards * compatibility between Flink versions. */ @Test public void testDefaultKryoRegisteredClassesDidNotChange() throws Exception { final Kryo kryo = new KryoSerializer<>(Integer.class, new ExecutionConfig()).getKryo(); try (BufferedReader reader = new BufferedReader(new InputStreamReader( getClass().getClassLoader().getResourceAsStream("flink_11-kryo_registrations")))) { String line; while ((line = reader.readLine()) != null) { String[] split = line.split(","); final int tag = Integer.parseInt(split[0]); final String registeredClass = split[1]; Registration registration = kryo.getRegistration(tag); if (registration == null) { fail(String.format("Registration for %d = %s got lost", tag, registeredClass)); } else if (registeredClass.equals("org.apache.avro.generic.GenericData$Array")) { // starting with Flink 1.4 Avro is no longer a dependency of core. Avro is // only available if flink-avro is present. There is a special version of // this test in AvroKryoSerializerRegistrationsTest that verifies correct // registration of Avro types if present assertThat( registration.getType().getName(), is("org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroRegisteredClass")); } else if (!registeredClass.equals(registration.getType().getName())) { fail(String.format("Registration for %d = %s changed to %s", tag, registeredClass, registration.getType().getName())); } } } }
@Override @SuppressWarnings("rawtypes") public Registration registerImplicit(Class type) { while (getRegistration(nextAvailableRegistrationId) != null) { nextAvailableRegistrationId++; } //logger.debug("adding new classid pair {} => {}", nextAvailableRegistrationId, type.getName()); pairs.add(new ClassIdPair(nextAvailableRegistrationId, type.getName())); return register(new Registration(type, kryo.getDefaultSerializer(type), nextAvailableRegistrationId++)); }
public void registerExplicit(ClassIdPair pair) throws ClassNotFoundException { //logger.debug("registering class {} => {}", pair.classname, pair.id); //pairs.add(pair); Class type = Class.forName(pair.classname, false, Thread.currentThread().getContextClassLoader()); register(new Registration(type, kryo.getDefaultSerializer(type), pair.id)); if (nextAvailableRegistrationId <= pair.id) { nextAvailableRegistrationId = pair.id + 1; } }
public Registration getRegistration (Class type) { if (type == memoizedClass) return memoizedClassValue; Registration registration = classToRegistration.get(type); if (registration != null) { memoizedClass = type; memoizedClassValue = registration; } return registration; }
public Registration writeClass (Output output, Class type) { if (type == null) { if (TRACE || (DEBUG && kryo.getDepth() == 1)) log("Write", null); output.writeVarInt(Kryo.NULL, true); return null; } Registration registration = kryo.getRegistration(type); if (registration.getId() == NAME) writeName(output, type, registration); else { if (TRACE) trace("kryo", "Write class " + registration.getId() + ": " + className(type)); output.writeVarInt(registration.getId() + 2, true); } return registration; }
public Class read (Kryo kryo, Input input, Class<Class> type) { Registration registration = kryo.readClass(input); int isPrimitive = input.read(); Class typ = registration != null ? registration.getType() : null; if (typ == null || !typ.isPrimitive()) return typ; return (isPrimitive == 1) ? typ : getWrapperClass(typ); }
public EnumSet read (Kryo kryo, Input input, Class<EnumSet> type) { Registration registration = kryo.readClass(input); EnumSet object = EnumSet.noneOf(registration.getType()); Serializer serializer = registration.getSerializer(); int length = input.readInt(true); for (int i = 0; i < length; i++) object.add(serializer.read(kryo, input, null)); return object; }
public Object[] read (Kryo kryo, Input input, Class<Object[]> type) { int length = input.readVarInt(true); if (length == NULL) return null; Object[] object = (Object[])Array.newInstance(type.getComponentType(), length - 1); kryo.reference(object); Class elementClass = object.getClass().getComponentType(); if (elementsAreSameType || Modifier.isFinal(elementClass.getModifiers())) { Serializer elementSerializer = kryo.getSerializer(elementClass); // if(generics!=null) elementSerializer.setGenerics(kryo, generics); for (int i = 0, n = object.length; i < n; i++) { if (elementsCanBeNull) object[i] = kryo.readObjectOrNull(input, elementClass, elementSerializer); else object[i] = kryo.readObject(input, elementClass, elementSerializer); } } else { for (int i = 0, n = object.length; i < n; i++) { // Propagate generics Registration registration = kryo.readClass(input); if (registration != null) { registration.getSerializer().setGenerics(kryo, generics); object[i] = kryo.readObject(input, registration.getType(), registration.getSerializer()); } else { object[i] = null; } } } return object; }
@Override public void write(Kryo kryo, Output output, DataMessage dm) { kryo.writeObject(output, dm.getPrimaryType()); kryo.writeObject(output, dm.getSourceId()); kryo.writeObject(output, dm.getStreamName()); kryo.writeObject(output, dm.getElementNames()); Object[] events=dm.getData(); kryo.writeObject(output, events.length); if(events[0] instanceof Map<?,?>){ kryo.writeObject(output, RAW_EVENT); kryo.writeClass(output, events[0].getClass()); for(int i=0; i<events.length; i++){ kryo.writeObject(output, events[i]); } } else{ kryo.writeObject(output, RECORD); Object[][] records=(Object[][])events; kryo.writeObject(output, records[0].length); Serializer<?>[] sers=new Serializer<?>[records[0].length]; for(int j=0; j<records[0].length; j++){ Registration reg=kryo.writeClass(output, records[0][j].getClass()); sers[j]=reg.getSerializer(); } for(int i=0; i<records.length; i++){ for(int j=0; j<records[i].length; j++){ kryo.writeObject(output, records[i][j], sers[j]); } } } }
public static int registerClasses(Kryo kryo){ if(allClazzList==null){ allClazzList=getSortedClasses(); } kryo.setRegistrationRequired(false);//ATT kryo.setReferences(true);//ATT for(Class<?> clazz: allClazzList){ //System.out.println(clazz.getSimpleName()); Registration reg=kryo.register(clazz); //log.info("registed class %s, id=%d", clazz.getSimpleName(), reg.getId()); } log.info("registed %d classes for Kryo", allClazzList.size()); return allClazzList.size(); }
@Override public ArrayList read(Kryo kryo, Input input, Class<ArrayList> type) { Integer size=kryo.readObject(input, Integer.class); ArrayList list=new ArrayList(size); for(int i=0;i<size;i++){ Registration reg=kryo.readClass(input); Object obj=kryo.readObject(input, reg.getType()); list.add(obj); } return list; }
public Class read (Kryo kryo, Input input, Class<Class> type) { Registration registration = kryo.readClass(input); int isPrimitive = input.read(); Class typ = registration.getType(); if (!typ.isPrimitive()) return typ; return (isPrimitive == 1) ? typ : getWrapperClass(typ); }
@Override public Entity read(final Kryo kryo, final Input input, final Class<Entity> type) { final String group = input.readString(); final Entity entity = new Entity(group); final Registration reg = kryo.readClass(input); entity.setVertex(kryo.readObject(input, reg.getType())); entity.copyProperties(kryo.readObjectOrNull(input, Properties.class)); return entity; }
@Override public Edge read(final Kryo kryo, final Input input, final Class<Edge> type) { final String group = input.readString(); Registration reg = kryo.readClass(input); final Object source = kryo.readObject(input, reg.getType()); reg = kryo.readClass(input); final Object dest = kryo.readObject(input, reg.getType()); final boolean directed = input.readBoolean(); final Properties properties = kryo.readObjectOrNull(input, Properties.class); return new Edge(group, source, dest, directed, null, properties); }
private Object readUserData(Kryo kryo, Input input) { Object userData = null; if (input.readBoolean()) { Registration clazz = kryo.readClass(input); userData = kryo.readObject(input, clazz.getType()); } return userData; }