什么是虚拟线程?java19下的虚拟线程编程教程
作者:互联网
虚拟线程作为 Java 19 的一部分于 2022 年 9 月作为预览功能发布。它们是平台线程的轻量级版本。与遗留平台线程不同,虚拟线程的内存占用很小。有关虚拟线程的深入介绍,请查看以下 文章。
虚拟线程支持为每个工作单元模型创建一个线程,无论我们要处理多少任务。我们可以为每个任务创建一个虚拟线程,而不是通过线程池重用遗留平台线程。在每个线程的基础上遵循每个任务的编程流程可以简化编码并简化维护。特别是,虚拟线程与 I/O 请求一起大放异彩,以支持并发高吞吐量 I/O 编程。
CPU 绑定计算通过平台线程和线程池获得最佳 CPU 利用率。我们可以执行 CPU 密集型任务并 利用线程池 来支持并行化,使我们能够使用 CPU 多核架构来最大化速度和资源利用率。
因此,我们可以随意使用虚拟线程来支持 I/O 绑定请求和线程池及其关联的平台线程来处理 CPU 绑定任务。在本文中,我们将提供不同的策略,当我们请求工作单元同时需要 I/O 任务 和 资源昂贵的 CPU 密集型计算时,我们可以遵循这些策略。我们将探索一些可用的选项,并评估不同编程选择的权衡。
问题概述
让我们考虑一个关键任务应用程序的简单示例,该应用程序监视发电厂中的不同机器并快速确定它们是否正常运行。我们假设机器有连接到网络的传感器,这些传感器提供温度、压力等读数。问题的要点是我们必须连接到这些设备,获取它们的数据读数,并快速进行计算密集型数据分析以报告系统状态。
假设我们有一个连续运行的批处理进程来检查不同机器的状态。为了简化问题,我们假设进程从文件中连续读取数据,从中获取位置和必须监视的时间间隔。数据示例可能如下所示:
网址 | ID | 开始时间 | 时间结束 |
https://pw1/section/传感器 | 756a9c | 12/21/2022 11:00 | 12/21/2022 12:00 |
https://pw1/section2/传感器 | 647d5米 | 12/21/2022 11:00 | 12/21/2022 12:00 |
https://pw2/section/sensors | 948k4l | 12/21/2022 8:00 | 12/21/2022 12:00 |
https://pw2/section2/传感器 | 938r8c | 12/21/2022 9:00 | 12/21/2022 12:00 |
这个例子展示了一个大大简化的通用问题类型,但无需太多努力,它就可以很容易地泛化。我们的任务是展示我们将如何使用虚拟线程编写这个批处理过程,并展示我们将以编程方式解决这个问题的新方法及其相对于传统方法的优势。
使用虚拟线程编程
要开始我们的批处理,我们必须读取表的条目并将它们放在条目列表 ( List<InputEntry>
) 中,其中InputEntry
定义为 java 记录:
1、record InputEntry ( String url , String id , String startTime , String endTime ) {}
接下来,我们必须查看列表并分析每台机器的传感器读数:
爪哇1 public static void processData ( List < InputEntry > inputEntries ) {
2 系统。出来。println ( "processSensorReadings()" );
3 ExecutorService 执行器 = 执行器。newVirtualThreadPerTaskExecutor ();
4 CompletionService < String > cService = new ExecutorCompletionService <> (执行器);
5 对于(InputEntry inputEntry:inputEntries){
6 c服务。提交(() -> processSensorData ( inputEntry ));
7 }
8 的
9 int 处理 = 0 ;
10 while ( processed < inputEntries . size ()) {
11 处理过的++;
12 尝试{
13 未来<字符串> resultFuture = cService。取();
14 系统。出来。println ( "处理状态:" + resultFuture.get ( )) ;
15 }赶上(ExecutionException | InterruptedException e){
16 系统。出来。println ( "处理失败:" + e.getMessage ()) ;
17 }
18 }
19 }
在第 3 行中,我们使用一个新的执行器为每个任务启动一个虚拟线程。
在第 4 行中,我们将此执行器包装在 aCompletionService
中,这允许我们按照虚拟线程完成的顺序执行任务,如第 10-18 行所示。
在第 6 行中,我们将 提交processSensorData()
给CompletionService
,正如我们将看到的,它将执行 I/O 请求以及可能的数据分析。
由于虚拟线程是预览功能,我们需要使用--enable-preview
VM 参数运行 Java 19。
在这个阶段,我们注意到对于每个InputEntry
,我们都在创建一个虚拟线程。这个过程与我们使用遗留线程编程的方式形成对比,使用线程池来重用这种昂贵的资源。这种方法的优势是显而易见的,每个工作单元有一个虚拟线程。它促进了代码逻辑,而且更容易维护或解决问题——例如,通过线程转储或堆栈跟踪。
获取和处理数据
我们需要获取传感器数据来对其进行分析,因此我们执行以下操作:
爪哇1 public static String processSensorData ( InputEntry inputEntry )抛出 IOException , InterruptedException {
2 的
3 双流数据 = fetchSensorData ( inputEntry );
4 的
5 返回 “ID:” + inputEntry。id () + ": " + analyzeSensorData (数据);
6 }
7
8 个私有 静态 DoubleStream fetchSensorData ( InputEntry inputEntry )抛出 MalformedURLException , InterruptedException {
9 URL pwUrl = new URL ( inputEntry .url ( ) + " /startTime/endTime" );
10 // 在真实的应用程序中打开一个安全的 url 流并获取数据
11 // 对于这个例子,我们返回一些随机数据并模拟网络延迟
12 线程。睡眠((长)(数学。随机()* 100));
13 双流 数据 = 双流。生成(() -> new Random (). nextDouble ())。限制(100);
14 返回 数据;
15 }
在这个简单的示例中,我们只从一个传感器获取数据,但在更实际的应用程序中,我们可能需要在每个虚拟线程上处理多个具有多个 I/O 操作的传感器。
至此,流程就很清楚了——我们的虚拟线程正在获取数据并并行运行。接下来的问题是我们将如何分析数据。让我们看看我们可以遵循的不同策略。
分析数据
我们假设我们必须执行的数据分析是 CPU 密集型的。理想情况下,我们希望在同一个线程中执行此计算,以维护线程每单位工作范例。但是,如果分析对时间敏感,则可能有更好的方法。我们知道,对于受 CPU 限制的工作负载,最好使用线程池来获得最佳性能。
在这种情况下,虚拟线程最好将工作委托给线程池:
爪哇1 public static int analyzeSensorData(双流 数据){
2 的
3 双 resultMean = 0 ;
4 // 使用 ForkJoinPool 提高性能
5 // 在实际应用中做适当的数据分析
6 结果均值 = 数据。并行()。平均值()。getAsDouble ();
7 返回 determineStatusCode ( resultMean );
8 }
9
10 private static int determineStatusCode(双 结果){
11 // 在实际应用中根据结果参数确定代码
12 如果(结果 > 0.49){
13 返回 719 ; // 编写错误代码
14 }否则{
15 返回 0;
16 }
17 }
对于此示例,我们使用 Java Streams,它在幕后利用ForkJoinPool
进行计算。
在第 6 行中,我们向 Stream 框架发出信号以并行化计算。我们找到平均值,但分析将更多地涉及实际应用程序。
我们使用 Java 提供的 common ForkJoinPool
,但我们将在更现实的场景中定义我们自己的。我们还必须确定池大小以最大化 CPU 利用率。对于公共池,您可以设置系统属性:
1 系统。setProperty ( "java.util.concurrent.ForkJoinPool.common.parallelism" , "10" );
我们假设 10 是优化系统的并发线程数。
我们可以看到,按照这个路径,我们通过从虚拟线程调用线程池来优化数据分析处理。当虚拟线程完成它们的 I/O 任务时,它们将利用 Java Stream 平台来执行(或排队)ForkJoinPool
.
这是一个例子;我们本可以使用其他线程池来进行计算。在提供的示例代码中,我们展示了更多示例。
或者,如果我们需要限制虚拟线程对稀缺资源的利用,我们可以使用信号量,如以下代码所示:
爪哇1 public static Semaphore semaphoreService = new Semaphore ( NUM_PERMITS );
2 的
3 public static String performScarceResourceRequest ()抛出 InterruptedException , IOException {
4
5 信号量服务。获得();
6 //执行昂贵的请求
7 // 对于这个例子,我们返回一些随机数据并模拟网络延迟
8 线程。睡眠((长)数学。随机());
9 信号量服务。释放();
10 返回 “请求的数据”;
11 }
我们微调NUM_PERMITS
我们在第 1 行的信号量中设置的数量,以优化对资源的访问。虚拟线程必须获得信号量的许可才能运行。
最后,我们介绍了虚拟线程执行需要其他子任务才能完成的任务的情况。Java 19 版本也有一个支持 结构化并发的框架。该项目处于孵化器阶段。它为使用虚拟线程运行相关任务提供了强大的支持。我们将介绍一个简单的案例,在分析来自传感器的数据之前,我们必须执行一些验证:
爪哇1 public static int validateAndAnalyzeSensorData(双流 数据)抛出 IOException,InterruptedException,ExecutionException {
2 尝试(var scope = new StructuredTaskScope。ShutdownOnFailure ( )){
3 Future < String > validatedData = 范围。fork (() -> validateData (数据));
4 未来<字符串> checkedEnvironment = 范围。fork (() -> checkEnvironment (数据));
5 范围。加入();
6 范围。throwIfFailed ( e -> new IOException ( e ));
7 if ( validatedData.get ( ). equals ( " ok" ) && checkedEnvironment.get ( ). equals ( " ok" ))
8 返回 分析传感器数据(数据);
9 别的
10 返回 - 1 ;
11 }
12 }
13
14 public static String validateData ( DoubleStream data ) throws IOException , InterruptedException {
15 // 在真实的应用程序中打开一个安全的 url 流并获取数据
16 线程。睡眠((长)数学。随机());
17 返回 “确定”;
18 }
19
20 public static String checkEnvironment ( DoubleStream data ) throws IOException , InterruptedException {
21 // 在真实的应用程序中打开一个安全的 url 流并获取数据
22 线程。睡眠((长)数学。随机());
23 返回 “确定”;
24 }
在第 2 行中,我们使用StructuredTaskScope
协调并发运行任务。StructuredTaskScope
支持 try-with-resources 以确保一切都在语句结束时关闭。
我们在第 3 行和第 4 行运行验证并检查环境调用scope.fork
() 将 a 作为参数Callable
,生成一个新线程,并返回 a Future
。父任务范围ThreadFactory
创建新线程,因此我们将拥有虚拟线程。
在第 4 行,我们加入了两个任务,第 6 行调用scope.throwIfFailed()
以传播任何错误。最后,在第 7 行,我们知道这两个任务已经成功完成,所以我们可以继续。
由于结构并发处于孵化器模式,我们需要使用参数运行 VM --add-modules=jdk.incubator.concurrent
:。
另一种方法是在父线程的虚拟线程中连续执行这两个子任务。如果任务密切相关并且我们希望为每个工作单元范例保留一个线程,那么这可能是可取的。但是,这不会像以前的方法那样高效。
结论
虚拟线程有望彻底改变我们完成并发 I/O 编程的方式。本教程涵盖了当我们的任务需要支持 I/O 和 CPU 密集型工作负载时处理每个工作单元线程的不同方法。