编程语言
首页 > 编程语言> > java-如何执行CompletableFuture数组并组合其结果

java-如何执行CompletableFuture数组并组合其结果

作者:互联网

我正在研究Java 8 CompletableFutures,并阅读(看到)我应该使用thenCompose而不是thenApply.

我已经将代码转换为使用thenCompose,但是感觉不正确.

这是我的控制代码…

final CompletableFuture<List<String>> extractor = get(htmlPageSource);
@SuppressWarnings("unchecked")
final CompletableFuture<List<Documentable>>[] completableFutures =
   new CompletableFuture[ENDPOINT.EXTRACTABLES.size()];
int index = 0;
for( ENDPOINT endpoint : ENDPOINT.EXTRACTABLES ) {
   final CompletableFuture<List<Documentable>> metaData =
      extractor.thenComposeAsync(
         s -> endpoint.contactEndpoit(s), executorService );
   completableFutures[index++] = metaData.exceptionally(x -> failedList(x));
}
CompletableFuture
   .allOf( completableFutures )
   .thenComposeAsync( dummy -> combineDocuments( completableFutures ))
   .thenAccept   ( x -> finish( x ))
   .exceptionally( x -> failed( x ));

private List<Documentable> failedList(final Throwable x) {
   LOGGER.error("failedList", x);
   final List<Documentable> metaData = new ArrayList<>();
   return metaData;
}

private Void failed(final Throwable x) {
   LOGGER.error("failed", x);
   return null;
}

我认为可以接受

但是令我不安的代码是:

WWW_SITE_ONE("https://example.site.one/") {
   @Override
   public <T extends Documentable> CompletionStage<List<T>> contactEndpoit( final List<String> elements) {
      LOGGER.info("WWW_SITE_ONE " + Thread.currentThread().getName());
      final List<T> SITE_ONEs = new ArrayList<>();
      for (final String element : elements) {
         try {
            final String json = Jsoup.connect(ENDPOINT.WWW_SITE_ONE.getBaseUrl() + element).ignoreContentType(true).ignoreHttpErrors(true).maxBodySize(0).timeout(60000).execute().body();
            if (json.contains("errors")) {
               continue;
            }
            final T SITE_ONE = OBJECT_READER_SITE_ONE.readValue(json);
            SITE_ONEs.add(SITE_ONE);
         }
         catch( final Throwable e ) {
            LOGGER.error("WWW_SITE_ONE failed", e);
            throw new RuntimeException(e);
         }
      }
      return CompletableFuture.supplyAsync(() -> SITE_ONEs);  
   }
},
WWW_SITE_TWO("https://example.site.two/") {
   @Override
   public <T extends Documentable> CompletionStage<List<T>> contactEndpoit(final List<String> elements) {
      LOGGER.info("WWW_SITE_TWO " + Thread.currentThread().getName());      
      final List<T> SITE_TWOs = new ArrayList<>();
      for (final String element : elements) {
         try {
            final String json = Jsoup.connect(ENDPOINT.WWW_SITE_TWO.getBaseUrl() + element).ignoreContentType(true).ignoreHttpErrors(true).maxBodySize(0).timeout(60000).execute().body();
            if (json.equals("Resource not found.")) {
               continue;
            }
            final T SITE_TWO = OBJECT_READER_SITE_TWO.readValue(json);
            SITE_TWOs.add(SITE_TWO);
         }
         catch (final Throwable e) {
            LOGGER.error("WWW_SITE_TWO failed", e);
            throw new RuntimeException(e);
         }
      }
      return CompletableFuture.supplyAsync(() -> SITE_TWOs);  
   }
},
WWW_SITE_THREE("https://example.site.three/") {
   @Override
   public <T extends Documentable> CompletionStage<List<T>> contactEndpoit(final List<String> elements) {
      LOGGER.info("WWW_SITE_THREE " + Thread.currentThread().getName());        
      final List<T> SITE_THREEs = new ArrayList<>();
      for (final String element : elements) {
         try {
            final String SITE_THREEJsonString = Jsoup
               .connect( ENDPOINT.WWW_SITE_THREE.getBaseUrl() + element)
               .ignoreContentType(true)
               .ignoreHttpErrors(true)
               .maxBodySize(0)
               .timeout(60000)
               .execute()
               .body();
            final SITE_THREE SITE_THREE_Json = OBJECT_READER_SITE_THREE.readValue(SITE_THREEJsonString);
            final T SITE_THREE = (T) SITE_THREE_Json;
            if (SITE_THREE_Json.getHitCount() > 0) {
               SITE_THREEs.add(SITE_THREE);
            }
         }
         catch (final Throwable e) {
            LOGGER.error("WWW_SITE_THREE failed", e);
            throw new RuntimeException(e);
         }
      }
      return CompletableFuture.supplyAsync(() -> SITE_THREEs);  
   }
};

它是我返回CompletableFuture.supplyAsync(()-> SITE_THREEs)的地方;

这是正确的方法吗?

还是启动另一个异步线程以简单地返回我的List<&gt ;?

解决方法:

顾名思义,supplyAsync将执行异步操作,并在后台线程中执行Supplier的get()方法,从而执行lambda表达式的主体,而不管它多么琐碎.由于supplyAsync的实现无法检查Supplier封装的代码的琐碎程度,因此必须以这种方式工作.

您应该使用CompletableFuture.completedFuture(SITE_THREEs)而不是CompletableFuture.supplyAsync(()-> SITE_THREEs),它返回一个已经完成了结果的future,因此,不需要其他操作.

如果该方法仅返回完成的阶段或引发异常,则也可以将其更改为返回结果值而不是CompletionStage,并使用thenApply而不是thenCompose来简化代码-除非您希望保留在程序中引入异步操作的选项.该方法的未来版本.

标签:completable-future,java-8,java
来源: https://codeday.me/bug/20191111/2017664.html