首页 > 其他分享> > |NO.Z.00072|——————————|BigDataEnd|——|Hadoop&kafka.V57|-------------------------------------------|ka
|NO.Z.00072|——————————|BigDataEnd|——|Hadoop&kafka.V57|-------------------------------------------|ka
作者:互联网
[BigDataHadoop:Hadoop&kafka.V57] [BigDataHadoop.kafka][|章节二|Hadoop生态圈技术栈|kafka|稳定性|幂等性|]
一、稳定性:幂等性
### --- 幂等性
~~~ Kafka在引入幂等性之前,Producer向Broker发送消息,
~~~ 然后Broker将消息追加到消息流中后给Producer返回Ack信号值。实现流程如下:
~~~ 生产中,会出现各种不确定的因素,比如在Producer在发送给Broker的时候出现网络异常。
~~~ 比如以下这种异常情况的出现:
### --- 上图这种情况,当Producer第一次发送消息给Broker时,Broker将消息(x2,y2)追加到了消息流中,
~~~ 但是在返回Ack信号给Producer时失败了(比如网络异常) 。
~~~ 此时,Producer端触发重试机制,将消息(x2,y2)重新发送给Broker,Broker接收到消息后,
~~~ 再次将该消息追加到消息流中,然后成功返回Ack信号给Producer。
~~~ 这样下来,消息流中就被重复追加了两条相同的(x2,y2)的消息。
### --- 幂等性
~~~ 保证在消息重发的时候,消费者不会重复处理。
~~~ 即使在消费者收到重复消息的时候,重复处理,也要保证最终结果的一致性。
~~~ 所谓幂等性,数学概念就是: f(f(x)) = f(x) 。f函数表示对消息的处理。
~~~ 比如,银行转账,如果失败,需要重试。不管重试多少次,都要保证最终结果一定是一致的。
### --- 幂等性实现
~~~ 添加唯一ID,类似于数据库的主键,用于唯一标记一个消息。
~~~ Kafka为了实现幂等性,它在底层设计架构中引入了ProducerID和SequenceNumber。
~~~ ProducerID:在每个新的Producer初始化时,
~~~ 会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
~~~ SequenceNumber:对于每个ProducerID,
~~~ Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。
~~~ 同样,这是一种理想状态下的发送流程。实际情况下,会有很多不确定的因素,
~~~ 比如Broker在发送Ack信号给Producer时出现网络异常,导致发送失败。异常情况如下图所示:
~~~ # 当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。
~~~ 此时,Broker返回Ack信号给Producer时,发生异常导致Producer接收Ack信号失败。
~~~ 对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,
~~~ 但是,由于引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。
~~~ 相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,
~~~ 那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。
### --- 客户端在生成Producer时,会实例化如下代码:
~~~ 实例化一个Producer对象
Producer<String, String> producer = new KafkaProducer<>(props);
### --- 在org.apache.kafka.clients.producer.internals.Sender类中,
~~~ 在run()中有一个maybeWaitForPid()方法,用来生成一个ProducerID,实现代码如下:
private void maybeWaitForPid() {
if (transactionState == null)
return;
while (!transactionState.hasPid()) {
try {
Node node = awaitLeastLoadedNodeReady(requestTimeout);
if (node != null) { ClientResponse response = sendAndAwaitInitPidRequest(node);
if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) { InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
} else {
log.error("Received an unexpected response type for an InitPidRequest from {}. " + "We will back off and try again.", node);
}
} else {
log.debug("Could not find an available broker to send InitPidRequest to. " + "We will back off and try again.");
}
} catch (Exception e) {
log.warn("Received an exception while trying to get a pid.Will back off and retry.", e);
}
log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
time.sleep(retryBackoffMs);
metadata.requestUpdate();
}
}
===============================END===============================
Walter Savage Landor:strove with none,for none was worth my strife.Nature I loved and, next to Nature, Art:I warm'd both hands before the fire of life.It sinks, and I am ready to depart ——W.S.Landor
来自为知笔记(Wiz)
标签:Z.00072,Producer,NO,ProducerID,Broker,kafka,消息,y2,### 来源: https://www.cnblogs.com/yanqivip/p/16125599.html