其他分享
首页 > 其他分享> > 关于kafka定期清理日志后再消费报错kafka.common.OffsetOutOfRangeException的解决

关于kafka定期清理日志后再消费报错kafka.common.OffsetOutOfRangeException的解决

作者:互联网

 

 

 环境:

kafka  0.10

spark  2.1.0

zookeeper  3.4.5-cdh5.14.0

公司阿里云测试机,十月一放假前,没有在继续消费,假期过后回来再使用spark streaming消费某个消费组下的kafka时报错如下:

As I regularly kill the servers running Kafka and the producers feeding it (yes, just for fun), things sometimes go a bit crazy, not entirely sure why but I got the error:

kafka.common.OffsetOutOfRangeError: FetchResponse(topic='my_messages', partition=0, error=1, highwaterMark=-1, messages=)
To fix it I added the “seek” setting:

consumer.seek(0,2)

 

出现问题的原因:

kafka会定时清理日志

当我们的任务开始的时候,如果之前消费过某个topic,那么这个topic会在zk上设置offset,我们一般会去获取这个offset来继续从上次结束的地方继续消费,但是kafka定时清理日志的功能,比如定时一天一清理,那么如果你的offset是前天消费的offset,那么这个时候你再去消费,自然而然的你的offset肯定已经不在有效范围内,所以就报OffsetOutOfRangeException了

解决:

需要在发现zk_offset<earliest_offset>时矫正zk_offset为合法值

 

前期完整代码

https://www.cnblogs.com/niutao/p/10547831.html

 

改正后的关键代码:

