其他分享
首页 > 其他分享> > Kafka - 03操作

Kafka - 03操作

作者:互联网

Kafka - 03操作 

一、数据读写

1.1 console

[root@my-node51 ~]# kafka-console-producer.sh --bootstrap-server 192.168.6.51:9092 --topic ttopic
>t1------
>t2------
>t3------
>

[root@my-node52 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.6.51:9092 --topic ttopic --from-beginning
t1------
t3------
t2------

1.2 使用Kafka Connect导入/导出数据

从控制台写入数据并将其写回控制台是一个方便的起点,但有时候可能希望使用其他来源的数据或将数据从Kafka导出到其他系统。可以使用Kafka Connect导入或导出数据,而不是编写自定义集成代码。

Kafka Connect是Kafka附带的工具,用于向Kafka导入和导出数据。它是一个可扩展的工具,运行连接器,实现与外部系统交互的自定义逻辑。

connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

[root@my-node51 config]# grep -v "^#" connect-standalone.properties

bootstrap.servers=192.168.6.51:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

### 读取数据源 [root@my-node51 config]# cat connect-file-source.properties name=local-file-source connector.class=FileStreamSource tasks.max=1 file=test.txt topic=connect-test
### 写入目的地 [root@my-node51 config]# cat connect-file-sink.properties name=local-file-sink connector.class=FileStreamSink tasks.max=1 file=test.sink.txt topics=connect-test
### 开启进程 [root@my-node51 config]# connect-standalone.sh ./connect-standalone.properties ./connect-file-source.properties ./connect-file-sink.properties ### 往源文件写入数据 [root@my-node51 config]# echo test-file4444 >> test.txt [root@my-node51 config]# echo test-file5555 >> test.txt [root@my-node51 config]# echo test-file6666 >> test.txt
### 查看目的文件 [root@my-node51 config]# cat test.sink.txt test-file1111 test-file2222 test-file3333 test-file4444 test-file5555 test-file6666
### 使用console读取数据, 格式为JSON [root@my-node52 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.6.51:9092 --topic connect-test --from-beginning {"schema":{"type":"string","optional":false},"payload":"test-file1111"} {"schema":{"type":"string","optional":false},"payload":"test-file2222"} {"schema":{"type":"string","optional":false},"payload":"test-file3333"} {"schema":{"type":"string","optional":false},"payload":"test-file4444"} {"schema":{"type":"string","optional":false},"payload":"test-file5555"} {"schema":{"type":"string","optional":false},"payload":"test-file6666"}

 

二、topic管理

2.1 修改topic分区数

[root@my-node53 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --describe --topic ttopic
Topic: ttopic   PartitionCount: 3       ReplicationFactor: 1    Configs: segment.bytes=1073741824

### 执行alter操作 [root@my-node53 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --alter --partitions 5 --topic ttopic [root@my-node53 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --describe --topic ttopic Topic: ttopic PartitionCount: 5 ReplicationFactor: 1 Configs: segment.bytes=1073741824 ### 减少topic分区,报错 [root@my-node53 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --alter --partitions 1 --topic ttopic Error while executing topic command : org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 5 partitions, which is higher than the requested 1.

  

2.2 增加副本数

### 增加副本数, 当前topic副本数为1, 增加为2
[root@my-node53 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --describe --topic ttopic
Topic: ttopic   PartitionCount: 5       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: ttopic   Partition: 0    Leader: 3       Replicas: 3     Isr: 3
        Topic: ttopic   Partition: 1    Leader: 1       Replicas: 1     Isr: 1
        Topic: ttopic   Partition: 2    Leader: 2       Replicas: 2     Isr: 2
        Topic: ttopic   Partition: 3    Leader: 3       Replicas: 3     Isr: 3
        Topic: ttopic   Partition: 4    Leader: 1       Replicas: 1     Isr: 1

### 编辑变更文件, JSON格式
[root@my-node53 kafka-2.6.0]# cat add-topic-replica.json
{
  "version": 1,
  "partitions":[
     { "topic": "ttopic", "partition": 0, "replicas": [1,2,3] },
     { "topic": "ttopic", "partition": 1, "replicas": [1,2,3] },
     { "topic": "ttopic", "partition": 2, "replicas": [1,2,3] },
     { "topic": "ttopic", "partition": 3, "replicas": [1,2,3] },
     { "topic": "ttopic", "partition": 4, "replicas": [1,2,3] }
  ]
}

### 执行kafka-reassign-partitions.sh
[root@my-node53 kafka-2.6.0]# ./bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.6.51:9092  --reassignment-json-file ./add-topic-replica.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"ttopic","partition":0,"replicas":[3],"log_dirs":["any"]},{"topic":"ttopic","partition":1,"replicas":[1],"log_dirs":["any"]},
                           {"topic":"ttopic","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"ttopic","partition":3,"replicas":[3],"log_dirs":["any"]},
                           {"topic":"ttopic","partition":4,"replicas":[1],"log_dirs":["any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for ttopic-0,ttopic-1,ttopic-2,ttopic-3,ttopic-4

### 查看变更结果
[root@my-node53 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --describe --topic ttopic
Topic: ttopic   PartitionCount: 5       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: ttopic   Partition: 0    Leader: 3       Replicas: 1,2,3 Isr: 3,1,2
        Topic: ttopic   Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: ttopic   Partition: 2    Leader: 2       Replicas: 1,2,3 Isr: 2,3,1
        Topic: ttopic   Partition: 3    Leader: 3       Replicas: 1,2,3 Isr: 3,1,2
        Topic: ttopic   Partition: 4    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3

 

2.3 迁移topic

### 迁移后的broker 要大于等于 副本数,否则报错
[root@my-node53 kafka-2.6.0]# ./bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.6.51:9092  --topics-to-move-json-file ./move-topic-partition.json --broker-list "4" --generate
Error: Replication factor: 3 larger than available brokers: 1.
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.

### 创建ttopic3, 2个分区,1个副本
[root@my-node53 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --create --topic ttopic3 --partitions 2 --replication-factor 1
Created topic ttopic3.

[root@my-node52 kafka-2.6.0]#  ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --describe --topic ttopic3
Topic: ttopic3  PartitionCount: 2       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: ttopic3  Partition: 0    Leader: 2       Replicas: 2     Isr: 2
        Topic: ttopic3  Partition: 1    Leader: 3       Replicas: 3     Isr: 3

### 编写迁移分区的配置文件
[root@my-node53 kafka-2.6.0]# cat move-topic-partition.json
{"topics": [{"topic": "ttopic3"}], "version": 1}

### 根据topic列表和broker列表,生成迁移计划。
[root@my-node53 kafka-2.6.0]# ./bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.6.51:9092  --topics-to-move-json-file ./move-topic-partition.json --broker-list "4" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"ttopic3","partition":0,"replicas":[2],"log_dirs":["any"]},{"topic":"ttopic3","partition":1,"replicas":[3],"log_dirs":["any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"ttopic3","partition":0,"replicas":[4],"log_dirs":["any"]},{"topic":"ttopic3","partition":1,"replicas":[4],"log_dirs":["any"]}]}

### 迁移计划
[root@my-node53 kafka-2.6.0]# cat expand-cluster-reassignment.json
{"version":1,"partitions":[{"topic":"ttopic3","partition":0,"replicas":[4],"log_dirs":["any"]},{"topic":"ttopic3","partition":1,"replicas":[4],"log_dirs":["any"]}]}

### 执行迁移计划,迁移topic --execute
[root@my-node53 kafka-2.6.0]# ./bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.6.51:9092 --reassignment-json-file ./expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"ttopic3","partition":0,"replicas":[2],"log_dirs":["any"]},{"topic":"ttopic3","partition":1,"replicas":[3],"log_dirs":["any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for ttopic3-0,ttopic3-1

### 检查迁移计划是否执行完成 --verify
[root@my-node53 kafka-2.6.0]# ./bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.6.51:9092 --reassignment-json-file ./expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition ttopic3-0 is complete.
Reassignment of partition ttopic3-1 is complete.

Clearing broker-level throttles on brokers 1,2,3,4
Clearing topic-level throttles on topic ttopic3

### 迁移后 分区信息 [root@my-node52 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --describe --topic ttopic3 Topic: ttopic3 PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: ttopic3 Partition: 0 Leader: 4 Replicas: 4 Isr: 4 Topic: ttopic3 Partition: 1 Leader: 4 Replicas: 4 Isr: 4

 

### 将topic 和 ttopic2 迁移到 1,2,3,4 四个节点上
[root@my-node53 kafka-2.6.0]# ./bin/kafka-reassign-partitions.sh --bootstrap-server 192.168.6.51:9092 --topics-to-move-json-file ./move-topic-partition.json --broker-list "1,2,3,4" --generate Current partition replica assignment {"version":1,"partitions":[{"topic":"ttopic","partition":0,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"ttopic","partition":1,"replicas":[1,2,3],"log_dirs":["any","any","any"]}, {"topic":"ttopic","partition":2,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"ttopic","partition":3,"replicas":[1,2,3],"log_dirs":["any","any","any"]}, {"topic":"ttopic","partition":4,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"ttopic2","partition":0,"replicas":[3,1],"log_dirs":["any","any"]}, {"topic":"ttopic2","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"ttopic2","partition":2,"replicas":[2,3],"log_dirs":["any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"ttopic","partition":0,"replicas":[2,4,1],"log_dirs":["any","any","any"]},{"topic":"ttopic","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]}, {"topic":"ttopic","partition":2,"replicas":[4,2,3],"log_dirs":["any","any","any"]},{"topic":"ttopic","partition":3,"replicas":[1,3,4],"log_dirs":["any","any","any"]}, {"topic":"ttopic","partition":4,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"ttopic2","partition":0,"replicas":[3,2],"log_dirs":["any","any"]}, {"topic":"ttopic2","partition":1,"replicas":[4,3],"log_dirs":["any","any"]},{"topic":"ttopic2","partition":2,"replicas":[1,4],"log_dirs":["any","any"]}]} [root@my-node52 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --describe --topic ttopic Topic: ttopic PartitionCount: 5 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: ttopic Partition: 0 Leader: 1 Replicas: 2,4,1 Isr: 1,2,4 Topic: ttopic Partition: 1 Leader: 1 Replicas: 3,1,2 Isr: 1,2,3 Topic: ttopic Partition: 2 Leader: 4 Replicas: 4,2,3 Isr: 2,3,4 Topic: ttopic Partition: 3 Leader: 1 Replicas: 1,3,4 Isr: 3,1,4 Topic: ttopic Partition: 4 Leader: 1 Replicas: 2,1,3 Isr: 1,2,3
[root@my-node52 kafka-2.6.0]# ./bin/kafka-topics.sh --bootstrap-server 192.168.6.51:9092 --describe --topic ttopic2 Topic: ttopic2 PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: ttopic2 Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2 Topic: ttopic2 Partition: 1 Leader: 4 Replicas: 4,3 Isr: 3,4

 

  

 

 

标签:03,ttopic,--,partition,Kafka,topic,操作,kafka,any
来源: https://www.cnblogs.com/kingdomer/p/16462539.html