企业级工作流解决方案(八)--微服务Tcp消息传输模型之服务端处理
作者:互联网
服务端启动
服务端启动主要做几件事情,1. 从配置文件读取服务配置(主要是服务监听端口和编解码配置),2. 注册编解码器工厂,3. 启动dotnetty监听端口,4. 读取配置文件,解析全局消息处理模型5. 注册服务端处理对象到容器。
JsonRpcServerModule代码如下,见备注说明
[DependsOn(typeof(AbpKernelModule))] public class JsonRpcServerModule : AbpModule { public override void PreInitialize() { // 注册客户端配置,固定从Xml文件读取 SocketServiceConfiguration socketServiceConfiguration = XmlConfigProvider.GetConfig<SocketServiceConfiguration>("SocketServiceConfiguration.xml"); IocManager.IocContainer.Register( Component .For<ISocketServiceConfiguration>() .Instance(socketServiceConfiguration) ); IocManager.Register<IServiceExecutor, DefaultServiceExecutor>(Dependency.DependencyLifeStyle.Singleton); } public override void Initialize() { IocManager.RegisterAssemblyByConvention(typeof(JsonRpcServerModule).GetAssembly()); var socketServiceConfiguration = Configuration.Modules.RpcServiceConfig(); switch (socketServiceConfiguration.MessageCode) // 根据配置文件,编解码配置选择 { case EMessageCode.Json: IocManager.RegisterIfNot<ITransportMessageCodecFactory, JsonTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton); break; case EMessageCode.MessagePack: IocManager.RegisterIfNot<ITransportMessageCodecFactory, MessagePackTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton); break; case EMessageCode.ProtoBuffer: IocManager.RegisterIfNot<ITransportMessageCodecFactory, ProtoBufferTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton); break; default: break; } RegisterDefaultProtocol(); } public override void PostInitialize() { var socketServiceConfiguration = IocManager.Resolve<ISocketServiceConfiguration>(); // 方法里面调用ServiceHost构造函数传入的委托,启动dotnetty监听 IocManager.Resolve<IServiceHost>().StartAsync(new IpAddressModel("0.0.0.0", socketServiceConfiguration.Port).CreateEndPoint()); // 从配置文件读取json-rpc服务配置,解析消息处理模型 JsonRpcRegister.LoadFromConfig(IocManager); } private void RegisterDefaultProtocol() { var dotNettyServerMessageListener = new DotNettyServerMessageListener(Logger, IocManager.Resolve<ITransportMessageCodecFactory>(), IocManager.Resolve<ISocketServiceConfiguration>()); IocManager.IocContainer.Register( Component .For<IMessageListener>() .Instance(dotNettyServerMessageListener) ); var serviceExecutor = IocManager.Resolve<IServiceExecutor>(); // 新建一个ServiceHost对象,放入容器,这个时候dotnetty还未启动,只是定义了执行方法。 var serverHost = new DefaultServiceHost(async endPoint => { await dotNettyServerMessageListener.StartAsync(endPoint); // 启动dotnetty监听 return dotNettyServerMessageListener; }, serviceExecutor); IocManager.IocContainer.Register( Component .For<IServiceHost>() .Instance(serverHost) ); } }
Dotnetty启动监听代码,参考dotnetty提供的实例代码,ServerHandler为自定义消息处理Chanel
/// <summary> /// 触发接收到消息事件。 /// </summary> /// <param name="sender">消息发送者。</param> /// <param name="message">接收到的消息。</param> /// <returns>一个任务。</returns> public async Task OnReceived(IMessageSender sender, TransportMessage message) { if (Received == null) return; await Received(sender, message); } public async Task StartAsync(EndPoint endPoint) { _logger.Debug($"准备启动服务主机,监听地址:{endPoint}。"); IEventLoopGroup bossGroup = new MultithreadEventLoopGroup(1); IEventLoopGroup workerGroup = new MultithreadEventLoopGroup();//Default eventLoopCount is Environment.ProcessorCount * 2 var bootstrap = new ServerBootstrap(); bossGroup = new MultithreadEventLoopGroup(1); workerGroup = new MultithreadEventLoopGroup(); bootstrap.Channel<TcpServerSocketChannel>(); bootstrap .Option(ChannelOption.SoBacklog, _socketServiceConfiguration.Backlog) .ChildOption(ChannelOption.Allocator, PooledByteBufferAllocator.Default) .Group(bossGroup, workerGroup) .ChildHandler(new ActionChannelInitializer<IChannel>(channel => { var pipeline = channel.Pipeline; pipeline.AddLast(new LengthFieldPrepender(4)); pipeline.AddLast(new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4)); pipeline.AddLast(new TransportMessageChannelHandlerAdapter(_transportMessageDecoder)); pipeline.AddLast(new ServerHandler(async (contenxt, message) => { var sender = new DotNettyServerMessageSender(_transportMessageEncoder, contenxt); await OnReceived(sender, message); }, _logger)); })); try { _channel = await bootstrap.BindAsync(endPoint); _logger.Debug($"服务主机启动成功,监听地址:{endPoint}。"); } catch { _logger.Error($"服务主机启动失败,监听地址:{endPoint}。 "); } }
消息最终经过解码处理之后,会落到DefaultServiceExecutor类进行处理,在这里调用JsonRpcProcessor静态类的Process方法,处理Json-Rpc请求,并构造答复消息,答复客户端。
public class DefaultServiceExecutor : IServiceExecutor { private readonly ILogger _logger; public DefaultServiceExecutor(ILogger logger) { _logger = logger; } public async Task ExecuteAsync(IMessageSender sender, TransportMessage message) { _logger.Debug("服务提供者接收到消息"); if (!message.IsInvokeMessage()) return; JsonRequest jsonRequest; try { jsonRequest = message.GetContent<JsonRequest>(); } catch (Exception exception) { _logger.Error("将接收到的消息反序列化成 TransportMessage<JsonRequest> 时发送了错误。", exception); return; } _logger.Debug("准备执行本地逻辑。"); var resultMessage = await LocalExecuteAsync(jsonRequest, message.Headers); //向客户端发送调用结果。 await SendRemoteInvokeResult(sender, message.Id, JsonConvert.DeserializeObject<JsonResponse>(resultMessage)); } private async Task<string> LocalExecuteAsync(JsonRequest jsonRequest,object headers) { return await JsonRpcProcessor.Process(JsonConvert.SerializeObject(jsonRequest), headers); } private async Task SendRemoteInvokeResult(IMessageSender sender, string messageId, JsonResponse resultMessage) { try { _logger.Debug("准备发送响应消息。"); await sender.SendAndFlushAsync(TransportMessage.CreateInvokeResultMessage(messageId, resultMessage, new NameValueCollection())); _logger.Debug("响应消息发送成功。"); } catch (Exception exception) { _logger.Error("发送响应消息时候发生了异常。", exception); } } }
这部分内容没有太多的说明,参见surging
标签:IocManager,message,Tcp,企业级,var,new,logger,public,服务端 来源: https://www.cnblogs.com/spritekuang/p/10805768.html