static void convertAvroToJson(InputStream inputStream, OutputStream outputStream, Schema schema) throws IOException { DatumReader<Object> reader = new GenericDatumReader<>(schema); DatumWriter<Object> writer = new GenericDatumWriter<>(schema); BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(inputStream, null); JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, outputStream, true); Object datum = null; while (!binaryDecoder.isEnd()) { datum = reader.read(datum, binaryDecoder); writer.write(datum, jsonEncoder); jsonEncoder.flush(); } outputStream.flush(); }
/** * Process list. * * @param weathers the weathers * @throws Exception the exception */ public void processList(List<Weather> weathers) throws Exception { long before = System.currentTimeMillis(); BinaryEncoder binaryEncoder = null; BinaryDecoder binaryDecoder = null; Weather weatherRead = null; for (Weather weather : weathers) { DatumWriter<Weather> datumWriterWeather = new SpecificDatumWriter<Weather>(Weather.getClassSchema()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); byte[] byteData = null; try { binaryEncoder = EncoderFactory.get().binaryEncoder(baos, binaryEncoder); datumWriterWeather.write(weather, binaryEncoder); binaryEncoder.flush(); byteData = baos.toByteArray(); } finally { baos.close(); } DatumReader<Weather> datumReaderWeather = new SpecificDatumReader<Weather>(Weather.getClassSchema()); binaryDecoder = DecoderFactory.get().binaryDecoder(byteData, binaryDecoder); weatherRead = datumReaderWeather.read(weatherRead, binaryDecoder); // System.out.println("After Binary Read: " + weatherRead.toString()); } System.out.println("size=" + weathers.size() + ", elapsed: " + (System.currentTimeMillis()-before)); }
/** * Change the schema of an Avro record. * @param record The Avro record whose schema is to be changed. * @param newSchema The target schema. It must be compatible as reader schema with record.getSchema() as writer schema. * @return a new Avro record with the new schema. * @throws IOException if conversion failed. */ public static GenericRecord convertRecordSchema(GenericRecord record, Schema newSchema) throws IOException { if (record.getSchema().equals(newSchema)) { return record; } if (checkReaderWriterCompatibility(newSchema, record.getSchema()).getType() != COMPATIBLE) { LOG.debug("Record schema not compatible with writer schema. Converting record schema to writer schema may fail."); } try { BinaryDecoder decoder = new DecoderFactory().binaryDecoder(recordToByteArray(record), null); DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(record.getSchema(), newSchema); return reader.read(null, decoder); } catch (IOException e) { throw new IOException( String.format("Cannot convert avro record to new schema. Origianl schema = %s, new schema = %s", record.getSchema(), newSchema), e); } }
@SuppressWarnings("unchecked") @Override public JetstreamEvent decode(byte[] key, byte[] message) { ByteArrayInputStream stream = new ByteArrayInputStream(message); BinaryDecoder reusedDecoder = decoderHolder.get(); BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(stream, reusedDecoder); if (reusedDecoder == null) { decoderHolder.set(decoder); } Record object; try { object = reader.read(null, decoder); Map<String, Object> m = (Map<String, Object>) object.get(MAP_FIELD_NAME); return new JetstreamEvent(m); } catch (IOException e) { throw new IllegalArgumentException("Can not read the avro message", e); } }
/** * Change the schema of an Avro record. * @param record The Avro record whose schema is to be changed. * @param newSchema The target schema. It must be compatible as reader schema with record.getSchema() as writer schema. * @return a new Avro record with the new schema. * @throws IOException if conversion failed. */ public static GenericRecord convertRecordSchema(GenericRecord record, Schema newSchema) throws IOException { if (record.getSchema().equals(newSchema)) { return record; } try { BinaryDecoder decoder = new DecoderFactory().binaryDecoder(recordToByteArray(record), null); DatumReader<GenericRecord> reader = new GenericDatumReader<>(record.getSchema(), newSchema); return reader.read(null, decoder); } catch (IOException e) { throw new IOException( String.format("Cannot convert avro record to new schema. Origianl schema = %s, new schema = %s", record.getSchema(), newSchema), e); } }
public KafkaAvroJobMonitor(String topic, MutableJobCatalog catalog, Config config, Schema schema, SchemaVersionWriter<?> versionWriter) { super(topic, catalog, config); this.schema = schema; this.decoder = new ThreadLocal<BinaryDecoder>() { @Override protected BinaryDecoder initialValue() { InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]); return DecoderFactory.get().binaryDecoder(dummyInputStream, null); } }; this.reader = new ThreadLocal<SpecificDatumReader<T>>() { @Override protected SpecificDatumReader<T> initialValue() { return new SpecificDatumReader<>(KafkaAvroJobMonitor.this.schema); } }; this.versionWriter = versionWriter; }
public void handleMsg(BytesMessage msg) { BinaryDecoder binaryDecoder = null; User user = null; try { if (msg.getBodyLength() > 0) { byte[] byteArray = new byte[(int)msg.getBodyLength()]; msg.readBytes(byteArray); ByteArrayInputStream stream = new ByteArrayInputStream(byteArray); binaryDecoder = avroDecoderFactory.binaryDecoder(stream, binaryDecoder); user = avroUserReader.read(user, binaryDecoder); System.out.println("firstName: " + user.getFirstName()); System.out.println("lastName: " + user.getLastName()); System.out.println("City: " + user.getCity()); System.out.println("State: " + user.getState()); } } catch (Exception e) { throw new RuntimeException("Error processing Avro message", e); } }
public EventList deserialize(String project, String collection, SliceInput slice) throws IOException { String json = slice.readSlice(slice.readInt()).toStringUtf8(); Schema schema = new Schema.Parser().parse(json); int records = slice.readInt(); BinaryDecoder binaryDecoder = DecoderFactory.get().directBinaryDecoder(slice, null); List<SchemaField> fields = metastore.getCollection(project, collection); Schema avroSchema = AvroUtil.convertAvroSchema(fields); GenericDatumReader<GenericRecord> reader = new GenericDatumReader(schema, avroSchema); List<Event> list = new ArrayList<>(records); for (int i = 0; i < records; i++) { GenericRecord record = reader.read(null, binaryDecoder); list.add(new Event(project, collection, null, fields, record)); } return new EventList(Event.EventContext.empty(), project, list); }
private void parseTopics() { if (state.getProperty(TOPIC_LIST) != null) { byte[] data = base64.decodeBase64(state.getProperty(TOPIC_LIST)); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null); SpecificDatumReader<Topic> avroReader = new SpecificDatumReader<>(Topic.class); try { // NOSONAR Topic decodedTopic; while (!decoder.isEnd()) { decodedTopic = avroReader.read(null, decoder); LOG.debug("Loaded {}", decodedTopic); topicMap.put(decodedTopic.getId(), decodedTopic); } } catch (Exception ex) { LOG.error("Unexpected exception occurred while reading information from decoder", ex); } } else { LOG.info("No topic list found in state"); } }
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { int size = in.readInt(); byte[] data = new byte[size]; in.read(data); @SuppressWarnings("unchecked") AvroReader<T> reader = (AvroReader<T>) recordReaderMap.get().get(className); if (reader == null) { reader = new AvroReader<T>( new SpecificDatumReader<T>(clazz), DecoderFactory.get().binaryDecoder(data, null) ); recordReaderMap.get().put(className, reader); } BinaryDecoder recordDataDecoder = DecoderFactory.get().binaryDecoder(data, reader.getDecoder()); avroObject = reader.getReader().read(null, recordDataDecoder); }
/** * Decode records from <code>ByteBuffer</code> and return <code>Iterable</code>. * * @param bb the <code>ByteBuffer</code> which decode * @return decoded <code>Iterable</code> */ public Iterable<T> decodeRecords(ByteBuffer bb) throws IOException { byte[] recordDataBody = toByteArray(bb); BinaryDecoder recordDataDecoder = DecoderFactory.get().binaryDecoder(recordDataBody, null); RecordData recordData = recordDataReader.read(null, recordDataDecoder); LOG.debug("Avro event header: {}", recordData.getRecordHeader()); LOG.debug("Avro event data.size: {}", recordData.getEventRecords().size()); List<T> results = new ArrayList<T>(); BinaryDecoder recordDecoder = null; for (ByteBuffer eventBuf : recordData.getEventRecords()) { byte[] recordBody = toByteArray(eventBuf); recordDecoder = DecoderFactory.get().binaryDecoder(recordBody, recordDecoder); T record = recordReader.read(null, recordDecoder); results.add(record); LOG.trace("Parsed record: {}", record); } return results; }
/** * Read tokens from a file and add them to the user's credentials. * @param ugi user's credentials to add tokens to. * @throws IOException if there are errors in reading the tokens' file. */ void addTokensFromFile(final UserGroupInformation ugi) throws IOException { LOG.log(Level.FINE, "Reading security tokens from file: {0}", this.securityTokensFile); try (final FileInputStream stream = new FileInputStream(securityTokensFile)) { final BinaryDecoder decoder = decoderFactory.binaryDecoder(stream, null); while (!decoder.isEnd()) { final SecurityToken token = tokenDatumReader.read(null, decoder); final Token<TokenIdentifier> yarnToken = new Token<>( token.getKey().array(), token.getPassword().array(), new Text(token.getKind().toString()), new Text(token.getService().toString())); LOG.log(Level.FINE, "addToken for {0}", yarnToken.getKind()); ugi.addToken(yarnToken); } } }
/*** * Deserialize byte stream into an AvroSimpleFeature * * @param avroData * serialized bytes of AvroSimpleFeature * @param avroObjectToReuse * null or AvroSimpleFeature instance to be re-used. If null a * new object will be allocated. * @return instance of AvroSimpleFeature with values parsed from avroData * @throws IOException */ private static AvroSimpleFeature deserializeASF( final byte[] avroData, AvroSimpleFeature avroObjectToReuse ) throws IOException { final BinaryDecoder decoder = DECODER_FACTORY.binaryDecoder( avroData, null); if (avroObjectToReuse == null) { avroObjectToReuse = new AvroSimpleFeature(); } DATUM_READER.setSchema(avroObjectToReuse.getSchema()); return DATUM_READER.read( avroObjectToReuse, decoder); }
private static <T> T deserialize( final T avroObject, final byte[] avroData, Class<T> avroClass, Schema avroSchema ) throws IOException { BinaryDecoder decoder = df.binaryDecoder( avroData, null); if (!readers.containsKey(avroClass.toString())) { readers.put( avroClass.toString(), new SpecificDatumReader( avroSchema)); } SpecificDatumReader<T> reader = readers.get(avroClass.toString()); return reader.read( avroObject, decoder); }
@Override public int compare(byte[] o1, int s1, int l1, byte[] o2, int s2, int l2) { try { final BinaryDecoder d1 = DECODER_FACTORY.binaryDecoder(o1, s1, l1, null); final ByteBuffer key1 = d1.readBytes(null); // re-use the decoder instance, but do not re-use the byte buffer, // because DecoratedKey stores a reference final BinaryDecoder d2 = DECODER_FACTORY.binaryDecoder(o2, s2, l2, d1); final ByteBuffer key2 = d2.readBytes(null); return compare(key1, key2); } catch (final IOException e) { throw Throwables.propagate(e); } }
/** * Decode the message content. * * @param <I> The type of the data that is decoded. * @return The message data. * @throws DecodeMessageContentException Exception thrown if decoding the message content fails. */ public <I extends GenericContainer> I decode() throws DecodeMessageContentException { try { DatumReader<I> messageDatumReader = new SpecificDatumReader<I>(schema); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null); return messageDatumReader.read(null, decoder); } catch (Exception ex) { throw new DecodeMessageContentException(ex); } }
@Override public T deserialize(String topic, byte[] data) { DatumReader<T> datumReader = new SpecificDatumReader<>(TopicEnum.STOCK_QUOTATION_AVRO.getType().getSchema()); BinaryDecoder binaryEncoder = DecoderFactory.get().directBinaryDecoder(new ByteArrayInputStream(data), null); try { return datumReader.read(null, binaryEncoder); } catch (IOException e) { throw new RuntimeException(e.getMessage()); } }
@Override public WrapperAppMessage deserialize(String s, byte[] bytes) { //反序列化 DatumReader<WrapperAppMessage> datumReader = new SpecificDatumReader<WrapperAppMessage>(WrapperAppMessage.getClassSchema()); BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(bytes, null); WrapperAppMessage message = new WrapperAppMessage(); try { datumReader.read(message,binaryDecoder); } catch (IOException e) { e.printStackTrace(); } return message; }
public static void main(String[] args) throws Exception { CommandLine commandLine = parseCommandLine(args); String zkUrl = commandLine.getOptionValue(ZOOKEEPER); String statsTopic = commandLine.getOptionValue(STATS_TOPIC); String boostrapBrokers = OperatorUtil.getBrokers(zkUrl); Properties props = new Properties(); props.put("bootstrap.servers", boostrapBrokers); props.put("group.id", "broker_statsreader_group"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); Schema schema = BrokerStats.getClassSchema(); KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(statsTopic)); while (true) { ConsumerRecords<byte[], byte[]> records = consumer.poll(100); for (ConsumerRecord<byte[], byte[]> record : records) { System.out.printf("offset = %d, key.size = %d, value.size = %s%n", record.offset(), record.key().length, record.value().length); try { BinaryDecoder binaryDecoder = avroDecoderFactory.binaryDecoder(record.value(), null); SpecificDatumReader<BrokerStats> reader = new SpecificDatumReader<>(schema); BrokerStats result = new BrokerStats(); reader.read(result, binaryDecoder); System.out.println(result); } catch (Exception e) { LOG.error("Fail to decode an message", e); } } } }
public static BrokerStats deserializeBrokerStats(ConsumerRecord<byte[], byte[]> record) { try { BinaryDecoder binaryDecoder = avroDecoderFactory.binaryDecoder(record.value(), null); Schema schema = BrokerStats.getClassSchema(); SpecificDatumReader<BrokerStats> reader = new SpecificDatumReader<>(schema); BrokerStats stats = new BrokerStats(); reader.read(stats, binaryDecoder); return stats; } catch (Exception e) { LOG.debug("Fail to decode an message", e); return null; } }
@Override public T decode(InputStream inStream) throws IOException { // Get a BinaryDecoder instance from the ThreadLocal cache and attempt to reuse it. BinaryDecoder decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get()); // Save the potentially-new instance for later. decoder.set(decoderInstance); return reader.get().read(null, decoderInstance); }
public void run() { try { ConsumerIterator<byte[], byte[]> it = stream.iterator(); BinaryDecoder binaryDecoder = null; Weather weatherRead = null; DatumReader<Weather> datumReaderWeather = new SpecificDatumReader<Weather>(Weather.getClassSchema()); while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next(); String key = new String(messageAndMetadata.key()); binaryDecoder = DecoderFactory.get().binaryDecoder(messageAndMetadata.message(), binaryDecoder); weatherRead = datumReaderWeather.read(weatherRead, binaryDecoder); // User user = (User) // recordInjection.invert(messageAndMetadata.message()).get(); String summary = "Thread " + threadNumber + ", topic=" + messageAndMetadata.topic() + ", partition=" + messageAndMetadata.partition() + ", key=" + key + ", offset=" + messageAndMetadata.offset() + ", timestamp=" + messageAndMetadata.timestamp() + ", timestampType=" + messageAndMetadata.timestampType() + ", weatherRead=" + weatherRead.toString(); System.out.println(summary); } System.out.println("Shutting down Thread: " + threadNumber); } catch (Exception e) { System.out.println("Exception in thread "+threadNumber); System.out.println(e); e.printStackTrace(); } }
public void run() { try { ConsumerIterator<byte[], byte[]> it = stream.iterator(); BinaryDecoder binaryDecoder = null; Wave waveRead = null; DatumReader<Wave> datumReaderWave = new SpecificDatumReader<Wave>(Wave.getClassSchema()); while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next(); String key = new String(messageAndMetadata.key()); binaryDecoder = DecoderFactory.get().binaryDecoder(messageAndMetadata.message(), binaryDecoder); waveRead = datumReaderWave.read(waveRead, binaryDecoder); // User user = (User) // recordInjection.invert(messageAndMetadata.message()).get(); String summary = ">>> CONSUMER: Thread " + threadNumber + ", topic=" + messageAndMetadata.topic() + ", partition=" + messageAndMetadata.partition() + ", key=" + key + ", offset=" + messageAndMetadata.offset() + ", timestamp=" + messageAndMetadata.timestamp() + ", timestampType=" + messageAndMetadata.timestampType() + ", waveRead=" + waveRead.toString(); System.out.println(summary); } System.out.println("Shutting down Thread: " + threadNumber); } catch (Exception e) { System.out.println("Exception in thread "+threadNumber); System.out.println(e); e.printStackTrace(); } }
private static <T> T deserialize(final T avroObject, final byte[] avroData, Class<T> avroClass, Schema avroSchema) throws IOException { BinaryDecoder decoder = df.binaryDecoder(avroData, null); if (!readers.containsKey(avroClass.toString())){ readers.put(avroClass.toString(), new SpecificDatumReader(avroSchema)); } SpecificDatumReader<T> reader = readers.get(avroClass.toString()); return reader.read(avroObject, decoder); }
@Override public Object read(Kryo kryo, Input input, Class type) { int length = input.readInt(true); byte[] bytes = input.readBytes(length); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(bytes, null); try { return reader.read(null, decoder); } catch (IOException e) { throw new AppException(e); } }
@Override public void init(StarTreeConfig starTreeConfig, ThirdEyeKafkaConfig kafkaConfig) throws Exception { this.starTreeConfig = starTreeConfig; this.decoderThreadLocal = new ThreadLocal<BinaryDecoder>(); // Get schema String schemaUri = kafkaConfig.getDecoderConfig().getProperty(PROP_SCHEMA_URI); if (schemaUri == null) { throw new IllegalStateException("Must provide " + PROP_SCHEMA_URI); } InputStream schemaInputStream = URI.create(schemaUri).toURL().openStream(); Schema schema = Schema.parse(schemaInputStream); schemaInputStream.close(); LOG.info("Init decoder for {} with schema {}", starTreeConfig.getCollection(), schema); // Set decoder this.datumReader = new GenericDatumReader<GenericRecord>(schema); // Set any record offset String recordOffsetProp = kafkaConfig.getDecoderConfig().getProperty(PROP_RECORD_OFFSET); if (recordOffsetProp != null) { this.recordOffset = Integer.valueOf(recordOffsetProp); } }
@Test public void testWriteAvroRecordsDropSchema() throws InterruptedException, StageException, IOException { DataGeneratorFormatConfig dataGeneratorFormatConfig = new DataGeneratorFormatConfig(); dataGeneratorFormatConfig.avroSchema = SdcAvroTestUtil.AVRO_SCHEMA1; dataGeneratorFormatConfig.avroSchemaSource = INLINE; dataGeneratorFormatConfig.includeSchema = false; dataGeneratorFormatConfig.avroCompression = AvroCompression.NULL; FlumeTarget flumeTarget = FlumeTestUtil.createFlumeTarget( FlumeTestUtil.createDefaultFlumeConfig(port, false), DataFormat.AVRO, dataGeneratorFormatConfig ); TargetRunner targetRunner = new TargetRunner.Builder(FlumeDTarget.class, flumeTarget).build(); targetRunner.runInit(); List<Record> records = SdcAvroTestUtil.getRecords1(); targetRunner.runWrite(records); targetRunner.runDestroy(); List<GenericRecord> genericRecords = new ArrayList<>(); DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); //Reader schema argument is optional datumReader.setSchema(new Schema.Parser().parse(SdcAvroTestUtil.AVRO_SCHEMA1)); Transaction transaction = ch.getTransaction(); transaction.begin(); Event event = ch.take(); while(event != null) { BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(event.getBody(), null); GenericRecord read = datumReader.read(null, decoder); genericRecords.add(read); event = ch.take(); } transaction.commit(); transaction.close(); Assert.assertEquals(3, genericRecords.size()); SdcAvroTestUtil.compare1(genericRecords); }
public static Person desFromAvroBytes(byte[] record) throws IOException { DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema); BinaryDecoder decoder = null; decoder = DecoderFactory.get().binaryDecoder(record, decoder); GenericRecord avroValue = datumReader.read(null, decoder); return fromAvroRecord(avroValue); }
@Test public void singletonListTest() throws Exception { ByteBuffer bb = DecimalUtils.byteArray2byteBuffer("yuzhouwan".getBytes()); List<ByteBuffer> bytes = Collections.singletonList(bb); ByteBufferInputStream inputStream = new ByteBufferInputStream(bytes); BinaryDecoder bd = DecoderFactory.get().binaryDecoder(inputStream, null); // System.out.println(new String(DecimalUtils.byteBuffer2byteArray( // DecoderFactory.get().binaryDecoder(inputStream, null).readBytes(bb)))); }
public Object deserialize(byte[] msg) throws IOException, IllegalStateException { if(!inited) throw new IllegalStateException("Serializer must be initialized"); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(msg, null); return reader.read(null, decoder); }
/** * Starts the consumer threads. Each consumer thread will request a * {@link EventHandler} from the provided {@link Supplier}. */ public void startConsumer() { ImmutableMap<String, Integer> threadsPerTopicMap = ImmutableMap.of(Objects.requireNonNull(topic), numThreads); for(final KafkaStream<byte[], byte[]> stream : consumer.createMessageStreams(threadsPerTopicMap).get(topic)) { final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(new byte[0], null); final SpecificDatumReader<T> reader = new SpecificDatumReader<>(schema); scheduleReader(stream, decoder, reader); } }
@Override public Iterable<GenericRecord> convertRecord(Schema outputSchema, byte[] inputRecord, WorkUnitState workUnit) throws DataConversionException { Preconditions.checkNotNull(recordReader, "Must have called convertSchema!"); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputRecord, decoderCache.get()); try { GenericRecord parsedRecord = recordReader.read(null, decoder); decoderCache.set(decoder); return Collections.singleton(parsedRecord); } catch (IOException e) { throw new DataConversionException("Error parsing record", e); } }
/** * Deserialize messages of type TMessage from input decoder. * @param decoder An Avro BinaryDecoder instance that is reading the input stream containing the message. * @param observer An instance of the MultiObserver class that will process the message. * @param sequence A long value which contains the sequence number of the message in the input stream. * @throws IOException Read of input stream in decoder failed. * @throws IllegalAccessException Target method in observer is not accessible. * @throws InvocationTargetException Subclass threw and exception. */ public void deserialize(final BinaryDecoder decoder, final MultiObserver observer, final long sequence) throws IOException, IllegalAccessException, InvocationTargetException { final TMessage message = messageReader.read(null, decoder); if (message != null) { observer.onNext(sequence, message); } else { throw new RuntimeException("Failed to deserialize message [" + msgMetaClass.getSimpleName() + "]"); } }
@SuppressWarnings("deprecation") protected Decoder createDecoder() throws IOException { switch(codecType) { case BINARY: return new BinaryDecoder(getOrCreateInputStream()); case JSON: return new JsonDecoder(schema, getOrCreateInputStream()); } return null; }
synchronized public static <T> T deserialize( final byte[] avroData, final Schema avroSchema ) { try { final BinaryDecoder decoder = df.binaryDecoder( avroData, null); final String schemaName = getSchemaName(avroSchema); if (!readers.containsKey(schemaName)) { readers.put( schemaName, new SpecificDatumReader<T>( avroSchema)); } final SpecificDatumReader<T> reader = readers.get(schemaName); return reader.read( null, decoder); } catch (final IOException e) { LOGGER.error( "Unable to deserialize byte[] to Avro object: " + e.getMessage(), e); return null; } }
private Object deserialize(byte[] data, Class<?> clazz){ try{ ReflectDatumReader datumReader = new ReflectDatumReader<>(clazz); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null); return datumReader.read(null, decoder); }catch(IOException e){ return null; } }
@Override public void deserializeWalletExtension(Wallet containingWallet, byte[] data) throws Exception { ReflectDatumReader<KeyShareWalletExtension> datumReader = new ReflectDatumReader<>(KeyShareWalletExtension.class); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null); KeyShareWalletExtension result = datumReader.read(null, decoder); this.privateKey = result.privateKey; this.otherPublicKey = result.otherPublicKey; this.pkpDesktop = result.pkpDesktop; this.pkpPhone = result.pkpPhone; this.desktopBCParameters = result.desktopBCParameters; this.phoneBCParameters = result.phoneBCParameters; }
@Test public void testAvroGeneric() throws IOException { loadProperties("flume-log4jtest-avro-generic.properties"); PropertyConfigurator.configure(props); Logger logger = LogManager.getLogger(TestLog4jAppenderWithAvro.class); String msg = "This is log message number " + String.valueOf(0); Schema schema = new Schema.Parser().parse( getClass().getClassLoader().getResource("myrecord.avsc").openStream()); GenericRecordBuilder builder = new GenericRecordBuilder(schema); GenericRecord record = builder.set("message", msg).build(); logger.info(record); Transaction transaction = ch.getTransaction(); transaction.begin(); Event event = ch.take(); Assert.assertNotNull(event); GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(event.getBody(), null); GenericRecord recordFromEvent = reader.read(null, decoder); Assert.assertEquals(msg, recordFromEvent.get("message").toString()); Map<String, String> hdrs = event.getHeaders(); Assert.assertNull(hdrs.get(Log4jAvroHeaders.MESSAGE_ENCODING.toString())); Assert.assertEquals("Schema URL should be set", "file:///tmp/myrecord.avsc", hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_URL.toString())); Assert.assertNull("Schema string should not be set", hdrs.get(Log4jAvroHeaders.AVRO_SCHEMA_LITERAL.toString())); transaction.commit(); transaction.close(); }