Java多线程之Exchanger
作者:互联网
一. 概念
Exchanger能在两个线程驱动的任务之间交换对象。交换之前,A任务持有D1对象,B任务持有D2对象。交换之后,A任务持有D2对象,B任务持有D1对象。
Exchanger可以使用的场景:
1. 大数据量、多步骤执行的任务
比如把整个任务分割成了n段。第一段子任务在执行到某种程度后,执行Exchanger exchange( )操作,把已经加工好的数据传递给第二段子任务,每段任务由不同的线程来驱动,整体上加快了任务的处理速度。
2. 基因算法
二. 使用方式
1. Exchanger( )
构造函数,创建一个新的Exchanger对象
/**
* Creates a new Exchanger.
*/
public Exchanger() {
participant = new Participant();
}
2. exchange(V x)
互换对象。两个任务必须同时调用exchange( )互换才会成功。当一个任务调用了Exchanger对象的exchange( )后,它将被阻塞住,直到另一个任务也调用了同一个Exchanger对象的exchange( )方法,此时x对象的互换操作才会成功。
三. 案例
本例中创建了一个生产者和消费者,生产者生产Fat对象,消费者消费Fat对象,对象之间借助Exchanger,互换List<Fat>集合。
1. 生产者
private Generator<T> generator;
private Exchanger<List<T>> exchanger;
private List<T> holder;
public ExchangerProducer(Exchanger<List<T>> exchanger, Generator<T> gen, List<T> holder) {
this.exchanger = exchanger;
this.generator = gen;
this.holder = holder;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
for(int i = 0; i < ExchangerDemo.size; i++) {
holder.add(generator.next());
}
holder = exchanger.exchange(holder);
}
} catch (InterruptedException e) {
System.out.println("OK to terminate this way");
}
}
}
2. 消费者
class ExchangerConsumer<T> implements Runnable {
private Exchanger<List<T>> exchanger;
private List<T> holder;
private volatile T value;
public ExchangerConsumer(Exchanger<List<T>> exchanger, List<T> holder) {
this.exchanger = exchanger;
this.holder = holder;
}
@Override
public void run() {
try {
while(!Thread.interrupted()) {
holder = exchanger.exchange(holder);
for (T x : holder) {
// 此处在遍历列表时,移除了元素。为了不报错ConcurrentModificationException,
// 所以使用CopyOnWriteArrayList
value = x;
holder.remove(x);
}
}
}catch (InterruptedException e) {
System.out.println("OK to terminate this way");
}
System.out.println("Final value: " + value);
}
}
3. main方法
public class ExchangerDemo {
static int size = 10;
static int delay = 5;
public static void main(String[] args) throws Exception{
if(args.length > 0) {
size = Integer.parseInt(args[0]);
}
if(args.length > 1) {
delay = Integer.parseInt(args[1]);
}
ExecutorService exec = Executors.newCachedThreadPool();
Exchanger<List<Fat>> xc = new Exchanger<>();
List<Fat> producerList = new CopyOnWriteArrayList<>();
List<Fat> consumerList = new CopyOnWriteArrayList<>();
exec.execute(new ExchangerProducer<>(xc, BasicGenerator.create(Fat.class), producerList));
exec.execute(new ExchangerConsumer<>(xc, consumerList));
TimeUnit.SECONDS.sleep(delay);
exec.shutdownNow();
}
}
4. 执行结果
OK to terminate this way
Final value: Fat id:90189
OK to terminate this way
qq872068101 发布了45 篇原创文章 · 获赞 13 · 访问量 1万+ 私信 关注
标签:Java,Exchanger,exchange,exchanger,new,多线程,holder,public 来源: https://blog.csdn.net/miaomiao19971215/article/details/104117795