13.6 线程通信
作者:互联网
目录
一、传统的线程通信——synchronized同步的线程
假设系统中有两个线程,这两个线程分别代表存款者和取钱者——先假设有一种特殊的要求,系统要求存款者和取款者不断存钱、取钱的动作,而且要求存款者将钱存入指定账户后,取钱者就立即取出钱。不允许存款者两次存钱,也不允许取款者两次取钱。
为了实现这种功能,可以借助于Object类提供的wait()、notify()和notifyAll()三个方法,这三个方法并不属于Thread类,而是属于Object类。但这三个方法必须同步监视器对象调用,这可以分为以下两种情况:
1、对于使用synchronized修饰的同步方法,因为该类的默认实例(this)就是同步监视器,所以可以在同步方法中直接调用这三个方法。
2、对于synchronized修饰的同步代码块,同步监视器是synchronized后括号里的对象,所以必须使用该对象调用这三个方法
关于这三个方法的解释如下:
1、wait():导致当前线程等待,直到其他线程调用该同步监视器的notify()方法或notifyAll()方法来唤醒该线程。该wait()方法有三种形式:无时间参数的wait(一直等待,直到其他线程通知),带毫秒参数的wait和带毫秒、微秒参数的wait(这两种方法都是等待指定时间后自动苏醒)。调用wait()方法的当前线程会释放对该同步监视器的锁定。
2、notify():唤醒在此同步监视器上等待的单个线程。如果所有线程都在此同步监视器上等待,则会选择唤醒其中一个线程。选择是任意性的。只有当前线程放弃对该同步监视器的锁定后(使用wait()方法),才可以执行被唤醒的线程。
3、notifyAll():唤醒在此同步监视器上等待的所有线程。只有当前线程放弃对该同步监视器的锁定后,才可以执行被唤醒的线程。
程序通过一个旗帜来标识账户中是否已存款,当旗帜为false时,表示账户中没有存款,存款线程可以向下执行,当存款者把钱存入账户后,将旗帜设为true,并调用notify()或notifyAll()方法来唤醒其他线程;当存款者线程进入线程体后,如果旗帜为true就调用wait()方法让线程等待。
当旗标为true时,表示账户已经存入存款,则取钱者线程可以向下执行,当取钱者把钱从账户中取出来后,将旗帜设为false,并调用notify()或nitiftAll()方法来唤醒其他线程;当取钱者进入线程体后,如果旗帜为false就调用wait()方法让该线程等待。
本程序为Account类提供了draw()和deposit()两个方法,分别对应该账户的取钱、存款等操作,因为这两个方法可能需要并发修改Account类的balance成员的值,所以这两种方法都使用synchronized修饰成同步方法。除此之外还使用了wait()、notifyAll()来控制线程的协作:
package section6;
public class Account
{
//封装账户编号、账户余额的两个成员变量
private String accountNo;
private double balance;
//标识账户中是否已有存款
private boolean flag=false;
public Account(){}
//有参数构造器
public Account(String accountNo,double balance)
{
this.accountNo=accountNo;
this.balance=balance;
}
public String getAccountNo() {
return accountNo;
}
public void setAccountNo(String accountNo) {
this.accountNo = accountNo;
}
//因为账户余额不允许随便修改,所以只为balance提供getter方法
public double getBalance()
{
return this.balance;
}
public synchronized void draw(double drawAmount)
{
try{
//如果flag为假,表明账户中还没有人存钱进去,取钱方法阻塞
if(!flag)
{
wait();
}
else
{
//执行取钱操作
System.out.println(Thread.currentThread().getName()+"取钱:"+drawAmount);
this.balance=this.balance-drawAmount;
System.out.println("账户余额为:"+this.balance);
//将旗帜标识为false,表明取钱完成,等待存钱
this.flag=false;
//唤醒其他线程
notifyAll();
}
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized void deposit(double depositAmount)
{
try{
//如果flag为真,表明账户中已有人存钱进去,存钱方法阻塞
if(flag)//①
{
wait();
}
else
{
//执行存钱行为
System.out.println(Thread.currentThread().getName()+"存款:"+depositAmount);
this.balance=this.balance+depositAmount;
System.out.println("账户余额为"+this.balance);
//将表明账户已有存款旗帜标为true
flag=true;
//唤醒其他线程
notifyAll();
}
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
}
}
上面程序使用wait()和notifyAll()进行控制,对存款者线程而言,当程序进入deposit()方法,如果flag为true,表明账户已有存款,程序调用wait()方法阻塞;否则程序向下执行存款操作,当存款操作执行完成后,系统将flag设为true,然后调用notifyAll()来唤醒其他被阻塞的线程——如果系统中由存款者线程,存款者线程也会被唤醒,但是该存款者线程执行到①好代码处时再次进入阻塞状态,只有执行draw()方法的取钱线程才可以向下执行。同理,取钱者线程也是如此。
程序中存款者线程循环100次重复存款,而取钱者线程循环100次重复取钱,存款者线程和取钱者线程分别调用Account对象的额deposit()、draw()方法来实现:
package section6;
public class DrawThread extends Thread
{
//模拟用户账户
private Account account;
//当前线程所希望的取钱钱数
private double drawAmount;
public DrawThread(String name,Account account,double drawAmount)
{
super(name);
this.account=account;
this.drawAmount=drawAmount;
}
//重复100次取钱操作
public void run()
{
for(var i=0;i<100;i++)
{
account.draw(drawAmount);
}
}
}
存钱线程类程序
package section6;
public class DepositThread extends Thread
{
//模拟用户账户
private Account account;
//当前存款线程所需要存的钱数
private double depositAmount;
public DepositThread(String name,Account account,double depositAmount)
{
super(name);
this.account=account;
this.depositAmount=depositAmount;
}
//重复100次执行存款操作
public void run()
{
for(var i=0;i<100;i++)
{
account.deposit(depositAmount);
}
}
}
主程序可以启动任意多个存款和取款线程,可以看到所有的取款过程必须等到存款线程存钱后才能向下执行,而存款线程也必须等到取钱线程取钱后才可以向下执行。主程序代码:
package section6;
public class DrawTest
{
public static void main(String[] args)
{
//创建一个账户
var account=new Account("1234567",1);
new DrawThread("取钱者线程",account,800).start();
new DepositThread("存钱者甲",account,1000).start();
new DepositThread("存钱者乙",account,1000).start();
new DepositThread("存钱者丙",account,1000).start();
}
}
可以看到存款者线程和取款者线程交替执行,每当有存款者想账户中存入1000元,取钱者线程马上从账户中取出800。因为3个存款者线程尝试存款300次操作,但只有一个存款者线程执行取款100次,所以最后程序将堵塞。
二、使用Condition控制线程通信——Lock对象同步线程
如果程序不是使用synchronized关键字来保证同步,而是直接使用Lock对象来保证同步。则系统中不存在隐式的同步监视器,也就不能使用wait()、notify()、notifyAll()方法来进行线程通知了。
当使用Lock对象来保证同步时,Java提供了一个Condition类保持协调,使用Condition可以让那些得到已经得到Lock对象却无法继续执行的线程释放Lock对象,Condition对象也可以唤醒其他处于等待的线程。
Condition 将同步监视锁方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与Lock对象组合使用,为每个对象提供多个等待集(wait-set)。在这种情况下,Lock 替代了同步方法或同步代码块,Condition替代了同步监视锁的功能。
Condition实例实质上被绑定在一个Lock对象上。要获得特定Lock实例的Condition实例,调用Lock对象newCondition()方法即可。Condtion类提供了如下三个方法:
1、await():类似于隐式同步监视器上的wait()方法,导致当前线程等待,直到其他线程调用该Condtion的signal ()方法或signalAll ()方法来唤醒该线程。该await方法有更多变体:long awaitNanos(long nanosTimeout)、void awaitUninterruptibly()、awaitUntil(Date deadline)等,可以完成更丰富的等待操作。
2、signal ():唤醒在此Lock对象上等待的单个线程。如果所有线程都在该Lock对象上等待,则会选择唤醒其中一个线程。选择是任意性的。只有当前线程放弃对该Lock对象的锁定后(使用await()方法),才可以执行被唤醒的线程。
3、signalAll():唤醒在此Lock对象上等待的所有线程。只有当前线程放弃对该该Lock对象的锁定后,才可以执行被唤醒的线程。
下面程序Acount使用了Lock对象来控制同步,时使用Condition对象来控制线程的协作运行。
package section6.condition;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Account
{
//显示定义Lock对象
private final Lock lock=new ReentrantLock();
//获取指定Lock对象的Condition
private final Condition cond=lock.newCondition();
//标识账户中是否已有存款的旗帜
private boolean flag=false;
//封装账户编号、账户余额的两个成员变量
private String accountNo;
private double balance;
public Account(){}
public Account(String accountNo,double balance)
{
this.accountNo=accountNo;
this.balance=balance;
}
public String getAccountNo() {
return accountNo;
}
public void setAccountNo(String accountNo) {
this.accountNo = accountNo;
}
//因为账户余额不能随意修改,所以只为balance提供getter方法
public double getBalance() {
return balance;
}
public void draw(double drawAmount) {
lock.lock();
try {
//如果flag为假,表明账户中还没有人存钱进去
if (!flag)
{
cond.await();
}
else
{
//执行取钱操作
System.out.println(Thread.currentThread().getName()+"取钱:"+drawAmount);
this.balance=this.balance-drawAmount;
System.out.println("账户余额为:"+this.balance);
//将旗帜设为false
this.flag=false;
//唤醒其他线程
cond.signalAll();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
对比上一小节中的Account.java,不难发现者两个程序的逻辑基本相似,只是现在显式地调用Lock对象来充当同步监视器,则需要使用Condition对象来暂停、唤醒其他线程。
三、使用阻塞队列(BlockingQueue)控制线程通信
java 5提供了一个BlockingQueue接口(Queue的子接口),但它的主要作用不是作为容器,而是作为线程同步的工具。BlockingQueue有一个特征:当生产者试图向BlockingQueue中放入元素时,如果该队列已满,则该线程阻塞;当消费者线程试图从BlockingQueue中取出元素时,如果该队列已空,则该线程被阻塞。
3.1 BlockingQueue的方法介绍
程序中两个线程通过交替向BlockingQueue中放入元素、取出元素,既可以很好地控制线程通信。
BlockingQueue提供了如下两个支持阻塞的方法:
1、put(E e):尝试把E元素放入BlockingQueue中,如果该队列元素已满,则阻塞该线程。
2、take():尝试从BlockingQueue的头部元素取出元素,如果该队列已空,则阻塞该线程。
BlockingQueue继承Queue接口,当然也可以使用Queue接口中的方法,将这些方法归纳起来分为3组:
1、在队列尾部插入元素。包括add(E e)、offer(E e)和put(E e)方法,当队列已满时,这三个方法分别会抛出异常、返回false、阻塞队列。
2、在队列头部删除并返回删除的元素。包括remove()、poll()、take()方法。当队列已空时,这三个方法分别会抛出异常、返回false、阻塞队列。
3、在队列头部取出元素但不删除。包括element()和peek()方法,当队列已空时,这两个方法分别抛出异常、返回false。如下表所示:
BlockingQueue包含的方法之间的对应关系
抛出异常 | 不同返回值 | 阻塞线程 | 指定超时时长 | |
---|---|---|---|---|
队尾插入元素 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
在对头删除元素 | remove() | poll() | take() | poll(time,unit) |
获取但不删除对头元素 | element() | peek() | 无 | 无 |
3.2 BlockingQueue与其实现类之间的类图
从上图可以看出,BlockingQueue包含了如下5个实现类
1、ArrayBlockingQueue:基于数组实现的BlockingQueue队列。
2、LinkedBlockingQueue:基于链表实现的BlockingQueue队列。
3、PriorityBlockingQueue:它并不是标准的阻塞队列。与前面介绍的Priority类似,该队列调用remove()、poll()、take()等方法取出元素时,并不是取出队列中存在时间最长的元素,而是队列中最小的元素。PriorityBlockingQueue判断元素的大小即可根据元素(实现Comparable接口)的本身大小来自然排序,也可以使用Comparator进行定制排序。
4、SynchronousQueue:同步队列/对该队列的存、取操作必须交替进行。
5、DelayQueue:它是一个特殊的BlockingQueue,,底层基于PriorityBlockingQueue实现。不过,DelayQueue要求集合元素都实现Delay接口(该接口里只有一个long getDelay()方法),DelayDeque根据集合元素的getDelay()方法的返回值进行排序。
3.3 BlockingQueue应用举例
下面以ArrayBlockingQueue为例介绍阻塞队列的功能和用法。下面介绍一个最简单的程序来测试BlockingQueue的put()用法:
package section6;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockQueueTest
{
public static void main(String[] args) throws InterruptedException {
//定义一个长度为2的阻塞队列
BlockingQueue<String> bq=new ArrayBlockingQueue<>(2);
bq.put("java");//与bp.add("java")、bq.offer("java")相同
bq.put("java");
bq.put("java");//①阻塞线程
}
}
上面程序先定义了一个大小为2的BlockingQueue,程序先向该队列中放入两个元素,此时队列还没有满,两个元素都可以放入,因此使用bp.add("java")、bq.offer("java")相同。当程序使用put()尝试放入第三个元素时将会阻塞线程,如上面代码①处所示;如果使用add()将触发异常;使用offer()将返回false。
与此类似的是,BlockingQueue已空的情况下,程序使用take()方法尝试取出元素将会阻塞线程;使用remove()方法将会触发异常;使用poll()方法将会返回false,元素不会被删除。
掌握了BlockingQueue阻塞队列的特性后,下面程序就可以利用BlockingQueue实现线程通信了。
生成者类Producer:
package section6;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueTest1
{
public static void main(String[] args)
{
//创建一个容量为1的BlockingQueue
BlockingQueue<String> bq=new ArrayBlockingQueue<>(1);
//启动三个生产者线程
new Producer(bq).start();
new Producer(bq).start();
new Producer(bq).start();
//启动一个消费者线程
new Consumer(bq).start();
}
}![](https://www.icode9.com/i/l/?n=20&i=blog/1764014/202005/1764014-20200512214824736-92709312.png)
定义一个消费者线程类Comsumer
package section6;
import java.util.concurrent.BlockingQueue;
public class Consumer extends Thread
{
private BlockingQueue<String> bq;
public Consumer(BlockingQueue<String> bq)
{
this.bq=bq;
}
public void run()
{
while(true)
{
System.out.println(getName()+"消费者准备消费集合元素!");
try
{
Thread.sleep(2);
//尝试取出元素,如果队列已空,则线程被阻塞
bq.take();
}
catch (InterruptedException ex)
{
ex.printStackTrace();
}
System.out.println(getName()+"消费完成:"+bq);
}
}
}
主程序入口
package section6;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueTest1
{
public static void main(String[] args)
{
//创建一个容量为1的BlockingQueue
BlockingQueue<String> bq=new ArrayBlockingQueue<>(1);
//启动三个生产者线程
new Producer(bq).start();
new Producer(bq).start();
new Producer(bq).start();
//启动一个消费者线程
new Consumer(bq).start();
}
}
上面启动了3个生成者线程向BlockingQueue集合中放入元素,启动一个消费者线程从BlockingQueue中取出元素。本程序中BlockingQueue的集合容量为1,因此3个生产者线程无法连续放入元素,必须等待消费者从线程中取出一个元素后,3个生产者线程的其一才能放入一个元素,运行该程序可以看到如下输出结果:
标签:队列,Lock,通信,线程,13.6,方法,public,BlockingQueue 来源: https://www.cnblogs.com/weststar/p/12875280.html