首页 > TAG信息列表 > project-reactor

java-为什么当响应具有HTTP错误状态代码时,为什么只允许一个连接允许订阅者?

在休息请求中,我使用Spring WebClient将另一个请求发送到Web服务,并想将结果返回给调用者: return webClient.post() .uri(url) .body(...) .retrieve() .bodyToMono(String::class.java) .map { ResponseEntity.ok(it) } 现在,Web服务返回了HTTP错误状态代码,我收到以

java-Spring MVC与反应流集成

我在Spring MVC上构建了一个RESTful API应用程序. 最近,我在spring mvc和反应式流之间进行了一些集成(例如rxjava和project-reactor),并尝试使应用程序更具反应性. 我刚刚在下面建立了一些演示: 1.对于rxjava,我使用PublishSubject private SerializedSubject<StreamResult, Stream

java-Project Reactor文档

在哪里可以找到Reactor Core的文档(不是API javadoc),类似于Reactor Kafka Docs,可以阅读.是否存在? Reactor文档的结构如何,其dedicated page似乎不包含所有模块的参考文档,只有Reactor Kafka,只有API.这是设计使然吗? 解决方法:它本身不是设计使然,但是Reactor 3内核的参考指南尚不

使用IntelliJ IDEA调试Spring WebFlux / Reactor应用程序

我正在使用IntelliJ IDEA创建一个Spring WebFlux / Reactor应用程序. IDEA的调试器显示了许多无用的行,例如MonoDefer,MonoFlatMap等.有没有一种方法可以轻松地跟踪堆栈? 解决方法:在异步世界中,不幸的是堆栈跟踪失去了很多意义.在这里,您将看到一个堆栈,其中显示了构成整个反应链的

java-项目反应堆通量的并行处理

