首页 > TAG信息列表 > reactive-programming

java-为什么当响应具有HTTP错误状态代码时,为什么只允许一个连接允许订阅者?

在休息请求中,我使用Spring WebClient将另一个请求发送到Web服务,并想将结果返回给调用者: return webClient.post() .uri(url) .body(...) .retrieve() .bodyToMono(String::class.java) .map { ResponseEntity.ok(it) } 现在,Web服务返回了HTTP错误状态代码,我收到以

c#-如何在观察未完成的情况下或在长时间运行的可观察的热状态下在IObservable上使用组函数

我有以下可观察的 IObservable<int> observable = Observable.Range(0, 10); 我有以下订阅 observable.Subscribe(v => Console.WriteLine("Received value: " + v)); observable.Sum().Subscribe(s => Console.WriteLine("Sum so far: " + s)); 我想要的输出

C#-ReactiveList问题

我们对ReactiveUI相对较新,因此这可以解释为什么我们在使视图模型正常工作时遇到一些问题. 在我们的视图模型中,我们有一个类的ReactiveList,在该类中有一个“ selected”的. 在视图模型中,我们希望有一个“ AnySelected”属性,这样,如果列表中至少有1个项目标记为选中,则AnySelect

javascript-RxJs处理异常而不终止

我不确定我对此有多正确,因此,如果有任何专家能够对此进行纠正,也将不胜感激.我目前的理解是,可观察变量是惰性的,直到订阅后才产生值.如果发生错误,则可观察对象将不再发送任何值.在很多情况下,这不是想要的. 在下面的代码示例中,我正在获取珀斯和伦敦的天气情况,如果发生错误,则

RxJava Backpressure(快速生产者缓慢的消费者)

