Java 类org.apache.kafka.clients.producer.ProducerRecord 实例源码

项目:java-kafka-client    文件:TracingKafkaProducer.java   
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
  /*
  // Create wrappedRecord because headers can be read only in record (if record is sent second time)
  ProducerRecord<K, V> wrappedRecord = new ProducerRecord<>(record.topic(),
      record.partition(),
      record.timestamp(),
      record.key(),
      record.value(),
      record.headers());
  */

  try (Scope scope = buildAndInjectSpan(record)) {
    Callback wrappedCallback = new TracingCallback(callback, scope);
    return producer.send(record, wrappedCallback);
  }
}
项目:reactive-components    文件:TestKafka.java   
@Ignore
@Test
public void testWithString() throws Exception {
    KafkaComponent kafka = createComponent();

    Mono<List<String>> receive = Flux.from(kafka.from(TOPIC2, String.class))
        .take(2)
        .collectList();

    Subscriber<ProducerRecord> toTopic = kafka.to(TOPIC2, ProducerRecord.class);
    Flux.just(new ProducerRecord<String, String>(TOPIC2, "1", "test"),
              new ProducerRecord<String, String>(TOPIC2, "1", "test2"))
        .subscribe(toTopic);

    List<String> received = receive.block(Duration.ofSeconds(10));
    Assert.assertEquals(2, received.size());
    Assert.assertEquals("test", received.get(0));
    Assert.assertEquals("test2", received.get(1));
}
项目:reactive-components    文件:TestKafka.java   
@Ignore
@Test
public void testWithConsumerRecord() throws Exception {
    KafkaComponent kafka = createComponent();

    Mono<List<Object>> receive = Flux.from(kafka.from(TOPIC1, ConsumerRecord.class))
        .map(record -> record.value())
        .take(2)
        .collectList();

    Subscriber<ProducerRecord> toTopic = kafka.to(TOPIC1, ProducerRecord.class);
    Flux.just(new ProducerRecord<String, String>(TOPIC1, "1", "test"),
              new ProducerRecord<String, String>(TOPIC1, "1", "test2"))
        .subscribe(toTopic);

    List<Object> received = receive.block(Duration.ofSeconds(10));
    Assert.assertEquals(2, received.size());
    Assert.assertEquals("test", received.get(0));
}
项目:kafka-0.11.0.0-src-with-comment    文件:Producer.java   
public void run() {
    int messageNo = 1;
    while (true) {
        String messageStr = "Message_" + messageNo;
        long startTime = System.currentTimeMillis();
        if (isAsync) { // Send asynchronously
            producer.send(new ProducerRecord<>(topic,
                messageNo,
                messageStr), new DemoCallBack(startTime, messageNo, messageStr));
        } else { // Send synchronously
            try {
                producer.send(new ProducerRecord<>(topic,
                    messageNo,
                    messageStr)).get();
                System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        ++messageNo;
    }
}
项目:wngn-jms-kafka    文件:AsynchronousDeliveryStrategy.java   
@Override
public <K, V, E> boolean send(Producer<K, V> producer, ProducerRecord<K, V> record, final E event,
                              final FailedDeliveryCallback<E> failedDeliveryCallback) {
    try {
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    failedDeliveryCallback.onFailedDelivery(event, exception);
                }
            }
        });
        return true;
    } catch (BufferExhaustedException e) {
        failedDeliveryCallback.onFailedDelivery(event, e);
        return false;
    }
}
项目:oryx2    文件:ProduceData.java   
public void start() throws InterruptedException {
  RandomGenerator random = RandomManager.getRandom();

  Properties props = ConfigUtils.keyValueToProperties(
      "bootstrap.servers", "localhost:" + kafkaPort,
      "key.serializer", "org.apache.kafka.common.serialization.StringSerializer",
      "value.serializer", "org.apache.kafka.common.serialization.StringSerializer",
      "compression.type", "gzip",
      "batch.size", 0,
      "acks", 1,
      "max.request.size", 1 << 26 // TODO
  );
  try (Producer<String,String> producer = new KafkaProducer<>(props)) {
    for (int i = 0; i < howMany; i++) {
      Pair<String,String> datum = datumGenerator.generate(i, random);
      ProducerRecord<String,String> record =
          new ProducerRecord<>(topic, datum.getFirst(), datum.getSecond());
      producer.send(record);
      log.debug("Sent datum {} = {}", record.key(), record.value());
      if (intervalMsec > 0) {
        Thread.sleep(intervalMsec);
      }
    }
  }
}
项目:Lagerta    文件:KafkaFactoryForTests.java   
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    switch (method.getName()) {
        case SEND_METHOD_NAME: {
            ProducerRecord record = (ProducerRecord) args[0];

            args[0] = new ProducerRecord<>(
                    BaseIntegrationTest.adjustTopicNameForTest(record.topic()),
                    record.partition(),
                    record.timestamp(),
                    record.key(),
                    record.value()
            );
            break;
        }
        case PARTITIONS_FOR_METHOD_NAME: {
            args[0] = BaseIntegrationTest.adjustTopicNameForTest((String) args[0]);
            break;
        }
    }
    return method.invoke(producer, args);
}
项目:wngn-jms-kafka    文件:LatestProducer.java   
public static void main(String[] args) {

        Map<String, Object> config = new HashMap<String, Object>();
        config.put("partitioner.class", "com.wngn.kafka.SimpleKeyPartition");
        LatestProducer producer = LatestProducer.getInstance(ProducerConstants.TOPIC_KAFKA_TEST, config);
        ProducerRecord<String, String> record = null;
        long index = 0L;
        boolean controller = true;
        while (controller) {
            controller = false;
            index++;
            System.out.println(index + "------------");
            try {
                String message = "message_" + index;
                RecordMetadata recordMetadata = producer.sendWithSync("1", message);
                System.out.format("PARTITION: %d OFFSET: %d\n", recordMetadata.partition(), recordMetadata.offset());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.close();
    }
项目:kafka-0.11.0.0-src-with-comment    文件:RocksDBKeyValueStoreSupplierTest.java   
@Test
public void shouldNotBeLoggingEnabledStoreWhenLoggingNotEnabled() throws Exception {
    store = createStore(false, false);
    final List<ProducerRecord> logged = new ArrayList<>();
    final NoOpRecordCollector collector = new NoOpRecordCollector() {
        @Override
        public <K, V> void send(final String topic,
                                K key,
                                V value,
                                Integer partition,
                                Long timestamp,
                                Serializer<K> keySerializer,
                                Serializer<V> valueSerializer) {
            logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
        }
    };
    final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
                                                                  Serdes.String(),
                                                                  Serdes.String(),
                                                                  collector,
                                                                  cache);
    context.setTime(1);
    store.init(context, store);
    store.put("a", "b");
    assertTrue(logged.isEmpty());
}
项目:kmq    文件:KmqClient.java   
public ConsumerRecords<K, V> nextBatch() {
    List<Future<RecordMetadata>> markerSends = new ArrayList<>();

    // 1. Get messages from topic, in batches
    ConsumerRecords<K, V> records = msgConsumer.poll(msgPollTimeout);
    for (ConsumerRecord<K, V> record : records) {
        // 2. Write a "start" marker. Collecting the future responses.
        markerSends.add(markerProducer.send(
                new ProducerRecord<>(config.getMarkerTopic(),
                        MarkerKey.fromRecord(record),
                        new StartMarker(config.getMsgTimeoutMs()))));
    }

    // Waiting for a confirmation that each start marker has been sent
    markerSends.forEach(f -> {
        try { f.get(); } catch (Exception e) { throw new RuntimeException(e); }
    });

    // 3. after all start markers are sent, commit offsets. This needs to be done as close to writing the
    // start marker as possible, to minimize the number of double re-processed messages in case of failure.
    msgConsumer.commitSync();

    return records;
}
项目:talk-kafka-messaging-logs    文件:ProduceConsumeLongByteArrayRecord.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<Long, byte[]> producer = new KafkaProducer<>(properties);

    LongStream.rangeClosed(1, 100).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC, //topic
                            number, //key
                            String.format("record-%s", number.toString()).getBytes())) //value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:doctorkafka    文件:KafkaAvroPublisher.java   
