异步分组查询
作者:互联网
依赖
<!--集合工具类-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
<!--junit单元测试-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
代码
package com.example.demo;
import org.apache.commons.collections4.ListUtils;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class AsyncTest {
/**
* CPU核数
*/
// private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
private static final int AVAILABLE_PROCESSORS = 10;
/**
* 设置线程池
*/
private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
AVAILABLE_PROCESSORS, //核心线程数
3 * AVAILABLE_PROCESSORS, //最大线程数
3, TimeUnit.SECONDS, // keepAliveTime
new LinkedBlockingDeque<>(10000)); //阻塞队列
// 分批大小 可以通过配置处理
private static final Integer pageSize = 2;
// 休眠时间
private static final Long sleepMillis = 1000L;
// 模拟数据
private static final Map<Integer, Dog> map = new HashMap<>();
static {
map.put(1, new Dog(1, "xxx1"));
map.put(2, new Dog(2, "xxx2"));
map.put(3, new Dog(3, "xxx3"));
map.put(4, new Dog(4, "xxx4"));
map.put(5, new Dog(5, "xxx5"));
map.put(6, new Dog(6, "xxx6"));
map.put(7, new Dog(7, "xxx7"));
map.put(8, new Dog(8, "xxx8"));
map.put(9, new Dog(9, "xxx9"));
}
// 数据对象
static class Dog {
private Integer index;
private String name;
public Dog() {
}
public Dog(Integer index, String name) {
this.index = index;
this.name = name;
}
public Integer getIndex() {
return index;
}
public void setIndex(Integer index) {
this.index = index;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Dog{" + "index=" + index + ", name='" + name + '\'' + '}';
}
}
// 模拟查询数据库 每次查询休眠1s 为了更好对比时间差
private Dog selectDataBaseGetDog(Integer index) {
System.out.println("select DataBase index :{ " + index + " }");
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
e.printStackTrace();
}
return map.get(index);
}
@Test
public void testInitMonitor() {
// 请求的入参
List<Integer> list = Lists.newArrayList(1, 2, 3, 4, 5);
// 单线程查询
select(list);
System.out.println();
System.out.println();
System.out.println();
System.out.println();
System.out.println("分批大小 " + pageSize + " , 休眠时间 " + sleepMillis + " , CPU核数 " + AVAILABLE_PROCESSORS);
// 分组异步查询
asyncSelect(list);
}
// 普通查询
private void select(List<Integer> list) {
System.out.println("=====================================");
System.out.println("==============单线程查询===============");
System.out.println("查询数据,param:" + list);
System.out.println("=====================================");
final long a = System.currentTimeMillis();
System.out.println("方法开始:" + a);
List<Dog> dogList = new ArrayList<>();
list.forEach(index -> dogList.add(selectDataBaseGetDog(index)));
final long a1 = System.currentTimeMillis();
System.out.println("方法结束:" + a1);
System.out.println("时间差:" + (a1 - a));
System.out.println("=====================================");
System.out.println("查询结束,return:" + dogList);
System.out.println("==============单线程查询===============");
System.out.println("=====================================");
}
// 分组异步查询
private void asyncSelect(List<Integer> list) {
System.out.println("=====================================");
System.out.println("=============异步分组查询==============");
System.out.println("查询数据,param:" + list);
System.out.println("=====================================");
final long l = System.currentTimeMillis();
System.out.println("方法开始" + l);
List<List<Integer>> partition = ListUtils.partition(list, pageSize);
List<CompletableFuture<List<Dog>>> completableFutures = getCompletableFutures(partition);
final long l1 = System.currentTimeMillis();
System.out.println("方法结束" + l1);
System.out.println("时间差" + (l1 - l));
List<Dog> dogList = new ArrayList<>();
completableFutures.forEach(future -> {
try {
dogList.addAll(future.get());
} catch (Exception e) {
System.out.println("分页查询取消订单列表 -> 多线程处理取消列表异常");
}
});
System.out.println("=====================================");
System.out.println("查询结束,return:" + dogList);
System.out.println("==============异步分组查询===============");
System.out.println("=====================================");
}
// 异步查询
private List<CompletableFuture<List<Dog>>> getCompletableFutures(List<List<Integer>> partition) {
int partitionSize = partition.size();
List<CompletableFuture<List<Dog>>> completableFutureList = IntStream.range(0, partitionSize).mapToObj(
index -> CompletableFuture.supplyAsync(() ->
{
System.out.println("共 { " + partitionSize + " } 批,当前第 { " + (index + 1) + " } 批。分批查询数据 :{ " + partition.get(index) + " }");
return getSkuBasicInfoListResultTO(partition.get(index));
},
threadPoolExecutor)
).collect(Collectors.toList());
// 等待执行结束
CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[partitionSize])).join();
return completableFutureList;
}
// 分批查询数据
private List<Dog> getSkuBasicInfoListResultTO(List<Integer> request) {
System.out.println("查询数据param :{ " + request + " }");
List<Dog> dogList = new ArrayList<>();
request.forEach(i -> dogList.add(selectDataBaseGetDog(i)));
return dogList;
}
}
测试结果
/**
=====================================
==============单线程查询===============
查询数据,param:[1, 2, 3, 4, 5]
=====================================
方法开始:1662348134998
select DataBase index :{ 1 }
select DataBase index :{ 2 }
select DataBase index :{ 3 }
select DataBase index :{ 4 }
select DataBase index :{ 5 }
方法结束:1662348140040
时间差:5042
=====================================
查询结束,return:[Dog{index=1, name='xxx1'}, Dog{index=2, name='xxx2'}, Dog{index=3, name='xxx3'}, Dog{index=4, name='xxx4'}, Dog{index=5, name='xxx5'}]
==============单线程查询===============
=====================================
分批大小 2 , 休眠时间 1000 , CPU核数 10
=====================================
==============异步分组查询===============
查询数据,param:[1, 2, 3, 4, 5]
=====================================
方法开始1662348140040
共 { 3 } 批,当前第 { 1 } 批。分批查询数据 :{ [1, 2] }
查询数据param :{ [1, 2] }
共 { 3 } 批,当前第 { 2 } 批。分批查询数据 :{ [3, 4] }
查询数据param :{ [3, 4] }
共 { 3 } 批,当前第 { 3 } 批。分批查询数据 :{ [5] }
查询数据param :{ [5] }
select DataBase index :{ 1 }
select DataBase index :{ 3 }
select DataBase index :{ 5 }
select DataBase index :{ 2 }
select DataBase index :{ 4 }
方法结束1662348142062
时间差2022
=====================================
查询结束,return:[Dog{index=1, name='xxx1'}, Dog{index=2, name='xxx2'}, Dog{index=3, name='xxx3'}, Dog{index=4, name='xxx4'}, Dog{index=5, name='xxx5'}]
==============异步分组查询===============
=====================================
*/
标签:异步,index,System,查询,分组,println,Dog,out 来源: https://www.cnblogs.com/huiqing/p/16658025.html