Java 类org.apache.camel.component.kafka.KafkaComponent 实例源码

项目:camelinaction2    文件:WordRoute.java   
@Override
public void configure() throws Exception {
    // lets shutdown quicker
    getContext().getShutdownStrategy().setTimeout(10);

    // configure the kafka component to use the broker
    KafkaComponent kafka = new KafkaComponent();

    // you can specify more brokers separated by comma
    kafka.setBrokers("localhost:9092");

    // add component to CamelContext
    getContext().addComponent("kafka", kafka);

    // use a timer to trigger every 100 milli seconds and generate a random word
    // which is sent to kafka
    from("timer:foo?period=100")
        .bean(new WordBean())
        .to("kafka:words")
        .to("log:words?groupInterval=1000");
}
项目:wildfly-camel    文件:KafkaProducerIntegrationTest.java   
@Test
public void testCustomKafkaSerializer() throws Exception {

    String serializer = "&serializerClass=" + SimpleKafkaSerializer.class.getName();
    String epuri = "kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1" + serializer;

    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start").to(epuri);
        }
    });

    KafkaComponent kafka = new KafkaComponent();
    kafka.setBrokers("localhost:" + KAFKA_PORT);
    camelctx.addComponent("kafka", kafka);

    camelctx.start();
    try {
        Assert.assertEquals(ServiceStatus.Started, camelctx.getStatus());
    } finally {
        camelctx.stop();
    }
}
项目:wildfly-camel    文件:KafkaProducerIntegrationTest.java   
@Test
public void testCustomKafkaPartitioner() throws Exception {

    String partitioner = "&partitioner=" + SimpleKafkaPartitioner.class.getName();
    String epuri = "kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1" + partitioner;

    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start").to(epuri);
        }
    });

    KafkaComponent kafka = new KafkaComponent();
    kafka.setBrokers("localhost:" + KAFKA_PORT);
    camelctx.addComponent("kafka", kafka);

    camelctx.start();
    try {
        Assert.assertEquals(ServiceStatus.Started, camelctx.getStatus());
    } finally {
        camelctx.stop();
    }
}
项目:Camel    文件:KafkaComponentAutoConfiguration.java   
@Bean
@ConditionalOnClass(CamelContext.class)
@ConditionalOnMissingBean(KafkaComponent.class)
public KafkaComponent configureKafkaComponent(CamelContext camelContext,
        KafkaComponentConfiguration configuration) throws Exception {
    KafkaComponent component = new KafkaComponent();
    component.setCamelContext(camelContext);
    Map<String, Object> parameters = new HashMap<>();
    IntrospectionSupport.getProperties(configuration, parameters, null,
            false);
    IntrospectionSupport.setProperties(camelContext,
            camelContext.getTypeConverter(), component, parameters);
    return component;
}
项目:wildfly-camel    文件:KafkaProducerIntegrationTest.java   
@Test
public void producedStringMessageIsReceivedByKafka() throws Exception {

    String epuri = "kafka:" + TOPIC_STRINGS + "?requestRequiredAcks=-1";

    CamelContext camelctx = new DefaultCamelContext();
    camelctx.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("direct:start").to(epuri);
        }
    });

    KafkaComponent kafka = new KafkaComponent();
    kafka.setBrokers("localhost:" + KAFKA_PORT);
    camelctx.addComponent("kafka", kafka);

    camelctx.start();
    try {
        ProducerTemplate template = camelctx.createProducerTemplate();

        sendMessagesInRoute(10, template, "IT test message", KafkaConstants.PARTITION_KEY, "1");
        sendMessagesInRoute(5, template, "IT test message in other topic", KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER);

        CountDownLatch latch = new CountDownLatch(15);

        boolean allReceived;
        try (KafkaConsumer<String, String> consumer = createKafkaConsumer()) {
            consumeKafkaMessages(consumer, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, latch);
            allReceived = latch.await(2, TimeUnit.SECONDS);
        }

        Assert.assertTrue("Messages published to the kafka topics were received: " + latch.getCount(), allReceived);
    } finally {
        camelctx.stop();
    }
}