我有执行方法,它会在io线程上进行一些耗时的网络调用 例 /** * network call * @param value * @return */ private Observable<Integer> execute(final int value) { return Observable.create(new Observable.OnSubscribe<Integer>() { @Override publi

带有vararg observables的RxJava zip

当我们确切知道我们的确切类型有多少个可观察对象并且我们想压缩时,我们就这样做 Observable<String> data1 = Observable.just("one", "two", "three", "four", "five"); Observable<String> data2 = Observable.just("one", "two&

在rxjava中使用Observable序列进行一对多映射

给定一系列源对象的Observable,如何使用rxjava从每个输入对象映射多个输出对象? (一对多映射) 我有一些菜式清单,它们代表组成餐厅订单的物品.我需要将每个菜转变为一个或多个OrderLine. 每张菜品地图都会为其名称价格创建一个OrderLine,为每个浇头创建一个OrderLine,如果有注释则创

的CombineLatest保留可观察的顺序吗?

我对以下重载感兴趣: public static IObservable<IList<TSource>> CombineLatest<TSource>(this params IObservable<TSource>[] sources); public static IObservable<IList<TSource>> CombineLatest<TSource>(this IEnumerable<IObser

响应式流规范1.0发布后,jdbc规范也将变为响应式吗?

我正在学习并使用akka流进行反应式流编程,试图使用async-jdbc-driver或react-jdbc-driver的任何库已有2年了,我发现slick 3.0或rxjava-jdbc-driver提供了异步jdbc api,但我知道slick在JDBC api之上构建了令人惊叹的api,这正在阻塞(如果我错了,请纠正我),因此我认为从系统角度来看,

C#-SelectMany使用ReactiveExtensions占用大量内存

我想创建一个接收图像并返回一些派生对象的管道. 我正在使用一系列位图,对于每个位图我都执行任务(即异步).如此简单.但是,我发现内存消耗确实很高.为了说明问题,我创建了可以运行的测试. 请查看一下内存,因为它将占用多达400 MB的RAM. 我该怎么做才能避免占用太多内存?这里发生了什

java-Spring MVC与反应流集成

我在Spring MVC上构建了一个RESTful API应用程序. 最近,我在spring mvc和反应式流之间进行了一些集成(例如rxjava和project-reactor),并尝试使应用程序更具反应性. 我刚刚在下面建立了一些演示: 1.对于rxjava,我使用PublishSubject private SerializedSubject<StreamResult, Stream

结合多个可观察对象的RxJava弹性方式

我有多个返回Observable的模块:O1,O2,O3 … 所有模块的结果应合并为一个可观察到的Ocomb,以使单个任务可能失败,但是合并不会因单个问题而终止或影响.使用当前的解决方案,我遇到了以下示例中的各种问题: 这段代码结合了我的模块的输出: public Observable<Data> getModuleData(){

java-可以重用运算符执行

给出以下示例(kotlin代码) val subject = PublishSubject.create<Int>() val stream = subject.map { println("mapping") it * 2 } stream.forEach { println("A: $it") } stream.forEach { println("B: $it") } subject.onNext(1)

javascript-错误后继续订阅

我有一个代码,其中我为每个id发出ajax请求并在结果出现时对其进行处理.这是我的实际代码和jsfiddle的简单副本: var ids = [1,2,3,4,5,6]; var ids$= (id) => { return Observable.of(id); }; var loadIds$= (id) => { if(id == 4) return xhr('/echo/jsoneee/', {id: id})

c#-使用观察值测量响应时间

给定Observable A代表传入的请求… interface IRequest { string Id { get; } } IObservable<IRequest> requests; 和代表响应的可观察B … IObservable<string> responses; 我需要测量产生响应所需的时间.我当前的解决方案如下所示.这行得通,但我不确定是否可以简化或更简

Flutter Bloc Pattren Stream Transformer对象导致语法错误

我是Flutter和Dart语言的新手.在继续学习本教程的同时,我创建了一个具有2个StreamTransformers的验证器类,这是在尝试对batt pattren进行的,在该类中,用户将在2个TextField中键入电子邮件和密码,因此每次文本更改时都会进行验证. 如果我使用传入的电子邮件或密码,则会识别大量错误,

IObservable等待获取所有元素错误

我有这个课: public class TestService { public IObservable<int> GetObservable(int max) { var subject = new Subject<int>(); Task.Factory.StartNew(() => {

javascript-如何根据培根中的某些EventStream更改切换流

考虑从http://baconjs.github.io/开始的这个例子 var up = $('#up').asEventStream('click'); var down = $('#down').asEventStream('click'); var counter = // map up to 1, down to -1 up.map(1).merge(down.map(-1)) // accumul

javascript-检测对象是否为Stream类的实例的最佳方法是什么?

有没有一种方法可以检测对象是否是流类的实例?例如RxJS或Bacon.js流. 我正在寻找的是像 function isStream(obj) { // if obj is RxJS or Bacon Stream return true, otherwise false } 最可靠的方法是什么?解决方法:Observable是EventStream和Property对象都继承的基类.因此,如

使用多线程RxJava进行反应式拉取

我正在尝试在RxJava中构建反应式拉式观察器. 我的观察者是这样的: Observable<Command> myObs = Observable.create(s -> { Command command; int i = 0; do { command = NetworkOperation1.call(i); logger.info("Init command " + i); s.onNext(comma

javascript-如何使用Neo4j Reactivity驱动程序发布/订阅Meteor

我正在使用Meteor应用程序,您可以在其中创建Neo4j“ Room”节点和“ DOOR”关系.每个房间只能有一个通往其他房间的门. 我正在使用Dmitriy Aristarkhovich的Neo4j Reactivity驱动程序. 我想创建一个实时的主从系统,其中用户做出的选择会影响其他可能的选择. 为了说明这一点,在端口7

java-按需执行热的Observable

举一个冷酷的例子: Observable<Integer> cold = Observable.create(subscriber -> { try { for (int i = 0; i <= 42; i++) { // avoid doing unnecessary work if (!subscriber.isUnsubscribed()) { break; } subscriber.onNex

javascript-似乎找不到有关测试Cycle.js应用程序的资源

我一直在尝试搜索有关测试Cycle.js应用程序的指南,但似乎找不到.有人可以向我指出指南或提供一些示例吗?解决方法:从Cycle.js.org开始: Sources and sinks can be easily used as 07001. This also means testing is mostly a matter of feeding inputs and inspecting the output.

java-vert.x:如何正确发送发布请求?

我有vert.x个具有最简单授权的应用服务器,其路由如下: router.post("/user/sign").handler(this::userSignIn); 当我从GUI开始时,它工作正常. 但是我想为此动作创建Unit-Test. 我做的代码: @Test public void testServerUserRegister(TestContext context) { HttpClient client

java-Project Reactor:如何延迟(节流)每个元素的发射?

考虑以下通量 Flux.range(1, 5) .parallel(10) .runOn(Schedulers.parallel()) .map(i -> "https://www.google.com") .flatMap(uri -> Mono.fromCallable(new HttpGetTask(httpClient, uri))) HttpGetTask是一个Callable,在这种情况下其实际实现是无关紧要的,它对给定的