Java 类kafka.consumer.KafkaStream 实例源码

项目:Kafka-Insight    文件:KafkaOffsetGetter.java   
/**
 * When an object implementing interface <code>Runnable</code> is used
 * to create a thread, starting the thread causes the object's
 * <code>run</code> method to be called in that separately executing
 * thread.
 * <p>
 * The general contract of the method <code>run</code> is that it may
 * take any action whatsoever.
 *
 * @see Thread#run()
 */
@Override
public void run() {
    ConsumerConnector consumerConnector = KafkaUtils.createConsumerConnector(zkAddr, group);
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(CONSUMER_OFFSET_TOPIC, new Integer(1));
    KafkaStream<byte[], byte[]> offsetMsgStream = consumerConnector.createMessageStreams(topicCountMap).get(CONSUMER_OFFSET_TOPIC).get(0);

    ConsumerIterator<byte[], byte[]> it = offsetMsgStream.iterator();
    while (true) {

        MessageAndMetadata<byte[], byte[]> offsetMsg = it.next();
        if (ByteBuffer.wrap(offsetMsg.key()).getShort() < 2) {
            try {
                GroupTopicPartition commitKey = readMessageKey(ByteBuffer.wrap(offsetMsg.key()));
                if (offsetMsg.message() == null) {
                    continue;
                }
                kafka.common.OffsetAndMetadata commitValue = readMessageValue(ByteBuffer.wrap(offsetMsg.message()));
                kafkaConsumerOffsets.put(commitKey, commitValue);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
项目:flume-release-1.7.0    文件:KafkaConsumer.java   
public MessageAndMetadata getNextMessage(String topic) {
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
  // it has only a single stream, because there is only one consumer
  KafkaStream stream = streams.get(0);
  final ConsumerIterator<byte[], byte[]> it = stream.iterator();
  int counter = 0;
  try {
    if (it.hasNext()) {
      return it.next();
    } else {
      return null;
    }
  } catch (ConsumerTimeoutException e) {
    logger.error("0 messages available to fetch for the topic " + topic);
    return null;
  }
}
项目:wngn-jms-kafka    文件:LogbackIntegrationIT.java   
@Test
public void testLogging() throws InterruptedException {

    for (int i = 0; i<1000; ++i) {
        logger.info("message"+i);
    }

    final KafkaStream<byte[], byte[]> log = kafka.createClient().createMessageStreamsByFilter(new Whitelist("logs"),1).get(0);
    final ConsumerIterator<byte[], byte[]> iterator = log.iterator();

    for (int i=0; i<1000; ++i) {
        final String messageFromKafka = new String(iterator.next().message(), UTF8);
        assertThat(messageFromKafka, Matchers.equalTo("message"+i));
    }

}
项目:DataProcessPlatformKafkaJavaSDK    文件:KafkaConsumerTransducer.java   
@Override
public void run() {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(transducer_topic, new Integer(1));

    StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
    StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

    Map<String, List<KafkaStream<String, String>>> consumerMap =
            consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
    KafkaStream<String, String> stream = consumerMap.get(transducer_topic).get(0);
    ConsumerIterator<String, String> it = stream.iterator();
    while (it.hasNext() && bStartConsume){
        transducerDataProcessor.newData(it.next().message());

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
项目:iotdb-jdbc    文件:KafkaConsumer.java   
void consume() throws Exception {
    // specify the number of consumer threads
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(KafkaProducer.TOPIC, new Integer(threadsNum));

    // specify data decoder
    StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
    StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

    Map<String, List<KafkaStream<String, String>>> consumerMap = consumer
            .createMessageStreams(topicCountMap, keyDecoder, valueDecoder); // 三个String分别为TOPIC、Key、Value

    // acquire data
    List<KafkaStream<String, String>> streams = consumerMap.get(KafkaProducer.TOPIC);

    // multi-threaded consume
    executor = Executors.newFixedThreadPool(threadsNum);    //create a thread pool
    for (final KafkaStream<String, String> stream : streams) {
        executor.submit(new ConsumerThread(stream));        // run thread
    }
}
项目:linden    文件:KafkaDataProvider.java   
public KafkaDataProvider(String zookeeper, String topic, String groupId) {
  super(MessageAndMetadata.class);
  Properties props = new Properties();
  props.put("zookeeper.connect", zookeeper);
  props.put("group.id", groupId);
  props.put("zookeeper.session.timeout.ms", "30000");
  props.put("auto.commit.interval.ms", "1000");
  props.put("fetch.message.max.bytes", "4194304");
  consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
  Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  topicCountMap.put(topic, 1);
  Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);

  iter = stream.iterator();
}
项目:storm-demos    文件:KafkaDataSpout.java   
public void nextTuple() {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(TopologyConfig.kafkaTopic, 1);//one excutor - one thread
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = conn.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(kafkaTopic);
    ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator();
    while(true){
        while(iter.hasNext()){

            String s = new String(iter.next().message());
            collector.emit(new Values(s));

            UUID msgId = UUID.randomUUID();
            this.pending.put(msgId, new Values(s));
        }
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            logger.error("Spout : sleep wrong \n", e);
        }
    }
}
项目:koper    文件:KafkaReceiver.java   
/**
 * 启动 MessageReceiver,开始监听topic消息
 */
@Override
public void start() {

    if (consumer == null) {
        //sync init
        synchronized (lock) {
            init();
        }
    }

    String topicString = buildTopicsString();

    Whitelist topicFilter = new Whitelist(topicString);
    List<KafkaStream<byte[], byte[]>> streamList = consumer.createMessageStreamsByFilter(topicFilter, partitions);

    if (org.apache.commons.collections.CollectionUtils.isEmpty(streamList))
        try {
            TimeUnit.MILLISECONDS.sleep(1);
        } catch (InterruptedException e) {
            log.warn(e.getMessage(), e);
        }
    processStreamsByTopic(topicString, streamList);

}
项目:koper    文件:KafkaReceiver.java   
private void processStreamsByTopic(String topicKeys, List<KafkaStream<byte[], byte[]>> streamList) {
    // init stream thread pool
    ExecutorService streamPool = Executors.newFixedThreadPool(partitions);
    String[] topics = StringUtils.split(topicKeys, ",");
    if (log.isDebugEnabled())
        log.debug("准备处理消息流集合 KafkaStreamList,topic count={},topics={}, partitions/topic={}", topics.length, topicKeys, partitions);

    //遍历stream
    AtomicInteger index = new AtomicInteger(0);
    for (KafkaStream<byte[], byte[]> stream : streamList) {
        Thread streamThread = new Thread() {

            @Override
            public void run() {
                int i = index.getAndAdd(1);
                if (log.isDebugEnabled())
                    log.debug("处理消息流KafkaStream -- No.={}, partitions={}", i, partitions + ":" + i);

                ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator();

                processStreamByConsumer(topicKeys, consumerIterator);
            }
        };
        streamPool.execute(streamThread);
    }
}
项目:dmaap-framework    文件:KafkaConsumer.java   
/**
 * KafkaConsumer() is constructor. It has following 4 parameters:-
 * @param topic
 * @param group
 * @param id
 * @param cc
 * 
 */

public KafkaConsumer(String topic, String group, String id, ConsumerConnector cc) {
    fTopic = topic;
    fGroup = group;
    fId = id;
    fConnector = cc;

    fCreateTimeMs = System.currentTimeMillis();
    fLastTouch = fCreateTimeMs;

    fLogTag = fGroup + "(" + fId + ")/" + fTopic;
    offset = 0;

    state = KafkaConsumer.State.OPENED;

    final Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(fTopic, 1);
    final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = fConnector
            .createMessageStreams(topicCountMap);
    final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(fTopic);
    fStream = streams.iterator().next();
}
项目:light_drtc    文件:KafkaMqCollect.java   
public void collectMq(){
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
       topicCountMap.put(Constants.kfTopic, new Integer(1));

       StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
       StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

       Map<String, List<KafkaStream<String, String>>> consumerMap =
               consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);

       KafkaStream<String, String> stream = consumerMap.get(Constants.kfTopic).get(0);
       ConsumerIterator<String, String> it = stream.iterator();
       MessageAndMetadata<String, String> msgMeta;
       while (it.hasNext()){
        msgMeta = it.next();
        super.mqTimer.parseMqText(msgMeta.key(), msgMeta.message());
        //System.out.println(msgMeta.key()+"\t"+msgMeta.message());
       }
}
项目:javacode-demo    文件:ConsumerGroupExample.java   
public void run(int a_numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<>();
    topicCountMap.put(topic, new Integer(a_numThreads));

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(a_numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new ConsumerThread(consumer, stream, threadNumber));
        threadNumber++;
    }
}
项目:EasyMessage    文件:ConsumerEngine.java   
public void run(Decoder<K> keyDecoder, Decoder<V> valueDecoder){
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
       topicCountMap.put(topic, threadNum);
       Map<String, List<KafkaStream<K, V>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);

       List<KafkaStream<K, V>> streams = consumerMap.get(topic);

       executor = Executors.newFixedThreadPool(threadNum);

       int threadNo = 0;
    for (final KafkaStream<K, V> stream : streams) {
        ConsumerWorker<K, V> worker = new ConsumerWorker<K, V>(stream, threadNo);
        executor.submit(worker);
        threadNo++;
       }
}
项目:vertx-kafka-service    文件:KafkaProducerServiceIntegrationTest.java   
private void consumeMessages() {
    final Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(TOPIC, 1);
    final StringDecoder decoder =
            new StringDecoder(new VerifiableProperties());
    final Map<String, List<KafkaStream<String, String>>> consumerMap =
            consumer.createMessageStreams(topicCountMap, decoder, decoder);
    final KafkaStream<String, String> stream =
            consumerMap.get(TOPIC).get(0);
    final ConsumerIterator<String, String> iterator = stream.iterator();

    Thread kafkaMessageReceiverThread = new Thread(
            () -> {
                while (iterator.hasNext()) {
                    String msg = iterator.next().message();
                    msg = msg == null ? "<null>" : msg;
                    System.out.println("got message: " + msg);
                    messagesReceived.add(msg);
                }
            },
            "kafkaMessageReceiverThread"
    );
    kafkaMessageReceiverThread.start();

}
项目:kclient    文件:KafkaConsumer.java   
public void startup() {
    if (status != Status.INIT) {
        log.error("The client has been started.");
        throw new IllegalStateException("The client has been started.");
    }

    status = Status.RUNNING;

    log.info("Streams num: " + streams.size());
    tasks = new ArrayList<AbstractMessageTask>();
    for (KafkaStream<String, String> stream : streams) {
        AbstractMessageTask abstractMessageTask = (fixedThreadNum == 0 ? new SequentialMessageTask(
                stream, handler) : new ConcurrentMessageTask(stream,
                handler, fixedThreadNum));
        tasks.add(abstractMessageTask);
        streamThreadPool.execute(abstractMessageTask);
    }
}
项目:custom-java-kafka-producer-consumer    文件:ConsumerGroupExample.java   
public void run(int a_numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<>();
    //这个版本的kafka的这个消费线程填写要注意。否则收不到数据
    topicCountMap.put(topic, new Integer(a_numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // 启动所有线程
    executor = Executors.newFixedThreadPool(a_numThreads);

    // 开始消费消息
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new ConsumerTest(stream, threadNumber));
        threadNumber++;
    }
}
项目:geomesa-tutorials    文件:OSMKafkaSpout.java   
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
    _collector = spoutOutputCollector;
    Properties props = new Properties();
    props.put("zookeeper.connect", conf.get(OSMIngest.ZOOKEEPERS));
    props.put("group.id", groupId);
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, 1);
    Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap, new StringDecoder(new VerifiableProperties()), new StringDecoder(new VerifiableProperties()));
    List<KafkaStream<String, String>> streams = consumerMap.get(topic);
    KafkaStream<String, String> stream = null;
    if (streams.size() == 1) {
        stream = streams.get(0);
    } else {
        log.error("Streams should be of size 1");
    }
    kafkaIterator = stream.iterator();
}
项目:jlogstash-input-plugin    文件:KafkaDistributed.java   
public void addNewConsumer(String topic, Integer threads){
    ConsumerConnector consumer = consumerConnMap.get(topic);
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = null;

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, threads);
    consumerMap = consumer.createMessageStreams(topicCountMap);

    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
    ExecutorService executor = Executors.newFixedThreadPool(threads);

    for (final KafkaStream<byte[], byte[]> stream : streams) {
        executor.submit(new Consumer(stream, this));
    }

    executorMap.put(topic, executor);
}
项目:jlogstash-input-plugin    文件:Kafka.java   
public void addNewConsumer(String topic, Integer threads){
    ConsumerConnector consumer = consumerConnMap.get(topic);
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = null;

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, threads);
    consumerMap = consumer.createMessageStreams(topicCountMap);

    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
    ExecutorService executor = Executors.newFixedThreadPool(threads);

    for (final KafkaStream<byte[], byte[]> stream : streams) {
        executor.submit(new Consumer(stream, this));
    }

    executorMap.put(topic, executor);
}
项目:iote2e    文件:KafkaAvroDemo.java   
/**
 * Run.
 *
 * @param numThreads the num threads
 */