public void publish(BrokerStats brokerStats) throws IOException {
  try {
    ByteArrayOutputStream stream = new ByteArrayOutputStream();
    BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream, null);

    avroEventWriter.write(brokerStats, binaryEncoder);
    binaryEncoder.flush();
    IOUtils.closeQuietly(stream);

    String key = brokerStats.getName() + "_" + System.currentTimeMillis();
    int numPartitions = kafkaProducer.partitionsFor(destTopic).size();
    int partition = brokerStats.getId() % numPartitions;

    Future<RecordMetadata> future = kafkaProducer.send(
        new ProducerRecord(destTopic, partition, key.getBytes(), stream.toByteArray()));
    future.get();

    OpenTsdbMetricConverter.incr("kafka.stats.collector.success", 1, "host=" + HOSTNAME);
  } catch (Exception e) {
    LOG.error("Failure in publish stats", e);
    OpenTsdbMetricConverter.incr("kafka.stats.collector.failure", 1, "host=" + HOSTNAME);
    throw new RuntimeException("Avro serialization failure", e);
  }
}
项目:spark2.0    文件:KafkaSendMessage.java   
public static void  sendStringMessage() throws Exception{
    Properties props = new Properties();
    props.put("bootstrap.servers", servers);
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);

    //没有任何分区,默认1个分区,发送消息
    int i=0;
    while(i<1000){
        Thread.sleep(1000L);
        String message = "zhangsan"+i;
        producer.send(new ProducerRecord<>("NL_U_APP_ALARM_APP_STRING",message));
        i++;
        producer.flush();
    }
    producer.close();
}
项目:fluid    文件:KafkaSourceTest.java   
private void checkMulticast(KafkaUsage usage, String topic, KafkaSource<Integer> source) {
  List<Integer> resultsA = new ArrayList<>();
  List<Integer> resultsB = new ArrayList<>();
  source
    .transformPayload(i -> i + 1)
    .to(Sink.forEachPayload(resultsB::add));

  source
    .transformPayload(i -> i + 1)
    .to(Sink.forEachPayload(resultsA::add));

  AtomicInteger counter = new AtomicInteger();
  usage.produceIntegers(10, null,
    () -> new ProducerRecord<>(topic, counter.getAndIncrement()));

  await().atMost(1, TimeUnit.MINUTES).until(() -> resultsA.size() >= 10);
  await().atMost(1, TimeUnit.MINUTES).until(() -> resultsB.size() >= 10);
  assertThat(resultsA).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  assertThat(resultsB).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
项目:fluid    文件:KafkaSourceTest.java   
@Test
public void testSource() throws InterruptedException {
  KafkaUsage usage = new KafkaUsage();
  String topic = UUID.randomUUID().toString();
  List<Integer> results = new ArrayList<>();
  KafkaSource<Integer> source = new KafkaSource<>(vertx,
    getKafkaConfig()
      .put("topic", topic)
      .put("value.serializer", IntegerSerializer.class.getName())
      .put("value.deserializer", IntegerDeserializer.class.getName())
  );
  source
    .transformPayload(i -> i + 1)
    .to(Sink.forEachPayload(results::add));

  AtomicInteger counter = new AtomicInteger();
  usage.produceIntegers(10, null,
    () -> new ProducerRecord<>(topic, counter.getAndIncrement()));

  await().atMost(1, TimeUnit.MINUTES).until(() -> results.size() >= 10);
  assertThat(results).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
项目:kafka-0.11.0.0-src-with-comment    文件:RocksDBWindowStoreSupplierTest.java   
@Test
public void shouldCreateLoggingEnabledStoreWhenWindowStoreLogged() throws Exception {
    store = createStore(true, false);
    final List<ProducerRecord> logged = new ArrayList<>();
    final NoOpRecordCollector collector = new NoOpRecordCollector() {
        @Override
        public <K, V> void send(final String topic,
                                K key,
                                V value,
                                Integer partition,
                                Long timestamp,
                                Serializer<K> keySerializer,
                                Serializer<V> valueSerializer) {
            logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
        }
    };
    final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
                                                                  Serdes.String(),
                                                                  Serdes.String(),
                                                                  collector,
                                                                  cache);
    context.setTime(1);
    store.init(context, store);
    store.put("a", "b");
    assertFalse(logged.isEmpty());
}
项目:wechat-mall    文件:OrderProducer.java   
@Override
public void send(Long k, byte[] v) {
    KafkaProducer<Long, byte[]> p = getWorker();
    p.initTransactions();
    p.beginTransaction();
    Future<RecordMetadata> res = worker.send(new ProducerRecord<Long, byte[]>(topic, k, v));
    RecordMetadata record;
    try {
        record = res.get();
        offsets.clear();
        offsets.put(new TopicPartition(topic, record.partition()), new OffsetAndMetadata(record.offset()));
        p.sendOffsetsToTransaction(offsets, MallConstants.ORDER_GROUP);
        p.commitTransaction();
    } catch (InterruptedException | ExecutionException e) {
        p.abortTransaction();
    }
}
项目:open-kilda    文件:Original.java   
@Test
public void shouldWriteThenRead() throws Exception {

    //Create a consumer
    ConsumerIterator<String, String> it = buildConsumer(Original.topic);

    //Create a producer
    producer = new KafkaProducer<>(producerProps());

    //send a message
    producer.send(new ProducerRecord<>(Original.topic, "message")).get();

    //read it back
    MessageAndMetadata<String, String> messageAndMetadata = it.next();
    String value = messageAndMetadata.message();
    assertThat(value, is("message"));
}
项目:open-kilda    文件:SimpleKafkaTest.java   
@Test
public void shouldWriteThenRead() throws Exception {

    //Create a consumer
    ConsumerIterator<String, String> it = buildConsumer(SimpleKafkaTest.topic);

    //Create a producer
    producer = new KafkaProducer<>(producerProps());

    //send a message
    producer.send(new ProducerRecord<>(SimpleKafkaTest.topic, "message")).get();

    //read it back
    MessageAndMetadata<String, String> messageAndMetadata = it.next();
    String value = messageAndMetadata.message();
    assertThat(value, is("message"));
}
项目:kiqr    文件:TestDriver.java   
public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.LongSerializer");
    props.put("linger.ms", 0);

    KafkaProducer<String, Long> producer = new KafkaProducer<>(props);

    for(int i=0; i < 10000; i++){
        String ip = "127.0.0." + i % 10;
        System.out.println(ip);
        producer.send(new ProducerRecord<>("visits", ip, System.currentTimeMillis() + i));
    }

    producer.close();

}
项目:Building-Data-Streaming-Applications-with-Apache-Kafka    文件:IPLogProducer.java   
@Override
public void run() {
    PropertyReader propertyReader = new PropertyReader();

    Properties producerProps = new Properties();
    producerProps.put("bootstrap.servers", propertyReader.getPropertyValue("broker.list"));
    producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producerProps.put("auto.create.topics.enable", "true");

    KafkaProducer<String, String> ipProducer = new KafkaProducer<String, String>(producerProps);

    BufferedReader br = readFile();
    String oldLine = "";
    try {
        while ((oldLine = br.readLine()) != null) {
            String line = getNewRecordWithRandomIP(oldLine).replace("[", "").replace("]", "");
            ProducerRecord ipData = new ProducerRecord<String, String>(propertyReader.getPropertyValue("topic"), line);
            Future<RecordMetadata> recordMetadata = ipProducer.send(ipData);
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
    ipProducer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:RocksDBKeyValueStoreSupplierTest.java   
@Test
public void shouldCreateLoggingEnabledStoreWhenStoreLogged() throws Exception {
    store = createStore(true, false);
    final List<ProducerRecord> logged = new ArrayList<>();
    final NoOpRecordCollector collector = new NoOpRecordCollector() {
        @Override
        public <K, V> void send(final String topic,
                                K key,
                                V value,
                                Integer partition,
                                Long timestamp,
                                Serializer<K> keySerializer,
                                Serializer<V> valueSerializer) {
            logged.add(new ProducerRecord<K, V>(topic, partition, timestamp, key, value));
        }
    };
    final MockProcessorContext context = new MockProcessorContext(TestUtils.tempDirectory(),
                                                                  Serdes.String(),
                                                                  Serdes.String(),
                                                                  collector,
                                                                  cache);
    context.setTime(1);
    store.init(context, store);
    store.put("a", "b");
    assertFalse(logged.isEmpty());
}
项目:kafka-0.11.0.0-src-with-comment    文件:QueryableStateIntegrationTest.java   
@Override
public void run() {
    final Properties producerConfig = new Properties();
    producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
    producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
    producerConfig.put(ProducerConfig.RETRIES_CONFIG, 10);
    producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

    try (final KafkaProducer<String, String> producer =
                 new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer())) {

        while (getCurrIteration() < numIterations && !shutdown) {
            for (final String value : inputValues) {
                producer.send(new ProducerRecord<String, String>(topic, value));
            }
            incrementInteration();
        }
    }
}
项目:wngn-jms-kafka    文件:QuotationThreadProducer.java   
public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(3);
    StockQuotationInfo quotationInfo = null;
    ProducerRecord<String, String> record = null;
    try {
        int num = 0;
        for (int i = 0; i < MSG_SIZE; i++) {
            quotationInfo = createQuotationInfo();
            record = new ProducerRecord<String, String>(TOPIC, null, quotationInfo.getTradeTime(), quotationInfo
                    .getStockCode(), quotationInfo.toString());
            executor.execute(new KafkaProducerThread(producer, record));
        }
        Thread.sleep(2000L);
    } catch (Exception e) {
        logger.error("Send message occurs exception", e);
    } finally {
        producer.close();
        executor.shutdown();
    }
}
项目:trellis-rosid    文件:EventProducer.java   
private Optional<ProducerRecord<String, String>> buildMembershipMessage(final String topic, final IRI resource,
        final Resource parent, final Dataset dataset) throws Exception {
    try (final Dataset data = rdf.createDataset()) {
        if (DirectContainer.equals(parent.getInteractionModel())) {
            parent.getMembershipResource().ifPresent(member -> {
                parent.getMemberRelation().ifPresent(relation ->
                    data.add(rdf.createQuad(PreferMembership, member, relation, resource)));
                parent.getMemberOfRelation().ifPresent(relation ->
                    data.add(rdf.createQuad(PreferMembership, resource, relation, member)));
            });
        } else if (IndirectContainer.equals(parent.getInteractionModel())) {
            parent.getMembershipResource().ifPresent(member ->
                parent.getMemberRelation().ifPresent(relation ->
                    parent.getInsertedContentRelation().ifPresent(inserted ->
                        dataset.stream(of(PreferUserManaged), null, inserted, null).sequential().forEachOrdered(q ->
                            data.add(rdf.createQuad(PreferMembership, member, relation, q.getObject()))))));
        }
        final Optional<String> key = data.stream(of(PreferMembership), null, null, null).map(Quad::getSubject)
            .filter(x -> x instanceof IRI).map(x -> (IRI) x).map(IRI::getIRIString).findFirst();
        if (key.isPresent()) {
            dataset.stream(of(PreferAudit), null, null, null).map(auditTypeMapper).forEachOrdered(data::add);
            return of(new ProducerRecord<>(topic, key.get(), serialize(data)));
        }
        return empty();
    }
}
项目:spring-tutorial    文件:SynchronousProducer.java   
/**
 * @param args
 */
public static void main(String[] args) {

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092,localhost:9093");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    KafkaProducer<String, String> sampleProducer = new KafkaProducer<String, String>(props);
    ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName,key, value);

    try {

        RecordMetadata metaData = sampleProducer.send(record).get();
        System.out.printf("Message sent to Partition No. %d and offset %d", metaData.partition(), metaData.offset());
    } catch (Exception e) {
        LOGGER.error("exception occured while sending message to broker", e);
    } finally {
        sampleProducer.close();

    }
}
项目:spring-tutorial    文件:SampleProducer.java   
/**
     * @param args
     */
    public static void main(String[] args) {

        Properties props=new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9093");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String,String> sampleProducer= new KafkaProducer<String,String>(props);

//      ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName, value);       
//      sampleProducer.send(record);
        for (int i = 0; i < 10; i++)
            sampleProducer.send(new ProducerRecord<String, String>("demo-topic1","Data:"+ Integer.toString(i)));
        sampleProducer.close();
    }
