/** * Encode the message data provided. * * @param <O> The type of the data to encode. * @param data The message data. * @throws EncodeMessageContentException Exception thrown if encoding the message content fails. */ public <O extends GenericContainer> void encode(O data) throws EncodeMessageContentException { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); this.schema = data.getSchema(); DatumWriter<O> outputDatumWriter = new SpecificDatumWriter<O>(this.schema); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null); outputDatumWriter.write(data, encoder); encoder.flush(); this.data = baos.toByteArray(); } catch (Exception ex) { throw new EncodeMessageContentException(ex); } }
/** * Process singlex. * * @throws Exception the exception */ public void processSinglex() throws Exception { int base = (int) System.currentTimeMillis(); User user = User.newBuilder().setName("name" + base).setFavoriteColor("color" + base).setFavoriteNumber(base) .build(); DatumWriter<GenericRecord> datumWriterUser = new GenericDatumWriter<GenericRecord>(User.getClassSchema()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] byteData = null; try { BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null); datumWriterUser.write(user, binaryEncoder); binaryEncoder.flush(); byteData = baos.toByteArray(); } finally { baos.close(); } System.out.println(byteData.length); DatumReader<GenericRecord> datumReaderUser = new GenericDatumReader<GenericRecord>( User.getClassSchema()); GenericRecord genericRecord = datumReaderUser.read(null, DecoderFactory.get().binaryDecoder(byteData, null) ); System.out.println(genericRecord); System.out.println( genericRecord.get("name")); }
@Override public byte[] serialize(String topic, T payload) { try { byte[] result = null; if (payload != null) { LOGGER.debug("data='{}'", payload); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null); DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(payload.getSchema()); datumWriter.write(payload, binaryEncoder); binaryEncoder.flush(); byteArrayOutputStream.close(); result = byteArrayOutputStream.toByteArray(); LOGGER.debug("serialized data='{}'", DatatypeConverter.printHexBinary(result)); } return result; } catch (IOException ex) { throw new SerializationException("Can't serialize payload='" + payload + "' for topic='" + topic + "'", ex); } }
@Override public byte[] serialize(String s, WrapperAppMessage wrapperAppMessage) { DatumWriter<WrapperAppMessage> datumWriter = new SpecificDatumWriter<>(wrapperAppMessage.getSchema()); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(outputStream, null); try { datumWriter.write(wrapperAppMessage, binaryEncoder); binaryEncoder.flush();//带缓冲区的binaryEncoder和直接directBinaryEncoder不一样,需要flush一下,否则字节数组没有数据 outputStream.flush(); outputStream.close(); } catch (IOException e) { e.printStackTrace(); } byte[] data = outputStream.toByteArray(); return data; }
public void publish(BrokerStats brokerStats) throws IOException { try { ByteArrayOutputStream stream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream, null); avroEventWriter.write(brokerStats, binaryEncoder); binaryEncoder.flush(); IOUtils.closeQuietly(stream); String key = brokerStats.getName() + "_" + System.currentTimeMillis(); int numPartitions = kafkaProducer.partitionsFor(destTopic).size(); int partition = brokerStats.getId() % numPartitions; Future<RecordMetadata> future = kafkaProducer.send( new ProducerRecord(destTopic, partition, key.getBytes(), stream.toByteArray())); future.get(); OpenTsdbMetricConverter.incr("kafka.stats.collector.success", 1, "host=" + HOSTNAME); } catch (Exception e) { LOG.error("Failure in publish stats", e); OpenTsdbMetricConverter.incr("kafka.stats.collector.failure", 1, "host=" + HOSTNAME); throw new RuntimeException("Avro serialization failure", e); } }
public synchronized void sendMessage(String clusterName, String message) { int numRetries = 0; while (numRetries < MAX_RETRIES) { try { long timestamp = System.currentTimeMillis(); OperatorAction operatorAction = new OperatorAction(timestamp, clusterName, message); ByteArrayOutputStream stream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream, null); avroWriter.write(operatorAction, binaryEncoder); binaryEncoder.flush(); IOUtils.closeQuietly(stream); String key = "" + System.currentTimeMillis(); ProducerRecord producerRecord = new ProducerRecord(topic, key.getBytes(), stream.toByteArray()); Future<RecordMetadata> future = kafkaProducer.send(producerRecord); future.get(); LOG.info("Send an message {} to action report : ", message); break; } catch (Exception e) { LOG.error("Failed to publish report message {}: {}", clusterName, message, e); numRetries++; } } }
/** * Serialize the record to prepare for publishing. * * @param record the GenericRecord * @param schema the Avro Schema * @param ggAvroSchema the internal representation of the Avro schema * @return the serialized record * @throws IOException if there is a problem */ private byte[] serializeRecord(GenericRecord record, Schema schema, @SuppressWarnings("unused") AvroSchema ggAvroSchema) throws IOException { byte[] rval; BinaryEncoder encoder = null; // serialize the record into a byte array ByteArrayOutputStream out = new ByteArrayOutputStream(); DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); encoder = EncoderFactory.get().directBinaryEncoder(out, encoder); writer.write(record, encoder); encoder.flush(); rval = out.toByteArray(); //out.close(); // noop in the Apache version, so not bothering return rval; }
public void send(COL_RDBMS event) throws Exception { EncoderFactory avroEncoderFactory = EncoderFactory.get(); SpecificDatumWriter<COL_RDBMS> avroEventWriter = new SpecificDatumWriter<COL_RDBMS>(COL_RDBMS.SCHEMA$); ByteArrayOutputStream stream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream,null); try { avroEventWriter.write(event, binaryEncoder); binaryEncoder.flush(); } catch (IOException e) { e.printStackTrace(); throw e; } IOUtils.closeQuietly(stream); KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>( TOPIC, stream.toByteArray()); producer.send(data); }
public void send(COL_ONEM2M event) throws Exception { EncoderFactory avroEncoderFactory = EncoderFactory.get(); SpecificDatumWriter<COL_ONEM2M> avroEventWriter = new SpecificDatumWriter<COL_ONEM2M>(COL_ONEM2M.SCHEMA$); ByteArrayOutputStream stream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream,null); try { avroEventWriter.write(event, binaryEncoder); binaryEncoder.flush(); } catch (IOException e) { e.printStackTrace(); throw e; } IOUtils.closeQuietly(stream); KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>( TOPIC, stream.toByteArray()); producer.send(data); }
/** * @see ClientConnection#offerMessage(ServerToClient) */ public boolean offerMessage(ServerToClient message) { Session session = getSession(); if (session == null || !session.isOpen()) return false; if (inFlightMessages.incrementAndGet() > MAX_IN_FLIGHT_MESSAGES) { inFlightMessages.decrementAndGet(); return false; } ByteArrayOutputStream stream = new ByteArrayOutputStream(); try { BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null); writeToClient.write(message, encoder); encoder.flush(); } catch (Exception e) { log.warn("Failed to encode message to client", e); session.close(StatusCode.SERVER_ERROR, "Internal server error"); return true; } session.getRemote().sendBytes(ByteBuffer.wrap(stream.toByteArray()), new WriteCallback() { @Override public void writeSuccess() { inFlightMessages.decrementAndGet(); } @Override public void writeFailed(Throwable error) { inFlightMessages.decrementAndGet(); log.info("Sending message to WebSocket client failed: ", error); } }); return true; }
/** * Process singlex. * * @throws Exception the exception */ public void processSinglex() throws Exception { int base = (int) System.currentTimeMillis(); User user = User.newBuilder().setName("name" + base).setFavoriteColor("color" + base).setFavoriteNumber(base) .build(); DatumWriter<GenericRecord> datumWriterUser = new GenericDatumWriter<GenericRecord>(User.getClassSchema()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] byteData = null; try { BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null); datumWriterUser.write(user, binaryEncoder); binaryEncoder.flush(); byteData = baos.toByteArray(); } finally { baos.close(); } System.out.println(byteData.length); DatumReader<GenericRecord> datumReaderUser = new GenericDatumReader<GenericRecord>(User.getClassSchema()); GenericRecord genericRecord = datumReaderUser.read(null, DecoderFactory.get().binaryDecoder(byteData, null)); System.out.println(genericRecord); System.out.println(genericRecord.get("name")); }
/** * Process list. * * @param weathers the weathers * @throws Exception the exception */ public void processList(List<Weather> weathers) throws Exception { long before = System.currentTimeMillis(); BinaryEncoder binaryEncoder = null; BinaryDecoder binaryDecoder = null; Weather weatherRead = null; for (Weather weather : weathers) { DatumWriter<Weather> datumWriterWeather = new SpecificDatumWriter<Weather>(Weather.getClassSchema()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] byteData = null; try { binaryEncoder = EncoderFactory.get().binaryEncoder(baos, binaryEncoder); datumWriterWeather.write(weather, binaryEncoder); binaryEncoder.flush(); byteData = baos.toByteArray(); } finally { baos.close(); } DatumReader<Weather> datumReaderWeather = new SpecificDatumReader<Weather>(Weather.getClassSchema()); binaryDecoder = DecoderFactory.get().binaryDecoder(byteData, binaryDecoder); weatherRead = datumReaderWeather.read(weatherRead, binaryDecoder); // System.out.println("After Binary Read: " + weatherRead.toString()); } System.out.println("size=" + weathers.size() + ", elapsed: " + (System.currentTimeMillis()-before)); }
@Override public byte[] serialize(String topic, GenericRecord data) { ByteArrayOutputStream out = new ByteArrayOutputStream(); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); Schema schema = data.getSchema(); DatumWriter<GenericRecord> writer = new SpecificDatumWriter<GenericRecord>(schema); try { writer.write(data, encoder); encoder.flush(); out.close(); return out.toByteArray(); } catch (IOException e) { LOG.error("Error encoding Avro record into bytes: {}", e.getMessage()); return null; } }
@Override public void write(Kryo kryo, Output output, Object object) { ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(byteStream, null); try { writer.write(object, encoder); encoder.flush(); byte[] bytes = byteStream.toByteArray(); output.writeInt(bytes.length, true); output.write(bytes); output.flush(); } catch (IOException e) { throw new AppException(e); } }
public byte[] serialize(String topic, GenericRecord data) throws SerializationException { Schema schema = data.getSchema(); MD5Digest schemaId = null; try { schemaId = schemaRegistry.register(topic, schema); ByteArrayOutputStream out = new ByteArrayOutputStream(); // MAGIC_BYTE | schemaId-bytes | avro_payload out.write(LiAvroSerDeHelper.MAGIC_BYTE); out.write(schemaId.asBytes()); BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null); DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema); writer.write(data, encoder); encoder.flush(); byte[] bytes = out.toByteArray(); out.close(); return bytes; } catch (IOException | SchemaRegistryException e) { throw new SerializationException(e); } }
@Override public Iterable<byte[]> convertRecord(String outputSchema, GenericRecord inputRecord, WorkUnitState workUnit) throws DataConversionException { try { ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(bytesOut, encoderCache.get()); encoderCache.set(encoder); writer.write(inputRecord, encoder); encoder.flush(); return Collections.singleton(bytesOut.toByteArray()); } catch (IOException e) { throw new DataConversionException("Error serializing record", e); } }
@AfterMethod public void serializeRecord(ITestResult result) throws IOException { if (result.isSuccess() && result.getThrowable() == null) { /* Serialize the GenericRecord; this can catch issues in set() that the underlying GenericRecord * may not catch until serialize time */ DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(recordSchema); ByteArrayOutputStream bOs = new ByteArrayOutputStream(); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(bOs, null); datumWriter.write(record, encoder); encoder.flush(); bOs.flush(); Assert.assertTrue(bOs.toByteArray().length > 0); } }
private ByteBuf getBuffer(Event event) { DatumWriter writer = new FilteredRecordWriter(event.properties().getSchema(), GenericData.get()); ByteBuf buffer = DEFAULT.buffer(100); buffer.writeByte(2); BinaryEncoder encoder = EncoderFactory.get() .directBinaryEncoder(new ByteBufOutputStream(buffer), null); try { encoder.writeString(event.collection()); writer.write(event.properties(), encoder); } catch (Exception e) { throw new RuntimeException("Couldn't serialize event", e); } return buffer; }
public static byte[] exportAsAvro(QueryResult result) { Schema avroSchema = AvroUtil.convertAvroSchema(result.getMetadata()); ByteArrayOutputStream out = new ByteArrayOutputStream(); DatumWriter writer = new FilteredRecordWriter(avroSchema, GenericData.get()); BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null); GenericData.Record record = new GenericData.Record(avroSchema); for (List<Object> row : result.getResult()) { List<SchemaField> metadata = result.getMetadata(); for (int i = 0; i < row.size(); i++) { record.put(i, getAvroValue(row.get(i), metadata.get(i).getType())); } try { writer.write(record, encoder); } catch (Exception e) { throw new RuntimeException("Couldn't serialize event", e); } } return out.toByteArray(); }
private byte[] getInsertMessageBytes(InsertMessage insertMessage){ byte[] result = null; ByteArrayOutputStream out = new ByteArrayOutputStream(); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); SpecificDatumWriter writer = new SpecificDatumWriter<InsertMessage>(InsertMessage.getClassSchema()); try { writer.write(insertMessage, encoder); encoder.flush(); out.close(); result = out.toByteArray(); }catch (IOException e){ return null; } return result; }
@Override public void encode(PipelinePack pipelinePack, Handler<Buffer> bufferHandler) { final GenericRecord record = pipelinePack.getMessage(); DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema); ByteArrayOutputStream out = new ByteArrayOutputStream(); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); try { writer.write(record, encoder); encoder.flush(); out.close(); } catch (IOException e) { logger.error("Cannot encode data", e); } final Buffer buffer = Buffer.buffer(out.toByteArray()); bufferHandler.handle(buffer); }
private static <T> byte[] deserialize( final T avroObject, Schema avroSchema, Class<T> avroClass ) throws IOException { ByteArrayOutputStream os = new ByteArrayOutputStream(); BinaryEncoder encoder = ef.binaryEncoder( os, null); if (!writers.containsKey(avroClass.toString())) { writers.put( avroClass.toString(), new SpecificDatumWriter<T>( avroSchema)); } SpecificDatumWriter<T> writer = writers.get(avroClass.toString()); writer.write( avroObject, encoder); encoder.flush(); return os.toByteArray(); }
private byte[] serializeAvro(Object datum, Schema schema) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); ReflectDatumWriter<Object> writer = new ReflectDatumWriter<Object>(schema); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); out.reset(); writer.write(datum, encoder); encoder.flush(); return out.toByteArray(); }
public byte[] serialize(String topic, T data) { if (data == null) { return null; } DatumWriter<T> datumWriter = new SpecificDatumWriter<>(data.getSchema()); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); BinaryEncoder binaryEncoder = EncoderFactory.get().directBinaryEncoder(outputStream, null); try { datumWriter.write(data, binaryEncoder); } catch (IOException e) { throw new SerializationException(e.getMessage()); } return outputStream.toByteArray(); }
private final static <T> byte[] toBytes(final DatumWriter<T> writer, final T event) { final ByteArrayOutputStream out = new ByteArrayOutputStream(); final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); try { writer.write(event, encoder); encoder.flush(); out.close(); } catch (Exception ex) { log.error("error writing object to byte array", ex); } return out.toByteArray(); }
/** * Serialize as appropriate to send a record to Kafka that contains information * pertaining to the schema version that applies to this record. * * @param record a GenericRecord * @return a byte array representing the encoded Generic record + schema ID * @throws IOException if there is a problem with the encoding */ public byte[] serialize(GenericRecord record) throws IOException { BinaryEncoder encoder = null; Schema schema = record.getSchema(); byte[] rval; // register the schema. // TODO: determine if we need getName() or getFullName() int schemaId = registerSchema(schema.getName(), schema); // serialize the record into a byte array ByteArrayOutputStream out = new ByteArrayOutputStream(); out.write(magic_byte); out.write(ByteBuffer.allocate(idSize).putInt(schemaId).array()); //DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); DatumWriter<Object> writer = new GenericDatumWriter<Object>(schema); encoder = org.apache.avro.io.EncoderFactory.get().directBinaryEncoder(out, encoder); writer.write(record, encoder); encoder.flush(); rval = out.toByteArray(); //out.close(); // noop in the Apache version, so not bothering return rval; }
protected byte[] serializeAvro( GenericData.Record record, Schema schema, String topic, byte opType) throws IOException { short schemaId = getSchemaId(topic, schema); EncoderFactory encoderFactory = EncoderFactory.get(); DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(); writer.setSchema(schema); ByteArrayOutputStream out = new ByteArrayOutputStream(); out.write(PROTO_MAGIC_V0); out.write(ByteBuffer.allocate(opTypeSize).put(opType).array() ); out.write(ByteBuffer.allocate(idSize).putShort(schemaId).array()); BinaryEncoder enc = encoderFactory.binaryEncoder(out, null); writer.write(record, enc); enc.flush(); return out.toByteArray(); }
/** * Serializes the field object using the datumWriter. */ public static<T extends SpecificRecord> void serialize(OutputStream os, SpecificDatumWriter<T> datumWriter, Schema schema, T object) throws IOException { BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null); datumWriter.write(object, encoder); encoder.flush(); }
/** * Serializes the field object using the datumWriter. */ public static<T> void serialize(OutputStream os, SpecificDatumWriter<T> datumWriter, Schema schema, T object) throws IOException { BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null); datumWriter.write(object, encoder); encoder.flush(); }
@Override public void encode(T value, OutputStream outStream) throws IOException { // Get a BinaryEncoder instance from the ThreadLocal cache and attempt to reuse it. BinaryEncoder encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get()); // Save the potentially-new instance for reuse later. encoder.set(encoderInstance); writer.get().write(value, encoderInstance); // Direct binary encoder does not buffer any data and need not be flushed. }
/** * Process single. * * @throws Exception the exception */ public void processSingle() throws Exception { int base = (int) System.currentTimeMillis(); Map<CharSequence,CharSequence> myMap = new HashMap<CharSequence,CharSequence>(); myMap.put("Mike", "one"); myMap.put("Chris", "two"); myMap.put("Rob", "three"); myMap.put("Madeline", "four"); User user = User.newBuilder().setName("name" + base).setFavoriteColor("color" + base).setFavoriteNumber(base) .setMymap(myMap) .build(); DatumWriter<User> datumWriterUser = new SpecificDatumWriter<User>(User.getClassSchema()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] byteData = null; try { BinaryEncoder binaryEncoder = EncoderFactory.get().binaryEncoder(baos, null); datumWriterUser.write(user, binaryEncoder); binaryEncoder.flush(); byteData = baos.toByteArray(); } finally { baos.close(); } System.out.println(byteData.length); DatumReader<User> datumReaderUser = new SpecificDatumReader<User>( User.getClassSchema()); User userRead = datumReaderUser.read(null, DecoderFactory.get().binaryDecoder(byteData, null) ); System.out.println(userRead.getFavoriteColor()); System.out.println(userRead.getMymap()); System.out.println(userRead.getMymap().keySet()); myMap = userRead.getMymap(); System.out.println(myMap.get( new Utf8("Mike"))); }