Java 类org.apache.kafka.common.serialization.ByteArraySerializer 实例源码

项目:samza    文件:TestStreamProcessor.java   
private void initProducer(String bootstrapServer) {
  producer = TestUtils.createNewProducer(
      bootstrapServer,
      1,
      60 * 1000L,
      1024L * 1024L,
      0,
      0L,
      5 * 1000L,
      SecurityProtocol.PLAINTEXT,
      null,
      Option$.MODULE$.apply(new Properties()),
      new StringSerializer(),
      new ByteArraySerializer(),
      Option$.MODULE$.apply(new Properties()));
}
项目:talk-kafka-messaging-logs    文件:ProduceConsumeLongByteArrayRecord.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<Long, byte[]> producer = new KafkaProducer<>(properties);

    LongStream.rangeClosed(1, 100).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC, //topic
                            number, //key
                            String.format("record-%s", number.toString()).getBytes())) //value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:ProduceConsumeStringAvroRecord.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<String, byte[]> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 100).boxed()
            .map(number -> new ProducerRecord<>(
                    TOPIC, //topic
                    number.toString(), //key
                    UserAvroSerdes.serialize(new User(String.format("user-%s", number.toString()))))) //value
            .forEach(record -> producer.send(record));
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:KafkaProducer.java   
public static void main(String[] args) {
    final ActorSystem system = ActorSystem.create("KafkaProducerSystem");

    final Materializer materializer = ActorMaterializer.create(system);

    final ProducerSettings<byte[], String> producerSettings =
            ProducerSettings
                    .create(system, new ByteArraySerializer(), new StringSerializer())
                    .withBootstrapServers("localhost:9092");

    CompletionStage<Done> done =
            Source.range(1, 100)
                    .map(n -> n.toString())
                    .map(elem ->
                            new ProducerRecord<byte[], String>(
                                    "topic1-ts",
                                    0,
                                    Instant.now().getEpochSecond(),
                                    null,
                                    elem))
                    .runWith(Producer.plainSink(producerSettings), materializer);

    done.whenComplete((d, ex) -> System.out.println("sent"));
}
项目:talk-kafka-messaging-logs    文件:Compaction.java   
private static void produceRecords() {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<Integer, byte[]> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 10000).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            1, //Key
                            KafkaProducerUtil.createMessage(1000))) //Value
            .forEach(record -> {
                producer.send(record);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    producer.close();
}
项目:talk-kafka-messaging-logs    文件:Retention.java   
private static void produceRecords(String bootstrapServers) {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Producer<Integer, byte[]> producer = new KafkaProducer<>(properties);

    IntStream.rangeClosed(1, 10000).boxed()
            .map(number ->
                    new ProducerRecord<>(
                            TOPIC,
                            1, //Key
                            KafkaProducerUtil.createMessage(1000))) //Value
            .forEach(record -> {
                producer.send(record);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    producer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaConfigBackingStore.java   
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final WorkerConfig config) {
    Map<String, Object> producerProps = new HashMap<>();
    producerProps.putAll(config.originals());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

    Map<String, Object> consumerProps = new HashMap<>();
    consumerProps.putAll(config.originals());
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

    Map<String, Object> adminProps = new HashMap<>(config.originals());
    NewTopic topicDescription = TopicAdmin.defineTopic(topic).
            compacted().
            partitions(1).
            replicationFactor(config.getShort(DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG)).
            build();

    return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminProps);
}
项目:kafka-0.11.0.0-src-with-comment    文件:WindowedStreamPartitionerTest.java   
@Test
public void testWindowedSerializerNoArgConstructors() {
    Map<String, String> props = new HashMap<>();
    // test key[value].serializer.inner.class takes precedence over serializer.inner.class
    WindowedSerializer<StringSerializer> windowedSerializer = new WindowedSerializer<>();
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1");
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
    props.put("key.serializer.inner.class", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("serializer.inner.class", "org.apache.kafka.common.serialization.StringSerializer");
    windowedSerializer.configure(props, true);
    Serializer<?> inner = windowedSerializer.innerSerializer();
    assertNotNull("Inner serializer should be not null", inner);
    assertTrue("Inner serializer type should be StringSerializer", inner instanceof StringSerializer);
    // test serializer.inner.class
    props.put("serializer.inner.class", "org.apache.kafka.common.serialization.ByteArraySerializer");
    props.remove("key.serializer.inner.class");
    props.remove("value.serializer.inner.class");
    WindowedSerializer<?> windowedSerializer1 = new WindowedSerializer<>();
    windowedSerializer1.configure(props, false);
    Serializer<?> inner1 = windowedSerializer1.innerSerializer();
    assertNotNull("Inner serializer should be not null", inner1);
    assertTrue("Inner serializer type should be ByteArraySerializer", inner1 instanceof ByteArraySerializer);
}
项目:kafka-0.11.0.0-src-with-comment    文件:SimpleBenchmark.java   
private Properties setProduceConsumeProperties(final String clientId) {
    Properties props = new Properties();
    props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
    // the socket buffer needs to be large, especially when running in AWS with
    // high latency. if running locally the default is fine.
    props.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    // the socket buffer needs to be large, especially when running in AWS with
    // high latency. if running locally the default is fine.
    props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS);
    return props;
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaProducerTest.java   
@Test
public void testConstructorFailureCloseResource() {
    Properties props = new Properties();
    props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
    props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar.local:9999");
    props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());

    final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
    final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
    try {
        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(
                props, new ByteArraySerializer(), new ByteArraySerializer());
    } catch (KafkaException e) {
        assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
        assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());
        assertEquals("Failed to construct kafka producer", e.getMessage());
        return;
    }
    fail("should have caught an exception and returned");
}
项目:SkyEye    文件:KafkaManager.java   
public KafkaManager(final LoggerContext loggerContext, final String name, final String topic, final String zkServers, final String mail, final  String rpc,
                    final String app, final String host, final Property[] properties) {
    super(loggerContext, name);
    this.topic = topic;
    this.zkServers = zkServers;
    this.mail = mail;
    this.rpc = rpc;
    this.app = app;
    this.orginApp = app;
    this.host = host;
    this.checkAndSetConfig(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    this.checkAndSetConfig(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 设置分区类, 使用自定义的KeyModPartitioner,同样的key进入相同的partition
    this.checkAndSetConfig(ProducerConfig.PARTITIONER_CLASS_CONFIG, KeyModPartitioner.class.getName());
    // xml配置里面的参数
    for (final Property property : properties) {
        this.config.put(property.getName(), property.getValue());
    }
    // 由于容器部署需要从外部获取host
    this.config.put(ProducerConfig.CLIENT_ID_CONFIG, this.app + Constants.MIDDLE_LINE + this.host + Constants.MIDDLE_LINE + "log4j2");
}
项目:kafka    文件:KafkaProducerTest.java   
@Test
public void testConstructorFailureCloseResource() {
    Properties props = new Properties();
    props.setProperty(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose");
    props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "some.invalid.hostname.foo.bar:9999");
    props.setProperty(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());

    final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
    final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
    try {
        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(
                props, new ByteArraySerializer(), new ByteArraySerializer());
    } catch (KafkaException e) {
        Assert.assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
        Assert.assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());
        Assert.assertEquals("Failed to construct kafka producer", e.getMessage());
        return;
    }
    Assert.fail("should have caught an exception and returned");
}
项目:li-apache-kafka-clients    文件:LiKafkaProducerIntegrationTest.java   
@Test
public void testZeroLengthValue() throws Exception {
  Properties producerPropertyOverrides = new Properties();
  producerPropertyOverrides.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

  try (LiKafkaProducer producer =  createProducer(producerPropertyOverrides)) {
    producer.send(new ProducerRecord<>("testZeroLengthValue", "key", new byte[0])).get();
  }
  Properties consumerProps = new Properties();
  consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

  try (LiKafkaConsumer consumer = createConsumer(consumerProps)) {
    consumer.subscribe(Collections.singleton("testZeroLengthValue"));
    long startMs = System.currentTimeMillis();
    ConsumerRecords records = ConsumerRecords.empty();
    while (records.isEmpty() && System.currentTimeMillis() < startMs + 30000) {
      records = consumer.poll(100);
    }
    assertEquals(1, records.count());
    ConsumerRecord record = (ConsumerRecord) records.iterator().next();
    assertEquals("key", record.key());
    assertEquals(((byte[]) record.value()).length, 0);
  }
}
项目:beam    文件:BeamKafkaTable.java   
@Override
public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
  checkArgument(topics != null && topics.size() == 1,
      "Only one topic can be acceptable as output.");

  return new PTransform<PCollection<BeamRecord>, PDone>() {
    @Override
    public PDone expand(PCollection<BeamRecord> input) {
      return input.apply("out_reformat", getPTransformForOutput()).apply("persistent",
          KafkaIO.<byte[], byte[]>write()
              .withBootstrapServers(bootstrapServers)
              .withTopic(topics.get(0))
              .withKeySerializer(ByteArraySerializer.class)
              .withValueSerializer(ByteArraySerializer.class));
    }
  };
}
项目:ingestion-service    文件:KafkaVerticle.java   
private KafkaProducer<String, byte[]> createKafkaProducer() {
    Properties props = new Properties();

    String kafkaServers = System.getenv().get("KAFKA_SERVERS");
    if (null == kafkaServers) {
        kafkaServers = Constants.KAFKA_SERVERS;
    }

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

    return producer;
}
项目:ingestion-service    文件:RecordProcessor.java   
/**
 * Configures a Kafka Producer.
 *
 * @return new Kafka Producer Object
 */
public KafkaProducer<String, byte[]> createProducer() {
    logger.info("Creating Kafka Producer");

    Properties props = new Properties();

    String kafkaServers = System.getenv().get("KAFKA_SERVERS");
    if (null == kafkaServers) {
        kafkaServers = Constants.KAFKA_SERVERS;
    }

    logger.info("Using Kafka servers: " + kafkaServers);

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.ACKS_CONFIG, "1");
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 200);

    return new KafkaProducer<>(props);
}
项目:ameliant-tools    文件:ConsumerDriverTest.java   
public ProducerDriver createProducerDriver(CountDownLatch latch, String topic, int messageCount) {
    Map<String, Object> producerConfigs = new ProducerConfigsBuilder()
            .bootstrapServers("127.0.0.1:" + broker.getPort())
            .requestRequiredAcks(ProducerConfigsBuilder.RequestRequiredAcks.ackFromLeader)
            .producerType(ProducerConfigsBuilder.ProducerType.sync)
            .keySerializer(ByteArraySerializer.class)
            .valueSerializer(ByteArraySerializer.class)
            .batchSize(0)
            .build();

    ProducerDefinition producerDefinition = new ProducerDefinition();
    producerDefinition.setConfig(producerConfigs);
    producerDefinition.setTopic(topic);
    producerDefinition.setMessageSize(100 * 1024);
    producerDefinition.setMessagesToSend(messageCount);
    producerDefinition.setSendBlocking(true);

    return new ProducerDriver(producerDefinition, latch);
}
项目:datacollector    文件:KafkaProducer09IT.java   
@Test
public void testKafkaProducer09WriteFailsRecordTooLarge() throws IOException, StageException {

  HashMap<String, Object> kafkaProducerConfigs = new HashMap<>();
  kafkaProducerConfigs.put("retries", 0);
  kafkaProducerConfigs.put("batch.size", 100);
  kafkaProducerConfigs.put("linger.ms", 0);
  kafkaProducerConfigs.put(KafkaConstants.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  kafkaProducerConfigs.put(KafkaConstants.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
  // Set the message size to 510 as "message.max.bytes" is set to 500
  final String message = StringUtils.leftPad("a", 510, "b");
  SdcKafkaProducer sdcKafkaProducer = createSdcKafkaProducer(port, kafkaProducerConfigs);
  sdcKafkaProducer.init();
  String topic = getNextTopic();
  sdcKafkaProducer.enqueueMessage(topic, message.getBytes(), "0");
  try {
    sdcKafkaProducer.write();
    fail("Expected exception but didn't get any");
  } catch (StageException se) {
    assertEquals(KafkaErrors.KAFKA_69, se.getErrorCode());
  } catch (Exception e) {
    fail("Expected Stage Exception but got " + e);
  }
}
项目:Aletheia    文件:KafkaBinarySender.java   
protected Properties getProducerConfig() {

    final Properties producerProperties = (Properties) kafkaTopicDeliveryEndPoint.getProperties().clone();

    if (producerProperties.getProperty("value.serializer") != null
        || producerProperties.getProperty("key.serializer") != null) {
      logger.warn("serializer cannot be provided as producer properties. "
          + "Overriding manually to be the correct serialization type.");
    }

    producerProperties.setProperty("key.serializer", StringSerializer.class.getName());
    producerProperties.setProperty("value.serializer", ByteArraySerializer.class.getName());

    producerProperties.setProperty("client.id",
        kafkaTopicDeliveryEndPoint.getProperties().getProperty("client.id", UUID.randomUUID().toString()));

    producerProperties.setProperty("bootstrap.servers", kafkaTopicDeliveryEndPoint.getBrokerList());
    producerProperties.setProperty("metric.reporters", "com.outbrain.aletheia.datum.metrics.kafka.KafkaMetrics");

    logger.warn("Using producer config: {}", producerProperties);

    return producerProperties;
  }
项目:otj-logging    文件:KafkaAppender.java   
@Override
public void start()
{
    super.start();

    // Important to not initialize this until we are started, because ServerInfo itself logs...
    if (clientId == null) {
        clientId = OptionalServerInfo.getDefaultClientName(this::addError);
    }

    final Properties config = new Properties();
    config.put("bootstrap.servers", brokerList);
    config.put("acks", "1");
    config.put("compression.type", compressionCodec);
    config.put("client.id", clientId);
    producer = new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer());
}
项目:incubator-gobblin    文件:KafkaProducerPusher.java   
public KafkaProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
  this.closer = Closer.create();

  this.topic = topic;

  Properties props = new Properties();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
  props.put(ProducerConfig.ACKS_CONFIG, "1");

  // add the kafka scoped config. if any of the above are specified then they are overridden
  if (kafkaConfig.isPresent()) {
    props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
  }

  this.producer = createProducer(props);
}
项目:incubator-gobblin    文件:KafkaProducerPusher.java   
public KafkaProducerPusher(String brokers, String topic, Optional<Config> kafkaConfig) {
  this.closer = Closer.create();

  this.topic = topic;

  Properties props = new Properties();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
  props.put(ProducerConfig.ACKS_CONFIG, "1");

  // add the kafka scoped config. if any of the above are specified then they are overridden
  if (kafkaConfig.isPresent()) {
    props.putAll(ConfigUtils.configToProperties(kafkaConfig.get()));
  }

  this.producer = createProducer(props);
}
项目:cruise-control    文件:KafkaSampleStore.java   
@Override
public void configure(Map<String, ?> config) {
  _partitionMetricSampleStoreTopic = (String) config.get(PARTITION_METRIC_SAMPLE_STORE_TOPIC_CONFIG);
  _brokerMetricSampleStoreTopic = (String) config.get(BROKER_METRIC_SAMPLE_STORE_TOPIC_CONFIG);
  if (_partitionMetricSampleStoreTopic == null
      || _brokerMetricSampleStoreTopic == null
      || _partitionMetricSampleStoreTopic.isEmpty()
      || _brokerMetricSampleStoreTopic.isEmpty()) {
    throw new IllegalArgumentException("The sample store topic names must be configured.");
  }
  String numProcessingThreadsString = (String) config.get(NUM_SAMPLE_LOADING_THREADS);
  int numProcessingThreads = numProcessingThreadsString == null || numProcessingThreadsString.isEmpty() ?
      8 : Integer.parseInt(numProcessingThreadsString);
  _metricProcessorExecutor = Executors.newFixedThreadPool(numProcessingThreads);
  _consumers = new ArrayList<>(numProcessingThreads);
  for (int i = 0; i < numProcessingThreads; i++) {
    _consumers.add(createConsumers(config));
  }
  Properties producerProps = new Properties();
  producerProps.putAll(config);
  producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                            (String) config.get(KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG));
  producerProps.setProperty(ProducerConfig.CLIENT_ID_CONFIG, PRODUCER_CLIENT_ID);
  // Set batch.size and linger.ms to a big number to have better batching.
  producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, "30000");
  producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "800000");
  producerProps.setProperty(ProducerConfig.RETRIES_CONFIG, "5");
  producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
  producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
  producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
  _producer = new KafkaProducer<>(producerProps);

  _loadingProgress = -1.0;

  ensureTopicCreated(config);
}
项目:AthenaX    文件:KafkaJsonConnectorITest.java   
private static KafkaProducer<byte[], byte[]> getProducer(String brokerList) {
  Properties prop = new Properties();
  prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
  prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
  prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
  return new KafkaProducer<>(prop);
}
项目:AthenaX    文件:ITestUtil.java   
private static KafkaProducer<byte[], byte[]> getProducer(String brokerList) {
  Properties prop = new Properties();
  prop.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
  prop.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
  prop.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
  return new KafkaProducer<>(prop);
}
项目:tasfe-framework    文件:KafkaAppender.java   
public KafkaAppender() {
    // setting these as config values sidesteps an unnecessary warning (minor bug in KafkaProducer)
    addProducerConfigValue(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    addProducerConfigValue(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());

    Properties properties = PropertyUtils.getProperties();
    if (properties != null && !properties.isEmpty()) {
        addProducerConfigValue(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty("logcenter.bootstrap.servers", bootstrapServers));
        addProducerConfigValue(ProducerConfig.ACKS_CONFIG, properties.getProperty("logcenter.acks", "0"));
        addProducerConfigValue(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG, properties.getProperty("logcenter.metadata.fetch.timeout.ms", "1000"));

        batchNum = Integer.parseInt(properties.getProperty("logcenter.batchNum", "100"));
        maxCommitInterval = Integer.parseInt(properties.getProperty("logcenter.maxCommitInterval", "10000"));
    }
}
项目:ja-micro    文件:Producer.java   
Producer(Properties kafkaProducerConfig) {

        // Mandatory settings, not changeable.
        kafkaProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        kafkaProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        kafkaProducerConfig.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, SixtPartitioner.class.getName());

        kafka = new org.apache.kafka.clients.producer.KafkaProducer<>(kafkaProducerConfig);
        logger.info("Created producer.");
    }
