Java 类kafka.consumer.ConsumerIterator 实例源码

项目:Kafka-Insight    文件:KafkaOffsetGetter.java   
/**
 * When an object implementing interface <code>Runnable</code> is used
 * to create a thread, starting the thread causes the object's
 * <code>run</code> method to be called in that separately executing
 * thread.
 * <p>
 * The general contract of the method <code>run</code> is that it may
 * take any action whatsoever.
 *
 * @see Thread#run()
 */
@Override
public void run() {
    ConsumerConnector consumerConnector = KafkaUtils.createConsumerConnector(zkAddr, group);
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(CONSUMER_OFFSET_TOPIC, new Integer(1));
    KafkaStream<byte[], byte[]> offsetMsgStream = consumerConnector.createMessageStreams(topicCountMap).get(CONSUMER_OFFSET_TOPIC).get(0);

    ConsumerIterator<byte[], byte[]> it = offsetMsgStream.iterator();
    while (true) {

        MessageAndMetadata<byte[], byte[]> offsetMsg = it.next();
        if (ByteBuffer.wrap(offsetMsg.key()).getShort() < 2) {
            try {
                GroupTopicPartition commitKey = readMessageKey(ByteBuffer.wrap(offsetMsg.key()));
                if (offsetMsg.message() == null) {
                    continue;
                }
                kafka.common.OffsetAndMetadata commitValue = readMessageValue(ByteBuffer.wrap(offsetMsg.message()));
                kafkaConsumerOffsets.put(commitKey, commitValue);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
项目:flume-release-1.7.0    文件:KafkaConsumer.java   
public MessageAndMetadata getNextMessage(String topic) {
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
  // it has only a single stream, because there is only one consumer
  KafkaStream stream = streams.get(0);
  final ConsumerIterator<byte[], byte[]> it = stream.iterator();
  int counter = 0;
  try {
    if (it.hasNext()) {
      return it.next();
    } else {
      return null;
    }
  } catch (ConsumerTimeoutException e) {
    logger.error("0 messages available to fetch for the topic " + topic);
    return null;
  }
}
项目:open-kilda    文件:Original.java   
@Test
public void shouldWriteThenRead() throws Exception {

    //Create a consumer
    ConsumerIterator<String, String> it = buildConsumer(Original.topic);

    //Create a producer
    producer = new KafkaProducer<>(producerProps());

    //send a message
    producer.send(new ProducerRecord<>(Original.topic, "message")).get();

    //read it back
    MessageAndMetadata<String, String> messageAndMetadata = it.next();
    String value = messageAndMetadata.message();
    assertThat(value, is("message"));
}
项目:open-kilda    文件:SimpleKafkaTest.java   
@Test
public void shouldWriteThenRead() throws Exception {

    //Create a consumer
    ConsumerIterator<String, String> it = buildConsumer(SimpleKafkaTest.topic);

    //Create a producer
    producer = new KafkaProducer<>(producerProps());

    //send a message
    producer.send(new ProducerRecord<>(SimpleKafkaTest.topic, "message")).get();

    //read it back
    MessageAndMetadata<String, String> messageAndMetadata = it.next();
    String value = messageAndMetadata.message();
    assertThat(value, is("message"));
}
项目:tasfe-framework    文件:KafkaConsumerThread.java   
@Override
public void run() {
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    while (it.hasNext()) {
        MessageAndMetadata<byte[], byte[]> mam = it.next();
        String jsonStr = "";
        try {
            jsonStr = new String(mam.message());
            JSONObject jsonObject = JSONObject.parseObject(jsonStr);
            LogcenterConfig config = LogConfigCache.getLogConfigCache(jsonObject);
            IStorageApi iStorageApi = ServiceRegister.getInstance().getProvider(config.getStorageType());
            iStorageApi.save(jsonObject);
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("partition[" + mam.partition() + "]," + "offset[" + mam.offset() + "], " + jsonStr, e);
            continue;
        }
    }
}
项目:wngn-jms-kafka    文件:LogbackIntegrationIT.java   
@Test
public void testLogging() throws InterruptedException {

    for (int i = 0; i<1000; ++i) {
        logger.info("message"+i);
    }

    final KafkaStream<byte[], byte[]> log = kafka.createClient().createMessageStreamsByFilter(new Whitelist("logs"),1).get(0);
    final ConsumerIterator<byte[], byte[]> iterator = log.iterator();

    for (int i=0; i<1000; ++i) {
        final String messageFromKafka = new String(iterator.next().message(), UTF8);
        assertThat(messageFromKafka, Matchers.equalTo("message"+i));
    }

}
项目:DataProcessPlatformKafkaJavaSDK    文件:KafkaConsumerTransducer.java   
@Override
public void run() {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(transducer_topic, new Integer(1));

    StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
    StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

    Map<String, List<KafkaStream<String, String>>> consumerMap =
            consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
    KafkaStream<String, String> stream = consumerMap.get(transducer_topic).get(0);
    ConsumerIterator<String, String> it = stream.iterator();
    while (it.hasNext() && bStartConsume){
        transducerDataProcessor.newData(it.next().message());

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
项目:iotdb-jdbc    文件:KafkaConsumer.java   
public void run() {
    ConsumerIterator<String, String> it = stream.iterator();
    while (it.hasNext()) {
        MessageAndMetadata<String, String> consumerIterator = it.next();
        String uploadMessage = consumerIterator.message();
        System.out.println(Thread.currentThread().getName()
                + " from partiton[" + consumerIterator.partition() + "]: "
                + uploadMessage);
        try {
            sendDataToIotdb.writeData(uploadMessage); // upload data to the IoTDB database

        } catch (Exception ex) {
            System.out.println("SQLException: " + ex.getMessage());
        }
    }
}
项目:storm-demos    文件:KafkaDataSpout.java   
public void nextTuple() {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(TopologyConfig.kafkaTopic, 1);//one excutor - one thread
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = conn.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(kafkaTopic);
    ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator();
    while(true){
        while(iter.hasNext()){

            String s = new String(iter.next().message());
            collector.emit(new Values(s));

            UUID msgId = UUID.randomUUID();
            this.pending.put(msgId, new Values(s));
        }
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            logger.error("Spout : sleep wrong \n", e);
        }
    }
}
项目:koper    文件:KafkaReceiver.java   
private void processStreamsByTopic(String topicKeys, List<KafkaStream<byte[], byte[]>> streamList) {
    // init stream thread pool
    ExecutorService streamPool = Executors.newFixedThreadPool(partitions);
    String[] topics = StringUtils.split(topicKeys, ",");
    if (log.isDebugEnabled())
        log.debug("准备处理消息流集合 KafkaStreamList,topic count={},topics={}, partitions/topic={}", topics.length, topicKeys, partitions);

    //遍历stream
    AtomicInteger index = new AtomicInteger(0);
    for (KafkaStream<byte[], byte[]> stream : streamList) {
        Thread streamThread = new Thread() {

            @Override
            public void run() {
                int i = index.getAndAdd(1);
                if (log.isDebugEnabled())
                    log.debug("处理消息流KafkaStream -- No.={}, partitions={}", i, partitions + ":" + i);

                ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator();

                processStreamByConsumer(topicKeys, consumerIterator);
            }
        };
        streamPool.execute(streamThread);
    }
}
项目:light_drtc    文件:KafkaMqCollect.java   
public void collectMq(){
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
       topicCountMap.put(Constants.kfTopic, new Integer(1));

       StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
       StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

       Map<String, List<KafkaStream<String, String>>> consumerMap =
               consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);

       KafkaStream<String, String> stream = consumerMap.get(Constants.kfTopic).get(0);
       ConsumerIterator<String, String> it = stream.iterator();
       MessageAndMetadata<String, String> msgMeta;
       while (it.hasNext()){
        msgMeta = it.next();
        super.mqTimer.parseMqText(msgMeta.key(), msgMeta.message());
        //System.out.println(msgMeta.key()+"\t"+msgMeta.message());
       }
}
项目:vertx-kafka-service    文件:KafkaProducerServiceIntegrationTest.java   
private void consumeMessages() {
    final Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(TOPIC, 1);
    final StringDecoder decoder =
            new StringDecoder(new VerifiableProperties());
    final Map<String, List<KafkaStream<String, String>>> consumerMap =
            consumer.createMessageStreams(topicCountMap, decoder, decoder);
    final KafkaStream<String, String> stream =
            consumerMap.get(TOPIC).get(0);
    final ConsumerIterator<String, String> iterator = stream.iterator();

    Thread kafkaMessageReceiverThread = new Thread(
            () -> {
                while (iterator.hasNext()) {
                    String msg = iterator.next().message();
                    msg = msg == null ? "<null>" : msg;
                    System.out.println("got message: " + msg);
                    messagesReceived.add(msg);
                }
            },
            "kafkaMessageReceiverThread"
    );
    kafkaMessageReceiverThread.start();

}
项目:jlogstash-input-plugin    文件:KafkaDistributed.java   
public void run() {
    try {
        while(true){
            ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
            while (it.hasNext()) {
                String m = null;
                try {
                    m = new String(it.next().message(),
                            this.kafkaInput.encoding);
                    Map<String, Object> event = this.decoder
                            .decode(m);
                    if(zkDistributed==null){
                        this.kafkaInput.process(event);
                    }else{
                        zkDistributed.route(event);
                    }
                } catch (Exception e) {
                    logger.error("process event:{} failed:{}",m,ExceptionUtil.getErrorMessage(e));
                }
            }
        }
    } catch (Exception t) {
        logger.error("kakfa Consumer fetch is error:{}",ExceptionUtil.getErrorMessage(t));
    }
}
项目:jlogstash-input-plugin    文件:Kafka.java   
public void run() {
    try {
        while(true){
            ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
            while (it.hasNext()) {
                String m = null;
                try {
                    m = new String(it.next().message(),
                            this.kafkaInput.encoding);
                    Map<String, Object> event = this.decoder
                            .decode(m);
                    if (event!=null&&event.size()>0){
                        this.kafkaInput.process(event);
                    } 
                } catch (Exception e) {
                    logger.error("process event:{} failed:{}",m,e.getCause());
                }
            }
        }
    } catch (Exception t) {
        logger.error("kakfa Consumer fetch is error:{}",t.getCause());
    }
}
项目:iote2e    文件:KafkaAvroDemo.java   
public void run() {
    Iote2eRequestReuseItem iote2eRequestReuseItem = new Iote2eRequestReuseItem();
     ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
     while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
String key = new String(messageAndMetadata.key());
try {
        String summary = 
                "Thread " + threadNumber + 
                ", topic=" + messageAndMetadata.topic() + 
                ", partition=" + messageAndMetadata.partition() + 
                ", key=" + key + 
                ", offset=" + messageAndMetadata.offset() + 
                ", timestamp=" + messageAndMetadata.timestamp() + 
                ", timestampType=" + messageAndMetadata.timestampType() + 
                ", iote2eRequest=" + iote2eRequestReuseItem.fromByteArray(messageAndMetadata.message()).toString();
        logger.info(">>> Consumed: " + summary);
} catch( Exception e ) {
    logger.error(e.getMessage(), e);
}
     }
     logger.info(">>> Shutting down Thread: " + threadNumber);
 }
项目:iote2e    文件:KafkaStringDemo.java   
public void run() {
    ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
    while (it.hasNext()) {
        MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
        String key = new String(  messageAndMetadata.key() );
        String message = new String(  messageAndMetadata.message() );
        String summary = 
                "Thread " + threadNumber + 
                ", topic=" + messageAndMetadata.topic() + 
                ", partition=" + messageAndMetadata.partition() + 
                ", key=" + key + 
                ", message=" + message + 
                ", offset=" + messageAndMetadata.offset() + 
                ", timestamp=" + messageAndMetadata.timestamp() + 
                ", timestampType=" + messageAndMetadata.timestampType();
        logger.info(">>> Consumed: " + summary);
    }
    logger.info(">>> Shutting down Thread: " + threadNumber);
}
项目:iote2e    文件:AvroConsumerThread.java   
public void run() {
    try {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(User.getClassSchema());

        while (it.hasNext()) {
            MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
            String key = new String(messageAndMetadata.key());
            User user = genericRecordToUser(recordInjection.invert(messageAndMetadata.message()).get());
            // User user = (User)
            // recordInjection.invert(messageAndMetadata.message()).get();
            String summary = "Thread " + m_threadNumber + ", topic=" + messageAndMetadata.topic() + ", partition="
                    + messageAndMetadata.partition() + ", key=" + key + ", user=" + user.toString() + ", offset="
                    + messageAndMetadata.offset() + ", timestamp=" + messageAndMetadata.timestamp()
                    + ", timestampType=" + messageAndMetadata.timestampType();
            System.out.println(summary);
        }
        System.out.println("Shutting down Thread: " + m_threadNumber);
    } catch (Exception e) {
        System.out.println("Exception in thread "+m_threadNumber);
        System.out.println(e);
        e.printStackTrace();
    }
}
项目:iote2e    文件:ConsumerDemoThread.java   
public void run() {
    ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
    while (it.hasNext()) {
        MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
        String key = new String(  messageAndMetadata.key() );
        String message = new String(  messageAndMetadata.message() );
        String summary = 
                "Thread " + threadNumber + 
                ", topic=" + messageAndMetadata.topic() + 
                ", partition=" + messageAndMetadata.partition() + 
                ", key=" + key + 
                ", message=" + message + 
                ", offset=" + messageAndMetadata.offset() + 
                ", timestamp=" + messageAndMetadata.timestamp() + 
                ", timestampType=" + messageAndMetadata.timestampType();
        System.out.println(summary);
    }
    System.out.println("Shutting down Thread: " + threadNumber);
}
项目:cloudinsight-platform-docker    文件:CollectorTest.java   
public void recv() {
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());

    Map<String, Integer> topicMap = new HashMap<String, Integer>();
    topicMap.put(topic, new Integer(1));
    Map<String, List<KafkaStream<String, String>>> streamMap = consumer.createMessageStreams(topicMap, new StringDecoder(null), new StringDecoder(null));

    KafkaStream<String, String> stream = streamMap.get(topic).get(0);

    ConsumerIterator<String, String> it = stream.iterator();
    while (it.hasNext()) {
        MessageAndMetadata<String, String> mm = it.next();
        System.out.println("<<< Got new message");
        System.out.println("<<< key:" + mm.key());
        System.out.println("<<< m: " + mm.message());

    }
}
项目:cloudinsight-platform-docker    文件:CollectorTest.java   
public void recv() {
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());

    Map<String, Integer> topicMap = new HashMap<String, Integer>();
    topicMap.put(topic, new Integer(1));
    Map<String, List<KafkaStream<String, String>>> streamMap = consumer.createMessageStreams(topicMap, new StringDecoder(null), new StringDecoder(null));

    KafkaStream<String, String> stream = streamMap.get(topic).get(0);

    ConsumerIterator<String, String> it = stream.iterator();
    while (it.hasNext()) {
        MessageAndMetadata<String, String> mm = it.next();
        System.out.println("<<< Got new message");
        System.out.println("<<< key:" + mm.key());
        System.out.println("<<< m: " + mm.message());

    }
}
项目:watchtower-automation    文件:KafkaConsumerRunnable.java   
public void run() {
  logger.debug("KafkaChannel {} has stream", this.threadNumber);
  final ConsumerIterator<byte[], byte[]> streamIterator = stream.iterator();

  running = true;

  while (running) {
    try {
      if (streamIterator.hasNext()) {
        final byte[] message = streamIterator.next().message();

        logger.debug("Thread {}: {}", threadNumber, message.toString());

        consumeMessage(message);
      }
    } catch (ConsumerTimeoutException cte) {
      logger.debug("Timed out when consuming from Kafka", cte);

      KafkaHealthCheck.getInstance().heartAttack(cte.getMessage());
    }
  }
}
项目:sqoop-on-spark    文件:KafkaConsumer.java   
public MessageAndMetadata getNextMessage(String topic) {
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
  // it has only a single stream, because there is only one consumer
  KafkaStream stream = streams.get(0);
  final ConsumerIterator<byte[], byte[]> it = stream.iterator();
  int counter = 0;
  try {
    if (it.hasNext()) {
      return it.next();
    } else {
      return null;
    }
  } catch (ConsumerTimeoutException e) {
    logger.error("0 messages available to fetch for the topic " + topic);
    return null;
  }
}
项目:watchtower-workflow    文件:KafkaConsumerRunnable.java   
public void run() {
  logger.info("KafkaChannel {} has stream", this.threadNumber);

  final ConsumerIterator<byte[], byte[]> streamIterator = stream.iterator();

  running = true;

  while (running) {
    try {
      if (streamIterator.hasNext()) {
        MessageAndMetadata<byte[], byte[]> messageAndMetadata = streamIterator.next();

        byte[] key = messageAndMetadata.key();
        byte[] message = messageAndMetadata.message();

        consumeMessage(key, message);
      }
    } catch (ConsumerTimeoutException cte) {
      logger.debug("Timed out when consuming from Kafka", cte);

      KafkaHealthCheck.getInstance().heartAttack(cte.getMessage());
    }
  }
}
项目:data-acquisition    文件:KafkaRequestIdQueue.java   
/**
 * Modified example from kafka site with some defensive checks added.
 */
private ConsumerIterator<String, String> getStreamIterator() {
    Map<String, Integer> topicCountMap = ImmutableMap.of(topic, TOPIC_COUNT);
    Map<String, List<KafkaStream<String, String>>> consumerMap =
            consumer.createMessageStreams(topicCountMap, keyDecoder, msgDecoder);
    List<KafkaStream<String, String>> streams = consumerMap.get(topic);
    Preconditions.checkNotNull(streams, "There is no topic named : " + topic);
    //copy in case of live list returned. Needed for index check below.
    ImmutableList<KafkaStream<String, String>> streamsCopy = ImmutableList.copyOf(streams);

    Preconditions.checkElementIndex(FIRST_ELEMENT_INDEX, streamsCopy.size(),
            "Failed to find any KafkaStreams related to topic : " + topic);
    KafkaStream<String, String> stream = streamsCopy.get(FIRST_ELEMENT_INDEX);

    Preconditions.checkNotNull(stream, "Returned kafka stream is null");

    ConsumerIterator<String, String> iterator = stream.iterator();
    Preconditions.checkNotNull(iterator, "Returned kafka iterator is null");
    return iterator;
}
项目:VoltDB    文件:KafkaLoader.java   
@Override
public void run() {
    ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
    while (it.hasNext()) {
        MessageAndMetadata<byte[], byte[]> md = it.next();
        byte msg[] = md.message();
        long offset = md.offset();
        String smsg = new String(msg);
        try {
            m_loader.insertRow(new RowWithMetaData(smsg, offset), m_csvParser.parseLine(smsg));
        } catch (Exception ex) {
            m_log.error("Consumer stopped", ex);
            System.exit(1);
        }
    }
}
项目:kafka-consumer    文件:InFluxAvroConsumer.java   
/**
   * To avoid too many try-catches this is separate..
   */
  public void runrun() {
    log.info("Waiting to consume data");
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    //loop through all messages in the stream
    while (it.hasNext()) {
      byte[] msg = it.next().message();
      if (msg.length < 2) {
        //valid messages are longer than 2 bytes as the first one is schema id
        //once upon time some libraries (pypro) would start with a short message to try if the kafka topic was alive. this is what topic polling refers to.
        log.info("ignoring short msg, assuming topic polling");
        continue;
      }
//      log.trace("Thread " + id + ":: " + Arrays.toString(msg));
      process(msg);
    }
    log.info("Shutting down consumer Thread: " + id);
  }
项目:cep    文件:Consumer.java   
/**
 * Starts the consumer thread.
 */

@Override
public void run() {
    log.debug("Starting consumer for topic {}", topic);
    ConsumerIterator<byte[], byte[]> it = stream.iterator();

    // For each message present on the partition...
    while (it.hasNext()) {
        Map<String, Object> event = null;

        // Parse it with the parser associated with the topic
        try {
            event = parser.parse(new String(it.next().message(), "UTF-8"));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        // Send it to the source
        if (event != null) {
            source.send(topic.getName(), event);
        }
    }

    log.debug("Finished consumer for topic {}", topic);
}
项目:easyframe-msg    文件:KafkaHelper.java   
/**消费消息  [指定Topic]
 * 
 * @param topicName 队列名称
 * @param groupId Group Name
 * @return
 */
static MsgIterator consume(String topicName, String groupId) {
    ConsumerConnector consumerConnector = KafkaHelper.getConsumer(groupId);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();    //(topic, #stream) pair
    topicCountMap.put(topicName, new Integer(1));

    //TODO: 可消费多个topic
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); //Using default decoder
    List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get(topicName);  //The number of items in the list is #streams, Each Stream supoorts an iterator over message/metadata pair
    KafkaStream<byte[], byte[]> stream = streamList.get(0);

    //KafkaStream[K,V] K代表partitio Key的类型,V代表Message Value的类型
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    MsgIterator iter = new MsgIterator(it);
    return iter;
}
项目:easyframe-msg    文件:KafkaHelper.java   
/**消费消息  [指定Topic] 指定线程
 * 
 * @param topicName 队列名称
 * @param numStreams Number of streams to return
 * @return A list of MsgIterator each of which provides an iterator over message over allowed topics
 */
static List<MsgIterator> consume(String topicName, int numStreams, String groupId) {
    ConsumerConnector consumerConnector = KafkaHelper.getConsumer(groupId);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();    //(topic, #stream) pair
    topicCountMap.put(topicName, numStreams);

    //TODO: 可消费多个topic
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); //Using default decoder
    List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get(topicName);  //The number of items in the list is #streams, Each Stream supoorts an iterator over message/metadata pair

    List<MsgIterator> iterList = new ArrayList<MsgIterator>();
    for (KafkaStream<byte[], byte[]> stream : streamList) {
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        MsgIterator iter = new MsgIterator(it);
        iterList.add(iter);
    }

    //KafkaStream[K,V] K代表partitio Key的类型,V代表Message Value的类型
    return iterList;
}
项目:datacollector    文件:KafkaDestinationSinglePartitionPipelineRunIT.java   
@Override
protected int getRecordsInTarget() {
  int expectedRecordsInTarget = 0;
  for(KafkaStream<byte[], byte[]> kafkaStream : kafkaStreams) {
    ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
    try {
      while (it.hasNext()) {
        expectedRecordsInTarget++;
        it.next();
      }
    } catch (kafka.consumer.ConsumerTimeoutException e) {
      //no-op
    }
  }
  return expectedRecordsInTarget;
}
项目:datacollector    文件:KafkaDestinationMultiPartitionPipelineRunIT.java   
@Override
protected int getRecordsInTarget() {
  int expectedRecordsInTarget = 0;
  for(KafkaStream<byte[], byte[]> kafkaStream : kafkaStreams) {
    ConsumerIterator<byte[], byte[]> it = kafkaStream.iterator();
    try {
      while (it.hasNext()) {
        expectedRecordsInTarget++;
        it.next();
      }
    } catch (kafka.consumer.ConsumerTimeoutException e) {
      //no-op
    }
  }
  return expectedRecordsInTarget;
}
项目:netty-kafka-producer    文件:DataBrokerTest.java   
@Test
public void test_sendMessage() throws Exception {
    createTopic(topic);

    CountDownLatch latch = new CountDownLatch(1);
    ProducerProperties properties = new ProducerProperties();
    properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true);

    DataKafkaBroker dataChannel = new DataKafkaBroker("localhost", START_PORT, 0, topic,new NioEventLoopGroup(), properties);
    dataChannel.connect().sync();

    dataChannel.send(freeLaterBuffer("1".getBytes()), 0, freeLaterBuffer(TEST_MESSAGE.getBytes()));

    final KafkaStream<byte[], byte[]> stream = consume(topic).get(0);
    final ConsumerIterator<byte[], byte[]> messages = stream.iterator();

    Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE));

    dataChannel.disconnect();
}
项目:netty-kafka-producer    文件:ListenerTest.java   
@Test
public void test_no_acks_send_message() throws Exception {

    String topic = "test_no_acks_send_message";

    createTopic(topic, 1);
    ProducerProperties properties = new ProducerProperties();
    properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true);
    properties.override(ProducerProperties.DATA_ACK, Acknowledgment.WAIT_FOR_NO_ONE);
    KafkaProducer producer = new KafkaProducer("localhost", START_PORT, topic, properties);
    producer.connect().sync();
    KafkaTopic kafkaTopic = producer.topic();

    kafkaTopic.send(null, freeLaterBuffer(TEST_MESSAGE.getBytes()));

    final List<KafkaStream<byte[], byte[]>> consume = consume(topic);
    final KafkaStream<byte[], byte[]> stream = consume.get(0);
    final ConsumerIterator<byte[], byte[]> messages = stream.iterator();
    Assert.assertThat(TEST_MESSAGE, is(new String(messages.next().message())));
    producer.disconnect().sync();
}
项目:netty-kafka-producer    文件:KafkaTopicSingleBrokerTest.java   
@Test
public void test_producer() throws Exception {

    String topic = "test";
    ProducerProperties properties = new ProducerProperties();
    properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true);
    createTopic(topic);

    KafkaProducer producer = new KafkaProducer("localhost", START_PORT, topic, properties);
    producer.connect().sync();
    KafkaTopic kafkaTopic = producer.topic();

    kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "01").getBytes()));
    kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "02").getBytes()));
    kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "03").getBytes()));

    final KafkaStream<byte[], byte[]> stream = consume(topic).get(0);
    final ConsumerIterator<byte[], byte[]> messages = stream.iterator();

    Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "01"));
    Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "02"));
    Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "03"));
    producer.disconnect().sync();
}
项目:yuzhouwan    文件:ConsumerWorker.java   
@Override
public void run() {
    ConsumerIterator<byte[], byte[]> iter = kafkaStream.iterator();
    MessageAndMetadata<byte[], byte[]> msg;
    int total = 0, fail = 0, success = 0;
    long start = System.currentTimeMillis();
    while (iter.hasNext()) {
        try {
            msg = iter.next();
            _log.info("Thread {}: {}", threadNum, new String(msg.message(), "utf-8"));
            _log.info("partition: {}, offset: {}", msg.partition(), msg.offset());
            success++;
        } catch (Exception e) {
            _log.error("{}", e);
            fail++;
        }
        _log.info("Count [fail/success/total]: [{}/{}/{}], Time: {}s", fail, success, ++total,
                (System.currentTimeMillis() - start) / 1000);
    }
}
项目:java-kafka    文件:KafkaLogAppenderTest.java   
@Test
public void testKafkaLogAppender() {
    Properties consumerProps = new Properties();
    consumerProps.put("zookeeper.connect", zookeeper);
    consumerProps.put("group.id", "kafka-log-appender-test");
    consumerProps.put("auto.offset.reset", "smallest");
    consumerProps.put("schema.registry.url", schemaRegistry);

    Map<String, Integer> topicMap = new HashMap<String, Integer>();
    topicMap.put(topic, 1);

    ConsumerIterator<String, Object> iterator = Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProps))
            .createMessageStreams(topicMap, new StringDecoder(null), new KafkaAvroDecoder(new VerifiableProperties(consumerProps)))
            .get(topic).get(0).iterator();

    String testMessage = "I am a test message";
    logger.info(testMessage);

    MessageAndMetadata<String, Object> messageAndMetadata = iterator.next();
    GenericRecord logLine = (GenericRecord) messageAndMetadata.message();
    assertEquals(logLine.get("line").toString(), testMessage);
    assertEquals(logLine.get("logtypeid"), KafkaLogAppender.InfoLogTypeId);
    assertNotNull(logLine.get("source"));
    assertEquals(((Map<CharSequence, Object>) logLine.get("timings")).size(), 1);
    assertEquals(((Map<CharSequence, Object>) logLine.get("tag")).size(), 2);
}
项目:LogRTA    文件:KafkaSpoutTest.java   
public void activate() {         
    consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); 
    Map<String,Integer> topickMap = new HashMap<String, Integer>();  
    topickMap.put(topic, 1);  

    System.out.println("*********Results********topic:"+topic);  

    Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap=consumer.createMessageStreams(topickMap);  
    KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);  
    ConsumerIterator<byte[],byte[]> it =stream.iterator();   
    while(it.hasNext()){  
         String value =new String(it.next().message());
         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd日 HH:mm:ss SSS");
         Date curDate = new Date(System.currentTimeMillis());       
         String str = formatter.format(curDate);   
         System.out.println("storm接收到来自kafka的消息--->" + value);
         collector.emit(new Values(value,1,str), value);
    }  
}
项目:java-kafka-client-libs    文件:ConsumerThread.java   
@Override
public void run() {
    LOG.info( "Consuming thread started" );

    try {
        ConsumerIterator<byte[], byte[]> it = _stream.iterator();
        while ( it.hasNext() ) {
            long start = System.currentTimeMillis();
            byte[] message = it.next().message();
            LOG.debug( "message received: {}", ( new String( message ) ) );

            _handler.onMessage( message );

            long time = System.currentTimeMillis() - start;
            KruxStdLib.STATSD.time( "message_received." + _topic, time );

        }
    } catch ( Exception e ) {
        if ( e instanceof InterruptedException ) {
            LOG.warn( "Consumer group threads interrupted, shutting down" );
        } else {
            LOG.error( "no longer fetching messages", e );
        }
    }
}
项目:java.study    文件:SimpleHLConsumer.java   
public void testConsumer() throws Exception {
    String fileName = "logX.txt";
    BufferedWriter out = new BufferedWriter(new FileWriter(fileName));

    Map<String, Integer> topicCount = new HashMap<String, Integer>();
    topicCount.put(topic, 1);

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
    List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
    for (final KafkaStream stream : streams) {
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            String msg = new String(it.next().message(), Charset.forName("UTF-8"));
            System.out.println("Message from Single Topic: " + msg);
            out.write(msg, 0, msg.length());
            out.write('\n');
            out.flush();
        }
    }
    if (consumer != null) {
        consumer.shutdown();
    }
    if (null != out) {
        out.close();
    }
}
项目:monasca-thresh    文件:KafkaSpout.java   
@Override
public void run() {
  while (this.shouldContinue) {
    final ConsumerIterator<byte[], byte[]> it = streams.get(0).iterator();
    if (it.hasNext()) {
      final byte[] message = it.next().message();
      synchronized (this) {
        this.message = message;
        // Wake up getMessage() if it is waiting
        if (this.waiting) {
          notify();
        }
        while (this.message != null && this.shouldContinue)
          try {
            wait();
          } catch (InterruptedException e) {
            logger.info("Wait interrupted", e);
          }
      }
    }
  }
  logger.info("readerThread {} exited", this.readerThread.getName());
  this.readerThread = null;
}