其他分享
首页 > 其他分享> > Kafka-on-Pulsar 的前世今生,新秀 Pulsar 到底好在哪?

Kafka-on-Pulsar 的前世今生,新秀 Pulsar 到底好在哪?

作者:互联网

第二种猜想是能否直接将 Kafka 协议直接接入到 Pulsar broker 里,也就是目前 KoP 的成型。

那么关于第一种 proxy 做法,如果实现起来大概是什么样呢?OVHcloud 就有过一次尝试。

之前 OVHcloud 一直采用 Apache Kafka。尽管他们有在 Kafka 上运行多个集群且每秒处理数百万条消息的经验,但仍面临艰巨的运营挑战。所以,OVHcloud 放弃 Kafka,决定将其服务的产品转移到 Pulsar,并在 Pulsar 上构建其产品。

但是为了照顾到依旧使用 Kafka 系统的用户,所以他们想在 Pulsar 里添加一个 proxy 去支持 Kafka 协议。他们最初的做法就是将 Kafka 协议的一帧转换成 Pulsar 协议。

Kafka-on-Pulsar 的前世今生,新秀 Pulsar 到底好在哪?

Proxy 收到来自 Kafka 客户端的任何一帧,通过自由状态机将其转换为 Pulsar 相应的接口。

这个状态机一种是用于接收 Kafka 请求,第二种是用于处理 Pulsar response。然后在其中间再添加一个状态机进行同步。

Kafka-on-Pulsar 的前世今生,新秀 Pulsar 到底好在哪?

因为在 TCP 层进行这些操作,所以它的表现还是不错的。借由 Rust 的特性,整体运行流畅。但是这个情况下,代码仍需要一行行去写,同时 Kafka 协议里有一些是没有办法通过 proxy 方式实

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

现。比如:group coordinator 和 offsets management。

还有一个比较关键的点是,因为用 Rust 去构写,所以比较难开源。即便是开源出来也很难作为一个组件去插入到 Pulsar 系统中。

刚好去年 StreamNative 的一条推特引起了 OVHcloud 的注意。这是 StreamNative 第一次举行线下 Pulsar meetup 时翟佳老师分享的 KoP demo。

Kafka-on-Pulsar 的前世今生,新秀 Pulsar 到底好在哪?

经过几次双方经验互谈交流后,双方合力推出了更完善的「KoP」。利用 Pulsar 和 BookKeeper 的事件流存储架构和 Pulsar 的可插拔协议处理插件框架来提供一种精简而全面的解决方案。

KoP 组件与 Broker 协作

====================================================================================

所以当我们倒回去重看 Pulsar 架构,下方模块图中最核心的:Broker、BookKeeper、ZooKeeper。Pulsar 就是基于 Managed ledger 实现的一套分布式流式存储,包括如何存数据、如何防止数据丢失、流如何从本地机房复制到另一机房等。

Kafka-on-Pulsar 的前世今生,新秀 Pulsar 到底好在哪?

Pulsar 协议本身是一个很轻量级的东西,即上图中的 Pulsar protocol handler。它主要是处理 TCP 过来的请求格式,然后将请求转化和读取的操作。所以 Pulsar 协议最核心部分在存储层面、分布式均衡层面等。

将 Pulsar protocol handler 抽象出来,变成一个框架/接口。利用这个框架,可以直接访问 Pulsar 已经构建好的存储系统,剩下要做的只是协议的解析和转换。

所以依据这个构想,将 Kafka 协议带入去实践。在 Pulsar 2.5 版本时新加了一个「Pluggable protocol handler」的概念(PIP-41),将接口单独抽离了出来。

Pulsar protocol handler 的使用是类似 Pulsar function/connector,只需将其插入到 Pulsar broker 中,就可以让 Pulsar 具有读取和解析其他协议的能力。这个机制只需要调整两个配置:

Kafka-on-Pulsar 的前世今生,新秀 Pulsar 到底好在哪?

配置完成后,重启集群即可支持「其他类型协议」的处理能力。当然这个特性只在 Pulsar 2.5 版本后才支持,所以如需尝试,可以先将 Pulsar 系统升级到 2.5 版本。

Kafka-on-Pulsar 的前世今生,新秀 Pulsar 到底好在哪?

所以在此机制下过程就会变得更加明了简单。只需在 Pulsar 里实现 Kafka protocol handler 即可,剩下的上图实线绿色部分是 Kafka 原生客户端。只需将数据接入到 Pulsar 集群,就可以处理 Kafka 请求。

为什么选取 Kafka 作为实践对象?

