看完,你也能用多线程让接口提升5倍速!
作者:互联网
一、应用场景
酒店提供给各个渠道商房间价格是不一样的,我们需要轮询所有的渠道商接口,给用户返回一个最低的价格,前端会将这个价格显示给用户。
二、接口要求
实时查询去哪儿、携程、飞猪、艺龙、同程等渠道的今日房价,计算并返回当日最低价。
三、接口难点
去哪儿、携程、飞猪、艺龙、同程因为是走外网,网络会有一定的延迟。如果同步一个个获取价格数据,接口会很慢。
四、实现
4.1 简单实现
循环调用各个渠道今日房价,对比每个渠道价格,计算并返回最低价格。
package com.fourkmiles.common.thread; import java.util.ArrayList; import java.util.List; /** * @author 林志强 * @date 2021/3/31 */ public class MainDemo { public static void main(String[] args) throws Exception { long startTime = System.currentTimeMillis(); List<String> channelList = new ArrayList<>(); channelList.add("去哪儿"); channelList.add("携程"); channelList.add("飞猪"); channelList.add("艺龙"); channelList.add("同程"); // 初始值 int minPrice = -1; String minPriceChannel = ""; for (String channel : channelList) { CallableChannelPrice callableChannelPrice = new CallableChannelPrice(channel); int channelPrice = callableChannelPrice.getPrice(); // 将第一个值赋值给minPrice或者后者价格小于当前价格 if (minPrice == -1 || (minPrice != -1 && minPrice > channelPrice)) { minPrice = channelPrice; minPriceChannel = channel; } } long endTime = System.currentTimeMillis(); System.out.println("最低的渠道为:" + minPriceChannel + ",价格为:" + minPrice + ",执行时间为:" + (endTime - startTime)); } static class CallableChannelPrice { String channel; public CallableChannelPrice(String channel) { this.channel = channel; } public int getPrice() throws Exception { int price = 0; // 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程 if ("去哪儿".equals(channel)) { // 模拟请求去哪儿接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 150; } else if ("携程".equals(channel)) { // 模拟请求携程接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 141; } else if ("飞猪".equals(channel)) { // 模拟请求飞猪接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 122; } else if ("艺龙".equals(channel)) { // 模拟请求艺龙接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 138; } else if ("同程".equals(channel)) { // 模拟请求同程接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 195; } System.out.println("获取:" + channel + "渠道的价格,价格为:" + price); return price; } } }
获取:去哪儿渠道的价格,价格为:150 获取:携程渠道的价格,价格为:141 获取:飞猪渠道的价格,价格为:122 获取:艺龙渠道的价格,价格为:138 获取:同程渠道的价格,价格为:195 最低的渠道为:飞猪,价格为:122,执行时间为:25003
循环调用有一个很明显的缺点,就是速度非常慢,接口响应的速度等于各个渠道接口的总和,这明显不符合我们的要求。
4.2 多线程Future/FutureTask实现
解决这类问题最有效的办法就是采用多线程并发执行,然后获取各自结果集来计算最终的价格。我们接下来看看Future和FutureTask的实现案例,以及它到底可以节省多少时间呢。
package com.fourkmiles.common.thread; import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * @author 林志强 * @date 2021/3/31 */ public class FutureDemo { public static void main(String[] args) { //开启多线程 ExecutorService exs = Executors.newFixedThreadPool(5); long startTime = System.currentTimeMillis(); List<String> channelList = new ArrayList<>(); channelList.add("去哪儿"); channelList.add("携程"); channelList.add("飞猪"); channelList.add("艺龙"); channelList.add("同程"); // 初始值 int minPrice = -1; try { //结果集 List<Integer> list = new ArrayList<>(); List<Future<Integer>> futureList = new ArrayList<>(); //1.高速提交5个任务,每个任务返回一个Future入list for (String channel : channelList) { futureList.add(exs.submit(new CallableChannelPrice(channel))); } //2.结果归集,用迭代器遍历futureList,高速轮询(模拟实现了并发),任务完成就移除 while (futureList.size() > 0) { Iterator<Future<Integer>> iterable = futureList.iterator(); //遍历一遍 while (iterable.hasNext()) { Future<Integer> future = iterable.next(); //如果任务完成取结果,否则判断下一个任务是否完成 if (future.isDone() && !future.isCancelled()) { //获取结果 Integer i = future.get(); list.add(i); //任务完成移除任务 iterable.remove(); } } } for (Integer price : list) { // 将第一个值赋值给minPrice或者后者价格小于当前价格 if (minPrice == -1 || (minPrice != -1 && minPrice > price)) { minPrice = price; } } long endTime = System.currentTimeMillis(); System.out.println("最低渠道的价格为:" + minPrice + ",执行时间为:" + (endTime - startTime)); } catch (Exception e) { e.printStackTrace(); } finally { exs.shutdown(); } } static class CallableChannelPrice implements Callable<Integer> { String channel; public CallableChannelPrice(String channel) { this.channel = channel; } public int getPrice() throws Exception { int price = 0; // 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程 if ("去哪儿".equals(channel)) { // 模拟请求去哪儿接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 150; } else if ("携程".equals(channel)) { // 模拟请求携程接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 141; } else if ("飞猪".equals(channel)) { // 模拟请求飞猪接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 122; } else if ("艺龙".equals(channel)) { // 模拟请求艺龙接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 138; } else if ("同程".equals(channel)) { // 模拟请求同程接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 195; } System.out.println("获取:" + channel + "渠道的价格,价格为:" + price); return price; } @Override public Integer call() throws Exception { return getPrice(); } } }
获取:同程渠道的价格,价格为:195 获取:飞猪渠道的价格,价格为:122 获取:去哪儿渠道的价格,价格为:150 获取:携程渠道的价格,价格为:141 获取:艺龙渠道的价格,价格为:138 最低渠道的价格为:122,执行时间为:5004
FutureTask实现的版本如下所示:
package com.fourkmiles.common.thread; import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.concurrent.*; /** * @author 林志强 * @date 2021/3/31 */ public class FutureTaskDemo { public static void main(String[] args) { //开启多线程 ExecutorService exs = Executors.newFixedThreadPool(5); long startTime = System.currentTimeMillis(); List<String> channelList = new ArrayList<>(); channelList.add("去哪儿"); channelList.add("携程"); channelList.add("飞猪"); channelList.add("艺龙"); channelList.add("同程"); // 初始值 int minPrice = -1; try { //结果集 List<Integer> list = new ArrayList<>(); List<FutureTask<Integer>> futureList = new ArrayList<>(); //1.高速提交5个任务,每个任务返回一个Future入list for (String channel : channelList) { FutureTask<Integer> futureTask = new FutureTask<>((new CallableChannelPrice(channel))); //提交任务,添加返回,Runnable特性 exs.submit(futureTask); //Future特性 futureList.add(futureTask); } //2.结果归集,用迭代器遍历futureList,高速轮询(模拟实现了并发),任务完成就移除 while (futureList.size() > 0) { Iterator<FutureTask<Integer>> iterable = futureList.iterator(); //遍历一遍 while (iterable.hasNext()) { Future<Integer> future = iterable.next(); //如果任务完成取结果,否则判断下一个任务是否完成 if (future.isDone() && !future.isCancelled()) { //获取结果 Integer i = future.get(); list.add(i); //任务完成移除任务 iterable.remove(); } } } for (Integer price : list) { // 将第一个值赋值给minPrice或者后者价格小于当前价格 if (minPrice == -1 || (minPrice != -1 && minPrice > price)) { minPrice = price; } } long endTime = System.currentTimeMillis(); System.out.println("最低渠道的价格为:" + minPrice + ",执行时间为:" + (endTime - startTime)); } catch (Exception e) { e.printStackTrace(); } finally { exs.shutdown(); } } static class CallableChannelPrice implements Callable<Integer> { String channel; public CallableChannelPrice(String channel) { this.channel = channel; } public int getPrice() throws Exception { int price = 0; // 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程 if ("去哪儿".equals(channel)) { // 模拟请求去哪儿接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 150; } else if ("携程".equals(channel)) { // 模拟请求携程接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 141; } else if ("飞猪".equals(channel)) { // 模拟请求飞猪接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 122; } else if ("艺龙".equals(channel)) { // 模拟请求艺龙接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 138; } else if ("同程".equals(channel)) { // 模拟请求同程接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 195; } System.out.println("获取:" + channel + "渠道的价格,价格为:" + price); return price; } @Override public Integer call() throws Exception { return getPrice(); } } }
从执行结果上面我们可以看到,采用多线程并发执行方式,执行时间取决于最长的渠道接口。从接口响应的时间来看,效果还是非常显著的,直接从25s降低为5s。
多线程虽然解决网络问题,速度大幅度提升,但是CPU高速轮询,耗资源,代码也不够简洁。我们接着来看看CompletionService的实现。
4.3 多线程CompletionService实现
package com.fourkmiles.common.thread; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.*; /** * @author 林志强 * @date 2021/3/31 */ public class CompletionServiceDemo { public static void main(String[] args) { //开启3个线程 ExecutorService exs = Executors.newFixedThreadPool(5); try { long startTime = System.currentTimeMillis(); List<String> channelList = new ArrayList<>(); channelList.add("去哪儿"); channelList.add("携程"); channelList.add("飞猪"); channelList.add("艺龙"); channelList.add("同程"); // 初始值 int minPrice = -1; //结果集 List<Integer> list = new ArrayList<>(); //1.定义CompletionService CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs); List<Future<Integer>> futureList = new ArrayList<>(); //2.添加任务 for (String channel : channelList) { futureList.add(completionService.submit(new CallableChannelPrice(channel))); } //==================结果归集=================== //方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果 for (Future<Integer> future : futureList) { //线程在这里阻塞等待该任务执行完毕,按照 Integer result = future.get(); list.add(result); } //方法2.使用内部阻塞队列的take() // for (int i = 0; i < channelList.size(); i++) { // //采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到 // Integer result = completionService.take().get(); // list.add(result); // } for (Integer price : list) { // 将第一个值赋值给minPrice或者后者价格大于当前价格 if (minPrice == -1 || (minPrice != -1 && minPrice > price)) { minPrice = price; } } long endTime = System.currentTimeMillis(); System.out.println("最低渠道的价格为:" + minPrice + ",执行时间为:" + (endTime - startTime)); } catch (Exception e) { e.printStackTrace(); } finally { //关闭线程池 exs.shutdown(); } } static class CallableChannelPrice implements Callable<Integer> { String channel; public CallableChannelPrice(String channel) { this.channel = channel; } public int getPrice() throws Exception { int price = 0; // 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程 if ("去哪儿".equals(channel)) { // 模拟请求去哪儿接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 150; } else if ("携程".equals(channel)) { // 模拟请求携程接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 141; } else if ("飞猪".equals(channel)) { // 模拟请求飞猪接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 122; } else if ("艺龙".equals(channel)) { // 模拟请求艺龙接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 138; } else if ("同程".equals(channel)) { // 模拟请求同程接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 195; } System.out.println("获取:" + channel + "渠道的价格,价格为:" + price); return price; } @Override public Integer call() throws Exception { return getPrice(); } } }
获取:飞猪渠道的价格,价格为:122 获取:去哪儿渠道的价格,价格为:150 获取:携程渠道的价格,价格为:141 获取:艺龙渠道的价格,价格为:138 获取:同程渠道的价格,价格为:195 最低渠道的价格为:122,执行时间为:5003
CompletableFuture解决了Future/FutureTask版本中CPU高速轮询,耗资源问题,在JDK1.8之前推荐使用,但是提供API不够丰富,没办法框架自己捕捉异常。
4.4 多线程CompletableFuture实现
package com.fourkmiles.common.thread; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.stream.Stream; /** * @author 林志强 * @date 2021/3/31 */ public class CompletableFutureDemo { public static void main(String[] args) { // 添加顺序结果集 List<Integer> addSequenceList; // 结果顺序结果集 List<Integer> executionSequenceList = new ArrayList<>(); //定长5线程池 ExecutorService exs = Executors.newFixedThreadPool(5); long startTime = System.currentTimeMillis(); List<String> channelList = new ArrayList<>(); channelList.add("去哪儿"); channelList.add("携程"); channelList.add("飞猪"); channelList.add("艺龙"); channelList.add("同程"); // 初始值 int minPrice = -1; List<CompletableFuture<Integer>> futureList = new ArrayList<>(); try { //方式一:循环创建CompletableFuture list,调用sequence()组装返回一个有返回值的CompletableFuture,返回结果get()获取 for (int i = 0; i < channelList.size(); i++) { final int j = i; //异步执行 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> getPrice(channelList.get(j)), exs) //Integer转换字符串 thenAccept只接受不返回不影响结果 .thenApply(e -> Integer.valueOf(e)) //如需获取任务完成先后顺序,此处代码即可 .whenComplete((v, e) -> { executionSequenceList.add(v); }); futureList.add(future); } //流式获取结果:此处是根据任务添加顺序获取的结果 addSequenceList = sequence(futureList).get(); // 方式二:全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取 CompletableFuture[] cfs = channelList.stream().map(object -> CompletableFuture.supplyAsync(() -> getPrice(object), exs) .thenApply(h -> Integer.valueOf(h)) //如需获取任务完成先后顺序,此处代码即可 .whenComplete((v, e) -> { executionSequenceList.add(v); })).toArray(CompletableFuture[]::new); // 等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取 CompletableFuture.allOf(cfs).join(); for (Integer price : addSequenceList) { // 将第一个值赋值给minPrice或者后者价格小于当前价格 if (minPrice == -1 || (minPrice != -1 && minPrice > price)) { minPrice = price; } } long endTime = System.currentTimeMillis(); System.out.println("最低渠道的价格为:" + minPrice + ",执行时间为:" + (endTime - startTime)); } catch (Exception e) { e.printStackTrace(); } finally { exs.shutdown(); } } public static int getPrice(String channel) { int price = 0; try { // 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程 if ("去哪儿".equals(channel)) { // 模拟请求去哪儿接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 150; } else if ("携程".equals(channel)) { // 模拟请求携程接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 141; } else if ("飞猪".equals(channel)) { // 模拟请求飞猪接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 122; } else if ("艺龙".equals(channel)) { // 模拟请求艺龙接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 138; } else if ("同程".equals(channel)) { // 模拟请求同程接口的网络延迟,休眠5秒 Thread.sleep(5000); price = 195; } } catch (Exception e) { e.printStackTrace(); } System.out.println("获取:" + channel + "渠道的价格,价格为:" + price); return price; } /** * @param futures List * @return * @Description 组合多个CompletableFuture为一个CompletableFuture, 所有子任务全部完成,组合后的任务才会完成。带返回值,可直接get. * @author diandian.zhang * @date 2017年6月19日下午3:01:09 * @since JDK1.8 */ public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) { //1.构造一个空CompletableFuture,子任务数为入参任务list size CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); //2.流式(总任务完成后,每个子任务join取结果,后转换为list) return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())); } /** * @param futures Stream * @return * @Description Stream流式类型futures转换成一个CompletableFuture, 所有子任务全部完成,组合后的任务才会完成。带返回值,可直接get. * @author diandian.zhang * @date 2017年6月19日下午6:23:40 * @since JDK1.8 */ public static <T> CompletableFuture<List<T>> sequence(Stream<CompletableFuture<T>> futures) { List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList()); return sequence(futureList); } }
获取:去哪儿渠道的价格,价格为:150 获取:飞猪渠道的价格,价格为:122 获取:同程渠道的价格,价格为:195 获取:艺龙渠道的价格,价格为:138 获取:携程渠道的价格,价格为:141 最低渠道的价格为:122,执行时间为:5049
Java8流式编程,提供丰富的API接口,可以捕获异常,JDK8必选方案,相对于上面几种方案优势明显,强烈推荐使用。
CompletableFuture上面demo只介绍了一部分的内容,如果大家想要了解更多关于CompletableFuture的用法,可以查看JDK中java.util.concurrent.CompletableFuture源码。
五、总结:
从上面的实验我们可以得出,采用CompletableFuture进行多线程并发获取结果集是最好的方案。大家可能会很好奇,既然有了CompletableFuture,为啥还需要前面几种方案呢。那是因为CompletableFuture是在JDK8才出现的,以前还没有这个工具。
标签:minPrice,price,接口,add,CompletableFuture,倍速,多线程,channelList,channel 来源: https://blog.51cto.com/u_8865295/2984022