其他分享
首页 > 其他分享> > kafka:(2) 生产者

kafka:(2) 生产者

作者:互联网

一、向Kafka发送消息的主要步骤

  

二、整体架构

  消息在通过 send()方法发往 broker 的过程中,有可能需要经历拦截器(lnterceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往broker。

  

  那么在此之后又会发生什么呢?

  整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。RecordAccumulator主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源消耗以提升性能。

三、发送消息的3种方式

ProducerRecord<String,String> record = new ProducerRecord<String, String>("CustomerCountry","West","France");
try{
  RecordMetadata recordMetadata = producer.send(record).get();
}catch(Exception e){
  e.printStackTrace();
}

  调用 Future对象的get() 方法等待 Kafka 响应。如果服务器返回错误,get() 方法会抛出异常,如果没有发生错误,我们会得到 RecordMetadata 对象,可以用它获取消息的偏移量。

ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("CustomerCountry", "Huston", "America");
producer.send(producerRecord,new DemoProducerCallBack());

class DemoProducerCallBack implements Callback {
  @Override
  public void onCompletion(RecordMetadata metadata, Exception exception) {
    if(exception != null){
      exception.printStackTrace();;
    }
  }
}

  为了使用回调,需要一个实现了org.apache.kafka.clients.producer.Callback接口的类,这个接口只有一个 onCompletion方法。如果 kafka 返回一个错误,onCompletion 方法会抛出一个非空(non null)异常。

  生产者是可以使用多线程来发送消息的,刚开始的时候可以使用单个消费者和单个线程。如果需要更高的吞吐量,可以在生产者数量不变的前提下增加线程数量。如果这样做还不够,可以增加生产者数量。

  KafkaProducer一般会发生两类错误。其中一类是可重试错误,这类错误可以通过重发消息来解决。比如对于连接错误,可以通过再次建立连接来解决,"无主"错误则可以通过重新为分区选举首领来解决。KafkaProducer 可以被配置成自动重试,如果在多次重试后仍无法解决问题,应用程序会收到一个重试异常。另一类错误无法通过重试来解决,比如消息过大异常。对于这类错误,KafkaProducer 不会进行任何重试,直接抛出异常。

四、生产者的配置

  如果 acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应。如果当中出现了问题,导致服务器没有收到消息,生产者就无从得知,消息也就丢失了。可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。
  如果 acks=1 只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没选举出来)生产者收到一个错误响应,为了避免数据丢失,生产者会重发消息。
  如果 ack=all 只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。不过,它的延迟比 acks=1 时更高,因为我们要等待不止一个服务器节点接收消息。

五、顺序保证

  Kafka可以保证同一个分区里的消息是有序的。即如果生产者按照一定的顺序发送消息,broker就会按照这个顺序把它们写入分区,消费者也会按照同样的顺序去读取它们。
  如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把retires设为0 。可以把max.in.flight.request.per.connection设为1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。

六、分区

  ProducerRecord 对象包含了目标主题、键和值。Kafka 的消息是一个个键值对,ProducerRecord对象可以只包含目标主题和值,键可以设置为默认的null,不过大多数的应用程序会用到键。键有两个用途:可以作为消息的附加信息,也可以用来决定消息该被写到主题的哪个分区。拥有相同键的消息将被写到同一个分区。如果一个进程只从一个主题的分区读取数据,那么具有相同键的所有记录都会被该进程读取。
  如果键为null,并且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。
  如果键不为空,并且使用了默认分区器,那么 Kafka 会对键进行散列(使用Kafka 自己的散列算法)然后根据散列值把消息映射到特定的分区上。这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题所有的分区,而不仅仅是可用的分区。意味着,如果写入数据的分区时不可用的,那么就会发生错误。
  只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。一旦主题增加了新的分区,这些就无法保证了,旧数据仍然留在旧分区,但新记录可能被写到其他分区上。如果要使用键来映射分区,最好在创建主题的时候就把分区规划好,而且永远不要增加新分区。

  自定义分区: 

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
public class BananaPartitioner implements Partitioner {
  public void configure(Map<String, ?> configs) {} 
    
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if ((keyBytes == null) || (!(key instanceOf String))) ➋
    throw new InvalidRecordException("We expect all messages to have customer name as key")

    if (((String) key).equals("Banana"))
    return numPartitions; // Banana总是被分配到最后一个分区
    
   // 其他记录被散列到其他分区
    return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
    }    

  public void close() {}
}

 

标签:重试,生产者,分区,kafka,发送,消息,ProducerRecord
来源: https://www.cnblogs.com/zjxiang/p/15383949.html