/** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see Thread#run() */ @Override public void run() { ConsumerConnector consumerConnector = KafkaUtils.createConsumerConnector(zkAddr, group); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(CONSUMER_OFFSET_TOPIC, new Integer(1)); KafkaStream<byte[], byte[]> offsetMsgStream = consumerConnector.createMessageStreams(topicCountMap).get(CONSUMER_OFFSET_TOPIC).get(0); ConsumerIterator<byte[], byte[]> it = offsetMsgStream.iterator(); while (true) { MessageAndMetadata<byte[], byte[]> offsetMsg = it.next(); if (ByteBuffer.wrap(offsetMsg.key()).getShort() < 2) { try { GroupTopicPartition commitKey = readMessageKey(ByteBuffer.wrap(offsetMsg.key())); if (offsetMsg.message() == null) { continue; } kafka.common.OffsetAndMetadata commitValue = readMessageValue(ByteBuffer.wrap(offsetMsg.message())); kafkaConsumerOffsets.put(commitKey, commitValue); } catch (Exception e) { e.printStackTrace(); } } } }
public MessageAndMetadata getNextMessage(String topic) { List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); // it has only a single stream, because there is only one consumer KafkaStream stream = streams.get(0); final ConsumerIterator<byte[], byte[]> it = stream.iterator(); int counter = 0; try { if (it.hasNext()) { return it.next(); } else { return null; } } catch (ConsumerTimeoutException e) { logger.error("0 messages available to fetch for the topic " + topic); return null; } }
@Test public void shouldWriteThenRead() throws Exception { //Create a consumer ConsumerIterator<String, String> it = buildConsumer(Original.topic); //Create a producer producer = new KafkaProducer<>(producerProps()); //send a message producer.send(new ProducerRecord<>(Original.topic, "message")).get(); //read it back MessageAndMetadata<String, String> messageAndMetadata = it.next(); String value = messageAndMetadata.message(); assertThat(value, is("message")); }
@Test public void shouldWriteThenRead() throws Exception { //Create a consumer ConsumerIterator<String, String> it = buildConsumer(SimpleKafkaTest.topic); //Create a producer producer = new KafkaProducer<>(producerProps()); //send a message producer.send(new ProducerRecord<>(SimpleKafkaTest.topic, "message")).get(); //read it back MessageAndMetadata<String, String> messageAndMetadata = it.next(); String value = messageAndMetadata.message(); assertThat(value, is("message")); }
@Override public void run() { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> mam = it.next(); String jsonStr = ""; try { jsonStr = new String(mam.message()); JSONObject jsonObject = JSONObject.parseObject(jsonStr); LogcenterConfig config = LogConfigCache.getLogConfigCache(jsonObject); IStorageApi iStorageApi = ServiceRegister.getInstance().getProvider(config.getStorageType()); iStorageApi.save(jsonObject); } catch (Exception e) { e.printStackTrace(); logger.error("partition[" + mam.partition() + "]," + "offset[" + mam.offset() + "], " + jsonStr, e); continue; } } }
@Test public void testLogging() throws InterruptedException { for (int i = 0; i<1000; ++i) { logger.info("message"+i); } final KafkaStream<byte[], byte[]> log = kafka.createClient().createMessageStreamsByFilter(new Whitelist("logs"),1).get(0); final ConsumerIterator<byte[], byte[]> iterator = log.iterator(); for (int i=0; i<1000; ++i) { final String messageFromKafka = new String(iterator.next().message(), UTF8); assertThat(messageFromKafka, Matchers.equalTo("message"+i)); } }
@Override public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(transducer_topic, new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream<String, String> stream = consumerMap.get(transducer_topic).get(0); ConsumerIterator<String, String> it = stream.iterator(); while (it.hasNext() && bStartConsume){ transducerDataProcessor.newData(it.next().message()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
public void run() { ConsumerIterator<String, String> it = stream.iterator(); while (it.hasNext()) { MessageAndMetadata<String, String> consumerIterator = it.next(); String uploadMessage = consumerIterator.message(); System.out.println(Thread.currentThread().getName() + " from partiton[" + consumerIterator.partition() + "]: " + uploadMessage); try { sendDataToIotdb.writeData(uploadMessage); // upload data to the IoTDB database } catch (Exception ex) { System.out.println("SQLException: " + ex.getMessage()); } } }
public void nextTuple() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(TopologyConfig.kafkaTopic, 1);//one excutor - one thread Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = conn.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(kafkaTopic); ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator(); while(true){ while(iter.hasNext()){ String s = new String(iter.next().message()); collector.emit(new Values(s)); UUID msgId = UUID.randomUUID(); this.pending.put(msgId, new Values(s)); } try { Thread.sleep(1000L); } catch (InterruptedException e) { logger.error("Spout : sleep wrong \n", e); } } }
private void processStreamsByTopic(String topicKeys, List<KafkaStream<byte[], byte[]>> streamList) { // init stream thread pool ExecutorService streamPool = Executors.newFixedThreadPool(partitions); String[] topics = StringUtils.split(topicKeys, ","); if (log.isDebugEnabled()) log.debug("准备处理消息流集合 KafkaStreamList,topic count={},topics={}, partitions/topic={}", topics.length, topicKeys, partitions); //遍历stream AtomicInteger index = new AtomicInteger(0); for (KafkaStream<byte[], byte[]> stream : streamList) { Thread streamThread = new Thread() { @Override public void run() { int i = index.getAndAdd(1); if (log.isDebugEnabled()) log.debug("处理消息流KafkaStream -- No.={}, partitions={}", i, partitions + ":" + i); ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator(); processStreamByConsumer(topicKeys, consumerIterator); } }; streamPool.execute(streamThread); } }
public void collectMq(){ Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(Constants.kfTopic, new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream<String, String> stream = consumerMap.get(Constants.kfTopic).get(0); ConsumerIterator<String, String> it = stream.iterator(); MessageAndMetadata<String, String> msgMeta; while (it.hasNext()){ msgMeta = it.next(); super.mqTimer.parseMqText(msgMeta.key(), msgMeta.message()); //System.out.println(msgMeta.key()+"\t"+msgMeta.message()); } }
private void consumeMessages() { final Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(TOPIC, 1); final StringDecoder decoder = new StringDecoder(new VerifiableProperties()); final Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, decoder, decoder); final KafkaStream<String, String> stream = consumerMap.get(TOPIC).get(0); final ConsumerIterator<String, String> iterator = stream.iterator(); Thread kafkaMessageReceiverThread = new Thread( () -> { while (iterator.hasNext()) { String msg = iterator.next().message(); msg = msg == null ? "<null>" : msg; System.out.println("got message: " + msg); messagesReceived.add(msg); } }, "kafkaMessageReceiverThread" ); kafkaMessageReceiverThread.start(); }
public void run() { try { while(true){ ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); while (it.hasNext()) { String m = null; try { m = new String(it.next().message(), this.kafkaInput.encoding); Map<String, Object> event = this.decoder .decode(m); if(zkDistributed==null){ this.kafkaInput.process(event); }else{ zkDistributed.route(event); } } catch (Exception e) { logger.error("process event:{} failed:{}",m,ExceptionUtil.getErrorMessage(e)); } } } } catch (Exception t) { logger.error("kakfa Consumer fetch is error:{}",ExceptionUtil.getErrorMessage(t)); } }
public void run() { try { while(true){ ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); while (it.hasNext()) { String m = null; try { m = new String(it.next().message(), this.kafkaInput.encoding); Map<String, Object> event = this.decoder .decode(m); if (event!=null&&event.size()>0){ this.kafkaInput.process(event); } } catch (Exception e) { logger.error("process event:{} failed:{}",m,e.getCause()); } } } } catch (Exception t) { logger.error("kakfa Consumer fetch is error:{}",t.getCause()); } }
public void run() { Iote2eRequestReuseItem iote2eRequestReuseItem = new Iote2eRequestReuseItem(); ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator(); while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next(); String key = new String(messageAndMetadata.key()); try { String summary = "Thread " + threadNumber + ", topic=" + messageAndMetadata.topic() + ", partition=" + messageAndMetadata.partition() + ", key=" + key + ", offset=" + messageAndMetadata.offset() + ", timestamp=" + messageAndMetadata.timestamp() + ", timestampType=" + messageAndMetadata.timestampType() + ", iote2eRequest=" + iote2eRequestReuseItem.fromByteArray(messageAndMetadata.message()).toString(); logger.info(">>> Consumed: " + summary); } catch( Exception e ) { logger.error(e.getMessage(), e); } } logger.info(">>> Shutting down Thread: " + threadNumber); }
public void run() { ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator(); while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next(); String key = new String( messageAndMetadata.key() ); String message = new String( messageAndMetadata.message() ); String summary = "Thread " + threadNumber + ", topic=" + messageAndMetadata.topic() + ", partition=" + messageAndMetadata.partition() + ", key=" + key + ", message=" + message + ", offset=" + messageAndMetadata.offset() + ", timestamp=" + messageAndMetadata.timestamp() + ", timestampType=" + messageAndMetadata.timestampType(); logger.info(">>> Consumed: " + summary); } logger.info(">>> Shutting down Thread: " + threadNumber); }
public void run() { try { ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(User.getClassSchema()); while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next(); String key = new String(messageAndMetadata.key()); User user = genericRecordToUser(recordInjection.invert(messageAndMetadata.message()).get()); // User user = (User) // recordInjection.invert(messageAndMetadata.message()).get(); String summary = "Thread " + m_threadNumber + ", topic=" + messageAndMetadata.topic() + ", partition=" + messageAndMetadata.partition() + ", key=" + key + ", user=" + user.toString() + ", offset=" + messageAndMetadata.offset() + ", timestamp=" + messageAndMetadata.timestamp() + ", timestampType=" + messageAndMetadata.timestampType(); System.out.println(summary); } System.out.println("Shutting down Thread: " + m_threadNumber); } catch (Exception e) { System.out.println("Exception in thread "+m_threadNumber); System.out.println(e); e.printStackTrace(); } }
public void run() { ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator(); while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next(); String key = new String( messageAndMetadata.key() ); String message = new String( messageAndMetadata.message() ); String summary = "Thread " + threadNumber + ", topic=" + messageAndMetadata.topic() + ", partition=" + messageAndMetadata.partition() + ", key=" + key + ", message=" + message + ", offset=" + messageAndMetadata.offset() + ", timestamp=" + messageAndMetadata.timestamp() + ", timestampType=" + messageAndMetadata.timestampType(); System.out.println(summary); } System.out.println("Shutting down Thread: " + threadNumber); }
public void recv() { consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); Map<String, Integer> topicMap = new HashMap<String, Integer>(); topicMap.put(topic, new Integer(1)); Map<String, List<KafkaStream<String, String>>> streamMap = consumer.createMessageStreams(topicMap, new StringDecoder(null), new StringDecoder(null)); KafkaStream<String, String> stream = streamMap.get(topic).get(0); ConsumerIterator<String, String> it = stream.iterator(); while (it.hasNext()) { MessageAndMetadata<String, String> mm = it.next(); System.out.println("<<< Got new message"); System.out.println("<<< key:" + mm.key()); System.out.println("<<< m: " + mm.message()); } }
public void run() { logger.debug("KafkaChannel {} has stream", this.threadNumber); final ConsumerIterator<byte[], byte[]> streamIterator = stream.iterator(); running = true; while (running) { try { if (streamIterator.hasNext()) { final byte[] message = streamIterator.next().message(); logger.debug("Thread {}: {}", threadNumber, message.toString()); consumeMessage(message); } } catch (ConsumerTimeoutException cte) { logger.debug("Timed out when consuming from Kafka", cte); KafkaHealthCheck.getInstance().heartAttack(cte.getMessage()); } } }
public void run() { logger.info("KafkaChannel {} has stream", this.threadNumber); final ConsumerIterator<byte[], byte[]> streamIterator = stream.iterator(); running = true; while (running) { try { if (streamIterator.hasNext()) { MessageAndMetadata<byte[], byte[]> messageAndMetadata = streamIterator.next(); byte[] key = messageAndMetadata.key(); byte[] message = messageAndMetadata.message(); consumeMessage(key, message); } } catch (ConsumerTimeoutException cte) { logger.debug("Timed out when consuming from Kafka", cte); KafkaHealthCheck.getInstance().heartAttack(cte.getMessage()); } } }
/** * Modified example from kafka site with some defensive checks added. */ private ConsumerIterator<String, String> getStreamIterator() { Map<String, Integer> topicCountMap = ImmutableMap.of(topic, TOPIC_COUNT); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, msgDecoder); List<KafkaStream<String, String>> streams = consumerMap.get(topic); Preconditions.checkNotNull(streams, "There is no topic named : " + topic); //copy in case of live list returned. Needed for index check below. ImmutableList<KafkaStream<String, String>> streamsCopy = ImmutableList.copyOf(streams); Preconditions.checkElementIndex(FIRST_ELEMENT_INDEX, streamsCopy.size(), "Failed to find any KafkaStreams related to topic : " + topic); KafkaStream<String, String> stream = streamsCopy.get(FIRST_ELEMENT_INDEX); Preconditions.checkNotNull(stream, "Returned kafka stream is null"); ConsumerIterator<String, String> iterator = stream.iterator(); Preconditions.checkNotNull(iterator, "Returned kafka iterator is null"); return iterator; }
@Override public void run() { ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); while (it.hasNext()) { MessageAndMetadata<byte[], byte[]> md = it.next(); byte msg[] = md.message(); long offset = md.offset(); String smsg = new String(msg); try { m_loader.insertRow(new RowWithMetaData(smsg, offset), m_csvParser.parseLine(smsg)); } catch (Exception ex) { m_log.error("Consumer stopped", ex); System.exit(1); } } }
/** * To avoid too many try-catches this is separate.. */ public void runrun() { log.info("Waiting to consume data"); ConsumerIterator<byte[], byte[]> it = stream.iterator(); //loop through all messages in the stream while (it.hasNext()) { byte[] msg = it.next().message(); if (msg.length < 2) { //valid messages are longer than 2 bytes as the first one is schema id //once upon time some libraries (pypro) would start with a short message to try if the kafka topic was alive. this is what topic polling refers to. log.info("ignoring short msg, assuming topic polling"); continue; } // log.trace("Thread " + id + ":: " + Arrays.toString(msg)); process(msg); } log.info("Shutting down consumer Thread: " + id); }
/** * Starts the consumer thread. */ @Override public void run() { log.debug("Starting consumer for topic {}", topic); ConsumerIterator<byte[], byte[]> it = stream.iterator(); // For each message present on the partition... while (it.hasNext()) { Map<String, Object> event = null; // Parse it with the parser associated with the topic try { event = parser.parse(new String(it.next().message(), "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } // Send it to the source if (event != null) { source.send(topic.getName(), event); } } log.debug("Finished consumer for topic {}", topic); }
/**消费消息 [指定Topic] * * @param topicName 队列名称 * @param groupId Group Name * @return */ static MsgIterator consume(String topicName, String groupId) { ConsumerConnector consumerConnector = KafkaHelper.getConsumer(groupId); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); //(topic, #stream) pair topicCountMap.put(topicName, new Integer(1)); //TODO: 可消费多个topic Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); //Using default decoder List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get(topicName); //The number of items in the list is #streams, Each Stream supoorts an iterator over message/metadata pair KafkaStream<byte[], byte[]> stream = streamList.get(0); //KafkaStream[K,V] K代表partitio Key的类型,V代表Message Value的类型 ConsumerIterator<byte[], byte[]> it = stream.iterator(); MsgIterator iter = new MsgIterator(it); return iter; }
/**消费消息 [指定Topic] 指定线程 * * @param topicName 队列名称 * @param numStreams Number of streams to return * @return A list of MsgIterator each of which provides an iterator over message over allowed topics */ static List<MsgIterator> consume(String topicName, int numStreams, String groupId) { ConsumerConnector consumerConnector = KafkaHelper.getConsumer(groupId); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); //(topic, #stream) pair topicCountMap.put(topicName, numStreams); //TODO: 可消费多个topic Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); //Using default decoder List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get(topicName); //The number of items in the list is #streams, Each Stream supoorts an iterator over message/metadata pair List<MsgIterator> iterList = new ArrayList<MsgIterator>(); for (KafkaStream<byte[], byte[]> stream : streamList) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); MsgIterator iter = new MsgIterator(it); iterList.add(iter); } //KafkaStream[K,V] K代表partitio Key的类型,V代表Message Value的类型 return iterList; }
@Override protected int getRecordsInTarget() { int expectedRecordsInTarget = 0; for(KafkaStream<byte[], byte[]> kafkaStream : kafkaStreams) { ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator(); try { while (it.hasNext()) { expectedRecordsInTarget++; it.next(); } } catch (kafka.consumer.ConsumerTimeoutException e) { //no-op } } return expectedRecordsInTarget; }
@Test public void test_sendMessage() throws Exception { createTopic(topic); CountDownLatch latch = new CountDownLatch(1); ProducerProperties properties = new ProducerProperties(); properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true); DataKafkaBroker dataChannel = new DataKafkaBroker("localhost", START_PORT, 0, topic,new NioEventLoopGroup(), properties); dataChannel.connect().sync(); dataChannel.send(freeLaterBuffer("1".getBytes()), 0, freeLaterBuffer(TEST_MESSAGE.getBytes())); final KafkaStream<byte[], byte[]> stream = consume(topic).get(0); final ConsumerIterator<byte[], byte[]> messages = stream.iterator(); Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE)); dataChannel.disconnect(); }
@Test public void test_no_acks_send_message() throws Exception { String topic = "test_no_acks_send_message"; createTopic(topic, 1); ProducerProperties properties = new ProducerProperties(); properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true); properties.override(ProducerProperties.DATA_ACK, Acknowledgment.WAIT_FOR_NO_ONE); KafkaProducer producer = new KafkaProducer("localhost", START_PORT, topic, properties); producer.connect().sync(); KafkaTopic kafkaTopic = producer.topic(); kafkaTopic.send(null, freeLaterBuffer(TEST_MESSAGE.getBytes())); final List<KafkaStream<byte[], byte[]>> consume = consume(topic); final KafkaStream<byte[], byte[]> stream = consume.get(0); final ConsumerIterator<byte[], byte[]> messages = stream.iterator(); Assert.assertThat(TEST_MESSAGE, is(new String(messages.next().message()))); producer.disconnect().sync(); }
@Test public void test_producer() throws Exception { String topic = "test"; ProducerProperties properties = new ProducerProperties(); properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true); createTopic(topic); KafkaProducer producer = new KafkaProducer("localhost", START_PORT, topic, properties); producer.connect().sync(); KafkaTopic kafkaTopic = producer.topic(); kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "01").getBytes())); kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "02").getBytes())); kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "03").getBytes())); final KafkaStream<byte[], byte[]> stream = consume(topic).get(0); final ConsumerIterator<byte[], byte[]> messages = stream.iterator(); Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "01")); Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "02")); Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "03")); producer.disconnect().sync(); }
@Override public void run() { ConsumerIterator<byte[], byte[]> iter = kafkaStream.iterator(); MessageAndMetadata<byte[], byte[]> msg; int total = 0, fail = 0, success = 0; long start = System.currentTimeMillis(); while (iter.hasNext()) { try { msg = iter.next(); _log.info("Thread {}: {}", threadNum, new String(msg.message(), "utf-8")); _log.info("partition: {}, offset: {}", msg.partition(), msg.offset()); success++; } catch (Exception e) { _log.error("{}", e); fail++; } _log.info("Count [fail/success/total]: [{}/{}/{}], Time: {}s", fail, success, ++total, (System.currentTimeMillis() - start) / 1000); } }
@Test public void testKafkaLogAppender() { Properties consumerProps = new Properties(); consumerProps.put("zookeeper.connect", zookeeper); consumerProps.put("group.id", "kafka-log-appender-test"); consumerProps.put("auto.offset.reset", "smallest"); consumerProps.put("schema.registry.url", schemaRegistry); Map<String, Integer> topicMap = new HashMap<String, Integer>(); topicMap.put(topic, 1); ConsumerIterator<String, Object> iterator = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps)) .createMessageStreams(topicMap, new StringDecoder(null), new KafkaAvroDecoder(new VerifiableProperties(consumerProps))) .get(topic).get(0).iterator(); String testMessage = "I am a test message"; logger.info(testMessage); MessageAndMetadata<String, Object> messageAndMetadata = iterator.next(); GenericRecord logLine = (GenericRecord) messageAndMetadata.message(); assertEquals(logLine.get("line").toString(), testMessage); assertEquals(logLine.get("logtypeid"), KafkaLogAppender.InfoLogTypeId); assertNotNull(logLine.get("source")); assertEquals(((Map<CharSequence, Object>) logLine.get("timings")).size(), 1); assertEquals(((Map<CharSequence, Object>) logLine.get("tag")).size(), 2); }
public void activate() { consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); Map<String,Integer> topickMap = new HashMap<String, Integer>(); topickMap.put(topic, 1); System.out.println("*********Results********topic:"+topic); Map<String, List<KafkaStream<byte[],byte[]>>> streamMap=consumer.createMessageStreams(topickMap); KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0); ConsumerIterator<byte[],byte[]> it =stream.iterator(); while(it.hasNext()){ String value =new String(it.next().message()); SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd日 HH:mm:ss SSS"); Date curDate = new Date(System.currentTimeMillis()); String str = formatter.format(curDate); System.out.println("storm接收到来自kafka的消息--->" + value); collector.emit(new Values(value,1,str), value); } }
@Override public void run() { LOG.info( "Consuming thread started" ); try { ConsumerIterator<byte[], byte[]> it = _stream.iterator(); while ( it.hasNext() ) { long start = System.currentTimeMillis(); byte[] message = it.next().message(); LOG.debug( "message received: {}", ( new String( message ) ) ); _handler.onMessage( message ); long time = System.currentTimeMillis() - start; KruxStdLib.STATSD.time( "message_received." + _topic, time ); } } catch ( Exception e ) { if ( e instanceof InterruptedException ) { LOG.warn( "Consumer group threads interrupted, shutting down" ); } else { LOG.error( "no longer fetching messages", e ); } } }
public void testConsumer() throws Exception { String fileName = "logX.txt"; BufferedWriter out = new BufferedWriter(new FileWriter(fileName)); Map<String, Integer> topicCount = new HashMap<String, Integer>(); topicCount.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); for (final KafkaStream stream : streams) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { String msg = new String(it.next().message(), Charset.forName("UTF-8")); System.out.println("Message from Single Topic: " + msg); out.write(msg, 0, msg.length()); out.write('\n'); out.flush(); } } if (consumer != null) { consumer.shutdown(); } if (null != out) { out.close(); } }
@Override public void run() { while (this.shouldContinue) { final ConsumerIterator<byte[], byte[]> it = streams.get(0).iterator(); if (it.hasNext()) { final byte[] message = it.next().message(); synchronized (this) { this.message = message; // Wake up getMessage() if it is waiting if (this.waiting) { notify(); } while (this.message != null && this.shouldContinue) try { wait(); } catch (InterruptedException e) { logger.info("Wait interrupted", e); } } } } logger.info("readerThread {} exited", this.readerThread.getName()); this.readerThread = null; }