zookeeper分布式锁,解决了羊群效应,
作者:互联网
zookeeper 实现分布式锁,监听前一个节点来避免羊群效应,
思路:很简单,但是实现起来要麻烦一些, 而且我也是看了很多帖子,发现很多帖子的代码,下载下来逐步调试之后发现,看起来是对的,但在并发情况下运行,就会出现问题. 有的在代码里其实并没有真正实现(监听前一个节点),
接下来分享一个: 真正的zookeeper 分布式锁: 这个也是别人的代码,自己只是搬运工,很遗憾,我自己写的分布式锁,看起来是对的,但是出现了并发问题,
package tech.codestory.zookeeper.aalvcai.base_I0Itec_ZK_lock;
import lombok.extern.slf4j.Slf4j;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author 邱润泽
**/
@Slf4j
public class ZookeeperLock {
private String server = "127.0.0.1:2181";
private ZkClient zkClient;
private static final String rootPath = "/qiurunze-lock1";
public ZookeeperLock() {
zkClient = new ZkClient(server, 5000, 20000);
buildRoot();
}
// 构建根节点
public void buildRoot() {
if (!zkClient.exists(rootPath)) {
zkClient.createPersistent(rootPath);
}
}
// 获取锁
public Lock lock(String lockId, long timeout) {
// 创建临时节点
Lock lockNode = createLockNode(lockId);
lockNode = tryActiveLock(lockNode);// 尝试激活锁
if (!lockNode.isActive()) {
try {
synchronized (lockNode) {
lockNode.wait(timeout); // 线程锁住
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
if (!lockNode.isActive()) {
throw new RuntimeException(" lock timeout");
}
return lockNode;
}
// 释放锁
public void unlock(Lock lock) {
if (lock.isActive()) {
zkClient.delete(lock.getPath());
}
}
// 尝试激活锁
private Lock tryActiveLock(Lock lockNode) {
// 获取根节点下面所有的子节点
List<String> list = zkClient.getChildren(rootPath)
.stream()
.sorted()
.map(p -> rootPath + "/" + p)
.collect(Collectors.toList()); // 判断当前是否为最小节点
log.info("Thread: {}, list : {}",Thread.currentThread().getName(),list);
String firstNodePath = list.get(0);
log.info("Thread: {}, firstNodePath: {}",Thread.currentThread().getName(),firstNodePath);
// 最小节点是不是当前节点
if (firstNodePath.equals(lockNode.getPath())) {
lockNode.setActive(true);
} else {
String upNodePath = list.get(list.indexOf(lockNode.getPath()) - 1);
log.info("Thread: {},监听的节点是: {}",Thread.currentThread().getName(),upNodePath);
zkClient.subscribeDataChanges(upNodePath, new IZkDataListener() {
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
// 事件处理 与心跳 在同一个线程,如果Debug时占用太多时间,将导致本节点被删除,从而影响锁逻辑。
System.out.println("节点删除:" + dataPath);
Lock lock = tryActiveLock(lockNode);
synchronized (lockNode) {
if (lock.isActive()) {
lockNode.notify(); // 释放了
}
}
zkClient.unsubscribeDataChanges(upNodePath, this);
}
});
}
return lockNode;
}
public Lock createLockNode(String lockId) {
String nodePath = zkClient.createEphemeralSequential(rootPath + "/" + lockId, "w");
return new Lock(lockId, nodePath);
}
}
class Test01{
static volatile int num = 0;
static ZookeeperLock zookeeperLock = new ZookeeperLock();
public static void main(String[] args){
for (int i = 0; i < 10; i++) {
new Thread(()->{
try {
Lock zkLock = zookeeperLock.lock("lvcai", 5000 );
TimeUnit.MILLISECONDS.sleep(100);
for (int j = 0; j < 10; j++) {
num++;
}
System.out.println( "num的值是 : "+ num );
zookeeperLock.unlock(zkLock);
} catch (Exception e) {
e.printStackTrace();
}
},"线程"+i).start();
}
}
}
class Lock {
private String lockId;
private String path;
private boolean active;
public Lock(String lockId, String path) {
this.lockId = lockId;
this.path = path;
}
public Lock() {
}
public String getLockId() {
return lockId;
}
public void setLockId(String lockId) {
this.lockId = lockId;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
public boolean isActive() {
return active;
}
public void setActive(boolean active) {
this.active = active;
}
}
标签:lockNode,lockId,Lock,zookeeper,节点,羊群,分布式,public,String 来源: https://www.cnblogs.com/lvcai/p/13974803.html