编程语言
首页 > 编程语言> > Java 基础6 - 线程

Java 基础6 - 线程

作者:互联网

线程

实现线程的两种方式:继承 Thread,实现 Runnable 接口

有了 Thread 不就够了?通过继承Thread来实现线程虽然比较简单,但 Java 中每个类最多只能有一个父类,如果类已经有父类了,就不能再继承 Thread。

启动线程调 start 而不是 run,一个线程对象只能启动一次

线程的状态

  1. NEW:没有调用 start 的线程状态为 NEW。
  2. TERMINATED:线程运行结束后状态为 TERMINATED。
  3. RUNNABLE:调用 start 后线程在执行 run 方法且没有阻塞时状态为 RUNNABLE,不过,RUNNABLE 不代表 CPU 一定在执行该线程的代码,可能正在执行也可能在等待操作系统分配时间片,只是它没有在等待其他条件。
  4. BLOCKED、WAITING、TIMED_WAITING:都表示线程被阻塞了,在等待某些条件。

除了main线程外,至少还有一个负责垃圾回收的线程,这个线程就是 daemon 线程,在 main 线程结束的时候,垃圾回收线程也会退出。

Thread 几个静态方法:

  1. sleep 方法

    用该方法会让当前线程睡眠指定的时间,单位是毫秒。睡眠期间,该线程会让出 ** CPU(CPU可以去干其他事了),睡眠期间,线程可以被中断**,如果被中断,sleep 会抛出 InterruptedException 异常。

  2. yield 方法

    调用该方法,是告诉操作系统的调度器:现在不着急占用 CPU,可以先让其他线程运行。不过,这对调度器也仅仅是建议,调度器如何处理是不一定的,它可能完全忽略该调用。

  3. join 方法

    可以让调用join的线程等待该线程结束,join 实际上就是调用了 wait 方法。

共享内存及可能存在的问题

每个线程表示一条单独的执行流,有自己的程序计数器,有自己的栈,但线程之间可以共享内存,它们可以访问和操作相同的对象。当多条执行流执行相同的程序代码时,每条执行流都有单独的栈,方法中的参数和局部变量都有自己的一份。当多条执行流可以操作相同的变量时,可能会出现一些意料之外的结果,包括竞态条件内存可见性问题。

竞态条件

所谓竞态条件(race condition)是指,当多个线程访问和操作同一个对象时,最终执行结果与执行时序有关,可能正确也可能不正确。

10个线程同时对一个变量counter执行加一,可能每次结果都不一样,因为counter++这个操作不是原子操作,它分为三个步骤:

  1. 取counter的当前值
  2. 在当前值基础上加1
  3. 将新值重新赋值给counter。

如何解决这个问题:

  1. synchronized关键字
  2. 显式锁
  3. 原子变量

内存可见性

多个线程可以共享访问和操作相同的变量,但一个线程对一个共享变量的修改,另一个线程不一定马上就能看到,甚至永远也看不到。这就是内存可见性问题。在计算机系统中,除了内存,数据还会被缓存在 CPU 的寄存器以及各级缓存中,当访问一个变量时,可能直接从寄存器或 CPU 缓存中获取,而不一定到内存中去取,当修改一个变量时,也可能是先写到缓存中,稍后才会同步更新到内存中。

怎么解决:

  1. volatile 关键字
  2. synchronized 关键字
  3. 显式锁

Synchronized关键字

synchronized 可以用于修饰类的实例方法静态方法代码块

方法加了 synchronized 后,方法内的代码就变成了原子操作。

synchronized 实例方法实际保护的是同一个对象的方法调用,确保同时只能有一个线程执行。

synchronized 保护的是对象而非代码,只要访问的是同一个对象的 synchronized 方法,即使是不同的代码,也会被同步顺序访问。synchronized 方法不能防止非 synchronized 方法被同时执行,所以一般在保护变量时,需要在所有访问该变量的方法上加上 synchronized 。

  1. 实例方法

    synchronized 实例方法保护的是当前实例对象,即this, this 对象有一个锁和一个等待队列,锁只能被一个线程持有,其他试图获得同样锁的线程需要等待。

    执行synchronized实例方法的过程大致如下:

    1. 尝试获得锁,如果能够获得锁,继续下一步,否则加入等待队列,阻塞并等待唤醒
    2. 执行实例方法体代码
    3. 释放锁,如果等待队列上有等待的线程,从中取一个并唤醒,如果有多个等待的线程,唤醒哪一个是不一定的,不保证公平性。

    当前线程不能获得锁的时候,它会加入等待队列等待,线程的状态会变为 BLOCKED。

  2. 静态方法

    对静态方法,保护的是类对象。实际上,每个对象都有一个锁和一个等待队列,类对象也是。

  3. 代码块

    synchronized 括号里面的就是保护的对象,因为任意对象都有一个锁和等待队列,或者说,任何对象都可以作为锁对象。

