编程语言
首页 > 编程语言> > java-Kafka流:从应用程序的每个实例中的所有分区读取

java-Kafka流:从应用程序的每个实例中的所有分区读取

作者:互联网

当使用KTable时,当实例/使用者数等于分区数时,Kafka流不允许实例从特定主题的多个分区中读取.我尝试使用GlobalKTable来实现这一点,但问题是数据将被覆盖,并且聚合也无法应用于其上.

假设我有一个名为“ data_in”的主题,具有3个分区(P1,P2,P3).当我运行Kafka流应用程序的3个实例(I1,I2,I3)时,我希望每个实例都从“ data_in”的所有分区中读取数据.我的意思是,I1可以从P1,P2和P3读取,I2可以从P1,P2和P3,I2以及其他方式读取.

编辑:请记住,生产者可以将两个相似的ID发布到“ data_in”中的两个不同分区中.因此,当运行两个不同的实例时,GlobalKtable将被覆盖.

拜托,如何实现这一目标?这是我代码的一部分

private KTable<String, theDataList> globalStream() {

    // KStream of records from data-in topic using String and theDataSerde deserializers
    KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));

    // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
    KGroupedStream<String, Data> KGS = trashStream.groupByKey();

    Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
    materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

    // Return a KTable
    return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
        if (!value.getValideData())
            aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
        else
            aggregate.getList().add(value);
        return aggregate;
    }, materialized);
}

解决方法:

将输入主题“ data_in”的分区数更改为1个分区,或者使用GlobalKtable从主题中所有分区获取数据,然后可以将其加入流.这样一来,您的应用实例将不再需要位于不同的使用者组中.

该代码将如下所示:

private GlobalKTable<String, theDataList> globalStream() {

   // KStream of records from data-in topic using String and theDataSerde deserializers
  KStream<String, Data> trashStream = getBuilder().stream("data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));

  thrashStream.to("new_data_in"); // by sending to an other topic you're forcing a repartition on that topic

  KStream<String, Data> newTrashStream = getBuilder().stream("new_data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));

  // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
  KGroupedStream<String, Data> KGS = newTrashStream.groupByKey();

  Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
  materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

// Return a KTable
  KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
      if (!value.getValideData())
          aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
      else
        aggregate.getList().add(value);
      return aggregate;
  }, materialized)
  .to("agg_data_in");

  return getBuilder().globalTable("agg_data_in");
}

编辑:我编辑了上面的代码,以强制对名为“ new_data_in”的主题进行重新分区.

标签:apache-kafka-streams,java,apache-kafka,partitioning
来源: https://codeday.me/bug/20191009/1877602.html