编程语言
首页 > 编程语言> > JUC并发编程

JUC并发编程

作者:互联网

JUC概念

  1. 进程和线程

​ 一个进程包括多个线程

​ 进程指的是一个正在运行的应用程序,进程是一个资源分配的最小单位

​ 线程是程序中一个单元执行流,程序执行的最小单位

  1. 用户线程:基本我们编程的线程都是用户线程

​ 主线程结束,用户线程还在,jvm存活

  1. 守护线程:如垃圾回收

​ 用户线程设置成守护线程,主线程结束,没有其他用户线程,jvm结束

  1. 管程:

​ Monitor监视器,也就是所谓的锁,同步机制,保证同一时间,只有一个线程访问被保护数据或代码

  1. wait/sleep

    wait是object的方法,任何对象都能调用,会释放锁,调用他的前提是当前线程占有锁(代码在synchronized中)

    sleep不会释放锁,他也不需要占用锁

  2. 线程状态

    NEW 新建

    RUNNABLE 准备就绪

​ BLOCKED 阻塞

​ WAITING 不见不散

​ TIMED_WAITING 过期不候

​ TERMINATED 终结

  1. 并发与并行

    并发多个任务一个一个执行,多个线程访问同一个资源

    如电商秒杀,抢票

    并行是多个任务一起执行,多个线程一起执行最后汇总

Future接口

异步任务,帮助主线程去做一些耗时的业务

三个特点:异步任务/有返回/多线程

import java.util.concurrent.*;

public class Future {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<String> futureTask=new FutureTask<String>(new MyThread());
        Thread thread=new Thread(futureTask,"thread");
        thread.start();
        System.out.println(futureTask.get());
    }
}

class MyThread implements Callable<String>{

  @Override
  public String call() throws Exception {
      System.out.println("---come in call()");
      return "hello Callable";
  }
}

输出 :
---come in call()
hello Callable

Future 优缺点

优点:
future+线程池异步多线程配合,显著提高程序的执行效率
import java.util.concurrent.*;

public class FutureThreadPoolDemo {
    public static void main(String[] args) {
        long starTime = System.currentTimeMillis();
        ExecutorService threadPool= Executors.newFixedThreadPool(3);
        FutureTask<String> futureTask1 =new FutureTask<>(()->{
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            return "task1 over";
        });
        threadPool.submit(futureTask1);
        FutureTask<String> futureTask2 =new FutureTask<>(()->{
            try {
                TimeUnit.MILLISECONDS.sleep(500);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            return "task2 over";
        });
        threadPool.submit(futureTask2);
        try {
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        futureTask1.get();
        futureTask2.get();

        long endTime = System.currentTimeMillis();
        System.out.println("---threadPool costTime: "+(endTime-starTime)+"毫秒");

        threadPool.shutdown();
        m1();
    }
    private static void m1(){
        long starTime = System.currentTimeMillis();
        try {
            TimeUnit.MILLISECONDS.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            TimeUnit.MILLISECONDS.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long endTime = System.currentTimeMillis();
        System.out.println("---m1 costTime: "+(endTime-starTime)+"毫秒");

    }
}}

输出:

---threadPool costTime: 572毫秒
---m1 costTime: 1140毫秒

​ get(3,TimeUnit.SECONDS); 设置超时时长,会抛出异常,可以自动离开,一定成可以避免阻塞,但是不优雅

缺点:

  1. get()获取容易阻塞,需要等待get获取到,才会执行下面的程序

  2. isDone()轮询耗费cpu

      while (true){
            if (futureTask.isDone()){
                System.out.println(futureTask.get());
                break;
            }
            else {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                }catch (Exception e){
                    e.printStackTrace();
                }
                System.out.println("还未执行完");
            }
        }

​ isDone()可以轮询查问是否执行完,拿到一次返回值

CompletableFuture 常用方法

提供观察者模式,可以将执行任务完成后通知通知监听的一方

它实现了Future和CompletionStage接口

CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
            System.out.println(Thread.currentThread().getName());
            try {
                 TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(completableFuture.get());
    }

输出 :
ForkJoinPool.commonPool-worker-9
null

runAsync无返回值

  ExecutorService threadPool = Executors.newFixedThreadPool(3);
  CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello supplyAsync";
        },threadPool);
        System.out.println(completableFuture1.get());
        threadPool.shutdown();

输出 :
pool-1-thread-1
hello supplyAsync

supplyAsync 有返回值

