其他分享
首页 > 其他分享> > kafka 创建 topic

kafka 创建 topic

作者:互联网

执行 windows 脚本

kafka-topics.bat --create --zookeeper localhost:2181/kafka-zhang --replication-factor 1 --partitions 1 --topic zhang

命令行客户端直接把 topic 元数据写入 zk

// TopicCommand
adminZkClient.createTopic(topic, partitions, replicas, configs, rackAwareMode)

KafkaController 监听 zk 节点的变化,并产生 TopicChange 事件 

case object TopicChange extends ControllerEvent {
  override def state: ControllerState = ControllerState.TopicChange

  override def process(): Unit = {
    if (!isActive) return
    val topics = zkClient.getAllTopicsInCluster.toSet
    val newTopics = topics -- controllerContext.allTopics
    val deletedTopics = controllerContext.allTopics -- topics
    controllerContext.allTopics = topics

    registerPartitionModificationsHandlers(newTopics.toSeq)
    val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(newTopics)
    controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
      !deletedTopics.contains(p._1.topic))
    controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment
    info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
      s"[$addedPartitionReplicaAssignment]")
    if (addedPartitionReplicaAssignment.nonEmpty)
      onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
  }
}

在 KafkaController.TopicChange#process 中触发 KafkaController#onNewPartitionCreation
创建 partition,并改变 partition 状态,选出 leader


选择 leader 的策略也很简单,取 isr 的第一个。
选 leader 的策略可以参考单测:

// kafka.controller.PartitionLeaderElectionAlgorithmsTest

 

标签:val,addedPartitionReplicaAssignment,创建,topics,kafka,topic,controllerContext
来源: https://www.cnblogs.com/allenwas3/p/12864532.html