项目:kafka-0.11.0.0-src-with-comment    文件:WorkerSourceTaskTest.java   
@Test(expected = InvalidRecordException.class)
public void testSendRecordsCorruptTimestamp() throws Exception {
    final Long timestamp = -3L;
    createWorkerTask();

    List<SourceRecord> records = Collections.singletonList(
            new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp)
    );

    Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecordAnyTimes();

    PowerMock.replayAll();

    Whitebox.setInternalState(workerTask, "toSend", records);
    Whitebox.invokeMethod(workerTask, "sendRecords");
    assertEquals(null, sent.getValue().timestamp());

    PowerMock.verifyAll();
}
项目:kafka-webview    文件:WebKafkaConsumerTest.java   
public void publishDummyData() {
    final String topic = "TestTopic";

    // Create publisher
    final Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    final KafkaProducer<String, String> producer = new KafkaProducer<>(config);
    for (int charCode = 65; charCode < 91; charCode++) {
        final char[] key = new char[1];
        key[0] = (char) charCode;

        producer.send(new ProducerRecord<>(topic, new String(key), new String(key)));
    }
    producer.flush();
    producer.close();
}
项目:kafka-webview    文件:WebKafkaConsumerTest.java   
public void publishDummyDataNumbers() {
    final String topic = "NumbersTopic";

    // Create publisher
    final Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    final KafkaProducer<Integer, Integer> producer = new KafkaProducer<>(config);
    for (int value = 0; value < 10000; value++) {
        producer.send(new ProducerRecord<>(topic, value, value));
    }
    producer.flush();
    producer.close();
}
项目:wngn-jms-kafka    文件:ArvoQuotationProducer.java   
public static void main(String[] args) {

        logger.info("---------");
        ProducerRecord<String, AvroStockQuotation> record = null;
        AvroStockQuotation quotation = null;
        try {
            int num = 0;
            for (int i = 0; i < MSG_SIZE; i++) {
                quotation = createQuotationInfo();
                record = new ProducerRecord<String, AvroStockQuotation>(KafkaConstants.TOPIC_AVRO_STOCK_QUOTATION,
                        (String) quotation.getStockCode(), quotation);
                QuotationCallback callback = new QuotationCallback();
                System.out.println("--------" + num);
                producer.send(record);
                if (num++ % 10 == 0) {
                    Thread.sleep(2000L);
                }
            }
        } catch (InterruptedException e) {
            logger.error("Send message occurs exception", e);
        } finally {
            producer.close();
        }
    }
