其他分享
首页 > 其他分享> > 2022.8.21 Forkjoin与异步回调

2022.8.21 Forkjoin与异步回调

作者:互联网

14、Forkjoin(分支合并)

什么是 ForkJoin

ForkJoin 在 JDK 1.7 , 并行执行任务!提高效率。在大数据量中!

大数据:Map Reduce (把大任务拆分为小任务)

Forkjoin 特点:工作窃取,这里面维护的是双端队列

接口

 

 

 

通过forkjoinPool来执行forkjoin

构造方法

 

 

使用forkjoin

 package com.xing.forkjoin;
 ​
 import java.util.concurrent.RecursiveTask;
 ​
 /**
  * 求和计算的任务
  * 3000 6000(frokjoin) 9000(Stream并行流)
  * 如何使用frokjoin
  * 1.forkjoinPool通过它来执行
  * 2.计算任务forkjoinPool.execute(ForkJoinTask task)
  * 3.计算类要继承RecursiveTask(递归任务有返回值)
  */
 //                                       重写方法的返回值类型
 public class ForkJoinDemo extends RecursiveTask<Long> {
     private Long start;
     private Long end;
     //临界值
     private Long temp = 10000L;
 ​
     public ForkJoinDemo(Long start, Long end) {
         this.start = start;
         this.end = end;
    }
 ​
     //重写的方法
     @Override
     protected Long compute() {
         //正常计算
         if((end-start) < temp){
             Long sum = 0L;
             for (long i = start; i <= end; i++) {
 ​
                 sum += i;
            }
             return sum;
        }else {
             //forkjoin 递归
             long middle = (start + end) / 2; //中间值
 ​
             // 将一个任务拆分成两个任务
             ForkJoinDemo task1 = new ForkJoinDemo(start,middle);
             task1.fork();//拆分任务,把任务压入线程队列
 ​
             ForkJoinDemo task2 = new ForkJoinDemo(middle,end);
             task2.fork();
 ​
             //返回结果
             return task1.join() + task2.join();
 ​
        }
    }
 }
 ​

不同方法的执行速度 package com.xing.forkjoin;

 ​
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinTask;
 import java.util.stream.LongStream;
 ​
 public class Test {
     public static void main(String[] args) {
         test1();
         try {
             test2();
        } catch (ExecutionException e) {
             e.printStackTrace();
        } catch (InterruptedException e) {
             e.printStackTrace();
        }
         test3();
    }
     //普通程序员
     public static void test1(){
         Long sum = 0L;
         long start = System.currentTimeMillis();
         for (long i = 1L; i <= 10_0000_0000; i++) {
             sum += i;
        }
         long end = System.currentTimeMillis();
         System.out.println("sum = " + sum + "时间:" + (end - start));
    }
     //使用forkjoin
     public static void test2() throws ExecutionException, InterruptedException {
         long start = System.currentTimeMillis();
 ​
         ForkJoinPool forkJoinPool = new ForkJoinPool();
         ForkJoinTask<Long> task = new ForkJoinDemo(0L,10_0000_0000L); //向下转型
 ​
         ForkJoinTask<Long> submit = forkJoinPool.submit(task);//提交任务
         Long sum = submit.get();//获得结果
 ​
 ​
         long end = System.currentTimeMillis();
         System.out.println("sum=" + sum +"时间:" + (end - start));
    }
     //用Stream并行流
     public static void test3(){
         long start = System.currentTimeMillis();
 ​
         //stream并行流                         包含10_0000_0000
         Long sum = LongStream.rangeClosed(0L, 10_0000_0000L)
                .parallel()//并行计算
                .reduce(0,Long::sum);//调用Long下面的sum方法 输出结果
 ​
         
         long end = System.currentTimeMillis();
         System.out.println("sum =" + sum + "时间:" + (end - start));
    }
 }
 ​
 ​

15、异步回调(Future)

Future 设计的初衷: 对将来的某个事件的结果进行建模

同步回调

我们常用的一些请求都是同步回调的,同步回调是阻塞的,单个的线程需要等待结果的返回才能继续执行。

 

 

 

异步回调

有的时候,我们不希望程序在某个执行方法上一直阻塞,需要先执行后续的方法,那就是这里的异步回调。我们在调用一个方法时,如果执行时间比较长,我们可以传入一个回调的方法,当方法执行完时,让被调用者执行给定的回调方法。

 

 

 

 

 

 

 package com.xing.future;
 ​
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 ​
 /**
  * 异步调用:CompletableFuture
  * 异步执行
  * 成功回调
  * 失败回调
  */
 public class Demo01 {
     public static void main(String[] args) throws ExecutionException, InterruptedException {
      /*
         //发起一个请求
         //异步回调 没有返回值的异步回调
         CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
             try {
                 TimeUnit.SECONDS.sleep(2);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             System.out.println(Thread.currentThread().getName() + "runAsync=>void");
         });
         System.out.println("11111");
         completableFuture.get();//获取执行结果
         */
 ​
         //有返回值的异步回调
         //Ajax 成功和失败的回调
         //返回的是错误信息
         CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
             System.out.println(Thread.currentThread().getName() + "supplyAsync=>Integer");
             //int i = 10/0;
             return 1024;
        });
         System.out.println(completableFuture.whenComplete((t,u)->{//结果编译成功的时候返回  
             //成功的时候t为1024 u为null
             System.out.println("t=>" + t);//错的是时候t为null,
             System.out.println("u=>" + u);//错的时候u打印错误信息 java.util.concurrent.CompletionException:java.lang.ArithmeticException: / by zero
        }).exceptionally((e)->{//编译失败的时候返回
             System.out.println(e.getMessage());//打印异常信息 java.lang.ArithmeticException: / by zero
             return 2333;//可以获取错误的返回结果
        }).get());
    }
 ​
 }
 ​

 

标签:end,21,sum,System,Long,start,2022.8,println,Forkjoin
来源: https://www.cnblogs.com/shanzha/p/16611047.html