几个特征

  1. 可重入性

    对同一个执行线程,它在获得了锁之后,在调用其他需要同样锁的代码时,可以直接调用。

    可重入是通过记录锁的持有线程和持有数量来实现的,当调用被 synchronized 保护的代码时,检查对象是否已被锁,如果是,再检查是否被当前线程锁定,如果是,增加持有数量,如果不是被当前线程锁定,才加入等待队列,当释放锁时,减少持有数量,当数量变为0时才释放整个锁。

  2. 内存可见性

    在释放锁时,所有写入都会写回内存,而获得锁后,都会从内存中读最新数据。

  3. 死锁

    应该尽量避免在持有一个锁的同时去申请另一个锁,如果确实需要多个锁,所有代码都应该按照相同的顺序去申请锁。

协作

多线程之间除了竞争访问同一个资源外,也经常需要相互协作,基本方式就是 wait/notify

Java 的根父类是 Object , Java 在 Object 类而非 Thread 类中定义了一些线程协作的基本方法,这些方法有两类,一类是 wait ,另一类是 notify 。

wait实际上做了什么?除了用于锁的等待队列,每个对象还有另一个等待队列,表示条件队列,该队列用于线程间的协作。调用wait就会把当前线程放到条件队列上并阻塞,表示当前线程执行不下去了,它需要等待一个条件,这个条件它自己改变不了,需要其他线程改变。

但调用wait时,线程会释放对象锁。

一个线程因为等待某个条件执行不下去,当这个条件改变之后就该调用 notify 方法了,notify 会从条件队列中选一个线程,将其从队列中移除并唤醒,选哪个是不确定的。而 notifyAll 会移除条件队列中所有的线程并全部唤醒。

调用notify会把在条件队列中等待的线程唤醒并从队列中移除,但它不会释放对象锁。

唤醒之后线程会重新尝试竞争获得锁:如果能够获得锁,线程状态变为RUNNABLE,并从wait调用中返回,否则,该线程加入对象锁等待队列,线程状态变为BLOCKED,只有在获得锁后才会从 wait 调用中返回。

线程从wait调用中返回后,不代表其等待的条件就一定成立,它需要重新检查其等待的条件,这也是在条件附近看到 while 而不是 if 的原因。

中断

停止一个线程的主要机制是中断,中断并不是强迫终止一个线程,它是一种协作机制,是给线程传递一个取消信号,但是由线程来决定如何以及何时退出。

每个线程都有一个标志位,表示该线程是否被中断了。

中断相关的方法:

public void interrupt(); // 中断线程
public boolean isInterrupted(); // 线程的中断标志位是否为true
public static boolean interrupted(); // 线程的中断标志位是否为true + 清空中断标志位

注意:interrupt方法不一定会真正“中断”线程,

不同状态对中断信号的反应

  1. RUNNABLE

    线程在运行或具备运行条件只是在等待操作系统调度。

  2. WAITING/TIMED_WAITING

    线程在等待某个条件或超时。线程调用join/wait/sleep方法会进入WAITING或TIMED_WAITING状态。调用interrupt()会使得该线程抛出InterruptedException。需要注意的是,抛出异常后,中断标志位会被清空,而不是被设置。InterruptedException是一个受检异常,线程必须进行处理。

  3. BLOCKED

    线程在等待锁,试图进入同步块。调用interrupt()只是会设置线程的中断标志位,线程依然会处于BLOCKED状态,也就是说,interrupt()并不能使一个在等待锁的线程真正“中断”。

    test方法在持有锁lock的情况下启动线程a,而线程a也去尝试获得锁lock,所以会进入锁等待队列,随后test调用线程a的interrupt方法并调用join等待线程线程a结束,线程a会结束吗?不会,interrupt方法只会设置线程的中断标志,而并不会使它从锁等待队列中出来。

        public static void test() throws InterruptedException {
            synchronized (lock) {
                A a = new A();
                a.start();
                Thread.sleep(1000);
                a.interrunpt();
                a.join();
            }
        }
    

    注意:在使用 synchronized 关键字获取锁的过程中不响应中断请求,这是 synchronized 的局限性。

  4. NEW/TERMINATE

    线程还未启动或已结束。调用 interrupt() 对它没有任何效果,中断标志位也不会被设置。

取消/关闭线程的正确方式

原子操作

CAS

原子操作依赖一个很重要的方法:

public final boolean compareAndSet(int expect, int update)

