编程语言
首页 > 编程语言> > java – 默认情况下如何使用Kafka Spring Cloud Stream并使用汇合API生成的Kafka消息?

java – 默认情况下如何使用Kafka Spring Cloud Stream并使用汇合API生成的Kafka消息?

作者:互联网

我正在构建一个微服务组件,它将默认使用由其他(SCS)组件生成的Spring Cloud Stream(SCS)Kafka消息.

但我还要求使用来自使用汇合API的其他组件的Kafka消息.

我有一个示例存储库,显示我正在尝试做的事情.

https://github.com/donalthurley/KafkaConsumeScsAndConfluent

这是下面的应用程序配置,带有SCS输入绑定和汇合输入绑定.

spring:
  application:
    name: kafka
  kafka:
    consumer:
      properties.schema.registry.url: http://192.168.99.100:8081
  cloud:
    stream:
      kafka:
        binder:
          brokers: PLAINTEXT://192.168.99.100:9092
#          configuration:
#            specific:
#              avro:
#                reader: true
#            key:
#              deserializer: org.apache.kafka.common.serialization.StringDeserializer
#            value:
#              deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer

      bindings:
        inputConfluent:
          contentType: application/*+avro
          destination: confluent-destination
          group: input-confluent-group
        inputScs:
          contentType: application/*+avro
          destination: scs-destination
          group: input-scs-group

通过上面的配置,我可以使用SCS默认配置创建两个使用者
例如,类org.apache.kafka.common.serialization.ByteArrayDeserializer是两个输入绑定的值反序列化器.

如果我删除上述配置中的注释,我会让两个消费者从Confluent客户端发送配置
例如,类io.confluent.kafka.serializers.KafkaAvroDeserializer是两个输入绑定的值反序列化器.

我理解,因为配置在Kafka活页夹上,它将应用于使用该活页夹定义的所有消费者.

有没有什么方法可以定义这些特定的属性,以便它们只应用于汇合的特定使用者绑定,所有其他输入绑定可以使用默认的SCS配置?

解决方法:

您可以通过配置属性设置特定于绑定的使用者和生产者属性.

the reference manual.

spring.cloud.stream.kafka.bindings.<channelName>.consumer.configuration.foo.bar=baz

使用非标准序列化器/反序列化器时,必须分别为生产者和使用者设置useNativeEncoding和useNativeDecoding.再次参见参考手册.

标签:java,spring,apache-kafka,spring-cloud-stream
来源: https://codeday.me/bug/20190627/1303827.html