其他分享
首页 > 其他分享> > 看完,你也能用多线程让接口提升5倍速!

看完,你也能用多线程让接口提升5倍速!

作者:互联网

一、应用场景

酒店提供给各个渠道商房间价格是不一样的,我们需要轮询所有的渠道商接口,给用户返回一个最低的价格,前端会将这个价格显示给用户。

二、接口要求

实时查询去哪儿、携程、飞猪、艺龙、同程等渠道的今日房价,计算并返回当日最低价。

三、接口难点

去哪儿、携程、飞猪、艺龙、同程因为是走外网,网络会有一定的延迟。如果同步一个个获取价格数据,接口会很慢。

四、实现

4.1 简单实现

循环调用各个渠道今日房价,对比每个渠道价格,计算并返回最低价格。

package com.fourkmiles.common.thread;

import java.util.ArrayList;
import java.util.List;

/**
 * @author 林志强
 * @date 2021/3/31
 */
public class MainDemo {
    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();
        List<String> channelList = new ArrayList<>();
        channelList.add("去哪儿");
        channelList.add("携程");
        channelList.add("飞猪");
        channelList.add("艺龙");
        channelList.add("同程");
        // 初始值
        int minPrice = -1;
        String minPriceChannel = "";
        for (String channel : channelList) {
            CallableChannelPrice callableChannelPrice = new CallableChannelPrice(channel);
            int channelPrice = callableChannelPrice.getPrice();
            // 将第一个值赋值给minPrice或者后者价格小于当前价格
            if (minPrice == -1 || (minPrice != -1 && minPrice > channelPrice)) {
                minPrice = channelPrice;
                minPriceChannel = channel;
            }
        }
        long endTime = System.currentTimeMillis();
        System.out.println("最低的渠道为:" + minPriceChannel + ",价格为:" + minPrice + ",执行时间为:" + (endTime - startTime));
    }

    static class CallableChannelPrice {
        String channel;

        public CallableChannelPrice(String channel) {
            this.channel = channel;
        }

        public int getPrice() throws Exception {
            int price = 0;
            // 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程
            if ("去哪儿".equals(channel)) {
                // 模拟请求去哪儿接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 150;
            } else if ("携程".equals(channel)) {
                // 模拟请求携程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 141;
            } else if ("飞猪".equals(channel)) {
                // 模拟请求飞猪接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 122;
            } else if ("艺龙".equals(channel)) {
                // 模拟请求艺龙接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 138;
            } else if ("同程".equals(channel)) {
                // 模拟请求同程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 195;
            }
            System.out.println("获取:" + channel + "渠道的价格,价格为:" + price);
            return price;
        }
    }
}

获取:去哪儿渠道的价格,价格为:150
获取:携程渠道的价格,价格为:141
获取:飞猪渠道的价格,价格为:122
获取:艺龙渠道的价格,价格为:138
获取:同程渠道的价格,价格为:195
最低的渠道为:飞猪,价格为:122,执行时间为:25003

循环调用有一个很明显的缺点,就是速度非常慢,接口响应的速度等于各个渠道接口的总和,这明显不符合我们的要求。

4.2 多线程Future/FutureTask实现

解决这类问题最有效的办法就是采用多线程并发执行,然后获取各自结果集来计算最终的价格。我们接下来看看Future和FutureTask的实现案例,以及它到底可以节省多少时间呢。

package com.fourkmiles.common.thread;

import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * @author 林志强
 * @date 2021/3/31
 */
public class FutureDemo {

