其他分享
首页 > 其他分享> > 并发学习记录11:原子类型

并发学习记录11:原子类型

作者:互联网

JUC并发包提供了AtomicBoolean,AtomicInteger类,AtomicInteger有下面的方法

AtomicInteger i = new AtomicInteger(0);
// 获取并自增(i = 0, 结果 i = 1, 返回 0),类似于 i++
System.out.println(i.getAndIncrement());
// 自增并获取(i = 1, 结果 i = 2, 返回 2),类似于 ++i
System.out.println(i.incrementAndGet());
// 自减并获取(i = 2, 结果 i = 1, 返回 1),类似于 --i
System.out.println(i.decrementAndGet());
// 获取并自减(i = 1, 结果 i = 0, 返回 1),类似于 i--
System.out.println(i.getAndDecrement());
// 获取并加值(i = 0, 结果 i = 5, 返回 0)
System.out.println(i.getAndAdd(5));
// 加值并获取(i = 5, 结果 i = 0, 返回 0)
System.out.println(i.addAndGet(-5));
// 获取并更新(i = 0, p 为 i 的当前值, 结果 i = -2, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.getAndUpdate(p -> p - 2));
// 更新并获取(i = -2, p 为 i 的当前值, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.updateAndGet(p -> p + 2));
// 获取并计算(i = 0, p 为 i 的当前值, x 为参数1, 结果 i = 10, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
// getAndUpdate 如果在 lambda 中引用了外部的局部变量,要保证该局部变量是 final 的
// getAndAccumulate 可以通过 参数1 来引用外部的局部变量,但因为其不在 lambda 中因此不必是 final
System.out.println(i.getAndAccumulate(10, (p, x) -> p + x));
// 计算并获取(i = 10, p 为 i 的当前值, x 为参数1, 结果 i = 0, 返回 0)
// 其中函数中的操作能保证原子,但函数需要无副作用
System.out.println(i.accumulateAndGet(-10, (p, x) -> p + x));

#原子引用
两种线程安全的实现和一种线程不安全的实现

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;

