编程语言
首页 > 编程语言> > c#-根据输入序列的元素将IObservable转换为另一个故障或完成的序列

c#-根据输入序列的元素将IObservable转换为另一个故障或完成的序列

作者:互联网

我正在实现一个进程间消息传递系统,例如客户端可以从服务器请求某些数据.服务器需要能够以部分答复的形式发送回其答复,并且还需要能够通知客户以防万一发生异常.

目前,我正在通过3种消息类型执行此操作:

class PartialResponse : ResponseMessage { ... }
class ResponseError : ResponseMessage { ... }
class ResponseComplete : ResponseMessage { ... ]

所以客户端请求数据,服务器发回0-N PartialResponse消息,后跟ResponseError或ResponseComplete.

我正在使用的库(以NetMQ作为其传输层的Obvs)会将所有可能的消息流公开为

IObservable<ResponseMessage>

尽管这个可观察到的流永远不会完成,并且我相信也不会出错(除非对于某些Obvs / NetMQ内部异常或类似情况).

我想将其转换为IObservable< PartialResponse&gt ;,它在原始流推送ResponseComplete消息时完成,并在遇到ResponseError消息或输入流中的实际错误时出错.例如.就像是:

IObservable<PartialResponse> transform(IObservable<ResponseMessage> input)
{
  var subject = new Subject<PartialResponse>();
  input.Subscribe(
    x =>
    {
      if(x is PartialResponse r)
        subject.OnNext(r);
      else if(x is ResponseComplete)
        subject.OnCompleted();
      else if(x is ResponseError err)
        subject.OnError(new Exception(err?.ToString()));
      else
        throw new InvalidOperationException();
    },
    ex =>
    {
      subject.OnError(ex);
    }
  );

  return subject;
}

该代码实际上应该可以工作,但是可能很糟糕-尤其是因为它直接订阅输入的可观察序列.

有没有更好/更干净的方法来转换可观察序列?

解决方法:

@Enigmativity的答案充实了:

var input = new Subject<ResponseMessage>();

var partialResponseObservable = input
    .Select(msg => 
        (msg is PartialResponse r) 
        ? Notification.CreateOnNext(r)
        : (msg is ResponseComplete) 
            ? Notification.CreateOnCompleted<PartialResponse>()
            : (msg is ResponseError err)
                ? Notification.CreateOnError<PartialResponse>(new Exception(err?.ToString()))
                : throw new InvalidOperationException()
    )
    .Dematerialize();

或使用类型匹配(可能更好看):

var partialResponseObservable = input
    .Select(msg =>
    {
        switch(msg)
        {
            case PartialResponse r:
                return Notification.CreateOnNext(r);
            case ResponseComplete rc:
                return Notification.CreateOnCompleted<PartialResponse>();
            case ResponseError err:
                return Notification.CreateOnError<PartialResponse>(new Exception(err?.ToString()));
            default:
                throw new InvalidOperationException();
        }
    })
    .Dematerialize();

标签:observable,system-reactive,rx-net,c
来源: https://codeday.me/bug/20191110/2013639.html