编程语言
首页 > 编程语言> > Java多线程(三):线程组与线程池

Java多线程(三):线程组与线程池

作者:互联网

目录:

1. 线程组

在一个系统中,如果线程数量较多,且功能分配比较明确,就可以将相同功能的线程放在同一个线程组(ThreadGroup) 中。

ThreadGroup有两个比较重要的功能:

线程组使用举例:

public class TestThreadGroup {

    public static void main(String[] args) {
        ThreadGroup applesThread = new ThreadGroup("apples");
        ThreadGroup bananasThread = new ThreadGroup("bananas");
        Apple apple = new Apple();
        Banana banana = new Banana();

        for (int i = 0; i < 3; i++) {
            new Thread(applesThread, apple, "apple" + i).start();
        }

        for (int i = 0; i < 3; i++) {
            new Thread(bananasThread, banana, "banana" + i).start();
        }

        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // activeCount()方法可以获得活动线程总数,但由于是动态的,因此这是一个估计值
        System.out.println("当前活动线程总数: " + applesThread.activeCount());
        // list()方法可以打印出这个线程中所有的线程信息,对调试有一定的帮助
        System.out.println("bananas组中的所有线程信息: ");
        bananasThread.list();
    }
}

class Apple implements Runnable {

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + ": I'm an apple");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Banana implements Runnable {

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + ": I'm a banana");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

执行结果:
在这里插入图片描述

2. 线程池

线程池是Java并发编程中一项重要的工具。

为解决此问题,我们可以使用JDK提供的工具——线程池

JDK为线程池提供了一套Executor框架,以帮助开发人员有效的进行线程控制

Executor框架的核心成员:

2.1 Executors(线程池工厂)

主要的工厂方法:

工厂方法功能
ExecutorService newFixedThreadPool(int nThreads)返回一个固定线程数量的线程池,当有新任务提交时,如有空闲线程则立即执行,否则暂存在任务队列中进行等待(先进先出)
ExecutorService newSingleThreadExecutor()相当于newFixedThreadPool(1)
ExecutorService newCachedThreadPool()返回一个可根据实际情况调整的线程池,优先复用线程,线程不够时创建。
ScheduledExecutorService newSingleThreadScheduledExecutor()返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService接口之上扩展了在给定时间执行某任务的功能
ScheduledExecutorService newScheduledThreadPool(int corePoolSize)newSingleThreadScheduledExecutor()类似,但此方法可以指定线程数量。

以 newFixedThreadPool(int nThreads) 为例,介绍线程池的使用:

public class TestPool {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);

        Runnable task = () -> {
            System.out.println(Thread.currentThread().getName() +  ": hello thread pool!");
        };

        for (int i = 0; i < 4; i++) {
            executorService.submit(task);
        }

        // 执行完所有任务后,关闭线程池(不再接受新任务)。如果线程池已经关闭,则调用没有其他效果。
        executorService.shutdown();
    }
}

2.2 计划任务(延时执行,周期执行)

Java线程池还提供了计划执行的功能,主要包括以下三个方法:

schedule举例:

public class ScheduledTask {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
        Runnable runnable = () -> {
            System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": 开始执行run()方法");
            // 任务需要执行1s
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": run()方法执行结束");
        };
        
        System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": start");

        /*
        延时2s后调用任务:
        schedule(Runnable command, long delay, TimeUnit unit)
         */
        scheduledExecutorService.schedule(runnable, 2, TimeUnit.SECONDS);

        scheduledExecutorService.shutdown();
    }
}

在这里插入图片描述

scheduleAtFixedRate举例:

public class ScheduledTask {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
        Runnable runnable = () -> {
            System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": 开始执行run()方法");
            // 任务需要执行1s
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": run()方法执行结束");
        };

        System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": start");

        /*
        每隔2s调用一次runnable任务:
        scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
         */
        scheduledExecutorService.scheduleAtFixedRate(runnable, 0, 2, TimeUnit.SECONDS);
    }
}

在这里插入图片描述

scheduleWithFixedDelay举例:

package com.ju.pool;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledTask {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
        Runnable runnable = () -> {
            System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": 开始执行run()方法");
            // 任务需要执行1s
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": run()方法执行结束");
        };

        System.out.println(new SimpleDateFormat("HH:mm:ss").format(new Date(System.currentTimeMillis())) + ": start");

        /*
        一开始延时5s开始执行任务,从前一次任务结束到下一次任务开始间隔2s:
        scheduleAtFixedRate(Runnable command, long initialDelay, long delay, TimeUnit unit)
         */
        scheduledExecutorService.scheduleWithFixedDelay(runnable, 5, 2, TimeUnit.SECONDS);
    }
}

在这里插入图片描述

2.3 核心线程池的内部实现

除了计划执行外,其余三个工厂方法(newFixedThreadPoolnewSingleThreadExecutornewCachedThreadPool)的内部实现均使用了ThreadPoolExcutor类。

ThreadPoolExcutor 类的一个参数较全的构造器如下所示:

