其他分享
首页 > 其他分享> > |NO.Z.00072|——————————|BigDataEnd|——|Hadoop&kafka.V57|-------------------------------------------|ka

|NO.Z.00072|——————————|BigDataEnd|——|Hadoop&kafka.V57|-------------------------------------------|ka

作者:互联网



[BigDataHadoop:Hadoop&kafka.V57]                                                                          [BigDataHadoop.kafka][|章节二|Hadoop生态圈技术栈|kafka|稳定性|幂等性|]








一、稳定性:幂等性
### --- 幂等性

~~~     Kafka在引入幂等性之前,Producer向Broker发送消息,
~~~     然后Broker将消息追加到消息流中后给Producer返回Ack信号值。实现流程如下:
~~~     生产中,会出现各种不确定的因素,比如在Producer在发送给Broker的时候出现网络异常。
~~~     比如以下这种异常情况的出现:
### --- 上图这种情况,当Producer第一次发送消息给Broker时,Broker将消息(x2,y2)追加到了消息流中,

~~~     但是在返回Ack信号给Producer时失败了(比如网络异常) 。
~~~     此时,Producer端触发重试机制,将消息(x2,y2)重新发送给Broker,Broker接收到消息后,
~~~     再次将该消息追加到消息流中,然后成功返回Ack信号给Producer。
~~~     这样下来,消息流中就被重复追加了两条相同的(x2,y2)的消息。
### --- 幂等性

~~~     保证在消息重发的时候,消费者不会重复处理。
~~~     即使在消费者收到重复消息的时候,重复处理,也要保证最终结果的一致性。
~~~     所谓幂等性,数学概念就是: f(f(x)) = f(x) 。f函数表示对消息的处理。
~~~     比如,银行转账,如果失败,需要重试。不管重试多少次,都要保证最终结果一定是一致的。
### --- 幂等性实现

~~~     添加唯一ID,类似于数据库的主键,用于唯一标记一个消息。
~~~     Kafka为了实现幂等性,它在底层设计架构中引入了ProducerID和SequenceNumber。
~~~     ProducerID:在每个新的Producer初始化时,
~~~     会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
~~~     SequenceNumber:对于每个ProducerID,
~~~     Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。
~~~     同样,这是一种理想状态下的发送流程。实际情况下,会有很多不确定的因素,
~~~     比如Broker在发送Ack信号给Producer时出现网络异常,导致发送失败。异常情况如下图所示:

~~~     # 当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。
~~~     此时,Broker返回Ack信号给Producer时,发生异常导致Producer接收Ack信号失败。
~~~     对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,
~~~     但是,由于引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。
~~~     相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,
~~~     那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。
### --- 客户端在生成Producer时,会实例化如下代码:
~~~     实例化一个Producer对象

Producer<String, String> producer = new KafkaProducer<>(props);
### --- 在org.apache.kafka.clients.producer.internals.Sender类中,
~~~     在run()中有一个maybeWaitForPid()方法,用来生成一个ProducerID,实现代码如下:

private void maybeWaitForPid() {
    if (transactionState == null)
        return;
    while (!transactionState.hasPid()) {
        try {
            Node node = awaitLeastLoadedNodeReady(requestTimeout);
            if (node != null) { ClientResponse response = sendAndAwaitInitPidRequest(node);
                if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) { InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
                    transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
                } else {
                    log.error("Received an unexpected response type for an InitPidRequest from {}. " + "We will back off and try again.", node);
                }
        } else {
            log.debug("Could not find an available broker to send InitPidRequest to. " + "We will back off and try again.");
        }
        } catch (Exception e) {
            log.warn("Received an exception while trying to get a pid.Will back off and retry.", e);
        }
        log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
        time.sleep(retryBackoffMs);
        metadata.requestUpdate();
    }
}








===============================END===============================


Walter Savage Landor:strove with none,for none was worth my strife.Nature I loved and, next to Nature, Art:I warm'd both hands before the fire of life.It sinks, and I am ready to depart                                                                                                                                                   ——W.S.Landor



来自为知笔记(Wiz)

标签:Z.00072,Producer,NO,ProducerID,Broker,kafka,消息,y2,###
来源: https://www.cnblogs.com/yanqivip/p/16125599.html