并发编程-同步模式之保护性暂停
作者:互联网
定义
即 Guarded Suspension
,用在一个线程等待另一个线程的执行结果
要点
- 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
- 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
- JDK 中,join 的实现、Future 的实现,采用的就是此模式
- 因为要等待另一方的结果,因此归类到同步模式
实现V1版本
资源类
public class GuardedObject {
private Object response;
/**
* 获取结果
*/
public Object get(){
synchronized (this){
while (response == null) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
/**
* 产生结果
*/
public void complete(Object response){
synchronized (this) {
this.response = response;
this.notifyAll();
}
}
}
测试
public static void main(String[] args) {
GuardedObject obj = new GuardedObject();
new Thread(() -> {
log.debug("等待结果。。。");
List<String> download = (List<String>) obj.get();
log.debug("结果大小:" + download.size());
}, "t1").start();
new Thread(() -> {
log.debug("执行下载。。。");
try {
List<String> download = Downloader.download();
obj.complete(download);
} catch (IOException e) {
e.printStackTrace();
}
}, "t2").start();
}
优点:
- 使用wait/join必须等待线程结束 但是使用保护性暂停模式,生产资源的线程完成后就可以做其他事情
- join方式 变量必须设置为全局的,使用保护性暂停模式后资源变量都是局部的
实现V2版本
V1版本 如果生产资源的速度非常慢 获取资源时就必须一直等待。V2版本给get方法加上超时时间
public Object get(long timeout){
synchronized (this){
//开始时间
long start = System.currentTimeMillis();
//经历的时间
long passedTime = 0;
while (response == null) {
//经历的时间超过了最大时间 退出循环
if(passedTime >= timeout) {
log.debug("超时{}", timeout);
break;
}
try {
// 避免虚假唤醒导致 timeout重新等待
this.wait(timeout - passedTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
passedTime = System.currentTimeMillis() - start;
}
return response;
}
}
测试:
public static void main(String[] args) {
GuardedObjectV2 obj = new GuardedObjectV2();
new Thread(() -> {
log.debug("begin...");
Object response = obj.get(2000);
log.debug("结果是:" + response);
}, "t1").start();
new Thread(() -> {
try {
log.debug("begin...");
// TimeUnit.SECONDS.sleep(1); // 测试正常获取
// obj.complete(new Object());
// TimeUnit.SECONDS.sleep(3); // 测试超时
// obj.complete(new Object());
TimeUnit.SECONDS.sleep(1); // 测试虚假唤醒
obj.complete(null);
} catch (InterruptedException e) {
log.error("被打断");
}
}, "t2").start();
}
join原理
查看join源码 :
轮询检查线程 alive 状态,join 体现的正是【保护性暂停】
模式,
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
多任务版 GuardedObject
图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员
如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理
资源类加上唯一标识
//用来唯一标记 GuardedObject
private int id;
public int getId() {
return id;
}
public GuardedObjectV3(int id) {
this.id = id;
}
邮箱类
class MailBoxes{
private static Map<Integer, GuardedObjectV3> boxes = new Hashtable<>();
private static int id = 1;
//产生唯一id
private static synchronized int generateId(){
return id++;
}
public static GuardedObjectV3 createGuardedObjectV3(){
GuardedObjectV3 objectV3 = new GuardedObjectV3(generateId());
boxes.put(objectV3.getId(), objectV3);
return objectV3;
}
public static GuardedObjectV3 getGuardedObject(int id){
//需要清除不需要的信件 防止内存溢出
return boxes.remove(id);
}
//Hashtable是线程安全的 不需要自己加上锁
public static Set<Integer> getIds(){
return boxes.keySet();
}
}
邮件员
class Postman extends Thread{
private int id;
private String mail;
public Postman(int id, String mail){
this.id = id;
this.mail = mail;
}
@Override
public void run() {
GuardedObjectV3 obj = MailBoxes.getGuardedObject(id);
log.debug("开始送信:id = {} 内容 = {} ", obj.getId(), mail);
obj.complete(mail);
}
}
居民
class People extends Thread{
@Override
public void run() {
GuardedObjectV3 obj = MailBoxes.createGuardedObjectV3();
log.debug("开始收信:id = {}", obj.getId());
Object mail = obj.get(5000);
log.debug("收到信: id = {}, 内容:{}", obj.getId(), mail);
}
}
测试类
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new People().start();
}
TimeUnit.SECONDS.sleep(2);
//同步模式缺点: 必须一个生产者对应一个消费者
for (int id : MailBoxes.getIds()) {
new Postman(id, "内容 " + id).start();
}
}
标签:obj,log,编程,保护性,并发,线程,id,new,public 来源: https://www.cnblogs.com/mpyidudu/p/15733385.html