======================================================================================

应为 Pulsar 和 Kafka 在一些层面有很多相似之处。比如日志层,Pulsar 和 Kafka 都采用非常相似的数据模型,用于发布/订阅消息和事件流,Pulsar 和 Kafka 都采用分布式日志。

通过对比 Pulsar 和 Kafka,我们发现这两种系统有很多相似之处。这两种系统都包括以下操作:

实现方式

1. Topic

============================================================================

Kafka 将所有 Topic 存储在扁平的命名空间。但是,Pulsar 将 Topic 存储在层次化、多租户的命名空间。我们在 broker 配置中添加了 kafkaNamespace 配置,这样管理员就可以将 Kafka Topic 映射到 Pulsar Topic。

为了方便 Kafka 用户使用 Apache Pulsar 的多租户特性,当 Kafka 用户使用 SASL 验证机制来验证 Kafka 客户端的时候,可以指定一个 Pulsar 租户和命名空间作为其 SASL 用户名。

2. 消息 ID 和偏移量

=================================================================================

Kafka 为每条被成功发布到 Topic 分区的消息都指定了一个偏移量。Pulsar 为每条消息指定了一个 MessageID。消息 ID 由 ledger-identry-idbatch-index 组成。我们在 Pulsar-Kafka wrapper 中使用相同的方法将 Pulsar 的消息 ID 转换为偏移量,反之亦然。

3. 消息

=========================================================================

Kafka 和 Pulsar 的消息都包含键、值、时间戳和 header(在 Pulsar 中被称作 ‘properties’)。我们自动在 Kafka 消息和 Pulsar 消息之间转换这些字段。

4. Topic 查找

===============================================================================

我们为 Kafka 和 Pulsar 的请求处理插件提供相同的 Topic 查找方法。请求处理插件发现 Topic,查找所请求的 Topic 分区的全部所有权,然后将包含所有权信息的 Kafka TopicMetadata 返回给 Kafka 客户端。

5. 发布消息

===========================================================================

当收到 Kafka 客户端发布的消息后,Kafka 请求处理插件逐一将多个字段(例如键、值、时间戳和 headers)进行映射,从而将 Kafka 消息转换为 Pulsar 消息。

同时,Kafka 请求处理插件利用 ManagedLedger append API 将这些已转化的 Pulsar 消息存储在 BookKeeper。Kafka 请求处理插件将 Kafka 消息转换为 Pulsar 消息后,现有的 Pulsar 应用程序就可以接收 Kafka 客户端发布的消息。

6. 消费消息

===========================================================================

当收到 Kafka 客户端的 consumer 请求时,Kafka 请求处理插件打开一个非持久 cursor,然后从请求的偏移量开始读取 entries。

Kafka 请求处理插件将 Pulsar 消息转换回 Kafka 消息后,现有的 Kafka 应用程序就可以接收 Pulsar 客户端发布的消息。

7. Group coordinator & 偏移量管理

================================================================================================

最大的挑战是实现 group coordinator 和偏移量管理。Pulsar 不支持集中的 group coordinator,无法为消费组里的 consumer 分配分区,也无法管理每个消费组的偏移量。

Pulsar broker 基于分区来管理分区分配,而分区的 owner broker 通过将确认信息存储在 cursors 来管理偏移量。

我们很难让 Pulsar 模型与 Kafka 模型保持一致。因此,为了完全兼容 Kafka 客户端,我们将 coordinator group 的更改和偏移量存储在 Pulsar 名为 public/kafka/__offsets 系统 Topic 中,从而实现 Kafka coordinator group。

这样,我们能够在 Pulsar 和 Kafka 之间建立桥梁,并允许用户使用现有的 Pulsar 工具和策略来管理订阅并监控 Kafka consumer。我们在已实现的 coordinator group 中添加一个后台线程,定期将偏移量更新从系统 Topic 同步到 Pulsar cursor。

因此,实际上 Kafka 消费组被认为是 Pulsar 订阅。所有现有的 Pulsar 工具也可以用于管理 Kafka 消费组。

KoP 生产化

==========================================================================

如果将 KoP 应用到实际场景中,就需要考虑以下多个方面:

Q & A

========================================================================

1. Pulsar 有多种扩展,这些扩展有统一的管理方式吗?

目前在做一个项目:Pulsar Registry,类似于 DocHub。也可以看作一个应用商店,会集中一些组件/插件合集,可以期待一下。

标签:偏移量,Kafka,Topic,新秀,消息,Pulsar,客户端
来源: https://blog.csdn.net/m0_64867688/article/details/122024181