项目:Lagerta    文件:ReconcilerImpl.java   
private void processSingleRecord(List<Long> txIds, ConsumerRecord<ByteBuffer, ByteBuffer> record) {
    long txId = record.timestamp();
    boolean found = txIds.remove(txId);
    if (found) {
        ProducerRecord<ByteBuffer, ByteBuffer> producerRecord =
                new ProducerRecord<>(clusterConfig.getGapTopic(), record.key(), record.value());
        producer.send(producerRecord);
    }
}
项目:kafka-streams-machine-learning-examples    文件:IntegrationTestUtils.java   
/**
 * @param topic          Kafka topic to write the data records to
 * @param records        Data records to write to Kafka
 * @param producerConfig Kafka producer configuration
 * @param <K>            Key type of the data records
 * @param <V>            Value type of the data records
 */
public static <K, V> void produceKeyValuesSynchronously(
    String topic, Collection<KeyValue<K, V>> records, Properties producerConfig)
    throws ExecutionException, InterruptedException {
  Producer<K, V> producer = new KafkaProducer<>(producerConfig);
  for (KeyValue<K, V> record : records) {
    Future<RecordMetadata> f = producer.send(
        new ProducerRecord<>(topic, record.key, record.value));
    f.get();
  }
  producer.flush();
  producer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:WorkerSourceTask.java   
private synchronized void finishSuccessfulFlush() {
    // If we were successful, we can just swap instead of replacing items back into the original map
    IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> temp = outstandingMessages;
    outstandingMessages = outstandingMessagesBacklog;
    outstandingMessagesBacklog = temp;
    flushing = false;
}
项目:kafka-0.11.0.0-src-with-comment    文件:ProcessorTopologyTestDriver.java   
/**
 * Read the next record from the given topic. These records were output by the topology during the previous calls to
 * {@link #process(String, byte[], byte[])}.
 *
 * @param topic the name of the topic
 * @param keyDeserializer the deserializer for the key type
 * @param valueDeserializer the deserializer for the value type
 * @return the next record on that topic, or null if there is no record available
 */
public <K, V> ProducerRecord<K, V> readOutput(final String topic,
                                              final Deserializer<K> keyDeserializer,
                                              final Deserializer<V> valueDeserializer) {
    final ProducerRecord<byte[], byte[]> record = readOutput(topic);
    if (record == null) {
        return null;
    }
    final K key = keyDeserializer.deserialize(record.topic(), record.key());
    final V value = valueDeserializer.deserialize(record.topic(), record.value());
    return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value);
}
项目:kafka-0.11.0.0-src-with-comment    文件:VerifiableProducer.java   
/** Produce a message with given key and value. */
public void send(String key, String value) {
    ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, key, value);
    numSent++;
    try {
        producer.send(record, new PrintInfoCallback(key, value));
    } catch (Exception e) {

        synchronized (System.out) {
            System.out.println(errorString(e, key, value, System.currentTimeMillis()));
        }
    }
}
项目:Lagerta    文件:LocalKafkaWriter.java   
public void writeTransactions(TransactionWrapper txWrapper) {
    long transactionId = txWrapper.id();
    int partition = TransactionMessageUtil.partitionFor(txWrapper.deserializedMetadata(), partitions);

    producer.send(new ProducerRecord<>(topic, partition, transactionId, txWrapper.metadata(), txWrapper.data()),
            new LoggingErrorHandler());
}
项目:Practical-Real-time-Processing-and-Analytics    文件:DataGenerator.java   
public static void main(String args[]) {
    Properties properties = new Properties();

    properties.put("bootstrap.servers", "localhost:9092");
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("acks", "1");

    KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);
    int counter =0;
    int nbrOfEventsRequired = Integer.parseInt(args[0]);
    while (counter<nbrOfEventsRequired) {
        StringBuffer stream = new StringBuffer();

        long phoneNumber = ThreadLocalRandom.current().nextLong(9999999950l,
                9999999960l);
        int bin = ThreadLocalRandom.current().nextInt(1000, 9999);
        int bout = ThreadLocalRandom.current().nextInt(1000, 9999);

        stream.append(phoneNumber);
        stream.append(",");
        stream.append(bin);
        stream.append(",");
        stream.append(bout);
        stream.append(",");
        stream.append(new Date(ThreadLocalRandom.current().nextLong()));

        System.out.println(stream.toString());
        ProducerRecord<Integer, String> data = new ProducerRecord<Integer, String>(
                "storm-trident-diy", stream.toString());
        producer.send(data);
        counter++;
    }

    producer.close();
}
项目:SkyEye    文件:KafkaAppender.java   
/**
     * 心跳检测开始
     */
    public void heartbeatStart() {
        // 心跳检测定时器初始化
        this.timer = new Timer();
        this.timer.schedule(new TimerTask() {
            @Override
            public void run() {
                byte[] key = ByteBuffer.allocate(4).putInt(Constants.HEARTBEAT_KEY.hashCode()).array();
                final ProducerRecord<byte[], String> record = new ProducerRecord<>(topic, key, Constants.HEARTBEAT_VALUE);

                // java 8 lambda
//                LazySingletonProducer.getInstance(config).send(record, (RecordMetadata recordMetadata, Exception e) -> {
                // logic code
//                });

                LazySingletonProducer.getInstance(config).send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (null == e) {
                            // 更新flag状态
                            flag.compareAndSet(false, true);
                            // 如果没有发生异常, 说明kafka从异常状态切换为正常状态, 将开始状态设置为true
                            started = true;
                            addStatus(new InfoStatus("kafka send normal in appender", this, e));
                            // 关闭心跳检测机制
                            KafkaAppender.this.heartbeatStop();
                            zkRegister.write(Constants.SLASH + app + Constants.SLASH + host, NodeMode.EPHEMERAL,
                                    String.valueOf(Constants.APP_APPENDER_RESTART_KEY + Constants.SEMICOLON + System.currentTimeMillis()) + Constants.SEMICOLON + SysUtil.userDir);
                        }
                    }
                });
            }
        }, 10000,60000);
    }
项目:apache-kafka-demos    文件:PerformanceProducer.java   
public static void main(String[] args) throws InterruptedException {

        Properties props = new Properties();
        props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ACKS_CONFIG, "all");
        props.put(RETRIES_CONFIG, 0);
        props.put(BATCH_SIZE_CONFIG, 0);
        props.put(LINGER_MS_CONFIG, 0);
        props.put(BUFFER_MEMORY_CONFIG, 33554432);
        props.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");


        Producer<String, String> producer = new KafkaProducer<>(props);

        JsonObject json = Json.createObjectBuilder()
                .add("windrad", 6)
                .add("kw/h",33)
                .build();

        String msg= json.toString();

        long t1 = System.currentTimeMillis();

        for(int i = 1; i <= 10; i++) {

            String key = String.valueOf(round(random() * 1000));
            double value = new Double(round(random()*10000000L)).intValue()/1000.0;

            producer.send(new ProducerRecord<>("produktion", key,msg ));
        }
        System.out.println("Zeit: " + (System.currentTimeMillis() - t1)/1000 + "s");

        producer.close();
    }