如何在Apache Kafka和Schema Registry中使用Protobuf


从Confluent Platform 5.5版开始,Avro不再是唯一的架构。现在,Protobuf和JSON模式在Confluent Universe中作为一等公民得到支持。但是在继续说明如何在Kafka中使用Protobuf之前,让我们回答一个经常问到的问题:

为什么我们需要模式? 当应用程序通过pub-sub系统进行通信时,它们交换消息,并且通信中的所有参与者都需要理解并同意这些消息。此外,您想检测并阻止对消息格式的更改,这些更改会使某些参与者无法阅读消息。

这就是模式的用处-它代表了通信参与者之间的契约,就像API代表了服务与其使用者之间的契约一样。正如可以使用OpenAPI(Swagger)描述REST API一样,可以使用Avro,Protobuf或Avro模式描述Kafka中的消息。

模式通过以下方式描述数据的结构:

指定消息中的字段 指定每个字段的数据类型以及该字段是否为必填字段 此外,模式与Schema Registry一起可防止生产者发送有害消息-消费者无法解释的格式错误的数据。Schema Registry将检测生产者是否将要引入重大更改,并且可以将其配置为拒绝此类更改。重大更改的一个示例是从架构中删除必填字段。

Protobuf简介 与Apache Avro相似,Protobuf是一种序列化结构化数据的方法。消息格式在.proto文件中定义,您可以使用多种语言从中生成代码,包括Java,Python,C ++,C#,Go和Ruby。与Avro不同,Protobuf不会使用消息序列化架构。因此,为了反序列化消息,您需要使用方中的模式。

这是一个包含一种消息类型的Protobuf模式的示例:

syntax = "proto3";
package com.codingharbour.protobuf;
message SimpleMessage {
    string content = 1;
    string date_time = 2;
}

在第一行中,我们定义我们正在使用Protobuf版本3。我们的消息类型SimpleMessage定义了两个字符串字段:content和date_time。每个字段都分配有一个所谓的字段号,该字段号在消息类型中必须唯一。当邮件序列化为Protobuf二进制格式时,这些数字标识字段。Google建议对最常用的字段使用数字1到15,因为编码需要一个字节。

Protobuf支持常见的标量类型,例如字符串,int32,int64(长整数),double,bool等。有关Protobuf中所有标量类型的完整列表,请查阅Protobuf文档。

除了标量类型,还可以使用复杂的数据类型。在下面,我们看到两种模式,即订购和产品,其中订购可以包含零个,一个或多个产品:

message Order {
    int64 order_id = 1;
    int64 date_time = 2;
    repeated Product product = 3;
}

message Product {
    int32 product_id = 1;
    string name = 2;
    string description = 3;
}

现在,让我们看看这些架构如何最终出现在架构注册表中。

架构注册表和Protobuf 架构注册表是一项服务,用于存储Kafka中使用的架构的版本历史记录。它还以不破坏生产者或消费者的方式支持模式的演变。直到最近,Schema Registry仅支持Avro模式,但是自Confluent Platform 5.5起,该支持已扩展到Protobuf和JSON模式。

如果您以前曾使用过Avro和Kafka,则本节将不会有任何惊喜。与Avro一样,Schema Registry为Protobuf提供了一个序列化器和反序列化器,分别称为KafkaProtobufSerializer和KafkaProtobufDeserializer。

该序列化程序的工作是在生产者将消息写入Kafka之前,将Java对象转换为Protobuf二进制格式。

序列化程序的另一项工作是检查模式注册表中是否存在Protobuf模式。如果不是,它将把架构写入Schema Registry,并将架构ID写入消息(在消息的开头)。然后,当Kafka记录到达使用者时,使用者将使用KafkaProtobufDeserializer根据消息中的模式ID从“模式注册表”中获取模式。提取架构后,KafkaProtobufDeserializer将使用它来反序列化消息。这样,使用者就无需预先知道架构就能使用来自Kafka的消息。