我刚开始从事反应堆或反应式编程的工作,所以我可能做错了什么.我正在努力建立一个执行以下任务的流程: 给定一个类Entity: Entity { private Map<String, String> items; public Map<String, String> getItems() { return items; } } >从数据库读取实体(Listen

java-无法通过spring web reacting显示背压

我正在尝试使用spring-web-active来显示背压,就像在akka中显示它的方式-https://www.youtube.com/watch?v=oS9w3VenDW0 (在28:20至29:20之间观看). 要尝试一下,我使用了以下来自github https://github.com/bclozel/spring-boot-web-reactive的示例项目 设置项目后,我在HomeControll

java-Project Reactor:如何延迟(节流)每个元素的发射?

考虑以下通量 Flux.range(1, 5) .parallel(10) .runOn(Schedulers.parallel()) .map(i -> "https://www.google.com") .flatMap(uri -> Mono.fromCallable(new HttpGetTask(httpClient, uri))) HttpGetTask是一个Callable,在这种情况下其实际实现是无关紧要的,它对给定的

如何使用Spring Boot WebClient收集分页的API响应?

我有一个来自URL的分页响应,我想继续点击从上一个响应中获得的下一个页面URL,并继续收集项目,直到我的响应中没有“ nextPage” URL.如何使用WebFlux的Spring Boot WebClient以无阻碍的方式以反应方式实现此目标? Request1: GET /items response: { items:

java-如何从Http集成流程创建Spring Reactor Flux?

我有一个非常类似于这个How to create a Spring Reactor Flux from a ActiveMQ queue?的问题 区别在于消息来自Http端点而不是JMS队列.问题是由于某些原因而无法填充消息通道,或者Flux.from()不会拾取它.日志条目显示GenericMessage是从Http Integration流中创建的,并带有有效负载

为什么Spring ReactiveMongoRepository没有Mono的保存方法?

我有一个扩展ReactiveMongoRepository的MovieRepository.我想以反应方式保存单个POJO.但ReactiveMongoRepository不为Mono或Publisher提供保存方法.我必须使用block()方法或在ReactiveMongoRepository中使用saveAll方法. public Mono<ServerResponse> create(ServerRequest reques

spring – 如何使用WebClient限制请求/秒?

我正在使用WebClient对象将Http Post请求发送到服务器. 它正在快速发送大量请求(QueueChannel中有大约4000条消息).问题是……似乎服务器响应速度不够快……所以我得到了很多服务器错误500和connexion过早关闭. 有没有办法限制每秒的请求数量?或者限制它使用的线程数量? 编辑: QueueCh

如何将Mono>转换为Flux

我正在将用RxJava 1.x编写的小项目转换为Reactor 3.x.一切都很好,除了我找不到如何用适当的对应物替换flatMap(Observable :: from).我有Mono< List< String>>我需要将其转换为Flux< String>. 谢谢解决方法:在Reactor 3中,from运算符已经专门用于一些变体,具体取决于原始源(数组,可

如何等待Spring 5 WebClient完成所有请求?

我有一个简单的Java程序,它使用Spring WebClient发送多个请求.每个都返回一个单声道,我使用response.subscribe()来检查结果. 但是,我的主要执行线程在处理所有请求之前完成,除非我添加一个长的Thread.sleep(). 使用CompletableFutures,您可以使用:CompletableFuture.allOf(期货).jo

你如何使用Spring 5的反应式编程实际“管理”最大网络线程数?

使用经典的Tomcat方法时,您可以为服务器提供可用于处理来自用户的Web请求的最大线程数.使用Reactive Programming范例和Spring 5中的Reactor,我们能够更好地垂直扩展,确保我们最小化. 在我看来,这使得它比传统的Tomcat方法更易于管理,在经典的Tomcat方法中,您只需定义最大并发请求

java – Flux没有在Spring 5反应堆中订阅

我可能错过了一些东西,但我无法弄清楚它是什么. 以下代码什么都不做: webClient.get().uri("/some/path/here").retrieve() .bodyToMono(GetLocationsResponse.class) .doOnNext(System.out::println) .subscribe(); 如果我试图阻止呼叫它工作正常: webClient.get(

Java Reactor:如何从stdin生成Flux?

我想异步读取用户从stdin生成的消息. 就像是: Flux.from(stdinPublisher()) .subscribe(msg -> System.out.println("Received: " + msg)); 那么如何在这里实现这样的stdin发布者呢?解决方法:很容易.对不起打扰:) import java.util.Scanner; import lombok.extern.slf4j.Slf4

java – 返回Mono后的方法调用

我想在之前返回Mono< Void>时调用该方法: @Override public Mono<Void> sendEmail(EmailDto emailDto) { return mailReactiveClient.sendEmail(message -> createMessage(emailDto, emailDto.getBody(), message)) .doOnNext(saveNotificationLog(emailDto

reactor-core – java.lang.IllegalStateException:队列已满?!在热发布者(ConnectableFlux)上

到目前为止我一直在使用RxJava,但我开始使用来自projectreactor.io的reactor-core,因为它遵循反应流规范. 在下面的测试中,我创建了一个生成随机数的热Flux(ConnectableFlux).我立即连接()它预取256个值(我可以在日志中看到它们实际上有258个).我等待5秒钟来模拟订阅者直到一段时间

如何取消正在进行的Spring Flux?

我正在使用spring flux向服务发送并行请求,这是非常简化的版本: Flux.fromIterable(customers) .flatMap { customer -> client.call(customer) } ... 我想知道如何取消这种通量,就像在某种程度上获取对通量的引用并告诉它关闭一样.解决方法:您可能知道,对于反应对象,all

java – Reactor compose vs flatMap

我继续玩Reactor,现在我看到compose运算符的行为与flatMap完全一样,我想知道是否有任何我不理解的差异. @Test public void compose() throws InterruptedException { Scheduler mainThread = Schedulers.single(); Flux.just(("old element")) .compose(

然后是什么,然后空,然后很多和平面地图很多在春天的webflux?

我不明白使用和之间的区别,然后是空,然后很多和flatMapMany在春天webflux上的Flux或Mono.解决方法:> flatMap vs flatMapMany 在函数式编程中,flatMap返回的类型与承载该方法的类型相同,因此对于Mono< T>,flatMap返回Mono.这意味着内部发布者只能发出一个元素(或者它被截断).我们通

如何使用Spring WebFlux返回404

我有一个像这样的控制器(在Kotlin): @RestController @RequestMapping("/") class CustomerController (private val service: CustomerService) { @GetMapping("/{id}") fun findById(@PathVariable id: String, @RequestHeader(value = I

spring – 使用Reactive MongoDB和取消操作的流程正在取消

问题出在Project Reactor和Reactive MongoDB(Spring Data)之间. 执行包含(按以下顺序)的流时: >对Reactive MongoDB进行操作的方法非常快 >超过30秒的方法 流被取消(查看下面的代码和日志) @GetMapping("/test/{msg}") public Mono<SomeObject> test(@PathVariable String msg) {

如何在Java Reactor中设置完全背压驱动的通量?

我有一个需要多个工人的情况(比方说2). 工人必须执行消耗上游事件的任务. 手头的任务会消耗一系列事件,并且具有与列表大小无关的恒定时间. 因此,我希望上游只在请求时提供包含所有缓冲事件的列表,一次列出1个列表. 遗憾的是,大多数方法都实现了预取.即使使用,会发生什么limitRate(

如何在Spring Webflux控制器中结合Flux和ResponseEntity

我在我的Webflux控制器中使用Monos和ResponseEntitys来操纵标头和其他响应信息.例如: @GetMapping("/{userId}") fun getOneUser(@PathVariable userId: UserId): Mono<ResponseEntity<UserDto>> { return repository.findById(userId) .map(User::asDto) .