    public static void main(String[] args) {
        //开启多线程
        ExecutorService exs = Executors.newFixedThreadPool(5);
        long startTime = System.currentTimeMillis();
        List<String> channelList = new ArrayList<>();
        channelList.add("去哪儿");
        channelList.add("携程");
        channelList.add("飞猪");
        channelList.add("艺龙");
        channelList.add("同程");
        // 初始值
        int minPrice = -1;
        try {
            //结果集
            List<Integer> list = new ArrayList<>();
            List<Future<Integer>> futureList = new ArrayList<>();
            //1.高速提交5个任务,每个任务返回一个Future入list
            for (String channel : channelList) {
                futureList.add(exs.submit(new CallableChannelPrice(channel)));
            }
            //2.结果归集,用迭代器遍历futureList,高速轮询(模拟实现了并发),任务完成就移除
            while (futureList.size() > 0) {
                Iterator<Future<Integer>> iterable = futureList.iterator();
                //遍历一遍
                while (iterable.hasNext()) {
                    Future<Integer> future = iterable.next();
                    //如果任务完成取结果,否则判断下一个任务是否完成
                    if (future.isDone() && !future.isCancelled()) {
                        //获取结果
                        Integer i = future.get();
                        list.add(i);
                        //任务完成移除任务
                        iterable.remove();
                    }
                }
            }
            for (Integer price : list) {
                // 将第一个值赋值给minPrice或者后者价格小于当前价格
                if (minPrice == -1 || (minPrice != -1 && minPrice > price)) {
                    minPrice = price;
                }
            }
            long endTime = System.currentTimeMillis();
            System.out.println("最低渠道的价格为:" + minPrice + ",执行时间为:" + (endTime - startTime));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }

    static class CallableChannelPrice implements Callable<Integer> {
        String channel;

        public CallableChannelPrice(String channel) {
            this.channel = channel;
        }

        public int getPrice() throws Exception {
            int price = 0;
            // 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程
            if ("去哪儿".equals(channel)) {
                // 模拟请求去哪儿接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 150;
            } else if ("携程".equals(channel)) {
                // 模拟请求携程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 141;
            } else if ("飞猪".equals(channel)) {
                // 模拟请求飞猪接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 122;
            } else if ("艺龙".equals(channel)) {
                // 模拟请求艺龙接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 138;
            } else if ("同程".equals(channel)) {
                // 模拟请求同程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 195;
            }
            System.out.println("获取:" + channel + "渠道的价格,价格为:" + price);
            return price;
        }

        @Override
        public Integer call() throws Exception {
            return getPrice();
        }
    }
}

获取:同程渠道的价格,价格为:195
获取:飞猪渠道的价格,价格为:122
获取:去哪儿渠道的价格,价格为:150
获取:携程渠道的价格,价格为:141
获取:艺龙渠道的价格,价格为:138
最低渠道的价格为:122,执行时间为:5004

FutureTask实现的版本如下所示:

package com.fourkmiles.common.thread;

import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author 林志强
 * @date 2021/3/31
 */
public class FutureTaskDemo {

    public static void main(String[] args) {
        //开启多线程
        ExecutorService exs = Executors.newFixedThreadPool(5);
        long startTime = System.currentTimeMillis();
        List<String> channelList = new ArrayList<>();
        channelList.add("去哪儿");
        channelList.add("携程");
        channelList.add("飞猪");
        channelList.add("艺龙");
        channelList.add("同程");
        // 初始值
        int minPrice = -1;
        try {
            //结果集
            List<Integer> list = new ArrayList<>();
            List<FutureTask<Integer>> futureList = new ArrayList<>();
            //1.高速提交5个任务,每个任务返回一个Future入list
            for (String channel : channelList) {
                FutureTask<Integer> futureTask = new FutureTask<>((new CallableChannelPrice(channel)));
                //提交任务,添加返回,Runnable特性
                exs.submit(futureTask);
                //Future特性
                futureList.add(futureTask);
            }
            //2.结果归集,用迭代器遍历futureList,高速轮询(模拟实现了并发),任务完成就移除
            while (futureList.size() > 0) {
                Iterator<FutureTask<Integer>> iterable = futureList.iterator();
                //遍历一遍
                while (iterable.hasNext()) {
                    Future<Integer> future = iterable.next();
                    //如果任务完成取结果,否则判断下一个任务是否完成
                    if (future.isDone() && !future.isCancelled()) {
                        //获取结果
                        Integer i = future.get();
                        list.add(i);
                        //任务完成移除任务
                        iterable.remove();
                    }
                }
            }
            for (Integer price : list) {
                // 将第一个值赋值给minPrice或者后者价格小于当前价格
                if (minPrice == -1 || (minPrice != -1 && minPrice > price)) {
                    minPrice = price;
                }
            }
            long endTime = System.currentTimeMillis();
            System.out.println("最低渠道的价格为:" + minPrice + ",执行时间为:" + (endTime - startTime));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }

    static class CallableChannelPrice implements Callable<Integer> {
        String channel;

        public CallableChannelPrice(String channel) {
            this.channel = channel;
        }

        public int getPrice() throws Exception {
            int price = 0;
            // 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程
            if ("去哪儿".equals(channel)) {
                // 模拟请求去哪儿接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 150;
            } else if ("携程".equals(channel)) {
                // 模拟请求携程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 141;
            } else if ("飞猪".equals(channel)) {
                // 模拟请求飞猪接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 122;
            } else if ("艺龙".equals(channel)) {
                // 模拟请求艺龙接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 138;
            } else if ("同程".equals(channel)) {
                // 模拟请求同程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 195;
            }
            System.out.println("获取:" + channel + "渠道的价格,价格为:" + price);
            return price;
        }

        @Override
        public Integer call() throws Exception {
            return getPrice();
        }
    }
}

从执行结果上面我们可以看到,采用多线程并发执行方式,执行时间取决于最长的渠道接口。从接口响应的时间来看,效果还是非常显著的,直接从25s降低为5s。

多线程虽然解决网络问题,速度大幅度提升,但是CPU高速轮询,耗资源,代码也不够简洁。我们接着来看看CompletionService的实现。

4.3 多线程CompletionService实现

package com.fourkmiles.common.thread;


import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;

/**
 * @author 林志强
 * @date 2021/3/31
 */
public class CompletionServiceDemo {