//TODO  解决kafka中数据还没来得及消费,数据就已经丢失或者过期了#########################
  //Offsets out of range with no configured reset policy for partitions
    /**
      * 获取最小offset
      *
      * @param consumer   消费者
      * @param partitions topic分区
      * @return
      */
    def getEarliestOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {

      consumer.seekToBeginning(partitions)
      partitions.map(tp => tp -> consumer.position(tp)).toMap
    }

    /**
      * 获取最小offset
      * Returns the earliest (lowest) available offsets, taking new partitions into account.
      *
      * @param kafkaParams kafka客户端配置
      * @param topics      获取获取offset的topic
      */
    def getEarliestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
      val newKafkaParams = mutable.Map[String, Object]()
      newKafkaParams ++= kafkaParams
      newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
      consumer.subscribe(topics)
      val parts = consumer.assignment()
      consumer.seekToBeginning(parts)
      consumer.pause(parts)
      val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
      consumer.unsubscribe()
      consumer.close()
      offsets
    }

    /**
      * 获取最大offset
      *
      * @param consumer   消费者
      * @param partitions topic分区
      * @return
      */
    def getLatestOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
      consumer.seekToEnd(partitions)
      partitions.map(tp => tp -> consumer.position(tp)).toMap
    }

    /**
      * 获取最大offset
      * Returns the latest (highest) available offsets, taking new partitions into account.
      *
      * @param kafkaParams kafka客户端配置
      * @param topics      需要获取offset的topic
      **/
    def getLatestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
      val newKafkaParams = mutable.Map[String, Object]()
      newKafkaParams ++= kafkaParams
      newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
      val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
      consumer.subscribe(topics)
      val parts = consumer.assignment()
      consumer.seekToEnd(parts)
      consumer.pause(parts)
      val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
      consumer.unsubscribe()
      consumer.close()
      offsets
    }

    /**
      * 获取消费者当前offset
      *
      * @param consumer   消费者
      * @param partitions topic分区
      * @return
      */
    def getCurrentOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
      partitions.map(tp => tp -> consumer.position(tp)).toMap
    }

    /**
      * 获取offsets
      *
      * @param kafkaParams kafka参数
      * @param topics      topic
      * @return
      */
    def getCurrentOffset(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
      val offsetResetConfig = kafkaParams.getOrElse(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").toString.toLowerCase
      val newKafkaParams = mutable.Map[String, Object]()
      newKafkaParams ++= kafkaParams
      newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
      val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
      consumer.subscribe(topics)
      val notOffsetTopicPartition = mutable.Set[TopicPartition]()
      try {
        consumer.poll(0)
      } catch {
        case ex: NoOffsetForPartitionException =>
          log.warn(s"consumer topic partition offset not found:${ex.partition()}")
          notOffsetTopicPartition.add(ex.partition())
      }
      val parts = consumer.assignment().toSet
      consumer.pause(parts)
      val topicPartition = parts.diff(notOffsetTopicPartition)
      //获取当前offset
      val currentOffset = mutable.Map[TopicPartition, Long]()
      topicPartition.foreach(x => {
        try {
          currentOffset.put(x, consumer.position(x))
        } catch {
          case ex: NoOffsetForPartitionException =>
            log.warn(s"consumer topic partition offset not found:${ex.partition()}")
            notOffsetTopicPartition.add(ex.partition())
        }
      })
      //获取earliestOffset
      val earliestOffset = getEarliestOffsets(consumer, parts)
      earliestOffset.foreach(x => {
        val value = currentOffset.get(x._1)
        if (value.isEmpty) {
          currentOffset(x._1) = x._2
        } else if (value.get < x._2) {
          log.warn(s"kafka data is lost from partition:${x._1} offset ${value.get} to ${x._2}")
          currentOffset(x._1) = x._2
        }
      })
      //获取lastOffset
      val latestOffset = if (offsetResetConfig.equalsIgnoreCase("earliest")) {
        getLatestOffsets(consumer, topicPartition)
      } else {
        getLatestOffsets(consumer, parts)
      }
      latestOffset.foreach(x => {
        val value = currentOffset.get(x._1)
        if (value.isEmpty || value.get > x._2) {
          currentOffset(x._1) = x._2
        }
      })
      consumer.unsubscribe()
      consumer.close()
      currentOffset.toMap
    }
View Code

 

  /**
    * 包装createDirectStream方法,支持Kafka Offset,用于创建Kafka Streaming流
    *
    * @param ssc    Spark Streaming Context
    * @param topics Kafka话题
    * @tparam K Kafka消息Key类型
    * @tparam V Kafka消息Value类型
    * @return Kafka Streaming流
    */
  def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext, topics: Seq[String]): InputDStream[ConsumerRecord[K, V]] = {
    val groupId = kafkaParams("group.id").toString
    //TODO
//    val storedOffsets: Map[TopicPartition, Long] = readOffsets(topics, groupId)
    val storedOffsets: Map[TopicPartition, Long] = getCurrentOffset(kafkaParams , topics)
    log.info("Kafka消息偏移量汇总(格式:(话题,分区号,偏移量)):{}", storedOffsets.map(off => (off._1.topic, off._1.partition(), off._2)))
    val kafkaStream = KafkaUtils.createDirectStream[K, V](ssc, PreferConsistent, ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, storedOffsets))
    kafkaStream
  }
View Code

 

完整代码,拿去直接用就可以了

import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils}
import org.slf4j.LoggerFactory

import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import scala.util.Try
import com.cartravel.loggings.Logging
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsumer, NoOffsetForPartitionException}
import org.apache.kafka.common.TopicPartition

import scala.collection.JavaConversions._
import scala.collection.mutable
/**
  * Kafka的连接和Offset管理工具类
  *
  * @param zkHosts     Zookeeper地址
  * @param kafkaParams Kafka启动参数
  */
class KafkaManager(zkHosts: String, kafkaParams: Map[String, Object]) extends Serializable {
  //Logback日志对象,使用slf4j框架
  @transient private lazy val log = LoggerFactory.getLogger(getClass)
  //建立ZkUtils对象所需的参数
  val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(zkHosts, 10000, 10000)
//  zkClient.setZkSerializer(new MyZkSerializer())
  //ZkUtils对象,用于访问Zookeeper
  val zkUtils = new ZkUtils(zkClient, zkConnection, false)
  //TODO  解决kafka中数据还没来得及消费,数据就已经丢失或者过期了#########################
  //Offsets out of range with no configured reset policy for partitions
    /**
      * 获取最小offset
      *
      * @param consumer   消费者
      * @param partitions topic分区
      * @return
      */
    def getEarliestOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {

      consumer.seekToBeginning(partitions)
      partitions.map(tp => tp -> consumer.position(tp)).toMap
    }

