异步排队导出csv文件
作者:互联网
一、简介。
多人同时导出时进行排队等候,可以自定义排队人数,在大数据情况下可以分批处理数据、导出为异步,导出过程中可获取导出进度,导出完成之后调用下载接口。
可以自定义导出的列。
二、代码实现
1、创建队列,队列是使用的java的Queue
首先先创建队列的类
@Slf4j
@Component
public class Queue {
/**
* 缓存导出的队列。数量就是你最大几个请求排队
*/
public static BlockingQueue<ColumnForm> queue = new ArrayBlockingQueue<ColumnForm>(5);
/**
* 导出的线程状态
*/
private static boolean isRunning = false;
/**
* 超时时间间隔
*/
public static long TIMEOUT_INTERVAL = 3 * 60 * 1000;
public static boolean export(CmsNewsColumnForm cmsNewsColumnForm) {
boolean rsp = false;
try {
//阻塞3秒
rsp = queue.offer(cmsNewsColumnForm, 3, TimeUnit.SECONDS);
if (!rsp) {
throw new Exception(异常信息);
}
if (!isRunning) {
startup(cmsNewsColumnForm);
isRunning = true;
}
} catch (Exception e) {
throw new Exception(异常信息);
}
return rsp;
}
public static void startup(CmsNewsColumnForm cmsNewsColumnForm) {
if (isRunning) {
return;
}
Thread th = new Thread(() -> {
long timestamp = System.currentTimeMillis();
//获取service
Service service = SpringContextUtils.getBean(Service.class);
while (isRunning) {
long timeout = timestamp + TIMEOUT_INTERVAL;
if (System.currentTimeMillis() > timeout) {
//空跑一段时间(3分钟)后线程退出
break;
}
try {
if (queue.size() == 0) {
Thread.sleep(1000);
continue;
}
//这里是你导出业务的方法
Service.export(columnForm);
queue.poll();
} catch (Exception e) {
log.error("导出出错", e);
}
}
isRunning = false;
log.debug("导出线程停止。");
});;
th.start();
}
}
2、编写实现类service。
@Autowired
private ExportThread exportThread;
public void export(ColumnForm ColumnForm) {
List<Future> futures = new ArrayList<>();
//定义文件存放的地址
File file = FileUtil.mkdir(System.getProperty("user.dir") + File.separator + "upload" + File.separator + ColumnForm.getUserId());
String filePath = file.getPath() + File.separator;
//这里是你预先设置可能会需要导出的列
LinkedHashMap<String, String> linkedHashMap = ExportDict.newsMap;
//把map变成线程安全
Map<String, String> newsMap = Collections.synchronizedMap(new LinkedHashMap<>());
//需要导出的列,前端传过来的列是驼峰的,这里我们做一个转换,把他转为下划线,方便数据库查询使用。
Map<String, String> map = new HashMap<>();
ColumnForm.getColumn().forEach(news -> {
//驼峰转下划线命名
String chang = StringUtils.toUnderScoreCase(news);
map.put(news, chang);
//构建excel表头
linkedHashMap.forEach((k, v) -> {
if (k.equals(news)) {
newsMap.put(k, v);
}
});
});
把转换好的列放进map里面
ColumnForm.setChang(map);
//下面是分批次导出
//查询总共需要导出的总数
int totalCount = this.count();
//单个excel最大行数
int pageSize = CommonConstants.POINTS_DATA_LIMIT;
//总页数
int pageCount = (totalCount + pageSize - 1) / pageSize;
int pageBegin = 0;
//开始和结束条数
int pageEnd = pageSize;
int percentage = pageCount;
//把开始的坐标放入redis,用于获取导出进度使用。这里我用userId作为每个人进度的唯一标识,避免进度串。
redisTemplate.opsForValue().set(redisKeyConfig.getKey() + CommonRedisKey.EXPORT_NEWS_COUNT + cmsNewsColumnForm.getUserId(), Double.valueOf(pageCount));
//这里把总页数放进redis,用于后面计算百分比时使用。
redisTemplate.opsForValue().set(redisKeyConfig.getKey() + CommonRedisKey.PERCENTAGE + cmsNewsColumnForm.getUserId(), percentage);
//线程池
ThreadPoolTaskExecutor threadPoolTaskExecutor = threadPoolConfig.threadPoolTaskExecutor();
//分批多线程查询数据库数据并且写入到本地路径
for (int i = 0; i < pageCount; i++) {
final int begin = pageBegin;
//设置文件名称(name+i下标防止文件名重复)
final String fileUrl = filePath + "导出信息" + (i + 1) + ".csv";
//开启线程
Future<?> submit = threadPoolTaskExecutor.submit(new Runnable() {
@Override
public void run() {
List<exportVO> exportVOS = Collections.synchronizedList(new ArrayList<>());
exportVOS =Mapper.export(cmsNewsColumnForm, begin, pageEnd);
exportThread.export(exportVOS, newsMap, fileUrl);
//每次完成之后递减次数,用于计算进度。
redisTemplate.getConnectionFactory().getConnection().decr( redisTemplate.getKeySerializer().serialize(redisKeyConfig.getKey() + "percentage" + cmsNewsColumnForm.getUserId())
);
}
});
//返回的结果集存储
futures.add(submit);
//分批的页数
pageBegin = pageEnd + pageBegin;
}
//这里必须要阻塞,否则的话当前主线程不会等待所有子线程执行完后在执行,会直接完成,导致获取不到进度。
futures.forEach(f -> {
try {
f.get();
} catch (InterruptedException e) {
e.printStackTrace();
throw new ServiceException(ResponseCode.THREAD_INTERRUPTION_ERROR.msg(), ResponseCode.THREAD_INTERRUPTION_ERROR.code());
} catch (ExecutionException e) {
e.printStackTrace();
throw new ServiceException();
}
});
}
这是sql、这里使用的是mybatis-plus
<select id="export" resultType="com.puyiyun.cms.entity.vo.ExportVO">
SELECT
<foreach collection="ns.chang.entrySet()" index="key" item="value" open="" separator="," close="">
${value} as ${key}
</foreach>
FROM users
LIMIT ${pageBegin},${pageEnd}
</select>
3、写入的工具类
@Component
public class ExportThread {
/**
* 新闻导出异步线程
*
* @param newsMap (需要导出的列)
* @param fileUrl
*/
public <T> String exportNews(List<T> data, Map<String, String> newsMap, String fileUrl) {
try {
ExcelWriter writer = new ExcelWriter();
//创建一个不重名文件
writer = ExcelUtil.getWriter(fileUrl);
//设置列名
for (Map.Entry<String, String> entry : newsMap.entrySet()) {
writer.addHeaderAlias(entry.getKey(), entry.getValue());
}
//设置是否只显示设置了别名的字段
writer.setOnlyAlias(true);
writer.write(data, true);
writer.close();
return fileUrl;
} catch (Exception e) {
e.printStackTrace();
throw new ServiceException("导出异常");
}
}
}
4、预设置的列。
/**
* 导出对应字典类
*/
public class ExportDict {
public static final LinkedHashMap<String, String> newsMap = new LinkedHashMap<>();
static{
newsMap.put("userId","用户id");
newsMap.put("name","用户名称");
newsMap.put("address","地址");
}
}
5、获取导出进度的方法(这里是每请求一次获取一次最新进度,需要轮询调用,这里不友好,建议改成用socket推送的方式推送给前端)
public String exportSchedule() {
//上下文获取userId
Long userId = SecurityUtils.getUserId();
//返回保留两位小数
NumberFormat nf = new DecimalFormat("0.00 ");
String outcome = "";
Double onePercentLimit = 0.0;
//总次数
Object exportNewsCount = redisTemplate.opsForValue().get(redisKeyConfig.getKey() + CommonRedisKey.EXPORT_NEWS_COUNT + userId);
//当前第几次
Object percentage = redisTemplate.opsForValue().get(redisKeyConfig.getKey() + CommonRedisKey.PERCENTAGE + userId);
if (ObjectUtil.isNotNull(exportCount) && ObjectUtil.isNotNull(percentage)) {
int perChang = (Integer) percentage;
double arraySize = (Double) exportNewsCount;
double Count = Double.valueOf(perChang);
//计算进度百分比
onePercentLimit = (1.00 - Count / arraySize) * 100.00;
//保留两位小数输出
outcome = nf.format(onePercentLimit);
}
return outcome;
}
6、导出完成后点击下载
public void downloadNews(HttpServletResponse response) {
try {
Long userId = SecurityUtils.getUserId();
File file = FileUtil.mkdir(System.getProperty("user.dir") + File.separator + "upload" + File.separator + userId);
String filePath = file.getPath() + File.separator;
File[] fileUrl = FileUtil.ls(filePath);
response.setContentType("application/zip");
response.reset();
response.setCharacterEncoding("utf-8");
String fileNameCode = URLEncoder.encode("名称", "UTF-8");
OutputStream outputStream = response.getOutputStream();
//文件路径集合
List<File> fileList = new ArrayList<>();
//多个文件打成压缩包
if (!ArrayUtil.isEmpty(fileUrl)) {
if (fileUrl.length > 1) {
response.setHeader("Content-disposition", "attachment;filename=" + fileNameCode + ".zip");
for (int i = 0; i < fileUrl.length; i++) {
String path = fileUrl[i].getPath();
fileList.add(new File(path));
}
ZipUtils.toZip(fileList, outputStream);
} else {
response.setHeader("Content-disposition", "attachment;filename=" + fileNameCode + ".csv");
//单个文件不打包
FileInputStream fileInputStream = new FileInputStream(fileUrl[0].getPath());
byte[] buff = new byte[1024];
int i;
while ((i = fileInputStream.read(buff)) != -1) {
outputStream.write(buff, 0, i);
outputStream.flush();
}
fileInputStream.close();
}
//完成后删除进度
redisTemplate.delete(redisKeyConfig.getKey() + CommonRedisKey.EXPORT_NEWS_COUNT + userId);
redisTemplate.delete(redisKeyConfig.getKey() + CommonRedisKey.PERCENTAGE + userId);
//完成后删除文件
String parent = fileUrl[0].getParent();
FileUtil.del(parent);
}
} catch (IOException e) {
log.error(e.getMessage());
throw new ServiceException("导出失败,请联系管理员", ResponseCode.UNKNOWN.code());
}
}
压缩zip的工具类
public class ZipUtils {
public static void compressionFile(String path, String zipFilePath) {
File file = new File(path);
if (file == null || !file.exists() || !file.isDirectory()) {
return;
}
File zipFile = new File(zipFilePath);
File[] srcFile = file.listFiles();
byte[] buffer = new byte[1024];
try {
ZipOutputStream out = new ZipOutputStream(new FileOutputStream(zipFile));
for (int i = 0; i < srcFile.length; i++) {
FileInputStream fileInputStream = new FileInputStream(srcFile[i]);
out.putNextEntry(new ZipEntry(srcFile[i].getName()));
int length;
while ((length = fileInputStream.read(buffer)) > 0) {
out.write(buffer, 0, length);
}
out.closeEntry();
fileInputStream.close();
}
out.close();
} catch (Exception e) {
e.printStackTrace();
}
}
private static final int BUFFER_SIZE = 2 * 1024;
/**
* 压缩成ZIP 方法1
*
* @param srcDir 压缩文件夹路径
* @param out 压缩文件输出流
* @param KeepDirStructure 是否保留原来的目录结构,true:保留目录结构;
* false:所有文件跑到压缩包根目录下(注意:不保留目录结构可能会出现同名文件,会压缩失败)
* @throws RuntimeException 压缩失败会抛出运行时异常
*/
public static void toZip(String srcDir, OutputStream out, boolean KeepDirStructure)
throws RuntimeException {
long start = System.currentTimeMillis();
ZipOutputStream zos = null;
try {
zos = new ZipOutputStream(out);
File sourceFile = new File(srcDir);
compress(sourceFile, zos, sourceFile.getName(), KeepDirStructure);
long end = System.currentTimeMillis();
System.out.println("压缩完成,耗时:" + (end - start) + " ms");
} catch (Exception e) {
throw new RuntimeException("zip error from ZipUtils", e);
} finally {
if (zos != null) {
try {
zos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 压缩成ZIP 方法2
*
* @param srcFiles 需要压缩的文件列表
* @param out 压缩文件输出流
* @throws RuntimeException 压缩失败会抛出运行时异常
*/
public static void toZip(List<File> srcFiles, OutputStream out) throws RuntimeException {
long start = System.currentTimeMillis();
ZipOutputStream zos = null;
try {
zos = new ZipOutputStream(out);
for (File srcFile : srcFiles) {
byte[] buf = new byte[BUFFER_SIZE];
zos.putNextEntry(new ZipEntry(srcFile.getName()));
int len;
FileInputStream in = new FileInputStream(srcFile);
while ((len = in.read(buf)) != -1) {
zos.write(buf, 0, len);
}
zos.closeEntry();
in.close();
}
long end = System.currentTimeMillis();
System.out.println("压缩完成,耗时:" + (end - start) + " ms");
} catch (Exception e) {
throw new RuntimeException("zip error from ZipUtils", e);
} finally {
if (zos != null) {
try {
zos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 递归压缩方法
*
* @param sourceFile 源文件
* @param zos zip输出流
* @param name 压缩后的名称
* @param KeepDirStructure 是否保留原来的目录结构,true:保留目录结构;
* false:所有文件跑到压缩包根目录下(注意:不保留目录结构可能会出现同名文件,会压缩失败)
* @throws Exception
*/
private static void compress(File sourceFile, ZipOutputStream zos, String name,
boolean KeepDirStructure) throws Exception {
byte[] buf = new byte[BUFFER_SIZE];
if (sourceFile.isFile()) {
// 向zip输出流中添加一个zip实体,构造器中name为zip实体的文件的名字
zos.putNextEntry(new ZipEntry(name));
// copy文件到zip输出流中
int len;
FileInputStream in = new FileInputStream(sourceFile);
while ((len = in.read(buf)) != -1) {
zos.write(buf, 0, len);
}
// Complete the entry
zos.closeEntry();
in.close();
} else {
File[] listFiles = sourceFile.listFiles();
if (listFiles == null || listFiles.length == 0) {
// 需要保留原来的文件结构时,需要对空文件夹进行处理
if (KeepDirStructure) {
// 空文件夹的处理
zos.putNextEntry(new ZipEntry(name + "/"));
// 没有文件,不需要文件的copy
zos.closeEntry();
}
} else {
for (File file : listFiles) {
// 判断是否需要保留原来的文件结构
if (KeepDirStructure) {
// 注意:file.getName()前面需要带上父文件夹的名字加一斜杠,
// 不然最后压缩包中就不能保留原来的文件结构,即:所有文件都跑到压缩包根目录下了
compress(file, zos, name + "/" + file.getName(), KeepDirStructure);
} else {
compress(file, zos, file.getName(), KeepDirStructure);
}
}
}
}
}
三、总结
总的步骤来说就是
1、创建队列
2、导出(导出是先导出到本地磁盘)
3、获取导出进度
4、导出完成后进行下载,下载完成后把进度和文件全部删除
这是第一次写博客,写的不好的地方大家见谅。
标签:异步,int,导出,zos,File,new,csv,public 来源: https://blog.csdn.net/Norman1250/article/details/121266317