schema-registry-devto.jpg

这就是为什么在生产者或使用者中使用KafkaProtobuf(De)Serializer时,我们需要提供Schema Registry的URL。

Java代码生成 好的,现在我们知道Protobuf架构的外观,并且知道它最终在Schema Registry中的状态。现在让我们看看如何使用Java中的Protobuf模式。

您需要的第一件事是一个protobuf-java库。在这些示例中,我正在使用maven,因此让我们添加maven依赖项:

<dependencies>
    <dependency>
        <groupId>com.google.protobuf</groupId>
        <artifactId>protobuf-java</artifactId>
        <version>3.12.2</version>
    </dependency>
</dependencies>

接下来要做的是使用protoc编译器从中生成Java代码。原始文件。但是我们不会手动邀请编译器,我们将使用一个名为protoc-jar-maven-plugin的maven插件:

<plugin>
    <groupId>com.github.os72</groupId>
    <artifactId>protoc-jar-maven-plugin</artifactId>
    <version>3.11.4</version>
    <executions>
        <execution>
            <phase>generate-sources</phase>
            <goals>
                <goal>run</goal>
            </goals>
            <configuration>
               <inputDirectories>
                    <include>${project.basedir}/src/main/Protobuf</include>
                </inputDirectories>
                <outputTargets>
                    <outputTarget>
                        <type>java</type>
                        <addSources>main</addSources>
                        <outputDirectory>
                          ${project.basedir}/target/generated-sources/protobuf
                      </outputDirectory>
                    </outputTarget>
                </outputTargets>
            </configuration>
        </execution>
    </executions>
</plugin>

Protobuf类将在generate-sources阶段生成。该插件将在src / main / protobuf 文件夹中查找proto文件,并且所生成的代码将在target / generated-sources / protobuf文件夹中创建。

要在目标文件夹中生成类,请运行 mvn clean generate-sources

注意:此博客文章中的所有代码示例都可以在Coding Harbour的GitHub上找到。

好的,现在我们已经生成了我们的类,让我们使用新的Protobuf序列化器将其发送给Kafka。

运行本地Kafka群集 在开始之前,让我们使用Schema Registry启动本地Kafka集群,以便我们可以立即尝试我们的代码。我们将使用docker-compose运行集群。

没有docker-compose吗?检查: 如何安装docker-compose

我已经用一个Zookeeper,一个Kafka代理和Schema Registry准备了一个docker-compose文件。您可以从https://github.com/codingharbour/kafka-docker-compose上获取它

导航到single-node-avro-kafka文件夹并运行docker-compose up -d

现在可以使用本地Kafka群集了。通过运行docker-compose ps,我们可以看到Kafka代理在端口9092上可用,而模式注册表在端口8081上运行。请注意这一点,因为我们很快将需要它。

Writing a Protobuf Producer 随着Kafka集群的建立和运行,现在是时候创建一个Java生产者,它将我们的SimpleMessage发送到Kafka。首先,让我们为生产者准备配置:

Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
properties.put(KafkaProtobufSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
Producer<String, SimpleMessage> producer = new KafkaProducer<>(properties);

注意,我们使用KafkaProtobufSerializer作为值序列化器类。自5.5版以来,这是Confluent Platform中可用的新序列化程序。它的工作方式与KafkaAvroSerializer相似:发布消息时,它将使用Schema Registry检查模式是否可用。如果该架构尚未注册,它将把它写入Schema Registry,然后将消息发布到Kafka。为此,序列化程序需要Schema Registry的URL,在本例中为http:// localhost:8081。

接下来,我们使用从Protobuf模式生成的SimpleMessage类来准备KafkaRecord:

SimpleMessage simpleMessage = SimpleMessage.newBuilder()
        .setContent("Hello world")
        .setDateTime(Instant.now().toString())
        .build();

ProducerRecord<String, SimpleMessage> record
                = new ProducerRecord<>("protobuf-topic", null, simpleMessage);

该记录将被写入名为protobuf-topic的主题。最后要做的是将记录写到Kafka:

producer.send(record);
producer.flush();
producer.close();

通常,您不会调用flush()方法,但是由于此后我们的应用程序将停止,因此我们需要确保在此之前将消息写入Kafka。

编写Protobuf消费者 我们说过,由于有了Schema Registry,使用者不需要预先知道该架构就可以反序列化消息。但是,预先拥有可用的架构将使我们能够从中生成Java类,并在代码中使用该类。这有助于提高代码的可读性,并使代码强类型化。

这是操作方法。首先,您将生成一个Java类,如Java代码生成部分中所述。接下来,我们为Kafka使用者准备配置:

Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "protobuf-consumer-group");      
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

