深入理解Zookeeper(二)如何通过zookeeper实现分布式锁
作者:互联网
二、如何通过zookeeper实现分布式锁
(1)发现的问题
(2)解决方法
(3)代码实现
(1)用户支付订单代码
package com.yyds.quartzstudy.zk;
import org.apache.zookeeper.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class OrderPay {
private final static String timeOutOrderLockPrefix = "/timeOutOrderLock";
public void pay(Long orderId){
try {
String zkAddr = "192.168.42.101:2181,192.168.42.102:2181,192.168.42.103:2181";
// 只要执行一次countDown(),等待的线程就会继续执行
CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper(zkAddr, 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {// 异步创建
countDownLatch.countDown();
}
});
countDownLatch.await();
System.out.println("orderId-" + orderId + " 连接zk成功...");
String status = startPay(zooKeeper, orderId);
System.out.println("orderId-" + orderId+ ",支付状态为:" + status);
}catch (Exception e){
e.printStackTrace();
}
}
private String startPay(ZooKeeper zooKeeper, Long orderId) {
/**
* 支付前,需要先加锁 /timeOutOrderLock/{orderId}
* 如果创建成功,就进行付款操作
* 如果创建失败,就稍后再次付款
*/
String timeOutOrderLockPath = timeOutOrderLockPrefix + "/" + orderId;
try {
zooKeeper.create(timeOutOrderLockPath,
"order".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
// 模拟去支付订单
System.out.println("订单-" + orderId + " 准备支付...");
TimeUnit.SECONDS.sleep(3);
zooKeeper.delete(timeOutOrderLockPath,-1);
System.out.println("订单-" + orderId + " 支付成功...");
return "success";
}catch (KeeperException.NodeExistsException e){
System.out.println("订单-" + orderId + " 创建失败...请稍后重试");
return "failed";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
}
(2)超时处理逻辑
package com.yyds.quartzstudy.zk;
import org.apache.zookeeper.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class OrderTimedOut {
private final static String timeOutOrderLockPrefix = "/timeOutOrderLock";
public void timedOut(){
try {
String zkAddr = "192.168.42.101:2181,192.168.42.102:2181,192.168.42.103:2181";
// 只要执行一次countDown(),等待的线程就会继续执行
CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper = new ZooKeeper(zkAddr, 5000, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {// 异步创建
countDownLatch.countDown();
}
});
countDownLatch.await();
System.out.println("OrderTimedOut-" + " 连接zk成功...");
startUpdateOrderTimedOut(zooKeeper);
}catch (Exception e){
e.printStackTrace();
}
}
private void startUpdateOrderTimedOut(ZooKeeper zooKeeper) {
/**
* 1、首先找出30min没有付款的订单
* 2、处理每条订单,加上分布式锁,尝试创建/timeOutOrderLock/{orderId}
* 如果创建成功,进行业务处理
* 如果创建失败,不作任何处理
* 3、修改需要修改状态的订单
*/
ArrayList<Order> orders = new ArrayList<>();
orders.add(new Order(1L,"NOT_PAY"));
orders.add(new Order(2L,"NOT_PAY"));
Iterator<Order> iterator = orders.iterator();
while (iterator.hasNext()){
Order order = iterator.next();
String timeOutOrderLockPath = timeOutOrderLockPrefix + "/" + order.id;
zooKeeper.create(timeOutOrderLockPath,
"".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL, new AsyncCallback.StringCallback() {
@Override
public void processResult(int i, String s, Object o, String s1) {
if(i == KeeperException.Code.OK.intValue()){
System.out.println("OrderTimedOut开始执行业务处理order.id-" + order.id);
// 模拟处理
try {
TimeUnit.SECONDS.sleep(3);
// 删除锁
zooKeeper.delete(timeOutOrderLockPath,-1);
System.out.println("OrderTimedOut执行业务处理完毕order.id-" + order.id);
} catch (Exception e) {
e.printStackTrace();
}
}else if(i == KeeperException.Code.NODEEXISTS.intValue()){
System.out.println("OrderTimedOut不执行order.id-" + order.id);
iterator.remove();
}else {
System.out.println("OrderTimedOut异常,order.id-" + order.id);
}
}
},"call_back");
}
}
private class Order{
private Long id;
private String status;
public Order(Long id, String status) {
this.id = id;
this.status = status;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
}
}
(3)测试
package com.yyds.quartzstudy.zk;
import java.util.concurrent.TimeUnit;
public class OrderTest {
public static void main(String[] args) throws InterruptedException {
Thread pay = new Thread(() -> new OrderPay().pay(1L));
Thread timeOut = new Thread(() -> new OrderTimedOut().timedOut());
//用户先拿到锁
pay.start();
TimeUnit.SECONDS.sleep(1);
timeOut.start();
TimeUnit.SECONDS.sleep(6);
}
}
(4)测试结果
OrderTimedOut- 连接zk成功...
orderId-1 连接zk成功...
订单-1 准备支付...
OrderTimedOut不执行order.id-1
OrderTimedOut开始执行业务处理order.id-2
订单-1 支付成功...
orderId-1,支付状态为:success
(4)分布式读写锁的逻辑
标签:orderId,zookeeper,String,Zookeeper,public,new,order,id,分布式 来源: https://blog.csdn.net/qq_44665283/article/details/122155072