这个方法就被成为CAS。该方法有两个参数 expect 和 update ,以原子方式实现了如下功能:如果当前值等于 expect ,则更新为 update ,否则不更新,如果更新成功,返回 true,否则返回 false 。

以 AtomicInteger 为例,AtomicInteger 可以在程序中用作一个计数器,多个线程并发更新,也总能实现正确性。它的主要内部成员是:

public volatile int value; // 这个变量天生保证内存可见性

AtomicInteger 有个方法 incrementAndGet:

public final int incrementAndGet() {
    for(;;) {
        int current = get(); // 获取当前值value
        int next = current + 1; // 计算期望的值next
        // 调CAS方法进行更新,如果更新没有成功,说明value被别的线程改了,则再去取最新值并尝试更新直到成功为止。
        if(compareAndSet(current, next)) {
            return next;
        }
    }
}

与 synchronized 锁相比,这种原子更新方式代表一种不同的思维方式。synchronized 是悲观的,它假定更新很可能冲突,所以先获取锁,得到锁后才更新。原子变量的更新逻辑是乐观的,它假定冲突比较少,但使用 CAS 更新,也就是进行冲突检测,如果确实冲突了,那也没关系,继续尝试就好了。

AQS

AQS是一个抽象类AbstractQueuedSynchronizer。

AQS封装了一个状态,给子类提供了查询和设置状态的方法:

public volatile int state;
protected final int getState();
protected final void setState(int newState);
protected final boolean compareAndSetState(int expect, int update);

用于实现锁时,AQS 可以保存锁的当前持有线程,提供了方法进行查询和设置:

private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread t);
protected final Thread getExclusiveOwnerThread();

AQS内部维护了一个等待队列,借助 CAS 方法实现了无阻塞算法进行更新。

BUG

使用 CAS 方式更新有一个 ABA 问题。该问题是指,假设当前值为A,如果另一个线程先将 A 修改成 B ,再修改回成 A ,当前线程的 CAS 操作无法分辨当前值发生过变化。

ABA 是不是一个问题与程序的逻辑有关,一般不是问题。而如果确实有问题,解决方法是使用 AtomicStampedReference

显示锁

简介

显式锁接口和类主要有:

  1. 锁接口Lock,主要实现类是 ReentrantLock
  2. 读写锁接口 ReadWriteLock,主要实现类是 ReentrantReadWriteLock

Lock 接口定义为:

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    // 可以避免死锁。在持有一个锁获取另一个锁而获取不到的时候,可以释放已持有的锁,给其他线程获取锁的机会,然后重试获取所有锁。
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}
  1. lock()/unlock():就是普通的获取锁和释放锁方法,lock()会阻塞直到成功。
  2. lockInterruptibly():与lock()的不同是,它可以响应中断。
  3. tryLock():只是尝试获取锁,立即返回,不阻塞
  4. tryLock(long time, TimeUnit unit):先尝试获取锁,如果能成功则立即返回true,否则阻塞等待
  5. newCondition:新建一个条件,一个Lock可以关联多个条件(见下文显式条件)

Lock接口的主要实现类是ReentrantLock,底层依赖CAS,AQS,ReentrantLock,它的基本用法lock/unlock实现了与synchronized一样的语义,包括:

  1. 可重入,一个线程在持有一个锁的前提下,可以继续获得该锁
  2. 可以解决竞态条件问题
  3. 可以保证内存可见性

相较于synchronized

ReentrantLock 和 synchronized 都是默认不保证公平。使用显式锁,一定要记得调用 unlock。

相比 synchronized , ReentrantLock 可以实现与 synchronized 相同的语义,而且支持以非阻塞方式获取锁,可以响应中断,可以限时,更为灵活。

synchronized代表一种声明式编程思维,程序员更多的是表达一种同步声明,由 Java 系统负责具体实现,程序员不知道其实现细节;显式锁代表一种命令式编程思维,程序员实现所有细节。

简单总结下,能用 synchronized 就用 synchronized,不满足要求时再考虑 ReentrantLock。

显式条件

显式锁与 synchronized 相对应,而显式条件与 wait/notify 相对应。wait/notify与synchronized配合使用,显式条件与显式锁配合使用。

Condition 表示条件变量,是一个接口,其中有 await、signal、signalAll 方法。

await 对应于 Object 的 wait , signal 对应于 notify, signalAll 对应于 notifyAll,语义也是一样的。

一般的 await 相关方法都是响应中断的,如果发生了中断,会抛出 InterruptedException,但中断标志位会被清空。awaitUnInterruptibly() 方法不会响应中断,它不会由于中断结束,但当它返回时,如果等待过程中发生了中断,中断标志位会被设置。

await在进入等待队列后,会释放锁,释放CPU,当其他线程将它唤醒后,或等待超时后,或发生中断异常后,它都需要重新获取锁,获取锁后,才会从 await 方法中退出。

