编程语言
首页 > 编程语言> > java-Kafka-如何同时使用filter和filternot?

java-Kafka-如何同时使用filter和filternot?

作者:互联网

我有一个Kafka流,它从一个主题获取数据,并且需要将该信息过滤到两个不同的主题.

KStream<String, Model> stream = builder.stream(Serdes.String(), specificAvroSerde, "not-filtered-topic");
stream.filter((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "good-topic");
stream.filterNot((key, value) -> new Processor().test(key, value)).to(Serdes.String(), specificAvroSerde, "bad-topic");

但是,当我这样做时,它将两次从主题读取数据-不确定随着数据变大,这是否会对性能产生影响.有没有办法只过滤一次并将其推送到两个主题?

解决方法:

您的方法是正确的,并且不会从该主题两次读取数据,并且也没有进行内部数据复制.这种方法的唯一缺点是,每个记录都会对两个过滤谓词进行评估-但是,这非常便宜,并且不应该成为性能问题.

但是,您仍然可以通过使用KStream#branch()来提高性能,该方法确实接受多个谓词并彼此求值,然后为每个谓词返回一个输入流.如果一条记录与谓词匹配,则将其放入相应的输出流中,并停止求值(即,不对该单个记录进行进一步的谓词求值;这确保将每条记录添加到最大一个输出流中;否则,将其删除)没有谓词匹配).

因此,您可以为branch()提供两个谓词:第一个与您的原始filter()谓词相同,第二个谓词始终返回true.

KStream<String, Model> stream = builder.stream(
    Serdes.String(),
    specificAvroSerde,
    "not-filtered-topic"
);
KStream[] splitStreams = stream.branch(
    (key, value) -> new Processor().test(key,value),
    (key, value) -> true
);
splitStreams[0].to(Serdes.String(), specificAvroSerde, "good-topic");
splitStreams[1].to(Serdes.String(), specificAvroSerde, "bad-topic");

但不确定此代码是否比原始版本可读性更好.我想这是一个问题,我个人更喜欢您的原始代码,因为它确实更好地表达了语义.

我添加的版本应该稍微提高CPU效率,因为对于所有确实满足该谓词的记录,它只会被评估一次.对于所有不满足该结果的记录,将返回一个简单的true(即,没有第二个谓词求值).

如果您知道大多数记录将以splitStream [1]结尾,那么您还可以反转谓词(并将splitStream [0]用作“不良流”)以减少对第二个真返回谓词的调用次数.但是,这些只是微优化,不应有问题.

标签:apache-kafka-streams,apache-kafka,java
来源: https://codeday.me/bug/20191026/1936740.html