go任务调度9(op实现分布式乐观锁)
作者:互联网
package main
import (
"go.etcd.io/etcd/clientv3"
"time"
"fmt"
"context"
)
func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
lease clientv3.Lease
leaseGrantResp *clientv3.LeaseGrantResponse
leaseId clientv3.LeaseID
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
keepResp *clientv3.LeaseKeepAliveResponse
ctx context.Context
cancelFunc context.CancelFunc
kv clientv3.KV
txn clientv3.Txn
txnResp *clientv3.TxnResponse
)
// 客户端配置
config = clientv3.Config{
Endpoints: []string{"0.0.0.0:2379"},
DialTimeout: 5 * time.Second,
}
// 建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
// lease实现锁自动过期(上锁之后,如果节点宕机,锁会一直占用,所以要过期机制,也要续租机制):
// op操作
// txn事务: if else then
// 1, 上锁 (创建租约, 自动续租, 拿着租约去抢占一个key)
lease = clientv3.NewLease(client)
// 申请一个5秒的租约
if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
fmt.Println(err)
return
}
// 拿到租约的ID
leaseId = leaseGrantResp.ID
// 准备一个用于取消自动续租的context
ctx, cancelFunc = context.WithCancel(context.TODO())
// 确保函数退出后, 自动续租会停止
defer cancelFunc() //终止自动续租协程(goroutine)
defer lease.Revoke(context.TODO(), leaseId) //告诉etcd把租约直接释放掉,更直接,立即删除,锁就释放了
// 5秒后会取消自动续租
if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
fmt.Println(err)
return
}
// 处理续约应答的协程
go func() {
for {
select {
case keepResp = <- keepRespChan:
if keepRespChan == nil {
fmt.Println("租约已经失效了")
goto END
} else { // 每秒会续租一次, 所以就会受到一次应答
fmt.Println("收到自动续租应答:", keepResp.ID)
}
}
}
END:
}()
// if 不存在key, then 设置它, else 抢锁失败
kv = clientv3.NewKV(client)
// 创建事务
txn = kv.Txn(context.TODO())
// 定义事务
// 如果key不存在(创建版本是0说明没有被创建)
txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet("/cron/lock/job9")) // 否则抢锁失败
// 提交事务
if txnResp, err = txn.Commit(); err != nil {
fmt.Println(err)
return // 没有问题
}
// 判断是否抢到了锁
if !txnResp.Succeeded {
fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}
// 2, 处理业务
fmt.Println("处理任务")
time.Sleep(5 * time.Second)
// 3, 释放锁(取消自动续租, 释放租约)
// 上面的defer 会把租约释放掉, 关联的KV就被删除了
}
(右边的先执行,左边的后执行,左边会提示锁已被占用)
标签:clientv3,main,左边,etcd,go,任务调度,op 来源: https://blog.51cto.com/5660061/2382042