在这里,我们定义了代理URL,消费者的消费者组,并告诉消费者我们将处理偏移量提交。 接下来,我们为消息定义反序列化器:

properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class);
properties.put(KafkaProtobufDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
properties.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, SimpleMessage.class.getName());

我们使用字符串反序列化器作为键,但是对于值,我们使用新的KafkaProtobufDeserializer。对于Protobuf反序列化器,我们需要提供Schema Registry URL,就像上面对序列化器所做的一样。

最后一行是最重要的。它告诉反序列化器将记录值反序列化到哪个类。在我们的例子中,它是SimpleMessage类(我们使用Protobuf maven插件从Protobuf模式生成的类)。

现在我们准备创建我们的消费者并将其订阅protobuf-topic:

KafkaConsumer<String, SimpleMessage> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("protobuf-topic"));

然后我们轮询Kafka以获取记录并将其打印到控制台:

while (true) {
    ConsumerRecords<String, SimpleMessage> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, SimpleMessage> record : records) {
        System.out.println("Message content: " + record.value().getContent());
        System.out.println("Message time: " + record.value().getDateTime());
    }
    consumer.commitAsync();
}

在这里,我们正在使用一批记录,只是将内容打印到控制台。

还记得我们将使用者配置为通过将ENABLE_AUTO_COMMIT_CONFIG设置为false来处理提交偏移量的情况吗?这就是我们在最后一行中所做的事情:只有在完全处理完当前记录组之后,我们才提交消费者偏移量。

这就是编写一个简单的Protobuf消费者所需的全部内容。现在让我们检查另一个变体。

原始Protobuf消费者 如果要在使用者中以通用方式处理消息而不从Protobuf模式生成Java类,该怎么办?好了,您可以使用Protobuf库中的DynamicMessage类的实例。DynamicMessage具有反射API,因此您可以浏览消息字段并读取其值。这是您的操作方法...

首先,让我们配置使用者。它的配置与前面的示例非常相似:

Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "generic-protobuf-consumer-group");      
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class);
properties.put(KafkaProtobufDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");

唯一缺少的是SPECIFIC_PROTOBUF_VALUE_TYPE配置。由于我们希望以通用方式处理消息,因此我们不需要此配置。

现在,我们准备创建我们的使用者并将其订阅protobuf-topic主题,如上例所示:

KafkaConsumer<String, SimpleMessage> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("protobuf-topic"));

然后我们轮询Kafka以获取记录并将其打印到控制台:

while (true) {
    ConsumerRecords<String, DynamicMessage> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, DynamicMessage> record : records) {
        for (FieldDescriptor field : record.value().getAllFields().keySet()) {
            System.out.println(field.getName() + ": " + record.value().getField(field));
        }
    }
    consumer.commitAsync();
}

在我们的使用者中未配置SPECIFIC_PROTOBUF_VALUE_TYPE的情况下,使用者将始终以记录的值返回DynamicMessage的实例。然后,我们使用DynamicMessage.getAllFields()方法获取FieldDescriptor的列表。一旦有了所有的描述符,我们就可以简单地遍历它们并打印每个字段的值。


原文链接:http://codingdict.com