CompletableFuture自定义线程池

    CompletableFuture.supplyAsync(()->{
        System.out.println(Thread.currentThread().getName()+"---come in");
        int result = ThreadLocalRandom.current().nextInt(10);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("--- 1秒后输出结果" + result);
        return result;
        }).whenComplete((v,e)->{
        if (e == null){
          System.out.println("---计算完成,更新系统updateVa"+ v);
        }
        }).exceptionally((e)->{
          e.printStackTrace();
          System.out.println("异常情况"+e.getCause()+"\t"+e.getMessage());
          return null;
       });
       System.out.println(Thread.currentThread().getName()+"线程先去忙其他业务");
       //主线程不要立刻结束,否则CompletableFuture默认使用的线程会立刻关闭
       try {
         TimeUnit.SECONDS.sleep(3);
       } catch (InterruptedException e) {
          e.printStackTrace();
     }

输出:

ForkJoinPool.commonPool-worker-9---come in
main线程先去忙其他业务
--- 1秒后输出结果3
---计算完成,更新系统updateVa3

CompletionFuture 优点

异步结束时,会自动回调某个对象的方法

主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行

异步任务出错时,会自动回调某个对象的方法

CompletionFuture 函数

函数接口名称 方法名称 参数 返回值
Runnable run 无参数 无返回值
Function apply 1个参数 有返回值
Consume accept 1个参数 无返回值
Supplier get 没有参数 有返回值
BiConsumer accept 2个参数 无返回值

获取结果和触发计算

join和get对比

get 和 join 作用相同,但是get需要做抛出异常处理,jion不需要检查异常, 都要等待计算完成后的返回值

优雅的获取返回值,并防止阻塞:

getNow(T valueIfAbsent)

没有计算完成的情况下,给我一个替代结果

立刻获取结果不阻塞,计算完,返回计算完成后的结果,没计算完,返回设置的valueIfAbsent的值

complete(T value)

是否打断get方法立即返回括号值,打断会返回true ,将completeValue返回给join

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
         try {
            TimeUnit.SECONDS.sleep(5);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
             return "abc";
       });
       try {
            TimeUnit.SECONDS.sleep(1);
       } catch (InterruptedException e) {
            e.printStackTrace();
       }
    System.out.println(completableFuture.getNow("no value"));
    System.out.println(completableFuture.complete("completeValue") +"\t"+ completableFuture.join());

输出:

no value
true completeValue

对计算结果进行处理

thenApply

计算结果存在依赖关系,这是两个线程的串行化

由于存在依赖关系,当前步错,不能走下一步,会在当前步停止

ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
        try {
          TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
           e.printStackTrace();
        }
           return 1;
        },threadPool).thenApply(f -> {
           System.out.println(111);
           return f / 0;
        }).thenApply(f -> {
           System.out.println(222);
           return f + 1;
        }).whenComplete((v, e) -> {
            if (e == null) {
            System.out.println("---计算结果: " + v);
            }
         }).exceptionally(e -> {
            e.printStackTrace();
            ystem.out.println(e.getMessage());
            return null;
         });
System.out.println(Thread.currentThread().getName()+"-----主线程先忙其他业务");
threadPool.shutdown();

输出

main-----主线程先忙其他业务
111
java.lang.ArithmeticException: / by zero

Handle

有异常也可以往下一步走,根据带的异常参数可以进一步处理

ExecutorService threadPool = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
         } catch (InterruptedException e) {
            e.printStackTrace();
         }
         return 1;
        }, threadPool).handle((f, e) -> {
        System.out.println(111);
           return f / 0;
        }).handle((f, e) -> {
           System.out.println(222);
           return f + 1;
        }).whenComplete((v, e) -> {
            if (e == null) {
              System.out.println("---计算结果: " + v);
             }
        }).exceptionally(e -> {
            e.printStackTrace();
            System.out.println(e.getMessage());
            return null;
        });
System.out.println(Thread.currentThread().getName() + "-----主线程先忙其他业务");
threadPool.shutdown();

输出 :

main-----主线程先忙其他业务
111
222
java.lang.NullPointerException

对计算结果进行消费

接受任务的处理结果,并消费处理,无返回结果

thenAccept

  CompletableFuture.supplyAsync(()-> 1)
                    .thenApply(f-> f+1)
                    .thenAccept(System.out::println);

输出:

2

thenRun、 thenAccept、thenApply 区别

thenRun 任务A执行完执行B,B不需要传入值,也无返回值

thenAccept 任务A执行完执行B,B不需要传入值,有返回值

