KafKa
作者:互联网
服务注册发现的过程
etcd由哪几部分构成?
etcd作为一个高可用的键值存储系统,天生就是为了集群化而设计的,一般etcd推荐奇数个节点,推荐的节点数量是 3
、5
、7
构成一个集群。
启动etcd
安装完 etcd
以后,使用 go
进行连接
连接 etcd
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
// 连接etcd
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second, //超时时间
})
if err != nil {
fmt.Println("connect to etcd failed ,err ", err)
return
}
defer cli.Close()
// put
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = cli.Put(ctx, "s4", "真好")
if err != nil {
fmt.Println("put to etcd faild err:", err)
return
}
cancel() //写完关闭
// get
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
gr, err := cli.Get(ctx, "s4")
if err != nil {
fmt.Println("Get from etcd faild err:", err)
return
}
for _, ev := range gr.Kvs {
fmt.Println("从etcd中读取成功")
fmt.Printf("key:%s,value :%s", ev.Key, ev.Value)
}
cancel() //写完关闭
}
watch
用来监控etcd中key的变化(创建,更改,删除)
import (
"context"
"fmt"
"time"
"go.etcd.io/etcd/clientv3"
)
func main() {
// 连接etcd
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second, //超时时间
})
if err != nil {
fmt.Println("connect to etcd failed ,err ", err)
return
}
fmt.Println("etcd 连接成功......")
defer cli.Close()
// watch
watchChan := cli.Watch(context.Background(), "s4")
for wresp := range watchChan { //如果通道里面没有值得话,就会一直在这里阻塞
for _, evt := range wresp.Events {
fmt.Printf("watch 事件,type:%s , key:%s , value: %s\n", evt.Type, evt.Kv.Key, evt.Kv.Value)
}
}
}
测试 watch
使用 etcd
的客户端进行测试
etcdctl put s4 "liudehua"
标签:clientv3,cli,err,fmt,KafKa,etcd,context 来源: https://www.cnblogs.com/rush-peng/p/15314275.html