示例:

    static class MyBlockQueue<E> {
        private Queue<E> queue = null;
        private int limit;
        private Lock lock = new ReentrantLock();

        private Condition notFull = lock.newCondition();
        private Condition notEmpty = lock.newCondition();

        public  MyBlockQueue(int limit) {
            this.limit = limit;
            // ArrayDeque是线程不安全的
            queue = new ArrayDeque<>();
        }

        private void put(E e) throws InterruptedException {
            lock.lockInterruptibly();
            try {
                // 队列满,在notFull等待,不让放
                while (queue.size() == limit) {
                    notFull.await();
                }
                queue.add(e);
                // 唤醒一下,现在不空了
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }

        public E take() throws InterruptedException {
            lock.lockInterruptibly();
            try {
                // 队列空,在notEmpty等待,不让取
                while (queue.isEmpty()) {
                    notEmpty.await();
                }
                E e = queue.poll();
                // 唤醒一下,现在不满了
                notFull.signal();
                return e;
            } finally {
                lock.unlock();
            }
        }
    }

上述代码定义了两个等待条件:不满(notFull)、不空(notEmpty)。在put方法中,如果队列满,则在notFull上等待;在take方法中,如果队列空,则在notEmpty上等待。put操作后通知 notEmpty, take 操作后通知 notFull。这样,代码更清晰易读。

异步任务

基本接口:

  1. Runnable 和 Callable:表示要执行的异步任务。
  2. Executor 和 ExecutorService:表示执行服务。
  3. Future 表示异步任务的结果。

Runnable 没有返回结果,而 Callable 有,Runnable 不会抛出异常,而 Callable 会。

Executor 表示最简单的执行服务,可以执行一个 Runnable,没有返回结果。

ExecutorService 扩展了 Executor,其中的 submit 方法表示提交一个任务,返回值类型都是 Future,返回后,只是表示任务已提交,不代表已执行,通过Future可以查询异步任务的状态、获取最终结果、取消任务等。

Future中的 get 用于返回异步任务最终的结果,如果任务还未执行完成,会阻塞等待;cancel 用于取消异步任务,如果任务已完成、或已经取消、或由于某种原因不能取消, cancel 返回 false,否则返回true。isDone 和 isCancelled 用于查询任务状态。isCancelled 表示任务是否被取消,只要 cancel 方法返回了 true,随后的isCancelled 方法都会返回 true,即使执行任务的线程还未真正结束。isDone 表示任务是否结束,不管什么原因都算。

Future 是一个重要的概念,是实现“任务的提交”与“任务的执行”相分离的关键,任务提交者和任务执行服务通过它隔离各自的关注点,同时进行协作。

基本使用:

package AsyncTask;

import java.util.Random;
import java.util.concurrent.*;

public class AsyncTaskDemo {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        Future<Integer> future = executorService.submit(new Task());
        System.out.println("这是主线程");
        Thread.sleep(100);
        try {
            System.out.println("任务结果" + future.get());
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        executorService.shutdown();
    }

    static class Task implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            int sleepSeconds = new Random().nextInt(1000);
            System.out.println("子线程开始休眠");
            Thread.sleep(sleepSeconds);
            System.out.println("子线程休眠结束");
            return sleepSeconds;
        }
    }
}

示例中的大致步骤就是:

  1. 定义一个任务描述要做的事
  2. 创建ExecutorService实例
  3. ExecutorService实例提交任务
  4. 在别处获取任务结果
  5. 关闭ExecutorService

其中 ExecutorService 有两个关闭方法:ExecutorServicshutdown 和 shutdownNow。区别是,shutdown表示不再接受新任务,shutdownNow不仅不接受新任务,而且会终止已提交但尚未执行的任务,对于正在执行的任务,一般会调用线程的interrupt方法尝试中断,不过,线程可能不响应中断,shutdownNow会返回已提交但尚未执行的任务列表。shutdown 和 shutdownNow 不会阻塞等待,它们返回后不代表所有任务都已结束,调用者可以通过awaitTermination等待所有任务结束。

ExecutorService 有两组批量提交任务的方法:invokeAll 和 invokeAny。invokeAll 等待所有任务完成,返回的 Future 列表中,每个 Future 的 isDone 方法都返回true,不过 isDone 为true不代表任务就执行成功了,可能是被取消了。而对于 invokeAny,只要有一个任务在限时内成功返回了,它就会返回该任务的结果,其他任务会被取消

原理

好累啊不想写了

线程池