thenApply 任务A执行完执行B,B需要传入值,有返回值

System.out.println(CompletableFuture.supplyAsync(()->"resultA").thenRun(()->{}).join());
System.out.println(CompletableFuture.supplyAsync(()->"resultA")
                   .thenAccept(r->{ System.out.print(r+"\t"); })
                   .join());
System.out.println(CompletableFuture.supplyAsync(()->"resultA")
                   .thenApply(r->r+"resultB")
                   .join());

输出:

null
resultA null
resultAresultB

thenRun和thenRunAsync区别

  1. 没有传入自定义线程池,默认使用线程池ForkJoinPool

  2. 传入一个自定义的线程池

    如果你执行第一个任务的时候,传入了一个自定义线程池;

    调用thenRun方法执行第二个任务时,则第一个任务和第二个任务是共用一个线程池

    调用thenRunAsync执行第二个任务时,则第一个任务使用你传入的线程池,第二任务使用ForkJoinPool

  3. 备注

    有可能处理的太快,系统优化切换原则,直接使用main线程处理

    其他的thenAccept和 thenAcceptAsync,thenApply和thenApplyAsync等,也是同理

     ExecutorService threadPool = Executors.newFixedThreadPool(5);
           try {
              CompletableFuture completableFuture= CompletableFuture.supplyAsync(()->{
                try {
    //                  TimeUnit.MILLISECONDS.sleep(20);
                } catch (Exception e) {
                      e.printStackTrace();
                }
                  System.out.println("1号任务"+"\t"+Thread.currentThread().getName());
                  return "abc";
                },threadPool).thenRun(()->{
                   try {
                      TimeUnit.MILLISECONDS.sleep(20);
                   } catch (InterruptedException e) {
                      e.printStackTrace();
                   }
                  System.out.println("2号任务"+"\t"+Thread.currentThread().getName());
               }).thenRunAsync(()->{
                   try {
                      TimeUnit.MILLISECONDS.sleep(20);
                   } catch (InterruptedException e) {
                      e.printStackTrace();
                   }
                  System.out.println("3号任务"+"\t"+Thread.currentThread().getName());
               }).thenRun(()->{
                   try {
                       TimeUnit.MILLISECONDS.sleep(20);
                   } catch (InterruptedException e) {
                        e.printStackTrace();
                   }
                  System.out.println("4号任务"+"\t"+Thread.currentThread().getName());
                    });
                 System.out.println(completableFuture.get(2L,TimeUnit.MINUTES));
             } catch (Exception e) {
                 e.printStackTrace();
             } finally {
                 threadPool.shutdown();
             }
    
    

    输出:

    1号任务 pool-1-thread-1
    2号任务 main
    3号任务 ForkJoinPool.commonPool-worker-9
    4号任务 ForkJoinPool.commonPool-worker-9
    null

对计算速度进行选用

applyToEither

谁快用谁

 CompletableFuture playA = CompletableFuture.supplyAsync(() -> {
     System.out.println("playA come in");
     try {
         TimeUnit.SECONDS.sleep(1);
     } catch (InterruptedException e) {
          e.printStackTrace();
     }
          return "playA";
 });
 CompletableFuture playB = CompletableFuture.supplyAsync(() -> {
      System.out.println("playB come in");
      try {
        TimeUnit.SECONDS.sleep(2);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      return "playB";
 });
 CompletableFuture<String> result=playA.applyToEither(playB, f -> f + " is winer");
 System.out.println(Thread.currentThread().getName()+"\t"+result.join());

输出:

playA come in
playB come in
main playA is winer

对计算结果进行合并

thenCombine

 CompletableFuture<Integer> playA = CompletableFuture.supplyAsync(() -> {
   System.out.println("playA come in");
   try {
        TimeUnit.SECONDS.sleep(1);
   } catch (InterruptedException e) {
         e.printStackTrace();
   }
         return 10;
 });
 CompletableFuture<Integer> playB = CompletableFuture.supplyAsync(() -> {
   System.out.println("playB come in");
   try {
        TimeUnit.SECONDS.sleep(2);
   } catch (InterruptedException e) {
        e.printStackTrace();
   }
        return 20;
 });
 CompletableFuture<Integer> result =playA.thenCombine(playB,(x,y) -> x+y);
     System.out.println(result.join());

输出:

playA come in
playB come in
30

标签:JUC,printStackTrace,编程,System,并发,线程,println,CompletableFuture,out
来源: https://www.cnblogs.com/cool-fun/p/16379420.html