Java 类org.apache.kafka.clients.producer.ProducerInterceptor 实例源码

项目:kafka-0.11.0.0-src-with-comment    文件:ProducerInterceptors.java   
/**
 * This is called when client sends the record to KafkaProducer, before key and value gets serialized.
 * The method calls {@link ProducerInterceptor#onSend(ProducerRecord)} method. ProducerRecord
 * returned from the first interceptor's onSend() is passed to the second interceptor onSend(), and so on in the
 * interceptor chain. The record returned from the last interceptor is returned from this method.
 *
 * This method does not throw exceptions. Exceptions thrown by any of interceptor methods are caught and ignored.
 * If an interceptor in the middle of the chain, that normally modifies the record, throws an exception,
 * the next interceptor in the chain will be called with a record returned by the previous interceptor that did not
 * throw an exception.
 *
 * @param record the record from client
 * @return producer record to send to topic/partition
 */
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
    ProducerRecord<K, V> interceptRecord = record;
    for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
        try {
            interceptRecord = interceptor.onSend(interceptRecord);
        } catch (Exception e) {
            // do not propagate interceptor exception, log and continue calling other interceptors
            // be careful not to throw exception from here
            if (record != null)
                log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
            else
                log.warn("Error executing interceptor onSend callback", e);
        }
    }
    return interceptRecord;
}
项目:kafka    文件:ProducerInterceptors.java   
/**
 * This is called when client sends the record to KafkaProducer, before key and value gets serialized.
 * The method calls {@link ProducerInterceptor#onSend(ProducerRecord)} method. ProducerRecord
 * returned from the first interceptor's onSend() is passed to the second interceptor onSend(), and so on in the
 * interceptor chain. The record returned from the last interceptor is returned from this method.
 *
 * This method does not throw exceptions. Exceptions thrown by any of interceptor methods are caught and ignored.
 * If an interceptor in the middle of the chain, that normally modifies the record, throws an exception,
 * the next interceptor in the chain will be called with a record returned by the previous interceptor that did not
 * throw an exception.
 *
 * @param record the record from client
 * @return producer record to send to topic/partition
 */
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
    ProducerRecord<K, V> interceptRecord = record;
    for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
        try {
            interceptRecord = interceptor.onSend(interceptRecord);
        } catch (Exception e) {
            // do not propagate interceptor exception, log and continue calling other interceptors
            // be careful not to throw exception from here
            if (record != null)
                log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
            else
                log.warn("Error executing interceptor onSend callback", e);
        }
    }
    return interceptRecord;
}
项目:kafka    文件:ProducerInterceptors.java   
/**
 * This method is called when sending the record fails in {@link ProducerInterceptor#onSend
 * (ProducerRecord)} method. This method calls {@link ProducerInterceptor#onAcknowledgement(RecordMetadata, Exception)}
 * method for each interceptor
 *
 * @param record The record from client
 * @param interceptTopicPartition  The topic/partition for the record if an error occurred
 *        after partition gets assigned; the topic part of interceptTopicPartition is the same as in record.
 * @param exception The exception thrown during processing of this record.
 */