public ThreadPoolExecutor(int corePoolSize, // 指定线程池中固定存在的线程数量
                              int maximumPoolSize, // 指定了线程池的最大线程数量
                              long keepAliveTime, // 指定了线程池中超过corePoolSize的空闲线程,在多长时间内会被销毁
                              TimeUnit unit, // keepAliveTime的单位
                              BlockingQueue<Runnable> workQueue, // 任务队列,由于存储已被提交但未被执行的任务
                              ThreadFactory threadFactory, // 线程工厂,由于创建线程,一般用默认的即可
                              RejectedExecutionHandler handler) // 拒绝策略:任务太多来不及处理时,如何拒绝任务

其中大部分的参数均比较容易理解,需要解释类型的有三个:

2.3.1 任务队列(BlockingQueue)

2.3.2 线程工厂(ThreadFactory)

2.3.3 拒绝策略(RejectedExecutionHandler)

2.4 Executors中线程池的实现

对于newFixedThreadPoolnewSingleThreadExecutornewCachedThreadPool方法,其方法内部均调用了下面的构造器。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

2.4.1 newFixedThreadPool 的实现

public static ExecutorService newFixedThreadPool(int nThreads) { // 方法参数 nThreads,线程池中固定存在的线程数量
    return new ThreadPoolExecutor(nThreads, nThreads, // corePoolSize 和 maximumPoolSize 均为 nThreads
                                  0L, TimeUnit.MILLISECONDS, // 存活时间为0ms
                                  new LinkedBlockingQueue<Runnable>()); // 使用无界的任务队列(LinkedBlockingQueue)
}

2.4.2 newSingleThreadExecutor 的实现

// 基本与 newFixedThreadPool 相同,只是限定了线程池中固定存在的线程数量为1
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

2.4.3 newCachedThreadPool 的实现

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // corePoolSize = 0; maximumPoolSize = Integer.MAX_VALUE;
                                  60L, TimeUnit.SECONDS, // keepAliveTime = 60s;
                                  new SynchronousQueue<Runnable>()); // 直接提交的队列(SynchronousQueue)
}

2.5 扩展线程池

在默认的ThreadPoolExecutor 实现中,提供了空的beforeExecute()afterExecute()terminated()方法。在实际应用中,可以对其进行扩展来实现对线程池运行状态的跟踪,输出一些有用的调试信息,以帮助系统故障诊断,这对于多线程程序错误排查是很有帮助的。

public class MyPool {

    public static void main(String[] args) {
        ExecutorService myPool = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println(t.getName() + ": 任务执行前");
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("任务执行结束");
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };

        myPool.execute(() -> {
            System.out.println(Thread.currentThread().getName() + "任务执行中...");
        });

        myPool.shutdown();
    }
}

在这里插入图片描述

2.6 分而治之(Fork & Join)

“分而治之”是一个非常有效地处理大量数据的方法。

利用JDK提供的 Fork & Join 框架,我们可以十分方便的将一个大任务分解为多个子任务。当所有的子任务都执行完成后,在收集它们各自的结果,从而得到最终的结果。

使用示例:

// 利用fork&join,实现start到end的累加
public class TestForkJoinPool extends RecursiveTask<Long> {
    private static final int THRESHOLD = 30000; // 划分门槛
    private static final int GROUP_SIZE = 10; // 每次划分的组数
    private long start; // 计数起始值
    private long end; // 计数终止值

    public TestForkJoinPool(long start, long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        long sum = 0;

        if (end - start < THRESHOLD) {
            for (long i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 将任务划分位多个子任务
            long step = (end - start + 1) / GROUP_SIZE;
            ArrayList<TestForkJoinPool> subTasks = new ArrayList<>();
            long head = start;
            long tail;
            for (int i = 0; i < GROUP_SIZE; i++) {
                tail = head + step;
                if (tail > end) {
                    tail = end;
                }
                TestForkJoinPool subTask = new TestForkJoinPool(head, tail);
                head = tail + 1;
                subTasks.add(subTask);
                subTask.fork();
            }

            // 收集子任务的计算结果
            for (TestForkJoinPool subTask : subTasks) {
                sum += subTask.join();
            }
        }

        return sum;
    }

    public static void main(String[] args) {
        long startTime = 0;
        long endTime = 0;
        long result = 0;

        ForkJoinPool forkJoinPool = new ForkJoinPool();

        startTime = System.currentTimeMillis();
        ForkJoinTask<Long> taskResult = forkJoinPool.submit(new TestForkJoinPool(0, 2000000000L));
        try {
            result = taskResult.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        endTime = System.currentTimeMillis();
        System.out.println("fork&join计算: sum = " + result + " 花费了" + (endTime - startTime) + "ms");

        startTime = System.currentTimeMillis();
        result = 0;
        for (int i = 0; i <= 2000000000L; i++) {
            result += i;
        }
        endTime = System.currentTimeMillis();
        System.out.println("普通计算: sum = " + result + " 花费了" + (endTime - startTime) + "ms");
    }
}

在这里插入图片描述

参考书籍: Java高并发程序设计(第二版)

标签:组与,System,long,任务,线程,new,多线程,public
来源: https://blog.csdn.net/weixin_42041195/article/details/117606743