线程池主要由两个概念组成:一个是任务队列;另一个是工作者线程。工作者线程主体就是一个循环,循环从队列中接受任务并执行,任务队列保存待执行的任务。( JavaScript 中的异步实现也是类似的套路哦)

  1. 它可以重用线程,避免线程创建的开销。
  2. 任务过多时,通过排队避免创建过多线程,减少系统资源消耗和竞争,确保任务有序完成。

线程池的实现类是 ThreadPoolExecutor,它继承自 AbstractExecutorService ,实现了 ExecutorService ,基本用法与上节异步任务介绍的类似。

构造方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue);

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory);

第二个构造方法多了两个参数 threadFactory 和 handler,这两个参数一般不需要,第一个构造方法会设置默认值。参数 corePoolSize、maximumPoolSize、keepAliveTime、unit 用于控制线程池中线程的个数,workQueue 表示任务队列,threadFactory 用于对创建的线程进行一些配置,handler表示任务拒绝策略。

  1. 线程池大小

    1. corePoolSize:核心线程个数
    2. maximumPoolSize:最大线程个数
    3. keepAliveTime和unit:空闲线程存活时间

    一般情况下,有新任务到来的时候,如果当前线程个数小于 corePoolSize,就会创建一个新线程来执行该任务,需要说明的是,即使其他线程现在也是空闲的,也会创建新线程。不过,如果线程个数大于等于 corePoolSize,那就不会立即创建新线程了,它会先尝试排队,需要强调的是,它是“尝试”排队,而不是“阻塞等待”入队,如果队列满了或其他原因不能立即入队,它就不会排队,而是检查线程个数是否达到了 maximumPoolSize,如果没有,就会继续创建线程,直到线程数达到 maximumPoolSize。

    keepAliveTime 的目的是为了释放多余的线程资源,它表示,当线程池中的线程个数大于 corePoolSize 时额外空闲线程的存活时间。如果该值为 0 ,则表示所有线程都不会超时终止。

  2. 队列

    这里要求队列类型是阻塞队列 BlockingQueue。

    • LinkedBlockingQueue:基于链表的阻塞队列,可以指定最大长度,但默认是无界的。
    • ArrayBlockingQueue:基于数组的有界阻塞队列。
    • PriorityBlockingQueue:基于堆的无界阻塞优先级队列。
    • SynchronousQueue:没有实际存储空间的同步阻塞队列。

    注意:如果用的是无界队列,需要强调的是,线程个数最多只能达到 corePoolSize,到达 corePoolSize 后,新的任务总会排队,参数 maximumPoolSize 也就没有意义了。对于 SynchronousQueue,它没有实际存储元素的空间,当尝试排队时,只有正好有空闲线程在等待接受任务时,才会入队成功,否则,总是会创建新线程,直到达到 maximumPoolSize。

  3. 任务拒绝策略

    如果队列有界,且 maximumPoolSize 有限,则当队列排满,线程个数也达到了 maximumPoolSize,这时,新任务会触发线程池的任务拒绝策略。

    ThreadPoolExecuto r实现了4种处理方式。

    1. ThreadPoolExecutor.AbortPolicy:这就是默认的方式,抛出异常。
    2. ThreadPoolExecutor.DiscardPolicy:静默处理,忽略新任务,不抛出异常,也不执行。
    3. ThreadPoolExecutor.DiscardOldestPolicy:将等待时间最长的任务扔掉,然后自己排队。
    4. ThreadPoolExecutor.CallerRunsPolicy:在任务提交者线程中执行任务,而不是交给线程池中的线程执行。

    拒绝策略可以在构造方法中进行指定,也可以通过 set 方法进行指定

  4. 工厂

    线程池还可以接受一个参数:ThreadFactory。它是一个接口,由这个接口l来定义如何创建一个 Thread。

  5. 核心线程

    线程个数小于等于 corePoolSiz e时,我们称这些线程为核心线程,默认情况下:

    • 核心线程不会预先创建,只有当有任务时才会创建。
    • 核心线程不会因为空闲而被终止,keepAliveTime 参数不适用。不过,ThreadPoolExecutor 可以调用方法可以改变这个默认行为。
  6. 死锁

    提交给线程池的任务之间有如果依赖,这种情况可能会导致出现死锁。这个死锁不是说共享资源竞争的死锁,而是单纯的等待,比如任务A,在它的执行过程中,它给同样的任务执行服务提交了一个任务B,但需要等待任务B结束。

    解决办法:可以使用 newCachedThreadPool 创建线程池,让线程数不受限制。另一个解决方法是使用 SynchronousQueue,它可以避免死锁,怎么做到的呢?对于普通队列,入队只是把任务放到了队列中,而对于 SynchronousQueue 来说,入队成功就意味着已有线程接受处理,如果入队失败,可以创建更多线程直到 maximumPoolSize,如果达到了 maximumPoolSize,会触发拒绝机制,不管怎么样,都不会死锁。

