java-在Kafka连接器中设置分区策略
作者:互联网
我正在使用自定义的Kafka连接器(使用Kafka Connect的Java API用Java编写)从外部源提取数据并存储在主题中.我需要设置自定义分区策略.我了解可以通过设置partitioner.class
property在Kafka Producer中设置自定义partitioner.但是,此属性对于Kafka连接器似乎没有任何作用.如何配置Kafka Connect(我使用独立连接脚本来运行连接器)以使用编写的自定义分区程序?
解决方法:
源连接器可以通过SourceRecord的partition字段控制将每个源记录写入的分区.如果这是您自己的连接器,则这是最直接的.
但是,如果要更改源连接器对每个记录进行分区的方式,则可以使用覆盖源记录的分区字段的单消息转换(SMT).您可能必须通过实现org.apache.kafka.connect.transforms.Transformation并使用自己的分区逻辑来编写自定义SMT,但这实际上比编写自定义Kafka分区程序要容易一些.
例如,这是一个概念上的自定义转换,该转换显示了如何使用配置属性以及如何使用所需的分区号创建新的SourceRecord实例.该示例是不完整的,因为它实际上没有任何真正的分区逻辑,但这应该是一个很好的起点.
package io.acme.example; import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.transforms.Transformation; import java.util.Map; public class CustomPartitioner implements Transformation { private static final String MAX_PARTITIONS_CONFIG = "max.partitions"; private static final String MAX_PARTITIONS_DOC = "The maximum number of partitions"; private static final int MAX_PARTITIONS_DEFAULT = 1; /** * The definition of the configurations. We just define a single configuration property here, * but you can chain multiple "define" methods together. Complex configurations may warrant * pulling all the config-related things into a separate class that extends {@link AbstractConfig} * and adds helper methods (e.g., "getMaxPartitions()"), and you'd use this class to parse the * parameters in {@link #configure(Map)} rather than {@link AbstractConfig}. */ private static final ConfigDef CONFIG_DEF = new ConfigDef().define(MAX_PARTITIONS_CONFIG, Type.INT, MAX_PARTITIONS_DEFAULT, Importance.HIGH, MAX_PARTITIONS_DOC); private int maxPartitions; @Override public void configure(Map configs) { // store any configuration parameters as fields ... AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs); maxPartitions = config.getInt(MAX_PARTITIONS_CONFIG); } @Override public SourceRecord apply(SourceRecord record) { // Compute the desired partition here int actualPartition = record.kafkaPartition(); int desiredPartition = ... // Then create the new record with all of the existing fields except with the new partition ... return record.newRecord(record.topic(), desiredPartition, record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp()); } @Override public ConfigDef config() { return CONFIG_DEF; } @Override public void close() { // do nothing } }
ConfigDef和AbstractConfig功能非常有用,并且可以做更多有趣的事情,包括使用自定义验证程序和推荐程序,以及具有依赖于其他属性的配置属性.如果您想了解更多有关此的信息,请查看一些使用相同框架的现有Kafka Connect连接器.
最后一件事.当运行Kafka Connect独立或分布式工作程序时,但请确保将CLASSPATH环境变量设置为指向包含自定义SMT的JAR文件以及您的SMT所依赖的JAR文件,但Kafka提供的文件除外. connect-standalone.sh和connect-distributed.sh命令将自动将Kafka JAR添加到类路径中.
标签:apache-kafka-connect,apache-kafka,java 来源: https://codeday.me/bug/20191025/1932152.html