    /**
      * 获取最小offset
      * Returns the earliest (lowest) available offsets, taking new partitions into account.
      *
      * @param kafkaParams kafka客户端配置
      * @param topics      获取获取offset的topic
      */
    def getEarliestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
      val newKafkaParams = mutable.Map[String, Object]()
      newKafkaParams ++= kafkaParams
      newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
      consumer.subscribe(topics)
      val parts = consumer.assignment()
      consumer.seekToBeginning(parts)
      consumer.pause(parts)
      val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
      consumer.unsubscribe()
      consumer.close()
      offsets
    }

    /**
      * 获取最大offset
      *
      * @param consumer   消费者
      * @param partitions topic分区
      * @return
      */
    def getLatestOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
      consumer.seekToEnd(partitions)
      partitions.map(tp => tp -> consumer.position(tp)).toMap
    }

    /**
      * 获取最大offset
      * Returns the latest (highest) available offsets, taking new partitions into account.
      *
      * @param kafkaParams kafka客户端配置
      * @param topics      需要获取offset的topic
      **/
    def getLatestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
      val newKafkaParams = mutable.Map[String, Object]()
      newKafkaParams ++= kafkaParams
      newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
      val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
      consumer.subscribe(topics)
      val parts = consumer.assignment()
      consumer.seekToEnd(parts)
      consumer.pause(parts)
      val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
      consumer.unsubscribe()
      consumer.close()
      offsets
    }

    /**
      * 获取消费者当前offset
      *
      * @param consumer   消费者
      * @param partitions topic分区
      * @return
      */
    def getCurrentOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
      partitions.map(tp => tp -> consumer.position(tp)).toMap
    }

    /**
      * 获取offsets
      *
      * @param kafkaParams kafka参数
      * @param topics      topic
      * @return
      */
    def getCurrentOffset(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
      val offsetResetConfig = kafkaParams.getOrElse(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").toString.toLowerCase
      val newKafkaParams = mutable.Map[String, Object]()
      newKafkaParams ++= kafkaParams
      newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
      val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
      consumer.subscribe(topics)
      val notOffsetTopicPartition = mutable.Set[TopicPartition]()
      try {
        consumer.poll(0)
      } catch {
        case ex: NoOffsetForPartitionException =>
          log.warn(s"consumer topic partition offset not found:${ex.partition()}")
          notOffsetTopicPartition.add(ex.partition())
      }
      val parts = consumer.assignment().toSet
      consumer.pause(parts)
      val topicPartition = parts.diff(notOffsetTopicPartition)
      //获取当前offset
      val currentOffset = mutable.Map[TopicPartition, Long]()
      topicPartition.foreach(x => {
        try {
          currentOffset.put(x, consumer.position(x))
        } catch {
          case ex: NoOffsetForPartitionException =>
            log.warn(s"consumer topic partition offset not found:${ex.partition()}")
            notOffsetTopicPartition.add(ex.partition())
        }
      })
      //获取earliestOffset
      val earliestOffset = getEarliestOffsets(consumer, parts)
      earliestOffset.foreach(x => {
        val value = currentOffset.get(x._1)
        if (value.isEmpty) {
          currentOffset(x._1) = x._2
        } else if (value.get < x._2) {
          log.warn(s"kafka data is lost from partition:${x._1} offset ${value.get} to ${x._2}")
          currentOffset(x._1) = x._2
        }
      })
      //获取lastOffset
      val latestOffset = if (offsetResetConfig.equalsIgnoreCase("earliest")) {
        getLatestOffsets(consumer, topicPartition)
      } else {
        getLatestOffsets(consumer, parts)
      }
      latestOffset.foreach(x => {
        val value = currentOffset.get(x._1)
        if (value.isEmpty || value.get > x._2) {
          currentOffset(x._1) = x._2
        }
      })
      consumer.unsubscribe()
      consumer.close()
      currentOffset.toMap
    }
  //#########################################################
  /**
    * 包装createDirectStream方法,支持Kafka Offset,用于创建Kafka Streaming流
    *
    * @param ssc    Spark Streaming Context
    * @param topics Kafka话题
    * @tparam K Kafka消息Key类型
    * @tparam V Kafka消息Value类型
    * @return Kafka Streaming流
    */
  def createDirectStream[K: ClassTag, V: ClassTag](ssc: StreamingContext, topics: Seq[String]): InputDStream[ConsumerRecord[K, V]] = {
    val groupId = kafkaParams("group.id").toString
    //TODO
//    val storedOffsets: Map[TopicPartition, Long] = readOffsets(topics, groupId)
    val storedOffsets: Map[TopicPartition, Long] = getCurrentOffset(kafkaParams , topics)
    log.info("Kafka消息偏移量汇总(格式:(话题,分区号,偏移量)):{}", storedOffsets.map(off => (off._1.topic, off._1.partition(), off._2)))
    val kafkaStream = KafkaUtils.createDirectStream[K, V](ssc, PreferConsistent, ConsumerStrategies.Subscribe[K, V](topics, kafkaParams, storedOffsets))
    kafkaStream
  }
  /**
    * 从Zookeeper读取Kafka消息队列的Offset
    *
    * @param topics  Kafka话题
    * @param groupId Kafka Group ID
    * @return 返回一个Map[TopicPartition, Long],记录每个话题每个Partition上的offset,如果还没消费,则offset为0
    */
  def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = {
    val topicPartOffsetMap = collection.mutable.HashMap.empty[TopicPartition, Long]
    val partitionMap = zkUtils.getPartitionsForTopics(topics)
    // /consumers/<groupId>/offsets/<topic>/
    partitionMap.foreach(topicPartitions => {
      val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, topicPartitions._1)
      topicPartitions._2.foreach(partition => {
        val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + partition
        val tryGetKafkaOffset = Try {
          val offsetStatTuple = zkUtils.readData(offsetPath)
          if (offsetStatTuple != null) {
            log.info("查询Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](topicPartitions._1, partition.toString, offsetStatTuple._1, offsetPath): _*)
            topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), offsetStatTuple._1.toLong)
          }
        }
        if(tryGetKafkaOffset.isFailure){
          //http://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
          val consumer = new KafkaConsumer[String, Object](kafkaParams)
          val partitionList = List(new TopicPartition(topicPartitions._1, partition))
          consumer.assign(partitionList)
          val minAvailableOffset = consumer.beginningOffsets(partitionList).values.head
          consumer.close()
          log.warn("查询Kafka消息偏移量详情: 没有上一次的ZK节点:{}, 话题:{}, 分区:{}, ZK节点路径:{}, 使用最小可用偏移量:{}", Seq[AnyRef](tryGetKafkaOffset.failed.get.getMessage, topicPartitions._1, partition.toString, offsetPath, minAvailableOffset): _*)
          topicPartOffsetMap.put(new TopicPartition(topicPartitions._1, Integer.valueOf(partition)), minAvailableOffset)
        }
      })
    })
    topicPartOffsetMap.toMap
  }
  /**
    * 保存Kafka消息队列消费的Offset
    *
    * @param rdd            SparkStreaming的Kafka RDD,RDD[ConsumerRecord[K, V]
    * @param storeEndOffset true=保存结束offset, false=保存起始offset
    */
  def persistOffsets[K, V](rdd: RDD[ConsumerRecord[K, V]], storeEndOffset: Boolean = true): Unit = {
    val groupId = kafkaParams("group.id").toString
    val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    offsetsList.foreach(or => {
      val zkGroupTopicDirs = new ZKGroupTopicDirs(groupId, or.topic)
      val offsetPath = zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition
      val offsetVal = if (storeEndOffset) or.untilOffset else or.fromOffset
      zkUtils.updatePersistentPath(zkGroupTopicDirs.consumerOffsetDir + "/" + or.partition, offsetVal + "" /*, JavaConversions.bufferAsJavaList(acls)*/)
      log.debug("保存Kafka消息偏移量详情: 话题:{}, 分区:{}, 偏移量:{}, ZK节点路径:{}", Seq[AnyRef](or.topic, or.partition.toString, offsetVal.toString, offsetPath): _*)
    })
  }


}
完整解决代码

 

标签:OffsetOutOfRangeException,String,val,TopicPartition,param,kafka,报错,._,consumer
来源: https://www.cnblogs.com/niutao/p/11654679.html