SynchronousQueue源码解析(非公平模式)
作者:互联网
上篇文章,介绍了 SynchronousQueue
的公平模式(源码分析)。
这篇文章,从源码入手,解析 非公平模式。
如果你对SynchronousQueue不熟悉,可以先看我的这篇文章(图解SynchronousQueue)。
一、初始化
SynchronousQueue<Integer> queue = new SynchronousQueue<>();
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
也就是,不传参数,或传一个false时,就会实现非公平模式。
内部类 TransferStack
没有带参构造器,看下它的几个属性
static final int REQUEST = 0; // 取元素标识
static final int DATA = 1; // 放元素标识
static final int FULFILLING = 2; // 已匹配标识
// TransferStack的内部类SNode 的几个属性
volatile SNode next; // 栈中下一个元素
volatile SNode match; // 与其匹配的元素
volatile Thread waiter; // 对应的线程
Object item; // 元素的值
int mode;
二、调用 put() 后阻塞
用 图解 SynchronousQueue 那篇文章中的例子。 t1, t2, t3 先后往队列中放 10, 20, 30
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
也就是说 put(e) 方法,会调用 transferer.transfer(e, false, 0) 。
顺便说下,取元素与放元素都会调用这个 transfer 方法,只是参数不同而已。
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // empty or same-mode
if (timed && nanos <= 0) { // can't wait
if (h != null && h.isCancelled())
casHead(h, h.next); // pop cancelled node
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) {
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // try to fulfill
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m is s's match
if (m == null) { // all waiters are gone
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
SNode mn = m.next;
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // lost match
s.casNext(m, mn); // help unlink
}
}
} else { // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
t1、 t2、t3 三个线程,调用 transferer.transfer(e, false, 0),
第三行代码 int mode = (e == null) ? REQUEST : DATA;
,所以 mode 都是 1。
第六行代码 if (h == null || h.mode == mode) {}
这个条件,都是返回 true。
所以 for () {} 这个循环中,t1, t2, t3 会走同一分支的代码,精简如下。
E transfer(E e, boolean timed, long nanos) {
SNode s = null;
int mode = (e == null) ? REQUEST : DATA; // t1, t2, t3 对应都是1
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // t1 - head ==null, t2和t3,符合 h.mode == mode
if (timed && nanos <= 0) { // timed 是 false
// ...
} else if (casHead(h, s = snode(s, e, h, mode))) {
SNode m = awaitFulfill(s, timed, nanos); // 阻塞在这个方法里
if (m == s) { // wait was cancelled
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // try to fulfill
// ...
} else { // help a fulfiller
// ...
}
}
}
整个代码不是很难理解,
casHead(h, s = snode(s, e, h, mode))
这句,是先生成 SNode,然后入栈。
如果入栈失败,则会重头开始,再次执行(for()
这是无限循环)
调用 awaitFulfill(s, false, 0)
时,会阻塞,即 t1, t2, t3 这三个线程,都阻塞在这儿了。
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L; // timed是false
Thread w = Thread.currentThread(); // 当前线程
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel();
SNode m = s.match; // 此次调用,s.match都是null
if (m != null)
return m;
if (timed) { // 不会进
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
if (spins > 0) // spins > 0 ,从头再执行直至spins 减小到 0
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
s.waiter = w; // s 节点标属于哪个线程
else if (!timed)
LockSupport.park(this); // 阻塞
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
这个方法,现在调用最终都会执行 LockSupport.park(this);
即阻塞。
之后 还会再说这方法,那是有取元素的线程,修改了s.match,并唤醒线程,此方法可以跳出。
好的,放元素的三个线程全部阻塞,等待取元素线程工作。结合上面的那张图,整理好思路。
三、调用 take() 取元素
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
t4 线程取元素,调用 transfer(null, false, 0)
方法,和之前讲的,只是一个参数不同而已。
第三行代码 int mode = (e == null) ? REQUEST : DATA;
,所以 mode 是 0。
h == null || h.mode == mode
,这个判断返回false,看下一个判断
else if (!isFulfilling(h.mode))
这个判断返回true
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
// 即 1 & 2, 位运算结果是0
所以,此次调用 transfer(null, false, 0)
方法,会进入第二个分支,精简如下
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) {
// ...
} else if (!isFulfilling(h.mode)) {
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // loop until matched or waiters disappear
SNode m = s.next; // m 是 s 的匹配结点
if (m == null) { // 此次不会进,忽略这段
casHead(s, null);
s = null;
break;
}
SNode mn = m.next;
if (m.tryMatch(s)) { // 本例中,m是t3线程那个节点,s是本次新生成的节点。
casHead(s, mn); // head 移位,即 s和m都出栈
return (E) ((mode == REQUEST) ? m.item : s.item); // 返回
} else // 出现了并发,退出人头再来
s.casNext(m, mn); // help unlink
}
}
} else {
// ...
}
}
}
casHead(h, s=snode(s, e, h, FULFILLING|mode))
这行代码,是生成新SNode入栈。效果如图。
看下for () {}
是匹配出栈的过程。
m.tryMatch(s)
m是t3线程那个节点,s是本次新生节点。
boolean tryMatch(SNode s) {
// 将 t3线程那个节点的match,从null,改为 s
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // 只唤醒一次
waiter = null;
LockSupport.unpark(w); // 唤醒t3线程
}
return true;
}
return match == s;
}
最终 t4 线程,把 t3线程放的元素 30 给取走了,t4 返回了。
同时,t4 线程,将 t3线程的match属性,由null设置为s节点,并唤醒 t3 线程,最终 t3 也返回了。
也就是 取元素的线程,放元素的线程,相互作用,同时成功。
四、put() 方法被唤醒
前面说了,t1 线程的那个节点,match属性,由null设置为s节点。
往上翻看下 awaitFulfill
方法,被唤醒的 t1 线程,从头执行,
// awaitFulfill 方法由此退出
SNode m = s.match;
if (m != null)
return m;
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // 此次不相等
clean(s);
return null;
}
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item); // 返回s.item,即返回 10,
最终返回s.item,即返回 10,不等于null,外层put() 方法结束。
至此,非公平模式的源码解析完毕。
由于 put()
方法和 take()
方法调用的是同一个 transfer()
方法,代码理解起来就不容易。
多看几遍,多画图,大概的流程是可以弄明白的。
另外,本篇示例是先 put(),后 take(),来说明。其实也可以先take(),后 put() 来说明,
有兴趣的话,可以这么试试!
相关文章:
标签:解析,SynchronousQueue,SNode,else,casHead,源码,线程,mode,null 来源: https://blog.csdn.net/every__day/article/details/112547308