    public static void main(String[] args) {
        //开启3个线程
        ExecutorService exs = Executors.newFixedThreadPool(5);
        try {
            long startTime = System.currentTimeMillis();
            List<String> channelList = new ArrayList<>();
            channelList.add("去哪儿");
            channelList.add("携程");
            channelList.add("飞猪");
            channelList.add("艺龙");
            channelList.add("同程");
            // 初始值
            int minPrice = -1;
            //结果集
            List<Integer> list = new ArrayList<>();
            //1.定义CompletionService
            CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs);
            List<Future<Integer>> futureList = new ArrayList<>();
            //2.添加任务
            for (String channel : channelList) {
                futureList.add(completionService.submit(new CallableChannelPrice(channel)));
            }
            //==================结果归集===================
            //方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果
            for (Future<Integer> future : futureList) {
                //线程在这里阻塞等待该任务执行完毕,按照
                Integer result = future.get();
                list.add(result);
            }
           //方法2.使用内部阻塞队列的take()
//            for (int i = 0; i < channelList.size(); i++) {
//                //采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到
//                Integer result = completionService.take().get();
//                list.add(result);
//            }
            for (Integer price : list) {
                // 将第一个值赋值给minPrice或者后者价格大于当前价格
                if (minPrice == -1 || (minPrice != -1 && minPrice > price)) {
                    minPrice = price;
                }
            }
            long endTime = System.currentTimeMillis();
            System.out.println("最低渠道的价格为:" + minPrice + ",执行时间为:" + (endTime - startTime));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭线程池
            exs.shutdown();
        }
    }

    static class CallableChannelPrice implements Callable<Integer> {
        String channel;

        public CallableChannelPrice(String channel) {
            this.channel = channel;
        }

        public int getPrice() throws Exception {
            int price = 0;
            // 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程
            if ("去哪儿".equals(channel)) {
                // 模拟请求去哪儿接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 150;
            } else if ("携程".equals(channel)) {
                // 模拟请求携程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 141;
            } else if ("飞猪".equals(channel)) {
                // 模拟请求飞猪接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 122;
            } else if ("艺龙".equals(channel)) {
                // 模拟请求艺龙接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 138;
            } else if ("同程".equals(channel)) {
                // 模拟请求同程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 195;
            }
            System.out.println("获取:" + channel + "渠道的价格,价格为:" + price);
            return price;
        }

