Exchanger类
作者:互联网
1.简述
Exchanger是适用在两个线程之间数据交换的并发工具类,它的作用是找到一个同步点,当两个线程都执行到了同步点(exchange方法)之后(有一个没有执行到就一直等待,也可以设置等待超时时间),就将自身线程的数据与对方交换。
Exchanger使用场景:
- 线程间交互数据。
2.Exchanger的常用方法
/**构造方法 */ //创建一个新的Exchanger Exchanger() /**常用方法 */ //exchange方法用于交互数据V V exchange(V x) //延迟一定时间交换数据 V exchange(V x, long timeout, TimeUnit unit)View Code
3.Exchanger的源码分析
Exchanger的算法核心是通过一个可以交换数据的slot和一个可以带有数据item的参与者。
Exchanger的主要属性:
/** The number of CPUs, for sizing and spin control */ private static final int NCPU = Runtime.getRuntime().availableProcessors(); //arena(Slot数组)的容量。设置这个值用来避免竞争。 private static final int CAPACITY = 32; //arena最大不会超过FULL,避免空间浪费。如果单核或者双核CPU,FULL=0,只有一个SLot可以用。 private static final int FULL = Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1); //自旋等待次数。单核情况下,自旋次数为0;多核情况下为大多数系统线程上下文切换的平均值。该值设置太大会消耗CPU。 private static final int SPINS = (NCPU == 1) ? 0 : 2000; //若在超时机制下,自旋次数更少,因为多个检测超时的时间,这是一个经验值。 private static final int TIMED_SPINS = SPINS / 20; private static final class Node extends AtomicReference<Object> { //创建这个节点的线程提供的用于交换的数据。 public final Object item; //等待唤醒的线程 public volatile Thread waiter; /** * Creates node with given item and empty hole. * @param item the item */ public Node(Object item) { this.item = item; } } //一个Slot就是一对线程交换数据的地方。这里对Slot做了缓存行填充,能够避免伪共享问题。虽然填充导致浪费了一些空间,但Slot是按需创建,一般没什么问题。 private static final class Slot extends AtomicReference<Object> { // Improve likelihood of isolation on <= 64 byte cache lines long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe; } //Slot数组,在需要时才进行初始化,用volatile修饰,因为这样可以安全的使用双重锁检测方式构建。 private volatile Slot[] arena = new Slot[CAPACITY]; //正在使用的slot下标的最大值。当一个线程经历了多次CAS竞争后,这个值会递增。当一个线程自旋等待超时后,这个值会递减。 private final AtomicInteger max = new AtomicInteger();View Code
Exchanger的exchange方法:
/** * 等待其他线程到达交换点,然后与其进行数据交换。 * 如果其他线程到来,那么交换数据,返回。 * 如果其他线程未到来,那么当前线程等待,直到如下情况发生: * 1.有其他线程来进行数据交换。 * 2.当前线程被中断。 */ public V exchange(V x) throws InterruptedException { //检测当前线程是否被中断。 if (!Thread.interrupted()) { //进行数据交换。 Object v = doExchange((x == null) ? NULL_ITEM : x, false, 0); //检测结果是否为null。 if (v == NULL_ITEM) return null; //检测是否被取消。 if (v != CANCEL) return (V)v; //清除中断标记。 Thread.interrupted(); // Clear interrupt status on IE throw } throw new InterruptedException(); } /** * 等待其他线程到达交换点,然后与其进行数据交换。 * 如果其他线程到来,那么交换数据,返回。 * 如果其他线程未到来,那么当前线程等待,直到如下情况发生: * 1.有其他线程来进行数据交换。 * 2.当前线程被中断。 * 3.超时。 */ public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { //检测当前线程是否被中断。 if (!Thread.interrupted()) { //进行数据交换。 Object v = doExchange((x == null) ? NULL_ITEM : x, true, unit.toNanos(timeout)); //检测结果是否为null。 if (v == NULL_ITEM) return null; //检测是否被取消。 if (v != CANCEL) return (V)v; if (!Thread.interrupted()) throw new TimeoutException(); } throw new InterruptedException(); } /**doExchange方法,进行数据交换 */ private Object doExchange(Object item, boolean timed, long nanos) { Node me = new Node(item); //根据thread id计算出自己要去的那个交易位置(slot) int index = hashIndex(); int fails = 0; for (;;) { Object y; Slot slot = arena[index]; //slot = null,创建一个slot,然后会回到for循环,再次开始 if (slot == null) createSlot(index); else if ((y = slot.get()) != null &&//slot里面有人等着(有Node),则尝试和其交换 slot.compareAndSet(y, null)) {//关键点1:slot清空,Node拿出来,俩人在Node里面交互。把Slot让给后面的人,做交互地点 Node you = (Node)y; //把Node里面的东西,换成自己的 if (you.compareAndSet(null, item)) { //唤醒对方 LockSupport.unpark(you.waiter); //自己把对方的东西拿走 return you.item; }//关键点2:如果运气不好,在Node里面要交换的时候,被另一个线程抢了,回到for循环,重新开始 } else if (y == null &&//slot里面为空(没有Node),则自己把位置占住 slot.compareAndSet(null, me)) { //如果是0这个位置,自己阻塞,等待别人来交换 if (index == 0) return timed ? awaitNanos(me, slot, nanos) : await(me, slot); //不是0这个位置,自旋等待 Object v = spinWait(me, slot); //自旋等待的时候,运气好,有人来交换了,返回 if (v != CANCEL) return v; //自旋的时候,没人来交换。走执行下面的,index减半,挪个位置,重新开始for循环 me = new Node(item); int m = max.get(); if (m > (index >>>= 1)) max.compareAndSet(m, m - 1); } else if (++fails > 1) {//失败 case1: slot有人,要交互,但被人家抢了 case2: slot没人,自己要占位置,又被人家抢了 int m = max.get(); //3次匹配失败,把index扩大,再次开始for循环 if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) index = m + 1; else if (--index < 0) index = m; } } } /** * 在下标为0的Slot上等待获取其他线程填充的值。 * 如果在Slot被填充之前超时或者被中断,那么操作失败。 */ private Object awaitNanos(Node node, Slot slot, long nanos) { int spins = TIMED_SPINS; long lastTime = 0; Thread w = null; for (;;) { Object v = node.get(); if (v != null) //如果已经被其他线程填充了值,那么返回这个值。 return v; long now = System.nanoTime(); if (w == null) w = Thread.currentThread(); else nanos -= now - lastTime; lastTime = now; if (nanos > 0) { if (spins > 0) --spins; //先自旋几次。 else if (node.waiter == null) node.waiter = w; //自旋阶段完毕后,将当前线程设置到node的waiter域。 else if (w.isInterrupted()) tryCancel(node, slot); //如果当前线程被中断,尝试取消node。 else LockSupport.parkNanos(node, nanos); //阻塞给定的时间。 } else if (tryCancel(node, slot) && !w.isInterrupted()) //超时后,如果当前线程没有被中断,那么从Slot数组的其他位置看看有没有等待交换数据的节点 return scanOnTimeout(node); } }View Code
4.Exchanger的使用示例
public class Test { public static void main(String[] args) throws Exception { final Exchanger<String> exgr = new Exchanger<String>(); new Thread((new Runnable() { @Override public void run() { try { String A = Thread.currentThread().getName()+"的数据"; System.out.println(Thread.currentThread().getName()+"交互前:"+ A); A = exgr.exchange(A); System.out.println(Thread.currentThread().getName()+"交互后:"+ A); } catch (InterruptedException e) { } } }), "线程1").start(); new Thread((new Runnable() { @Override public void run() { try { String B = Thread.currentThread().getName()+"的数据"; System.out.println(Thread.currentThread().getName()+"交互前:"+ B); B = exgr.exchange(B); System.out.println(Thread.currentThread().getName()+"交互后:"+ B); } catch (InterruptedException e) { } } }), "线程2").start(); } }View Code
5.总结
Exchange和SynchronousQueue类似,都是通过两个线程操作同一个对象实现数据交换,只不过就像我们开始说的,SynchronousQueue使用的是同一个属性,通过不同的isData来区分,多线程并发时,使用了队列进行排队。Exchange使用了一个对象里的两个属性,item和match,不需要isData 属性了,因为在Exchange里面,没有isData这个语义。而多线程并发时,使用数组来控制,每个线程访问数组中不同的槽。
标签:slot,Node,Thread,item,线程,Exchanger,null 来源: https://www.cnblogs.com/bl123/p/14189113.html