其他分享
首页 > 其他分享> > spark rpc

spark rpc

作者:互联网

Spark2 rpc

一、前言

Spark在1.6之前的通信使用的是akka框架,在1.6可选用akka还是netty,2.0之后摒弃akka。Akka也是一个优秀的框架,为什么摒弃它官方给出的原因如下:

1、很多Spark用户自己也是使用Akka,但是由于Akka版本之间无法互相通信,这就要求用户必须使用跟Spark完全相同的版本,导致用户无法升级Akka。

2、Spark用的Akka配置是针对Spark自身来调优的,可能跟用户自己代码中的Akka配置冲突。

3、Spark用的Akka特性很少,这部分特性很容易自己实现,同时这部分代码量相比Akka来说少很多,debug比较容易。如果遇到什么bug,也可以自己马上fix,不需要等到Akka上游发布新版本。而且Spark审计Akka本身因为第一点会强制要求用户升级他们的Akka版本,对某些用户来说这代价是比较昂贵的或者是不太现实的。

二、通信组件概览

 

 

 

RpcEnv:RPC环境,每个Rpc端点运行时依赖的环境

RPCEndpoint:RPC端点,Spark将每个通信实体都称之为一个Rpc端点,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的信息和不同的业务处理。

RpcEndpointRef:一个对RpcEndpoint的远程应用对象,通过它可以想远程端点发送信息一进行通信。

如果想要与一个RpcEndpoint端进行通信,一定需要获取到该RpcEndpoint一个RpcEndpointRef,通过RpcEndpointRef与RpcEndpoint进行通信,只能通过一个RpcEnv环境对象来获取RpcEndpoint对应的RpcEndpointRef。

客户端通过RpcEndpointRef发消息,首先通过RpcEnv来处理这个消息,找到这个消息具体发给谁,然后路由给RpcEndpoint实体。

Dispatcher:消息分发器,负责将RpcMessage分发至对应的RpcEndpoint。其包含一个Meseageloop,它读取LinkedBlockingQueue中投递的的RpcMessage,根据客户端指定的Endpoint表示,找到Endpoint的Inbox,然后投递进去,由于是阻塞队列,当没有消息的时候自然阻塞,一旦有消息,就开始工作。Dispatcher的ThreadPool负责消费这些message。

Inbox:一个本地端点对应一个收件箱,Inbox里面有一个InboxMessage的链表,InboxMessage有很多子类,可以是远程调用过来的RpcMessage,可以是远程调用过来的fire-and-forget的单向消息OneWayMessage,还可以是各种服务启动、链路建立断开等Message,这些message都会在Inbox内部的方法内做模式匹配,调用相应的RpcEndpoint的函数。

Outbox:一个远程端点对应一个收件箱,NettyRpcEnv中包含一个ConcurrentHashMap[RpcAddress,Outbox]。当消息放入Outbox后,紧接着将消息通过TransportClient发送出去

TranSportClient:Netty通信客户端,一个Outbox对应一个TranSportClient,根据Outbox消息的receiver消息,请求对应远程TranSportClient。

TranSportServer:Netty通信服务端,一个Rpc端点对应一个TranSportServer,接远程消息后调用Dispatcher分发消息至对应的收发件箱。

三、通信

 

 

 

 

EndPoint启动后,默认会向Inbox中添加Onstart消息,不同的端点消费Onstart指令时,进行相关端点的启动处理流程。

Endpoint启动时,会默认启动TranSportServer,且启动结束后会进行一次同步测试可用性。

Dispatcher作为一个分发器,内部存放了Inbox句柄和状态数据(outboxes在NettyRpcEnv中)

在构建Dispatcher时会创建一个线程池,该线程池循环从receivers阻塞队列中取出EndpointData处理,如果receivers中没有EndpointData,就阻塞。有EndpointData就从该EndpointData的Inbox中取出消息消费。

 

 

标签:RpcEndpoint,rpc,Inbox,端点,spark,Akka,Spark,消息
来源: https://www.cnblogs.com/lsbigdata/p/11475352.html