        @Override
        public Integer call() throws Exception {
            return getPrice();
        }
    }
}
获取:飞猪渠道的价格,价格为:122
获取:去哪儿渠道的价格,价格为:150
获取:携程渠道的价格,价格为:141
获取:艺龙渠道的价格,价格为:138
获取:同程渠道的价格,价格为:195
最低渠道的价格为:122,执行时间为:5003

CompletableFuture解决了Future/FutureTask版本中CPU高速轮询,耗资源问题,在JDK1.8之前推荐使用,但是提供API不够丰富,没办法框架自己捕捉异常。

4.4 多线程CompletableFuture实现

package com.fourkmiles.common.thread;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
 * @author 林志强
 * @date 2021/3/31
 */
public class CompletableFutureDemo {
    public static void main(String[] args) {
        // 添加顺序结果集
        List<Integer> addSequenceList;
        // 结果顺序结果集
        List<Integer> executionSequenceList = new ArrayList<>();
        //定长5线程池
        ExecutorService exs = Executors.newFixedThreadPool(5);
        long startTime = System.currentTimeMillis();
        List<String> channelList = new ArrayList<>();
        channelList.add("去哪儿");
        channelList.add("携程");
        channelList.add("飞猪");
        channelList.add("艺龙");
        channelList.add("同程");
        // 初始值
        int minPrice = -1;
        List<CompletableFuture<Integer>> futureList = new ArrayList<>();
        try {
            //方式一:循环创建CompletableFuture list,调用sequence()组装返回一个有返回值的CompletableFuture,返回结果get()获取
            for (int i = 0; i < channelList.size(); i++) {
                final int j = i;
                //异步执行
                CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> getPrice(channelList.get(j)), exs)
                        //Integer转换字符串    thenAccept只接受不返回不影响结果
                        .thenApply(e -> Integer.valueOf(e))
                        //如需获取任务完成先后顺序,此处代码即可
                        .whenComplete((v, e) -> {
                            executionSequenceList.add(v);
                        });
                futureList.add(future);
            }
            //流式获取结果:此处是根据任务添加顺序获取的结果
            addSequenceList = sequence(futureList).get();

//            方式二:全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
            CompletableFuture[] cfs = channelList.stream().map(object -> CompletableFuture.supplyAsync(() -> getPrice(object), exs)
                    .thenApply(h -> Integer.valueOf(h))
                    //如需获取任务完成先后顺序,此处代码即可
                    .whenComplete((v, e) -> {
                        executionSequenceList.add(v);
                    })).toArray(CompletableFuture[]::new);
//            等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取
            CompletableFuture.allOf(cfs).join();
            for (Integer price : addSequenceList) {
                // 将第一个值赋值给minPrice或者后者价格小于当前价格
                if (minPrice == -1 || (minPrice != -1 && minPrice > price)) {
                    minPrice = price;
                }
            }
            long endTime = System.currentTimeMillis();
            System.out.println("最低渠道的价格为:" + minPrice + ",执行时间为:" + (endTime - startTime));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();
        }
    }

    public static int getPrice(String channel) {
        int price = 0;
        try {
            // 1=去哪儿、2=携程、3=飞猪、4=艺龙、5=同程
            if ("去哪儿".equals(channel)) {
                // 模拟请求去哪儿接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 150;
            } else if ("携程".equals(channel)) {
                // 模拟请求携程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 141;
            } else if ("飞猪".equals(channel)) {
                // 模拟请求飞猪接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 122;
            } else if ("艺龙".equals(channel)) {
                // 模拟请求艺龙接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 138;
            } else if ("同程".equals(channel)) {
                // 模拟请求同程接口的网络延迟,休眠5秒
                Thread.sleep(5000);
                price = 195;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("获取:" + channel + "渠道的价格,价格为:" + price);
        return price;
    }

    /**
     * @param futures List
     * @return
     * @Description 组合多个CompletableFuture为一个CompletableFuture, 所有子任务全部完成,组合后的任务才会完成。带返回值,可直接get.
     * @author diandian.zhang
     * @date 2017年6月19日下午3:01:09
     * @since JDK1.8
     */
    public static <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
        //1.构造一个空CompletableFuture,子任务数为入参任务list size
        CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        //2.流式(总任务完成后,每个子任务join取结果,后转换为list)
        return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
    }

    /**
     * @param futures Stream
     * @return
     * @Description Stream流式类型futures转换成一个CompletableFuture, 所有子任务全部完成,组合后的任务才会完成。带返回值,可直接get.
     * @author diandian.zhang
     * @date 2017年6月19日下午6:23:40
     * @since JDK1.8
     */
    public static <T> CompletableFuture<List<T>> sequence(Stream<CompletableFuture<T>> futures) {
        List<CompletableFuture<T>> futureList = futures.filter(f -> f != null).collect(Collectors.toList());
        return sequence(futureList);
    }
}
获取:去哪儿渠道的价格,价格为:150
获取:飞猪渠道的价格,价格为:122
获取:同程渠道的价格,价格为:195
获取:艺龙渠道的价格,价格为:138
获取:携程渠道的价格,价格为:141
最低渠道的价格为:122,执行时间为:5049

Java8流式编程,提供丰富的API接口,可以捕获异常,JDK8必选方案,相对于上面几种方案优势明显,强烈推荐使用。

CompletableFuture上面demo只介绍了一部分的内容,如果大家想要了解更多关于CompletableFuture的用法,可以查看JDK中java.util.concurrent.CompletableFuture源码。

五、总结:

从上面的实验我们可以得出,采用CompletableFuture进行多线程并发获取结果集是最好的方案。大家可能会很好奇,既然有了CompletableFuture,为啥还需要前面几种方案呢。那是因为CompletableFuture是在JDK8才出现的,以前还没有这个工具。

 

标签:minPrice,price,接口,add,CompletableFuture,倍速,多线程,channelList,channel
来源: https://blog.51cto.com/u_8865295/2984022