public void run(int numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new ConsumerDemoThread(stream, threadNumber, this));
        threadNumber++;
    }
}
项目:iote2e    文件:KafkaStringDemo.java   
/**
 * Run.
 *
 * @param numThreads the num threads
 */
public void run(int numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new ConsumerDemoThread(stream, threadNumber, this));
        threadNumber++;
    }
}
项目:iote2e    文件:AvroConsumerMaster.java   
/**
 * Run.
 *
 * @param a_numThreads the a num threads
 */
public void run(int a_numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(a_numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(a_numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new AvroConsumerThread(stream, threadNumber));
        threadNumber++;
    }
}
项目:iote2e    文件:AvroConsumerWeatherMaster.java   
/**
 * Run.
 *
 * @param numThreads the num threads
 */
public void run(int numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream<byte[], byte[]> stream : streams) {
        executor.submit(new AvroConsumerWeatherThread(stream, threadNumber));
        threadNumber++;
    }
}
项目:iote2e    文件:AvroConsumerWaveMaster.java   
/**
 * Run.
 *
 * @param numThreads the num threads
 */
public void run(int numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream<byte[], byte[]> stream : streams) {
        executor.submit(new AvroConsumerWaveThread(stream, threadNumber));
        threadNumber++;
    }
}
项目:iote2e    文件:ConsumerDemoMaster.java   
/**
 * Run.
 *
 * @param a_numThreads the a num threads
 */