public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTopicPartition, Exception exception) {
    for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
        try {
            if (record == null && interceptTopicPartition == null) {
                interceptor.onAcknowledgement(null, exception);
            } else {
                if (interceptTopicPartition == null) {
                    interceptTopicPartition = new TopicPartition(record.topic(),
                                                                 record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
                }
                interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1, Record.NO_TIMESTAMP, -1, -1, -1),
                                              exception);
            }
        } catch (Exception e) {
            // do not propagate interceptor exceptions, just log
            log.warn("Error executing interceptor onAcknowledgement callback", e);
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:ProducerInterceptors.java   
/**
 * Closes every interceptor in a container.
 */
@Override
public void close() {
    for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
        try {
            interceptor.close();
        } catch (Exception e) {
            log.error("Failed to close producer interceptor ", e);
        }
    }
}
项目:kafka-0.11.0.0-src-with-comment    文件:ProducerInterceptorsTest.java   
@Test
public void testOnSendChain() {
    List<ProducerInterceptor<Integer, String>> interceptorList = new ArrayList<>();
    // we are testing two different interceptors by configuring the same interceptor differently, which is not
    // how it would be done in KafkaProducer, but ok for testing interceptor callbacks
    AppendProducerInterceptor interceptor1 = new AppendProducerInterceptor("One");
    AppendProducerInterceptor interceptor2 = new AppendProducerInterceptor("Two");
    interceptorList.add(interceptor1);
    interceptorList.add(interceptor2);
    ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);

    // verify that onSend() mutates the record as expected
    ProducerRecord<Integer, String> interceptedRecord = interceptors.onSend(producerRecord);
    assertEquals(2, onSendCount);
    assertEquals(producerRecord.topic(), interceptedRecord.topic());
    assertEquals(producerRecord.partition(), interceptedRecord.partition());
    assertEquals(producerRecord.key(), interceptedRecord.key());
    assertEquals(interceptedRecord.value(), producerRecord.value().concat("One").concat("Two"));

    // onSend() mutates the same record the same way
    ProducerRecord<Integer, String> anotherRecord = interceptors.onSend(producerRecord);
    assertEquals(4, onSendCount);
    assertEquals(interceptedRecord, anotherRecord);

    // verify that if one of the interceptors throws an exception, other interceptors' callbacks are still called
    interceptor1.injectOnSendError(true);
    ProducerRecord<Integer, String> partInterceptRecord = interceptors.onSend(producerRecord);
    assertEquals(6, onSendCount);
    assertEquals(partInterceptRecord.value(), producerRecord.value().concat("Two"));

    // verify the record remains valid if all onSend throws an exception
    interceptor2.injectOnSendError(true);
    ProducerRecord<Integer, String> noInterceptRecord = interceptors.onSend(producerRecord);
    assertEquals(producerRecord, noInterceptRecord);

    interceptors.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:ProducerInterceptorsTest.java   
@Test
public void testOnAcknowledgementChain() {
    List<ProducerInterceptor<Integer, String>> interceptorList = new ArrayList<>();
    // we are testing two different interceptors by configuring the same interceptor differently, which is not
    // how it would be done in KafkaProducer, but ok for testing interceptor callbacks
    AppendProducerInterceptor interceptor1 = new AppendProducerInterceptor("One");
    AppendProducerInterceptor interceptor2 = new AppendProducerInterceptor("Two");
    interceptorList.add(interceptor1);
    interceptorList.add(interceptor2);
    ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);

    // verify onAck is called on all interceptors
    RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, Long.valueOf(0L), 0, 0);
    interceptors.onAcknowledgement(meta, null);
    assertEquals(2, onAckCount);

    // verify that onAcknowledgement exceptions do not propagate
    interceptor1.injectOnAcknowledgementError(true);
    interceptors.onAcknowledgement(meta, null);
    assertEquals(4, onAckCount);

    interceptor2.injectOnAcknowledgementError(true);
    interceptors.onAcknowledgement(meta, null);
    assertEquals(6, onAckCount);

    interceptors.close();
}
项目:kafka    文件:ProducerInterceptors.java   
/**
 * Closes every interceptor in a container.
 */
