kafka 创建 topic
作者:互联网
执行 windows 脚本
kafka-topics.bat --create --zookeeper localhost:2181/kafka-zhang --replication-factor 1 --partitions 1 --topic zhang
命令行客户端直接把 topic 元数据写入 zk
// TopicCommand adminZkClient.createTopic(topic, partitions, replicas, configs, rackAwareMode)
KafkaController 监听 zk 节点的变化,并产生 TopicChange 事件
case object TopicChange extends ControllerEvent { override def state: ControllerState = ControllerState.TopicChange override def process(): Unit = { if (!isActive) return val topics = zkClient.getAllTopicsInCluster.toSet val newTopics = topics -- controllerContext.allTopics val deletedTopics = controllerContext.allTopics -- topics controllerContext.allTopics = topics registerPartitionModificationsHandlers(newTopics.toSeq) val addedPartitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(newTopics) controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p => !deletedTopics.contains(p._1.topic)) controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " + s"[$addedPartitionReplicaAssignment]") if (addedPartitionReplicaAssignment.nonEmpty) onNewPartitionCreation(addedPartitionReplicaAssignment.keySet) } }
在 KafkaController.TopicChange#process 中触发 KafkaController#onNewPartitionCreation
创建 partition,并改变 partition 状态,选出 leader
选择 leader 的策略也很简单,取 isr 的第一个。
选 leader 的策略可以参考单测:
// kafka.controller.PartitionLeaderElectionAlgorithmsTest
标签:val,addedPartitionReplicaAssignment,创建,topics,kafka,topic,controllerContext 来源: https://www.cnblogs.com/allenwas3/p/12864532.html