public void run(int a_numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(a_numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(a_numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new ConsumerDemoThread(stream, threadNumber, this));
        threadNumber++;
    }
}
项目:cloudinsight-platform-docker    文件:CollectorTest.java   
public void recv() {
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());

    Map<String, Integer> topicMap = new HashMap<String, Integer>();
    topicMap.put(topic, new Integer(1));
    Map<String, List<KafkaStream<String, String>>> streamMap = consumer.createMessageStreams(topicMap, new StringDecoder(null), new StringDecoder(null));

    KafkaStream<String, String> stream = streamMap.get(topic).get(0);

    ConsumerIterator<String, String> it = stream.iterator();
    while (it.hasNext()) {
        MessageAndMetadata<String, String> mm = it.next();
        System.out.println("<<< Got new message");
        System.out.println("<<< key:" + mm.key());
        System.out.println("<<< m: " + mm.message());

    }
}
项目:cloudinsight-platform-docker    文件:CollectorTest.java   
public void recv() {
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());

    Map<String, Integer> topicMap = new HashMap<String, Integer>();
    topicMap.put(topic, new Integer(1));
    Map<String, List<KafkaStream<String, String>>> streamMap = consumer.createMessageStreams(topicMap, new StringDecoder(null), new StringDecoder(null));

    KafkaStream<String, String> stream = streamMap.get(topic).get(0);

    ConsumerIterator<String, String> it = stream.iterator();
    while (it.hasNext()) {
        MessageAndMetadata<String, String> mm = it.next();
        System.out.println("<<< Got new message");
        System.out.println("<<< key:" + mm.key());
        System.out.println("<<< m: " + mm.message());

    }
}
项目:sqoop-on-spark    文件:KafkaConsumer.java   
public MessageAndMetadata getNextMessage(String topic) {
  List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
  // it has only a single stream, because there is only one consumer
  KafkaStream stream = streams.get(0);
  final ConsumerIterator<byte[], byte[]> it = stream.iterator();
  int counter = 0;
  try {
    if (it.hasNext()) {
      return it.next();
    } else {
      return null;
    }
  } catch (ConsumerTimeoutException e) {
    logger.error("0 messages available to fetch for the topic " + topic);
    return null;
  }
}
项目:LogRTA    文件:KafkaSpoutTest.java   
public void activate() {         
    consumer =kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); 
    Map<String,Integer> topickMap = new HashMap<String, Integer>();  
    topickMap.put(topic, 1);  

    System.out.println("*********Results********topic:"+topic);  

    Map<String, List<KafkaStream<byte[],byte[]>>>  streamMap=consumer.createMessageStreams(topickMap);  
    KafkaStream<byte[],byte[]>stream = streamMap.get(topic).get(0);  
    ConsumerIterator<byte[],byte[]> it =stream.iterator();   
    while(it.hasNext()){  
         String value =new String(it.next().message());
         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd日 HH:mm:ss SSS");
         Date curDate = new Date(System.currentTimeMillis());       
         String str = formatter.format(curDate);   
         System.out.println("storm接收到来自kafka的消息--->" + value);
         collector.emit(new Values(value,1,str), value);
    }  
}
项目:netty-kafka-producer    文件:KafkaTopicSingleBrokerTest.java   
@Test
public void test_producer() throws Exception {

    String topic = "test";
    ProducerProperties properties = new ProducerProperties();
    properties.override(ProducerProperties.NETTY_DEBUG_PIPELINE, true);
    createTopic(topic);

    KafkaProducer producer = new KafkaProducer("localhost", START_PORT, topic, properties);
    producer.connect().sync();
    KafkaTopic kafkaTopic = producer.topic();

    kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "01").getBytes()));
    kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "02").getBytes()));
    kafkaTopic.send(null, freeLaterBuffer((TEST_MESSAGE + "03").getBytes()));

    final KafkaStream<byte[], byte[]> stream = consume(topic).get(0);
    final ConsumerIterator<byte[], byte[]> messages = stream.iterator();

    Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "01"));
    Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "02"));
    Assert.assertThat(new String(messages.next().message()), is(TEST_MESSAGE + "03"));
    producer.disconnect().sync();
}
项目:ingestion-service    文件:KafkaKinesisIntegrationTest.java   
public String getDataFromKafka(int a_numThreads) throws Exception {
    Map<String, Integer> topicCountMap = new HashMap<>();
    topicCountMap.put(TestConstants.topic, new Integer(a_numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(TestConstants.topic);


    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(a_numThreads);

    // now create an object to consume the messages
    //
    Future fut = null;
    int threadNumber = 0;
    ConsumerTest consumerTest = null;
    for (final KafkaStream stream : streams) {
        consumerTest = new ConsumerTest(stream, threadNumber);
        fut = executor.submit(consumerTest);
        threadNumber++;
    }

    Object result = fut.get();
    return consumerTest.getResult();
}
项目:data-acquisition    文件:KafkaRequestIdQueue.java   
/**
 * Modified example from kafka site with some defensive checks added.
 */
private ConsumerIterator<String, String> getStreamIterator() {
    Map<String, Integer> topicCountMap = ImmutableMap.of(topic, TOPIC_COUNT);
    Map<String, List<KafkaStream<String, String>>> consumerMap =
            consumer.createMessageStreams(topicCountMap, keyDecoder, msgDecoder);
    List<KafkaStream<String, String>> streams = consumerMap.get(topic);
    Preconditions.checkNotNull(streams, "There is no topic named : " + topic);
    //copy in case of live list returned. Needed for index check below.
    ImmutableList<KafkaStream<String, String>> streamsCopy = ImmutableList.copyOf(streams);

    Preconditions.checkElementIndex(FIRST_ELEMENT_INDEX, streamsCopy.size(),
            "Failed to find any KafkaStreams related to topic : " + topic);
    KafkaStream<String, String> stream = streamsCopy.get(FIRST_ELEMENT_INDEX);

    Preconditions.checkNotNull(stream, "Returned kafka stream is null");

    ConsumerIterator<String, String> iterator = stream.iterator();
    Preconditions.checkNotNull(iterator, "Returned kafka iterator is null");
    return iterator;
}
项目:yuzhouwan    文件:ConsumerGroup.java   
private void run(int threadNum) {
    Map<String, Integer> topicCountMap = new HashMap<>();
    topicCountMap.put(topic, threadNum);
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    executor = Executors.newFixedThreadPool(threadNum);

    int threadNumber = 0;
    _log.info("the streams size is {}", streams.size());
    for (final KafkaStream<byte[], byte[]> stream : streams) {
        executor.submit(new ConsumerWorker(stream, threadNumber));
        consumer.commitOffsets();
        threadNumber++;
    }
}
项目:kafka-consumer    文件:CassandaAvroConsumer.java   
public CassandaAvroConsumer(SchemaRepository repo, KafkaStream stream) {
  this.repo = repo;
  this.stream = stream;
  this.id = nextId++;

  this.cluster = Cluster.builder()
          .addContactPoint(Config.cassandraUrl)
          .build();
  Metadata metadata = cluster.getMetadata();
  log.debug("Connected to cluster: %s\n", metadata.getClusterName());
  for (Host host : metadata.getAllHosts()) {
    log.debug("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
  }
  this.session = cluster.connect();
  createSchemas();
  createInserts();
}
项目:benchmarkio    文件:BlockingKafkaMessageConsumerCoordinator.java   
@Override
public CompletionService<Histogram> startConsumers() {
    final ConsumerConfig consumerConfig = new ConsumerConfig(props);

    consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

    // Create message streams
    final Map<String, Integer> topicMap = new HashMap<>();
    topicMap.put(topic, numThreads);

    final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicMap);
    final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // Pass each stream to a consumer that will read from the stream in its own thread.
    for (final KafkaStream<byte[], byte[]> stream : streams) {
        executorCompletionService.submit(new BlockingKafkaMessageConsumer(stream));
    }

    return executorCompletionService;
}
项目:Simple-Kafka    文件:ConsumerGroupExample.java   
public void run(int a_numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(a_numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(a_numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
        executor.submit(new ConsumerTest(stream, threadNumber));
        threadNumber++;
    }
}
项目:laughing-octo-sansa    文件:HighLevelExample.java   
public void run(int a_numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(a_numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(a_numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream<byte[], byte[]> stream : streams) {
        executor.submit(new ConsumerTest(stream, threadNumber));
        threadNumber++;
    }
}
项目:easyframe-msg    文件:KafkaHelper.java   
/**消费消息  [指定Topic]
 * 
 * @param topicName 队列名称
 * @param groupId Group Name
 * @return
 */
static MsgIterator consume(String topicName, String groupId) {
    ConsumerConnector consumerConnector = KafkaHelper.getConsumer(groupId);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();    //(topic, #stream) pair
    topicCountMap.put(topicName, new Integer(1));

    //TODO: 可消费多个topic
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); //Using default decoder
    List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get(topicName);  //The number of items in the list is #streams, Each Stream supoorts an iterator over message/metadata pair
    KafkaStream<byte[], byte[]> stream = streamList.get(0);

    //KafkaStream[K,V] K代表partitio Key的类型,V代表Message Value的类型
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    MsgIterator iter = new MsgIterator(it);
    return iter;
}
项目:easyframe-msg    文件:KafkaHelper.java   
/**消费消息  [指定Topic] 指定线程
 * 
 * @param topicName 队列名称
 * @param numStreams Number of streams to return
 * @return A list of MsgIterator each of which provides an iterator over message over allowed topics
 */
static List<MsgIterator> consume(String topicName, int numStreams, String groupId) {
    ConsumerConnector consumerConnector = KafkaHelper.getConsumer(groupId);

    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();    //(topic, #stream) pair
    topicCountMap.put(topicName, numStreams);

    //TODO: 可消费多个topic
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); //Using default decoder
    List<KafkaStream<byte[], byte[]>> streamList = consumerMap.get(topicName);  //The number of items in the list is #streams, Each Stream supoorts an iterator over message/metadata pair

    List<MsgIterator> iterList = new ArrayList<MsgIterator>();
    for (KafkaStream<byte[], byte[]> stream : streamList) {
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        MsgIterator iter = new MsgIterator(it);
        iterList.add(iter);
    }

    //KafkaStream[K,V] K代表partitio Key的类型,V代表Message Value的类型
    return iterList;
}
项目:performance-test-harness-for-geoevent    文件:KafkaConsumerGroup.java   
public void run(int numThreads) {
      Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
      topicCountMap.put(topic, new Integer(numThreads));
      Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
      List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

// now launch all the threads
      //
      executor = Executors.newFixedThreadPool(numThreads);

// now create an object to consume the messages
      //
      int threadNumber = 0;
      for (final KafkaStream stream : streams) {
          executor.submit(new ConsumerThread(stream, receiveGeoEvent, threadNumber));
          threadNumber++;
      }
  }