@Override
public void close() {
    for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
        try {
            interceptor.close();
        } catch (Exception e) {
            log.error("Failed to close producer interceptor ", e);
        }
    }
}
项目:kafka    文件:ProducerInterceptorsTest.java   
@Test
public void testOnSendChain() {
    List<ProducerInterceptor<Integer, String>> interceptorList = new ArrayList<>();
    // we are testing two different interceptors by configuring the same interceptor differently, which is not
    // how it would be done in KafkaProducer, but ok for testing interceptor callbacks
    AppendProducerInterceptor interceptor1 = new AppendProducerInterceptor("One");
    AppendProducerInterceptor interceptor2 = new AppendProducerInterceptor("Two");
    interceptorList.add(interceptor1);
    interceptorList.add(interceptor2);
    ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);

    // verify that onSend() mutates the record as expected
    ProducerRecord<Integer, String> interceptedRecord = interceptors.onSend(producerRecord);
    assertEquals(2, onSendCount);
    assertEquals(producerRecord.topic(), interceptedRecord.topic());
    assertEquals(producerRecord.partition(), interceptedRecord.partition());
    assertEquals(producerRecord.key(), interceptedRecord.key());
    assertEquals(interceptedRecord.value(), producerRecord.value().concat("One").concat("Two"));

    // onSend() mutates the same record the same way
    ProducerRecord<Integer, String> anotherRecord = interceptors.onSend(producerRecord);
    assertEquals(4, onSendCount);
    assertEquals(interceptedRecord, anotherRecord);

    // verify that if one of the interceptors throws an exception, other interceptors' callbacks are still called
    interceptor1.injectOnSendError(true);
    ProducerRecord<Integer, String> partInterceptRecord = interceptors.onSend(producerRecord);
    assertEquals(6, onSendCount);
    assertEquals(partInterceptRecord.value(), producerRecord.value().concat("Two"));

    // verify the record remains valid if all onSend throws an exception
    interceptor2.injectOnSendError(true);
    ProducerRecord<Integer, String> noInterceptRecord = interceptors.onSend(producerRecord);
    assertEquals(producerRecord, noInterceptRecord);

    interceptors.close();
}
项目:kafka    文件:ProducerInterceptorsTest.java   
@Test
public void testOnAcknowledgementChain() {
    List<ProducerInterceptor<Integer, String>> interceptorList = new ArrayList<>();
    // we are testing two different interceptors by configuring the same interceptor differently, which is not
    // how it would be done in KafkaProducer, but ok for testing interceptor callbacks
    AppendProducerInterceptor interceptor1 = new AppendProducerInterceptor("One");
    AppendProducerInterceptor interceptor2 = new AppendProducerInterceptor("Two");
    interceptorList.add(interceptor1);
    interceptorList.add(interceptor2);
    ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);

    // verify onAck is called on all interceptors
    RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, 0, 0, 0);
    interceptors.onAcknowledgement(meta, null);
    assertEquals(2, onAckCount);

    // verify that onAcknowledgement exceptions do not propagate
    interceptor1.injectOnAcknowledgementError(true);
    interceptors.onAcknowledgement(meta, null);
    assertEquals(4, onAckCount);

    interceptor2.injectOnAcknowledgementError(true);
    interceptors.onAcknowledgement(meta, null);
    assertEquals(6, onAckCount);

    interceptors.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:ProducerInterceptors.java   
