private <T extends Serializable> byte[] serializationObject(T obj) { Kryo kryo = new Kryo(); kryo.setReferences(false); kryo.register(obj.getClass(), new JavaSerializer()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); Output output = new Output(baos); kryo.writeClassAndObject(output, obj); output.flush(); output.close(); byte[] b = baos.toByteArray(); try { baos.flush(); baos.close(); } catch (IOException e) { e.printStackTrace(); } return b; }
@Test public void testFoldWindowState() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class); DataStream<String> src = env.fromElements("abc"); SingleOutputStreamOperator<?> result = src .keyBy(new KeySelector<String, String>() { @Override public String getKey(String value) { return null; } }) .timeWindow(Time.milliseconds(1000)) .fold(new File("/"), new FoldFunction<String, File>() { @Override public File fold(File a, String e) { return null; } }); validateStateDescriptorConfigured(result); }
@Test public void testReduceWindowState() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class); DataStream<File> src = env.fromElements(new File("/")); SingleOutputStreamOperator<?> result = src .keyBy(new KeySelector<File, String>() { @Override public String getKey(File value) { return null; } }) .timeWindow(Time.milliseconds(1000)) .reduce(new ReduceFunction<File>() { @Override public File reduce(File value1, File value2) { return null; } }); validateStateDescriptorConfigured(result); }
@Test public void testApplyWindowState() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class); DataStream<File> src = env.fromElements(new File("/")); SingleOutputStreamOperator<?> result = src .keyBy(new KeySelector<File, String>() { @Override public String getKey(File value) { return null; } }) .timeWindow(Time.milliseconds(1000)) .apply(new WindowFunction<File, String, String, TimeWindow>() { @Override public void apply(String s, TimeWindow window, Iterable<File> input, Collector<String> out) {} }); validateListStateDescriptorConfigured(result); }
@Test public void testProcessWindowState() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class); DataStream<File> src = env.fromElements(new File("/")); SingleOutputStreamOperator<?> result = src .keyBy(new KeySelector<File, String>() { @Override public String getKey(File value) { return null; } }) .timeWindow(Time.milliseconds(1000)) .process(new ProcessWindowFunction<File, String, String, TimeWindow>() { @Override public void process(String s, Context ctx, Iterable<File> input, Collector<String> out) {} }); validateListStateDescriptorConfigured(result); }
@Test public void testProcessAllWindowState() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class); DataStream<File> src = env.fromElements(new File("/")); SingleOutputStreamOperator<?> result = src .timeWindowAll(Time.milliseconds(1000)) .process(new ProcessAllWindowFunction<File, String, TimeWindow>() { @Override public void process(Context ctx, Iterable<File> input, Collector<String> out) {} }); validateListStateDescriptorConfigured(result); }
@Test public void testFoldWindowAllState() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class); DataStream<String> src = env.fromElements("abc"); SingleOutputStreamOperator<?> result = src .timeWindowAll(Time.milliseconds(1000)) .fold(new File("/"), new FoldFunction<String, File>() { @Override public File fold(File a, String e) { return null; } }); validateStateDescriptorConfigured(result); }
@Test public void testReduceWindowAllState() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class); DataStream<File> src = env.fromElements(new File("/")); SingleOutputStreamOperator<?> result = src .timeWindowAll(Time.milliseconds(1000)) .reduce(new ReduceFunction<File>() { @Override public File reduce(File value1, File value2) { return null; } }); validateStateDescriptorConfigured(result); }
@Test public void testApplyWindowAllState() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.registerTypeWithKryoSerializer(File.class, JavaSerializer.class); DataStream<File> src = env.fromElements(new File("/")); SingleOutputStreamOperator<?> result = src .timeWindowAll(Time.milliseconds(1000)) .apply(new AllWindowFunction<File, String, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<File> input, Collector<String> out) {} }); validateListStateDescriptorConfigured(result); }
private void validateListStateDescriptorConfigured(SingleOutputStreamOperator<?> result) { OneInputTransformation<?, ?> transform = (OneInputTransformation<?, ?>) result.getTransformation(); WindowOperator<?, ?, ?, ?, ?> op = (WindowOperator<?, ?, ?, ?, ?>) transform.getOperator(); StateDescriptor<?, ?> descr = op.getStateDescriptor(); assertTrue(descr instanceof ListStateDescriptor); ListStateDescriptor<?> listDescr = (ListStateDescriptor<?>) descr; // this would be the first statement to fail if state descriptors were not properly initialized TypeSerializer<?> serializer = listDescr.getSerializer(); assertTrue(serializer instanceof ListSerializer); TypeSerializer<?> elementSerializer = listDescr.getElementSerializer(); assertTrue(elementSerializer instanceof KryoSerializer); Kryo kryo = ((KryoSerializer<?>) elementSerializer).getKryo(); assertTrue("serializer registration was not properly passed on", kryo.getSerializer(File.class) instanceof JavaSerializer); }
@Override public Serializer getDefaultSerializer(Class type) { if (type == null) { throw new IllegalArgumentException("type cannot be null."); } if (!type.isArray() && !ReflectionUtils.checkZeroArgConstructor(type)) { if (logger.isWarnEnabled()) { logger.warn(type + " has no zero-arg constructor and this will affect the serialization performance"); } return new JavaSerializer(); } return super.getDefaultSerializer(type); }
@SuppressWarnings("unchecked") private <T extends Serializable> T deserializationObject(byte[] obj, Class<T> clazz) { Kryo kryo = new Kryo(); kryo.setReferences(false); kryo.register(clazz, new JavaSerializer()); ByteArrayInputStream bais = new ByteArrayInputStream( obj); Input input = new Input(bais); return (T) kryo.readClassAndObject(input); }
private void validateStateDescriptorConfigured(SingleOutputStreamOperator<?> result) { OneInputTransformation<?, ?> transform = (OneInputTransformation<?, ?>) result.getTransformation(); WindowOperator<?, ?, ?, ?, ?> op = (WindowOperator<?, ?, ?, ?, ?>) transform.getOperator(); StateDescriptor<?, ?> descr = op.getStateDescriptor(); // this would be the first statement to fail if state descriptors were not properly initialized TypeSerializer<?> serializer = descr.getSerializer(); assertTrue(serializer instanceof KryoSerializer); Kryo kryo = ((KryoSerializer<?>) serializer).getKryo(); assertTrue("serializer registration was not properly passed on", kryo.getSerializer(File.class) instanceof JavaSerializer); }
@Override public Serializer getDefaultSerializer(Class type) { if (type == null) { throw new IllegalArgumentException("type cannot be null."); } if (!type.isArray() && !ReflectionUtils.checkZeroArgConstructor(type)) { if (log.isWarnEnabled()) { log.warn(type + " has no zero-arg constructor and this will affect the serialization performance"); } return new JavaSerializer(); } return super.getDefaultSerializer(type); }
@Override public boolean beforeAll() { kryo = new Kryo(); kryo.register(POJO.class, new JavaSerializer()); pojo = new POJO(); return true; }
public void testJavaSerializer () { kryo.register(String.class, new JavaSerializer()); roundTrip(50, 50, "abcdefabcdefabcdefabcdefabcdefabcdefabcdef"); roundTrip(12, 12, "meow"); kryo.register(TestClass.class, new JavaSerializer()); TestClass test = new TestClass(); test.stringField = "fubar"; test.intField = 54321; roundTrip(134, 134, test); roundTrip(134, 134, test); roundTrip(134, 134, test); }
public JavaSerializationStreamCodec() { super(); this.kryo.setDefaultSerializer(JavaSerializer.class); }
private LinkedHashMap<Class<?>, Serializer<?>> getExtraRegistrations() { /* The map returned by this method MUST have a fixed iteration order! * * The order itself is irrelevant, so long as it is completely stable at runtime. * * LinkedHashMap satisfies this requirement (its contract specifies * iteration in key-insertion-order). */ final LinkedHashMap<Class<?>, Serializer<?>> m = new LinkedHashMap<>(); // The following entries were copied from GryoSerializer's constructor // This could be turned into a static collection on GryoSerializer to avoid // duplication, but it would be a bit cumbersome to do so without disturbing // the ordering of the existing entries in that constructor, since not all // of the entries are for TinkerPop (and the ordering is significant). try { m.put(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer()); m.put(Class.forName("scala.reflect.ManifestFactory$$anon$1"), new JavaSerializer()); m.put(Class.forName("org.apache.spark.internal.io.FileCommitProtocol$TaskCommitMessage"), new JavaSerializer()); m.put(Class.forName("org.apache.spark.internal.io.FileCommitProtocol$EmptyTaskCommitMessage$"), new JavaSerializer()); } catch (final ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } m.put(WrappedArray.ofRef.class, null); m.put(MessagePayload.class, null); m.put(ViewIncomingPayload.class, null); m.put(ViewOutgoingPayload.class, null); m.put(ViewPayload.class, null); m.put(VertexWritable.class, new UnshadedSerializerAdapter<>(new VertexWritableSerializer())); m.put(ObjectWritable.class, new UnshadedSerializerAdapter<>(new ObjectWritableSerializer<>())); // m.put(HadoopVertex.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.VertexSerializer())); m.put(HadoopVertexProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.VertexPropertySerializer())); m.put(HadoopProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.PropertySerializer())); m.put(HadoopEdge.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.EdgeSerializer())); // m.put(ComputerGraph.ComputerVertex.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.VertexSerializer())); m.put(ComputerGraph.ComputerVertexProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.VertexPropertySerializer())); m.put(ComputerGraph.ComputerProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.PropertySerializer())); m.put(ComputerGraph.ComputerEdge.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.EdgeSerializer())); // m.put(StarGraph.StarEdge.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.EdgeSerializer())); m.put(StarGraph.StarVertex.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.VertexSerializer())); m.put(StarGraph.StarProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.PropertySerializer())); m.put(StarGraph.StarVertexProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.VertexPropertySerializer())); // m.put(MutablePath.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.PathSerializer())); m.put(ImmutablePath.class, new UnshadedSerializerAdapter<>(new GryoSerializersV1d0.PathSerializer())); // m.put(CompactBuffer[].class, null); // TODO: VoidSerializer is a default serializer and thus, may not be needed (if it is, you can't use FieldSerializer) // TODO: We will have to copy/paste the shaded DefaultSerializer.VoidSerializer into an unshaded form. //m.put(void.class, null); //m.put(Void.class, null); return m; }