定时任务

TimerTask

示例

package AsyncTask;

import java.util.Timer;
import java.util.TimerTask;

public class TimerDemo {
    public static void main(String[] args) throws InterruptedException {
        Timer timer = new Timer();
        timer.schedule(new DelayTask(), 10);
        // 延迟指定时间后以固定时延执行
        timer.schedule(new DelayTask2(), 100, 1000);
        Thread.sleep(4000);
        timer.cancel();
    }

    static class DelayTask extends TimerTask {
        @Override
        public void run() {
            System.out.println("延迟1任务执行");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
    static class DelayTask2 extends TimerTask {
        @Override
        public void run() {
            System.out.println("延迟2任务执行");
        }
    }
}

创建一个 Timer 对象,先运行 DelayTask,再固定周期运行 DelayTask2,最后调用 Timer 的 cancel 方法取消所有定时任务。

这里会发现 DelayTask2 总是等 DelayTas k执行之后才开始输出,因为一个 Timer 对象只有一个 Timer 线程在执行,所以 DelayTask2 被 DelayTask 给强行延迟了。

注意:任务的延迟执行分为固定延时(fixed-delay)与固定频率(fixed-rate),二者都是重复执行,但后一次任务执行相对的时间是不一样的,对于固定延时,它是基于上次任务的“实际”执行时间来算的,如果由于某种原因,上次任务延时了,则本次任务也会延时,而固定频率会尽量补够运行次数。

基本原理

Timer 内部主要由任务队列Timer 线程两部分组成,一个 Timer 对象只有一个 Timer 线程。任务队列是一个基于堆实现的优先级队列,按照下次执行的时间排优先级。Timer 线程主体是一个循环,从队列中获取任务,如果队列中有任务且计划执行时间小于等于当前时间,就执行它,如果队列中没有任务或第一个任务延时还没到,就睡眠。

在执行任何一个任务的 run 方法时,一旦 run 抛出异常,Timer 线程就会退出,从而所有定时任务都会被取消。

如果希望各个定时任务不互相干扰,一定要在 run 方法内捕获所有异常

总之需要注意:

  1. 后台只有一个线程在运行
  2. 固定频率的任务被延迟后,可能会立即执行多次,将次数补够
  3. 固定延时任务的延时相对的是任务执行前的时间
  4. 不要在定时任务中使用无限循环
  5. 一个定时任务的未处理异常会导致所有定时任务被取消

ScheduledExecutorService

由于 Timer/TimerTask 的一些问题,Java 并发包引入了 ScheduledExecutorService。ScheduledExecutorService 的主要实现类是ScheduledThreadPoolExecutor,它是线程池 ThreadPoolExecutor 的子类,是基于线程池实现的。它的任务队列是一个无界的优先级队列,所以最大线程数对它没有作用,即使 corePoolSize 设为 0,它也会至少运行一个线程。

与 Timer 不同,它不支持以绝对时间作为首次运行的时间。另外,单个定时任务的异常不会再导致全部定时任务被取消,即使后台只有一个线程执行任务。不过,需要强调的是,任务发生异常不会在任何地方体现,也就是说在 run 方法里 throw 了之后什么也看不见。所以,与 Timer 中的任务类似,应该捕获所有异常。

package AsyncTask;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledExecutorServiceDemo {
    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService timer = Executors.newScheduledThreadPool(10);
        timer.schedule(new LongRunTask(), 10, TimeUnit.MILLISECONDS);
        timer.scheduleWithFixedDelay(new FixedDelayTask(), 100, 1000, TimeUnit.MILLISECONDS);
        Thread.sleep(4000);
        timer.shutdown();
    }

    static class LongRunTask implements Runnable {

        @Override
        public void run() {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("LongRunTask");

            throw new RuntimeException();
        }
    }

    static class FixedDelayTask implements Runnable {

