其他分享
首页 > 其他分享> > 【无标题】

【无标题】

作者:互联网

kafka消息队列的方式

LD is tigger forever,CG are not brothers forever, throw the pot and shine forever.
Modesty is not false, solid is not naive, treacherous but not deceitful, stay with good people, and stay away from poor people.
talk is cheap, show others the code,Keep progress,make a better result.
Survive during the day and develop at night。

目录

概述

应用程序使用KafkaConsumer向Kafka订阅主题,并从订阅的主题上接收消息。Kafka消费者从属于消费者群组,一个群组里的消费者订阅的是同一个主题,每个消费者接收主题的一部分分区的消息。

一个分区不能被一个消费者群组里的多个消费者消费,因此如果消费者超过主题的分区数量,那么就有一部分消费者被闲置。

分区的所有权从一个消费者转移到另一个消费者,这样的行为叫做在均衡,不过在均衡期间消费者无法读取消息,造成整个群组一小段时间不可用。
消费者通过被指派为群组协调器的broker发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。
在读取消息之前,首先创建一个KafkaConsumer对象,有三个必选属性:bootstrap.servers,key.deserializer,value.deserializer,第四个属性group.id不是必须的,它指定了消费者属于哪个消费者群组。

订阅主题consumer.subscribe()方法,可以指定特定主题,或使用正则表达式。消息轮询是消费者API的核心,通过一个简单的轮询向服务器请求数据。一旦消费者订阅了主题,轮询就会处理所有细节,包括群组协调,分区再均衡,发送心跳和获取数据。

返回的每条数据都包含记录所属主题信息,记录所做分区信息,记录在分区的偏移量,以及记录键值对。

在退出之前使用consume.close()关闭消费者,网络连接和socket也会随之关闭。

我们无法让一个线程运行多个消费者,也无法让多个线程安全共享一个消费者。按照规则,一个消费者使用一个线程。
在退出之前使用consume.close()关闭消费者,网络连接和socket也会随之关闭。

我们无法让一个线程运行多个消费者,也无法让多个线程安全共享一个消费者。按照规则,一个消费者使用一个线程。
6.enable.auto.commit

该属性指定了消费者是否自动提交偏移量,默认是true,为了尽量避免重复数据和数据丢失,可以把它设为false,由自己控制何时提交偏移量。

7.partition.assignment.strategy

分区会被分配给群组的消费者,partitionAssignor根据给定消费者和主题,决定哪些分区应该被分配给哪个消费者,有两个默认分配策略:

Range:若干连续分区分配

RoundRobin:逐个分配给消费者

默认是org.apache.kafka.clients.consumer.RangeAssignor,这个类实现了Range策略,也可以改为org.apache.kafka.clients.consumer.RoundRobinAssignor

8.client.id

可以是任意字符串,broker用它标记从客户端发送过来的消息,通常被用在日志,度量指标和配额里。

9.max.poll.records

该属性用于控制单次调用call方法能够返回的记录数量

10.receive.buffer.bytes和send.buffer.bytes

socket读写数据时tcp缓冲区的大小,如果=-1,就使用操作系统默认值

小结:

参考资料和推荐阅读

1.链接: 参考资料.

标签:消费者,一个,群组,主题,无标题,线程,分区
来源: https://blog.csdn.net/xiamaocheng/article/details/122548599