CompletableFutures多线程阻塞获取结果
作者:互联网
voidCompletableFuture.get();
voidCompletableFuture.join();
一样会阻塞当前线程,直到所有子任务都完成一起打印结果
package com.async;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
/**
* @author Allen
* @version 1.0
* @date 2021/8/12 20:23
* @功能说明
*/
public class AsyncTest {
public static void main(String[] args) {
System.out.println();
long l = System.currentTimeMillis();
// 任务列表
List<Integer> jobList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14);
int nThreads = 3;
// 工作线程池
ExecutorService executorService = new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
// 接收每个任务的执行结果
List<CompletableFuture<String>> completableFutures = new ArrayList<>(jobList.size());
// 任务分派
for (int i = 0; i < jobList.size(); i++) {
try {
Integer jobId = jobList.get(i);
CompletableFuture<String> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 睡几秒
return jobId + "任务执行结果";
}, executorService);
// 保存本次任务执行结果
completableFutures.add(integerCompletableFuture);
} catch (Exception e) {
}
}
// 等待全部任务执行完毕
CompletableFuture[] completableFutures1 = new CompletableFuture[completableFutures.size()];
CompletableFuture<Void> voidCompletableFuture =
CompletableFuture.allOf(completableFutures.toArray(completableFutures1));
try {
voidCompletableFuture.get();
//voidCompletableFuture.join();
completableFutures.stream()
.forEach(tmp -> {
try {
System.out.println(tmp.get());
} catch (Exception e) {
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
executorService.shutdown();
System.out.println(System.currentTimeMillis()-l);
}
}
异步多线程配置
这里面可以添加@Bean注解返回的多线程 默认是使用getAsyncExecutor方法内的线程池
@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("Anno-Executor"); //指定用于新创建的线程名称的前缀。
executor.setCorePoolSize(10); //核心线程数
executor.setMaxPoolSize(20); //最大线程数
executor.setQueueCapacity(1000); //队列大小
executor.setKeepAliveSeconds(300); //线程最大空闲时间
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略(一共四种,此处省略)
executor.initialize();
return executor;
}
// 异常处理器:当然你也可以自定义的,这里我就这么简单写了~~~
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler();
}
}
比如如下 如果需要指定对应其他的线程池@Async("taskExecutor") +异常处理方法
@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer {
private final Logger log = LoggerFactory.getLogger(AsyncConfiguration.class);
@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
log.debug("Creating Async Task Executor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); //核心线程数
executor.setMaxPoolSize(20); //最大线程数
executor.setQueueCapacity(1000); //队列大小
executor.setKeepAliveSeconds(300); //线程最大空闲时间
executor.setThreadNamePrefix("ics-Executor-"); 指定用于新创建的线程名称的前缀。
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略
return new ExceptionHandlingAsyncTaskExecutor(executor);
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new MyAsyncExceptionHandler();
}
class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
log.info("Exception message - " + throwable.getMessage());
log.info("Method name - " + method.getName());
for (Object param : objects) {
log.info("Parameter value - " + param);
}
}
}
}
开发代码例子
/**
* 每天晚上10点持行一次
*/
@RequestMapping("/getUrlsByDate")
public void getUrlsByDate(String date) {
log.info("指定日期获取文件");
String toDay = date;
File file = new File(fileAddressPrefix + toDay);
log.info("文件转换:" + (fileAddressPrefix + toDay));
File[] tempList = file.listFiles();
if (tempList != null) {
log.info("文件转换文件个数:" + (tempList.length));
int onece = 4;
int time = tempList.length % onece == 0 ? tempList.length / onece : tempList.length / onece + 1;
final CountDownLatch countDownLatch = new CountDownLatch(time);
ArrayList<Future> futures = new ArrayList<>();
//Semaphore semaphore = new Semaphore(5);
for (int taskTime = 0; taskTime < time; taskTime++) {
int start = taskTime * onece;
int end = start + onece<tempList.length?start + onece:tempList.length;
threadPoolTaskExecutor.submit(() -> {
try {
Future<String> stringFuture = addFileBatchService.insertBatch(tempList, start,end, toDay);
futures.add(stringFuture);
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
});
}
String s="";
try {
countDownLatch.await();
System.out.println("----------------全部完成");
for (Future future : futures) {
//log.info(toDay + "执行结束"+future.get());
s+=future.get();
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(s);
}
}
countDownLatch没有用 调用异步方法直接走到finally
可以改用这样写法 指定对应的线程池 再加阻塞获取全部结果的
voidCompletableFuture.get();/ voidCompletableFuture.join();
CompletableFuture<String> integerCompletableFuture = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 睡几秒
return jobId + "任务执行结果";
}, executorService);
@Override
@Async
@Transactional(rollbackFor = Exception.class)//事务
public CompletableFuture<String> insertBatch(File[] tempList, int start, int end, String toDay) throws Exception {
Date now = new Date();
long startTime = System.currentTimeMillis();
ArrayList<TRuVoiceFile> tRuVoiceFiles = new ArrayList<>();
//文件拷贝
for (int i = start; i < end; i++) {
if (tempList[i].isFile()) {
//把当前文件转化成8bit备份到新文件夹
try{
transFile(tempList[i], fileAddressPrefix + "trans" + File.separator + toDay + File.separator + tempList[i].getName().replace("wav", "mp3"));
TRuVoiceFile entity = new TRuVoiceFile();
//sid,file_name,url,create_time,update_time,version,status
entity.setSid(SnowFlakeUtil.getId());
//通过fileName访问的是转换后文件地址目录 /opt/ucc/data/imageserver/voiceFile/ trans/ yyyy-MM-dd / fileName.mp3
entity.setFileName(tempList[i].getName().replace("wav", "mp3"));
entity.setUrl(httpFileAddress + toDay + File.separator + tempList[i].getName());
entity.setCreateTime(now);
entity.setUpdateTime(now);
entity.setVersion(1);
entity.setStatus("initial");
tRuVoiceFiles.add(entity);
}catch (FileAlreadyExistsException e){
continue;
}
}
}
//批量插入
tRuVoiceFileService.addUpateBatch(tRuVoiceFiles, now);
long endTime = System.currentTimeMillis();
return CompletableFuture.completedFuture( "完成任务"+Thread.currentThread()+"用时:"+String.valueOf(endTime-startTime)+"ms");
}
on duplicate key update mybatis写法设置的file_name为唯一索引,如果file_name有冲突则更新
version = version+1,指定是让数据库的值+1 如果写成 version = value(version)+1,则是insert中的值+1
<insert id="insertBatch" parameterType="java.util.List">
insert into t_ru_voice_file
(sid,file_name,url,create_time,version,status)
values
<foreach collection="list" item="item" separator=",">
(#{item.sid},#{item.fileName},#{item.url},#{item.createTime},#{item.version},#{item.status})
</foreach>
on duplicate key update
version = version+1,
update_time = now()
</insert>
标签:CompletableFuture,阻塞,CompletableFutures,线程,executor,new,多线程,public,tempList 来源: https://www.cnblogs.com/wsyphaha/p/15134839.html