        @Override
        public void run() {
            System.out.println("FixedDelayTask");
        }
    }
}

原理

ScheduledThreadPoolExecutor 的实现思路与 Timer 基本是类似的,都有一个基于堆的优先级队列,保存待执行的定时任务,它的主要不同是:

工具类

读写锁ReentrantReadWriteLock

synchronized 和显式锁 ReentrantLock,对于同一受保护对象的访问,无论是读还是写,它们都要求获得相同的锁。在一些场景中,这是没有必要的,多个线程的读操作完全可以并行,在读多写少的场景中,让读操作并行可以明显提高性能。

通过一个 ReadWriteLock 产生两个锁:一个读锁,一个写锁。读操作使用读锁,写操作使用写锁。需要注意的是,只有“读-读”操作是可以并行的,“读-写”和“写-写”都不可以。

内部,它们使用同一个整数变量表示锁的状态,16 位给读锁用,16 位给写锁用,使用一个变量便于进行 CAS 操作,锁的等待队列其实也只有一个。写锁的获取,就是确保当前没有其他线程持有任何锁,否则就等待。写锁释放后,也就是将等待队列中的第一个线程唤醒,唤醒的可能是等待读锁的,也可能是等待写锁的。读锁的获取不太一样,首先,只要写锁没有被持有,就可以获取到读锁,此外,在获取到读锁后,它会检查等待队列,逐个唤醒最前面的等待读锁的线程,直到第一个等待写锁的线程。如果有其他线程持有写锁,获取读锁会等待。读锁释放后,检查读锁和写锁数是否都变为了 0,如果是,唤醒等待队列中的下一个线程。

package AsyncTask;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class MyCache {
    private Map<String, Object> map = new HashMap<>();
    private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private Lock readLock = readWriteLock.readLock();
    private Lock writeLock = readWriteLock.writeLock();

    public Object get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Object put(String key, Object value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }

    public void clear() {
        writeLock.lock();
        try {
            map.clear();
        } finally {
            writeLock.unlock();
        }
    }
}

信号量Semaphore

有的单个资源即使可以被并发访问,但并发访问数多了可能影响性能,所以希望限制并发访问的线程数。

一般锁只能由持有锁的线程释放,而 Semaphore 表示的只是一个许可数,任意线程都可以调用其 release 方法。主要的锁实现类 ReentrantLock 是可重入的,而 Semaphore 不是,每一次的 acquire 调用都会消耗一个许可,acquire 是会阻塞的。

package AsyncTask;

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    public static class ConcurrentLimitException extends RuntimeException {
        private static final long serialVersionUID = 1L;
    }

    private static final int MAX_PERMITS = 10;

    private Semaphore permits = new Semaphore(MAX_PERMITS);

    public boolean login(String name, String pwd) {
        if(!permits.tryAcquire()) {
            throw new ConcurrentLimitException();
        }
        // TODO 校验密码
        return true;
    }

    public void logout(String name) {
        // TODO 登出操作
        permits.release();
    }
}

倒计时门栓CountDownLatch

门栓的两种应用场景:一种是同时开始,另一种是主从协作。

同时开始场景中,运行员线程等待主裁判线程发出开始指令的信号,一旦发出后,所有运动员线程同时开始,计数初始为1,运动员线程调用 await,主线程调用 countDown

主从协作模式中,主线程依赖工作线程的结果,需要等待工作线程结束,这时,计数初始值为工作线程的个数,工作线程结束后调用 countDown,主线程调用 await 进行等待。

package AsyncTask;

import java.util.concurrent.CountDownLatch;