interface DecimalAccount {
BigDecimal getBalance();

void withdraw(BigDecimal amount);

static void demo(DecimalAccount account) {
    List<Thread> ts = new ArrayList<>();
    for (int i = 0; i < 1000; i++) {
        ts.add(new Thread(() -> {
            account.withdraw(BigDecimal.TEN);
        }));
    }
    ts.forEach(Thread::start);
    ts.forEach(t -> {
        try {
            t.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    System.out.println(account.getBalance());
}

}

class DecimalAccountUnsafe implements DecimalAccount {
BigDecimal balance;

public DecimalAccountUnsafe(BigDecimal balance) {
    this.balance = balance;
}

@Override
public BigDecimal getBalance() {
    return balance;
}

@Override
public void withdraw(BigDecimal amount) {
    BigDecimal balance = this.getBalance();
    this.balance = balance.subtract(amount);
}

}

class DecimalAccountSafeLock implements DecimalAccount {
private final Object lock = new Object();
BigDecimal balance;

public DecimalAccountSafeLock(BigDecimal balance) {
    this.balance = balance;
}

@Override
public BigDecimal getBalance() {
    return balance;
}

@Override
public void withdraw(BigDecimal amount) {
    synchronized (lock) {
        BigDecimal balance = this.getBalance();
        this.balance = balance.subtract(amount);
    }
}

}

class DecimalAccountSafeCas implements DecimalAccount {
AtomicReference ref;

public DecimalAccountSafeCas(BigDecimal balance) {
    this.ref = new AtomicReference<>(balance);
}

@Override
public BigDecimal getBalance() {
    return ref.get();
}

@Override
public void withdraw(BigDecimal amount) {
    while (true) {
        BigDecimal prev = ref.get();
        BigDecimal next = prev.subtract(amount);
        if (ref.compareAndSet(prev, next)) {
            break;
        }
    }
}

}

public class TestAccount01 {
public static void main(String[] args) {
DecimalAccount.demo(new DecimalAccountUnsafe(new BigDecimal(10000)));
DecimalAccount.demo(new DecimalAccountSafeLock(new BigDecimal(10000)));
DecimalAccount.demo(new DecimalAccountSafeCas(new BigDecimal(10000)));
}
}


#ABA问题
说的就是主线程仅能判断出共享变量的值和最初值A是否相同,不能感知A变为B又变为A的情况,如果主线程希望只要有其他线程动过了共享变量,那么自己的CAS就算失败,这样的话,关是比较值是不够的,还需要增加一个版本号。

##ABA问题示例

@Slf4j(topic = "ch.TestABA")
public class TestABA {
static AtomicReference ref = new AtomicReference<>("A");

public static void main(String[] args) throws InterruptedException {
    log.debug("main start");
    String prev = ref.get();
    other();
    Thread.sleep(10);
    log.debug("change A->C {}", ref.compareAndSet(prev, "C"));
}

private static void other() throws InterruptedException {
    new Thread(() -> {
        log.debug("change A-> B{}", ref.compareAndSet(ref.get(), "B"));
    }).start();
    Thread.sleep(10);
    new Thread(() -> {
        log.debug("change B->A {}", ref.compareAndSet(ref.get(), "A"));
    }, "t2").start();
}

}


##引入解决方案:AtomicStampedReference

@Slf4j(topic = "ch.TestABA01")
public class TestABA01 {
//第一个参数是引用变量,第二个参数是版本号
static AtomicStampedReference ref = new AtomicStampedReference<>("A", 0);

public static void main(String[] args) throws InterruptedException {
    log.debug("main start");
    String prev = ref.getReference();
    int stamp = ref.getStamp();
    log.debug("版本是{}", stamp);//这里的版本号应该是0
    other();
    Thread.sleep(10);
    //实际上这里的两个版本号参数应该分别是2,3
    log.debug("change A-C  {}", ref.compareAndSet(prev, "C", stamp, stamp + 1));
}

public static void other() {
    new Thread(() -> {
        log.debug("change A-B {}", ref.compareAndSet(ref.getReference(), "B",
                ref.getStamp(), ref.getStamp() + 1));
    }, "t1").start();
    log.debug("更新版本为 {}", ref.getStamp());
    new Thread(() -> {
        log.debug("change B-A {}", ref.compareAndSet(ref.getReference(), "A",
                ref.getStamp(), ref.getStamp() + 1));
    }, "t2").start();
    log.debug("更新的版本是{}", ref.getStamp());
}

}

AtomicStampedReference可以给原子引用加上版本号,追踪原子引用的整个变化的过程,如:A-B-A-C,通过原子引用携带的版本号,我们可以知道引用变量中途被更改了几次。
但是有时候,我们并不关心引用变量更改了几次,只是单纯关心是否更改过,所以就有了AtomicMarkableReference

AtomicMarkableReference用例,主要是注意一些参数的变化:

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.atomic.AtomicMarkableReference;

@Slf4j(topic = "ch.AtomicMarkableReferenceTest01")
public class AtomicMarkableReferenceTest01 {
public static void main(String[] args) throws InterruptedException {
GarbageBage bag = new GarbageBage("装满垃圾的袋子");
//参数一是引用,参数二记录的是一个状态值
AtomicMarkableReference ref = new AtomicMarkableReference<>(bag, true);
log.debug("主线程开始");
GarbageBage prev = ref.getReference();
log.debug(prev.toString());
new Thread(() -> {
log.debug("清洁线程");
bag.setDesc("空垃圾袋");
while (!ref.compareAndSet(bag, bag, true, false)) {
}
log.debug(ref.getReference().toString());

    }).start();
    Thread.sleep(1000);
    log.debug("主线程想更换一个新的垃圾袋");
    //prev指的是之前的引用,cur是新替换的引用,第三个参数是预期的状态,第四个参数是新设的状态
    GarbageBage cur = new GarbageBage("空垃圾袋");
    boolean isChange = ref.compareAndSet(prev, cur, false, true);
    log.debug("更换了吗?" + isChange);
    log.debug(ref.getReference().toString());
}

}



#原子数组
##目的是保护数组内部元素的线程安全
包括AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
一个小测试:

//测试原子数组
public class AtomicArrayTest02 {
public static void main(String[] args) throws InterruptedException {
test02();
test03();
}

//测试多线程操作普通数组,其实每个位置的最后值预期是1000的,但是因为在多线程环境下,出现了错误
public static void test02() throws InterruptedException {
    List<Thread> ts = new ArrayList<>();
    int[] nums = new int[5];
    for (int i = 0; i < 5; i++) {
        ts.add(new Thread(() -> {
            for (int j = 0; j < 10000; j++) {
                nums[j % 5]++;
            }
        }, "t" + i));
    }
    ts.forEach(Thread::start);
    for (Thread t : ts) {
        t.join();
    }
    System.out.print("普通数组情况:");
    for (int i = 0; i < nums.length; i++) {
        System.out.print(nums[i]);
        if (i != nums.length - 1) {
            System.out.print(" , ");
        }
    }
    System.out.println();
}

//利用AtomicIntegerArray做测试
public static void test03() throws InterruptedException {
    List<Thread> ts = new ArrayList<>();
    AtomicIntegerArray array = new AtomicIntegerArray(5);
    for (int i = 0; i < array.length(); i++) {
        ts.add(new Thread(() -> {
            for (int j = 0; j < 10000; j++) {
                array.incrementAndGet(j % 5);
            }
        }, "t" + i));
    }
    ts.forEach(Thread::start);
    for (Thread t : ts) {
        t.join();
    }
    System.out.print("线程安全数组:");
    for (int i = 0; i < array.length(); i++) {
        System.out.print(array.get(i));
        if (i != array.length() - 1) {
            System.out.print(" , ");
        }
    }
    System.out.println();
}

}


总结:Atomic数组类型保护了多线程访问数组元素的安全性


#字段更新器
AtomicReferenceFieldUpdater // 域 字段
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater

一个小的测试demo

//测试原子数组
public class AtomicArrayTest02 {
public static void main(String[] args) throws InterruptedException {
test02();
test03();
}

//测试多线程操作普通数组,其实每个位置的最后值预期是1000的,但是因为在多线程环境下,出现了错误
public static void test02() throws InterruptedException {
    List<Thread> ts = new ArrayList<>();
    int[] nums = new int[5];
    for (int i = 0; i < 5; i++) {
        ts.add(new Thread(() -> {
            for (int j = 0; j < 10000; j++) {
                nums[j % 5]++;
            }
        }, "t" + i));
    }
    ts.forEach(Thread::start);
    for (Thread t : ts) {
        t.join();
    }
    System.out.print("普通数组情况:");
    for (int i = 0; i < nums.length; i++) {
        System.out.print(nums[i]);
        if (i != nums.length - 1) {
            System.out.print(" , ");
        }
    }
    System.out.println();
}

//利用AtomicIntegerArray做测试
public static void test03() throws InterruptedException {
    List<Thread> ts = new ArrayList<>();
    AtomicIntegerArray array = new AtomicIntegerArray(5);
    for (int i = 0; i < array.length(); i++) {
        ts.add(new Thread(() -> {
            for (int j = 0; j < 10000; j++) {
                array.incrementAndGet(j % 5);
            }
        }, "t" + i));
    }
    ts.forEach(Thread::start);
    for (Thread t : ts) {
        t.join();
    }
    System.out.print("线程安全数组:");
    for (int i = 0; i < array.length(); i++) {
        System.out.print(array.get(i));
        if (i != array.length() - 1) {
            System.out.print(" , ");
        }
    }
    System.out.println();
}

}


结果:

普通数组情况:7851 , 7746 , 7842 , 7812 , 7754
线程安全数组:10000 , 10000 , 10000 , 10000 , 10000

从结果可以看到,普通数组在多线程的操作下会出现线程安全问题,而Atomic数组则很好的保持了线程安全性


#原子累加器
##累加器性能比较
样例:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

public class LongAdderTest02 {
public static void main(String[] args) {
//分别利用longAdder和atomicLong做多线程的累加,明显atomicLong的效率更高
for (int i = 0; i < 5; i++) {
test01();
}
for (int i = 0; i < 5; i++) {
test02();
}
}

//利用longAdder做多线程的累加
public static void test01() {
    long start = System.nanoTime();
    LongAdder longAdder = new LongAdder();
    List<Thread> ts = new ArrayList<>();
    for (int i = 0; i < 5; i++) {
        ts.add(new Thread(() -> {
            for (int j = 0; j < 100000; j++) {
                longAdder.increment();
            }
        }, "t" + i));
    }
    for (Thread t : ts) {
        t.start();
    }
    ts.forEach(t -> {
        try {
            t.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    long end = System.nanoTime();
    System.out.println("longAdder : " + longAdder + " , " + (end - start) / 1000);
}

//利用atomicLong做多线程的累加
public static void test02() {
    long start = System.nanoTime();
    AtomicLong atomicLong = new AtomicLong();
    List<Thread> ts = new ArrayList<>();
    for (int i = 0; i < 5; i++) {
        ts.add(new Thread(() -> {
            for (int j = 0; j < 100000; j++) {
                atomicLong.getAndIncrement();
            }
        }, "t" + i));
    }
    ts.forEach(Thread::start);
    ts.forEach(t -> {
        try {
            t.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    long end = System.nanoTime();
    System.out.println("atomicLong : " + atomicLong + " , " + (end - start) / 1000);
}

}


输出:

longAdder : 500000 , 104070
longAdder : 500000 , 8519
longAdder : 500000 , 2658
longAdder : 500000 , 2403
longAdder : 500000 , 2354
atomicLong : 500000 , 23483
atomicLong : 500000 , 20091
atomicLong : 500000 , 13030
atomicLong : 500000 , 12249
atomicLong : 500000 , 12655

总体来看,明显longAdder的效率更高

性能提升的原因非常简单,就是在有竞争时,设置多个累加单元,就是设置多个cell,线程0累加cell0,线程1累加cell1,最后将结果汇总。这样它们在累加时操作的就是不一样的cell变量,因此就减少了CAS重试失败,从而提高了性能。简单来说,就是CAS在多个进程修改同一变量时很可能会遇到频繁重试(因为自己这个线程在修改变量的过程中,别的线程会改变这个变量,从而自己这个线程就会重新读取再进行修改,这样效率会降低),所以longAdder通过增加共享的cell就可以减少重试次数从而提高效率。

#LongAdder之源码
LongAdder的关键属性:

// 累加单元数组, 懒惰初始化
transient volatile Cell[] cells;
// 基础值, 如果没有竞争, 则用 cas 累加这个域
transient volatile long base;
// 在 cells 创建或扩容时, 置为 1, 表示加锁
transient volatile int cellsBusy;

##LongAdder的关键属性之一:Cell为累加单元

//防止缓存行伪共享
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
// 用cas方式来进行累加,cmp为旧值,val为新的值
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}
@sun.misc.Contended用来防止缓存行伪共享
缓存是用来缓解CPU和内存之间由于运算速度差异过大而导致效率变低的内部存储。
计算机内部存储结构如下:其中越靠近CPU,存储操作数据的速度是越快的,而容量是越低的

因为CPU和内存的速度差异非常大,需要靠预读取数据到缓存来提升效率
而缓存以缓存行为单位,每个缓存行对应着一块内存,一般是64个byte
缓存的加入会导致数据副本的产生,即同一份数据会缓存到不同CPU的缓存行中
CPU需要保证数据的一致性,如果某个CPU核心修改了数据,其他CPU核心对应的整个缓存行必须失效


![](https://www.icode9.com/i/l/?n=22&i=blog/1272072/202209/1272072-20220908172744698-1908050326.png)
举一个例子,由于cell是数组在内存中是连续存储的,一个cell的大小是24byte,因此缓存行可以存下两个cell对象。
假设CPU1需要更改cell[0],CPU2需要修改cell[1]。假设CPU1先修改成功,把cell[0]改成6001,这是CPU2中的cell[0]仍为6000,这时候根据数据的一致性,所以CPU2中的缓存行失效。这就是缓存行失效的概念。

@sun.misc.Contended注解的解决方案:这个注解通过在注解对象的前后增加padding,就是可以让本来应该在同一个缓存行的两个相邻的数组元素变成身处两个缓存行中。这样CPU1读取的就是缓存行1,CPU2读取的就是缓存行2,更改并不会造成缓存行失效,最后导致效率的下降。
![](https://www.icode9.com/i/l/?n=22&i=blog/1272072/202209/1272072-20220908173515721-1531911809.png)


LongAdder实现累加操作主要调用了两个方法,add和longAccumulate方法
add方法源码:

public void add(long x) {
//as是累加单元数组,b是累加基础值,x是累加的递增值
Cell[] as; long b, v; int m; Cell a;
//进入if的条件有两个,第一个:as不为空,表示已经发生过竞争,第二个:给bas做cas累加的时候返回false,代表base发生了竞争
if ((as = cells) != null || !casBase(b = base, b + x)) {
//uncontended 表示cell没有竞争
boolean uncontended = true;

        if (//cell还没创建
            as == null || (m = as.length - 1) < 0 
            //当前的线程还没cell,getProbe是根据当前线程生成一个int值
            ||(a = as[getProbe() & m]) == null 
            //cas给当前线程的cell累加失败,代表有竞争
            ||!(uncontended = a.cas(v = a.value, v + x)))
            //进入cell数组创建,cell创建的流程
            longAccumulate(x, null, uncontended);
    }
}
流程如下:
![](https://www.icode9.com/i/l/?n=22&i=blog/1272072/202209/1272072-20220911090835718-846361612.png)

longAccumulate方法源码:

final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;

标签:11,int,System,原子,并发,new,ref,public,out
来源: https://www.cnblogs.com/wbstudy/p/16656388.html