项目:flume-release-1.7.0    文件:KafkaSourceEmbeddedKafka.java   
private void initProducer() {
  Properties props = new Properties();
  props.put("bootstrap.servers", HOST + ":" + serverPort);
  props.put("acks", "1");
  producer = new KafkaProducer<String,byte[]>(props,
          new StringSerializer(), new ByteArraySerializer());
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaStatusBackingStore.java   
@Override
public void configure(final WorkerConfig config) {
    this.topic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
    if (topic.equals(""))
        throw new ConfigException("Must specify topic for connector status.");

    Map<String, Object> producerProps = new HashMap<>();
    producerProps.putAll(config.originals());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class

    Map<String, Object> consumerProps = new HashMap<>();
    consumerProps.putAll(config.originals());
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

    Map<String, Object> adminProps = new HashMap<>(config.originals());
    NewTopic topicDescription = TopicAdmin.defineTopic(topic).
            compacted().
            partitions(config.getInt(DistributedConfig.STATUS_STORAGE_PARTITIONS_CONFIG)).
            replicationFactor(config.getShort(DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG)).
            build();

    Callback<ConsumerRecord<String, byte[]>> readCallback = new Callback<ConsumerRecord<String, byte[]>>() {
        @Override
        public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
            read(record);
        }
    };
    this.kafkaLog = createKafkaBasedLog(topic, producerProps, consumerProps, readCallback, topicDescription, adminProps);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaOffsetBackingStore.java   
@Override
public void configure(final WorkerConfig config) {
    String topic = config.getString(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG);
    if (topic.equals(""))
        throw new ConfigException("Offset storage topic must be specified");

    data = new HashMap<>();

    Map<String, Object> producerProps = new HashMap<>();
    producerProps.putAll(config.originals());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

    Map<String, Object> consumerProps = new HashMap<>();
    consumerProps.putAll(config.originals());
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());

    Map<String, Object> adminProps = new HashMap<>(config.originals());
    NewTopic topicDescription = TopicAdmin.defineTopic(topic).
            compacted().
            partitions(config.getInt(DistributedConfig.OFFSET_STORAGE_PARTITIONS_CONFIG)).
            replicationFactor(config.getShort(DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG)).
            build();

    offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback, topicDescription, adminProps);
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaProducerTest.java   
@Test
public void testOsDefaultSocketBufferSizes() throws Exception {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
    config.put(ProducerConfig.SEND_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
    config.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, Selectable.USE_DEFAULT_BUFFER_SIZE);
    KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(
            config, new ByteArraySerializer(), new ByteArraySerializer());
    producer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaProducerTest.java   
@Test(expected = KafkaException.class)
public void testInvalidSocketSendBufferSize() throws Exception {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
    config.put(ProducerConfig.SEND_BUFFER_CONFIG, -2);
    new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer());
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaProducerTest.java   
@Test(expected = KafkaException.class)
public void testInvalidSocketReceiveBufferSize() throws Exception {
    Map<String, Object> config = new HashMap<>();
    config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
    config.put(ProducerConfig.RECEIVE_BUFFER_CONFIG, -2);
    new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer());
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaProducerTest.java   
@Test
public void closeShouldBeIdempotent() {
    Properties producerProps = new Properties();
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
    Producer producer = new KafkaProducer<>(producerProps, new ByteArraySerializer(), new ByteArraySerializer());
    producer.close();
    producer.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:KafkaProducerTest.java   
@Test
public void testMetricConfigRecordingLevel() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
    try (KafkaProducer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) {
        assertEquals(Sensor.RecordingLevel.INFO, producer.metrics.config().recordLevel());
    }

    props.put(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
    try (KafkaProducer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) {
        assertEquals(Sensor.RecordingLevel.DEBUG, producer.metrics.config().recordLevel());
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:ClientCompatibilityTest.java   
public void testProduce() throws Exception {
    Properties producerProps = new Properties();
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, testConfig.bootstrapServer);
    ByteArraySerializer serializer = new ByteArraySerializer();
    KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps, serializer, serializer);
    ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(testConfig.topic, message1);
    Future<RecordMetadata> future1 = producer.send(record1);
    ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(testConfig.topic, message2);
    Future<RecordMetadata> future2 = producer.send(record2);
    producer.flush();
    future1.get();
    future2.get();
    producer.close();
}
项目:logback-kafka-appender    文件:KafkaAppender.java   
/**
 * Prepare final configuration map for producer to initiate kafka producer instance
 * Configurations will be picked up from {@link ModuleAware}
 *
 * @return
 */
private static Map<String, Object> prepareConfiguration() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(BOOTSTRAP_SERVERS_CONFIG, ModuleAware.CONTEXT.getBrokers());
    properties.put(KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    properties.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    properties.put(BATCH_SIZE_CONFIG, ModuleAware.CONTEXT.getBatchSize());
    properties.put(ACKS_CONFIG, ModuleAware.CONTEXT.getAcks());
    properties.put(BUFFER_MEMORY_CONFIG, ModuleAware.CONTEXT.getBufferMemory());
    return properties;
}
项目:SkyEye    文件:KafkaAppender.java   
/**
 * 构造方法
 */
public KafkaAppender() {
    this.checkAndSetConfig(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    this.checkAndSetConfig(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 设置分区类, 使用自定义的KeyModPartitioner,同样的key进入相同的partition
    this.checkAndSetConfig(ProducerConfig.PARTITIONER_CLASS_CONFIG, KeyModPartitioner.class.getName());

    // 添加hook
    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            close();
        }
    });
}
项目:SkyEye    文件:KafkaAppender.java   
/**
 * 构造方法
 */
public KafkaAppender() {
    this.checkAndSetConfig(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
    this.checkAndSetConfig(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    // 设置分区类, 使用自定义的KeyModPartitioner,同样的key进入相同的partition
    this.checkAndSetConfig(ProducerConfig.PARTITIONER_CLASS_CONFIG, KeyModPartitioner.class.getName());

    // 由于容器部署需要从外部获取host
    this.checkAndSetConfig(ProducerConfig.CLIENT_ID_CONFIG, this.app + Constants.MIDDLE_LINE + this.host + Constants.MIDDLE_LINE + "logback");

    shutdownHook = new DelayingShutdownHook();
}