// 同时开始场景
public class RacerWithCountDwnLatch {
    static class Racer extends Thread {
        CountDownLatch latch;
        public Racer(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                latch.await(); // 没有countDown信号就会卡在这
                System.out.println(getName() + "开始正式运行" + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int num = 10;
        CountDownLatch latch = new CountDownLatch(1);

        Thread[] racers = new Thread[num];

        for(int i = 0; i < 10; i++) {
            racers[i] = new Racer(latch);
            racers[i].start();
        }

        Thread.sleep(1000);
        // 发信号让线程一起开始动作
        latch.countDown();
    }
}

package AsyncTask;


import java.util.concurrent.CountDownLatch;

// 主从协作场景
public class MasterWorkerDemo {
    static class Worker extends Thread {
        CountDownLatch latch;
        public Worker(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                int sleepTime = (int) (Math.random() * 10);
                System.out.println(sleepTime);
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                latch.countDown();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int num = 10;
        CountDownLatch latch = new CountDownLatch(num);

        Worker[] workers = new Worker[num];
        for(int i = 0; i < num; i++) {
            workers[i] = new Worker(latch);
            workers[i].start();
        }

        latch.await();
        System.out.println("全部结束");
    }
}

循环栅栏CyclicBarrier

CyclicBarrier 特别适用于并行迭代计算,每个线程负责一部分计算,然后在栅栏处等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。

与 CountDownLatch 类似,它也有一个数字,但表示的是参与的线程个数。

它有一个构造方法,接受一个 Runnable 参数,这个参数表示栅栏动作,当所有线程到达栅栏后,在所有线程执行下一步动作前,运行参数中的动作,这个动作由最后一个到达栅栏的线程执行。

CyclicBarrier 的主要方法就是 await,await 在等待其他线程到达栅栏,调用 await 后,表示自己已经到达,如果自己是最后一个到达的,就执行可选的命令,执行后,唤醒所有等待的线程,然后重置内部的同步计数,以循环使用。

package AsyncTask;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierDemo {
    static class Tourist extends Thread {
        CyclicBarrier barrier;
        public Tourist(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                Thread.sleep((int) (Math.random() * 10));

                // 第一次集合
                barrier.await();
                System.out.println(getName() + "继续" + System.currentTimeMillis());
                Thread.sleep((int) (Math.random() * 10));

                // 第二次集合
                barrier.await();
                System.out.println(getName() + "继续" + System.currentTimeMillis());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        int num = 3;
        Tourist[] tourists = new Tourist[num];
        CyclicBarrier barrier = new CyclicBarrier(num, new Runnable() {
            @Override
            public void run() {
                System.out.println("全部集合了" + System.currentTimeMillis() + " 最后执行者:" + Thread.currentThread().getName());
            }
        });

        for (int p = 0; p < num; p++) {
            tourists[p] = new Tourist(barrier);
            tourists[p].start();
        }

    }
}

ThreadLocal

线程本地变量是说,每个线程都有同一个变量的独有拷贝。

多个线程访问的虽然是同一个变量,但每个线程都有自己的独立的值,这就是线程本地变量的含义。

使用场景:日期处理、随机数和上下文信息。

  1. 日期处理

    package Threads.ThreadLocal;
    
    
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * 每个线程使用自己的DateFormat,就不存在安全问题了,在线程的整个使用过程中,只需要创建一次,又避免了频繁创建的开销
     */
    public class ThreadLocalDateFormat {
        static ThreadLocal<SimpleDateFormat> sdf = new ThreadLocal<SimpleDateFormat>() {
            @Override
            protected SimpleDateFormat initialValue() {
                return new SimpleDateFormat("yyyy-MM-dd");
            }
        };
    
        public static String date2String(Date date) {
            return sdf.get().format(date);
        }
    
        public static Date string2Date(String str) throws ParseException {
            return sdf.get().parse(str);
        }
    
    }
    
  2. 随机数

    即使对象是线程安全的,使用 ThreadLocal 也可以减少竞争,它是 Random 的子类,利用了 ThreadLocal,它没有 public 的构造方法,通过静态方法current 获取对象,这个对象就是个就是一个 ThreadLocal 变量。

  3. 上下文信息

    package Threads.ThreadLocal;
    
    public class ReqContext {
        public static class Req {};
    
        private static ThreadLocal<String> localUserId = new ThreadLocal<>();
        private static ThreadLocal<Req> localReq = new ThreadLocal<>();
    
        public static String getCurrentUserId() {
            return localUserId.get();
        }
    
        public static void setCurrentUserId(String userId) {
            localUserId.set(userId);
        }
    
        public static Req getCurrentReq() {
            return localReq.get();
        }
    
        public static void setCurrentReq(Req req) {
            localReq.set(req);
        }
    }
    

    在一个 Web 服务器中,一个线程执行用户的请求,在执行过程中,很多代码都会访问一些共同的信息,比如请求信息、用户身份信息,它们是线程执行过程中的全局信息,在首次获取到信息时,调用 set 方法如 setCurrentRequest/setCurrentUserId 进行设置,然后就可以在代码的任意其他地方调用 get 相关方法进行获取了。

原理

每个线程都有一个 Map,类型为 ThreadLocalMap ,调用set实际上是在线程自己的Map里设置了一个条目,键为当前的 ThreadLocal 对象,值为 value。

每个线程都有一个 Map,对于每个 ThreadLocal 对象,调用其get/set实际上就是以 ThreadLocal 对象为键读写当前线程的 Map,这样,就实现了每个线程都有自己的独立副本的效果。

小结:

本章介绍了 Java 一些同步协作工具:

  1. 在读多写少的场景中使用 ReentrantReadWriteLock 替代 ReentrantLock,以提高性能。
  2. 使用 Semaphore 限制对资源的并发访问数。
  3. 使用 CountDownLatch 实现不同角色线程间的同步。
  4. 使用 CyclicBarrier 实现同一角色线程间的协调一致。
  5. CyclicBarrier 与 CountDownLatch 可能容易混淆,强调下它们的区别:
    • CountDownLatch 的参与线程是有不同角色的,有的负责倒计时,有的在等待倒计时变为 0,负责倒计时和等待倒计时的线程都可以有多个,用于不同角色线程间的同步。
    • CyclicBarrier 的参与线程角色是一样的,用于同一角色线程间的协调一致。
    • CountDownLatch 是一次性的,而 CyclicBarrier 是可以重复利用的。

标签:Java,队列,基础,任务,线程,new,等待,public
来源: https://www.cnblogs.com/nyfblog/p/16523010.html