public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
    this.interceptors = interceptors;
}
项目:kafka-0.11.0.0-src-with-comment    文件:ProducerInterceptorsTest.java   
@Test
public void testOnAcknowledgementWithErrorChain() {
    List<ProducerInterceptor<Integer, String>> interceptorList = new ArrayList<>();
    AppendProducerInterceptor interceptor1 = new AppendProducerInterceptor("One");
    interceptorList.add(interceptor1);
    ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);

    // verify that metadata contains both topic and partition
    interceptors.onSendError(producerRecord,
                             new TopicPartition(producerRecord.topic(), producerRecord.partition()),
                             new KafkaException("Test"));
    assertEquals(1, onErrorAckCount);
    assertEquals(1, onErrorAckWithTopicPartitionSetCount);

    // verify that metadata contains both topic and partition (because record already contains partition)
    interceptors.onSendError(producerRecord, null, new KafkaException("Test"));
    assertEquals(2, onErrorAckCount);
    assertEquals(2, onErrorAckWithTopicPartitionSetCount);

    // if producer record does not contain partition, interceptor should get partition == -1
    ProducerRecord<Integer, String> record2 = new ProducerRecord<>("test2", null, 1, "value");
    interceptors.onSendError(record2, null, new KafkaException("Test"));
    assertEquals(3, onErrorAckCount);
    assertEquals(3, onErrorAckWithTopicSetCount);
    assertEquals(2, onErrorAckWithTopicPartitionSetCount);

    // if producer record does not contain partition, but topic/partition is passed to
    // onSendError, then interceptor should get valid partition
    int reassignedPartition = producerRecord.partition() + 1;
    interceptors.onSendError(record2,
                             new TopicPartition(record2.topic(), reassignedPartition),
                             new KafkaException("Test"));
    assertEquals(4, onErrorAckCount);
    assertEquals(4, onErrorAckWithTopicSetCount);
    assertEquals(3, onErrorAckWithTopicPartitionSetCount);

    // if both record and topic/partition are null, interceptor should not receive metadata
    interceptors.onSendError(null, null, new KafkaException("Test"));
    assertEquals(5, onErrorAckCount);
    assertEquals(4, onErrorAckWithTopicSetCount);
    assertEquals(3, onErrorAckWithTopicPartitionSetCount);

    interceptors.close();
}
项目:kafka    文件:ProducerInterceptors.java   
public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
    this.interceptors = interceptors;
}
项目:kafka    文件:ProducerInterceptorsTest.java   
@Test
public void testOnAcknowledgementWithErrorChain() {
    List<ProducerInterceptor<Integer, String>> interceptorList = new ArrayList<>();
    AppendProducerInterceptor interceptor1 = new AppendProducerInterceptor("One");
    interceptorList.add(interceptor1);
    ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList);

    // verify that metadata contains both topic and partition
    interceptors.onSendError(producerRecord,
                             new TopicPartition(producerRecord.topic(), producerRecord.partition()),
                             new KafkaException("Test"));
    assertEquals(1, onErrorAckCount);
    assertEquals(1, onErrorAckWithTopicPartitionSetCount);

    // verify that metadata contains both topic and partition (because record already contains partition)
    interceptors.onSendError(producerRecord, null, new KafkaException("Test"));
    assertEquals(2, onErrorAckCount);
    assertEquals(2, onErrorAckWithTopicPartitionSetCount);

    // if producer record does not contain partition, interceptor should get partition == -1
    ProducerRecord<Integer, String> record2 = new ProducerRecord<>("test2", null, 1, "value");
    interceptors.onSendError(record2, null, new KafkaException("Test"));
    assertEquals(3, onErrorAckCount);
    assertEquals(3, onErrorAckWithTopicSetCount);
    assertEquals(2, onErrorAckWithTopicPartitionSetCount);

    // if producer record does not contain partition, but topic/partition is passed to
    // onSendError, then interceptor should get valid partition
    int reassignedPartition = producerRecord.partition() + 1;
    interceptors.onSendError(record2,
                             new TopicPartition(record2.topic(), reassignedPartition),
                             new KafkaException("Test"));
    assertEquals(4, onErrorAckCount);
    assertEquals(4, onErrorAckWithTopicSetCount);
    assertEquals(3, onErrorAckWithTopicPartitionSetCount);

    // if both record and topic/partition are null, interceptor should not receive metadata
    interceptors.onSendError(null, null, new KafkaException("Test"));
    assertEquals(5, onErrorAckCount);
    assertEquals(4, onErrorAckWithTopicSetCount);
    assertEquals(3, onErrorAckWithTopicPartitionSetCount);

    interceptors.close();
}
项目:kafka-0.11.0.0-src-with-comment    文件:ProducerInterceptors.java   
/**
 * This method is called when the record sent to the server has been acknowledged, or when sending the record fails before
 * it gets sent to the server. This method calls {@link ProducerInterceptor#onAcknowledgement(RecordMetadata, Exception)}
 * method for each interceptor.
 *
 * This method does not throw exceptions. Exceptions thrown by any of interceptor methods are caught and ignored.
 *
 * @param metadata The metadata for the record that was sent (i.e. the partition and offset).
 *                 If an error occurred, metadata will only contain valid topic and maybe partition.
 * @param exception The exception thrown during processing of this record. Null if no error occurred.
 */
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
        try {
            interceptor.onAcknowledgement(metadata, exception);
        } catch (Exception e) {
            // do not propagate interceptor exceptions, just log
            log.warn("Error executing interceptor onAcknowledgement callback", e);
        }
    }
}
项目:kafka    文件:ProducerInterceptors.java   
/**
 * This method is called when the record sent to the server has been acknowledged, or when sending the record fails before
 * it gets sent to the server. This method calls {@link ProducerInterceptor#onAcknowledgement(RecordMetadata, Exception)}
 * method for each interceptor.
 *
 * This method does not throw exceptions. Exceptions thrown by any of interceptor methods are caught and ignored.
 *
 * @param metadata The metadata for the record that was sent (i.e. the partition and offset).
 *                 If an error occurred, metadata will only contain valid topic and maybe partition.
 * @param exception The exception thrown during processing of this record. Null if no error occurred.
 */
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
        try {
            interceptor.onAcknowledgement(metadata, exception);
        } catch (Exception e) {
            // do not propagate interceptor exceptions, just log
            log.warn("Error executing interceptor onAcknowledgement callback", e);
        }
    }
}