其他分享
首页 > 其他分享> > DotNetty实现高性能tcpserver

DotNetty实现高性能tcpserver

作者:互联网

DotNetty实现高性能tcpserver,超时断开链路,垃圾包,断包,粘包处理

 

初始化类

using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using System;
using System.Threading;

namespace DotNettyUtil.tcpserver
{
    public class TcpServerIntance
    {
        private int port;
        private IProtocolHandler handle;
        private static int READ_IDEL_TIME_OUT = 0; // 读超时
        private static int WRITE_IDEL_TIME_OUT = 0;// 写超时
        private static int ALL_IDEL_TIME_OUT = 90; // 所有超时
        //
        IEventLoopGroup bossGroup;
        IEventLoopGroup workerGroup;
        IChannel boundChannel;

        public TcpServerIntance(int port, IProtocolHandler handle)
        {
            this.port = port;
            this.handle = handle;

            ThreadPool.QueueUserWorkItem(new WaitCallback(RunThread));
        }

        async void RunThread(object obj)
        { 
            bossGroup = new MultithreadEventLoopGroup();
            workerGroup = new MultithreadEventLoopGroup();

            try
            {
                var bootstrap = new ServerBootstrap();
                bootstrap.Group(bossGroup, workerGroup);
                bootstrap.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
                {
                    IChannelPipeline pipeline = channel.Pipeline;
                    pipeline.AddLast(new IdleStateHandler(READ_IDEL_TIME_OUT, WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT)); // 需要定时心跳包
                    pipeline.AddLast(new TcpServerHandler(handle));
                }));
                bootstrap.Channel<TcpServerSocketChannel>()
                .Option(ChannelOption.SoBacklog, 128)
               .ChildOption(ChannelOption.SoKeepalive, true);
                //
                LogHelper.AppendLog("TcpServer 监听端口:" + port);
                //
                boundChannel = await bootstrap.BindAsync(port);
            }
            catch (Exception ex)
            {
                LogHelper.AppendLog("[Error] TcpServer_RunThread,errmsg=" + ex.Message);
            }
        }

        void Close()
        {
            try
            {
                boundChannel.CloseAsync();
            }
            catch (Exception ex)
            {
                LogHelper.AppendLog("[Error] TcpServer_Close,errmsg=" + ex.Message);
            }
            finally
            {
                bossGroup.ShutdownGracefullyAsync();
                workerGroup.ShutdownGracefullyAsync();
            }
        }
    }
}

 

协议解析类

using DotNetty.Buffers;
using DotNetty.Common.Utilities;
using DotNetty.Transport.Channels;
using DotNettyUtil.tcpserver;
using System;

namespace TcpServer
{
    public class ClientProtocol : IProtocolHandler
    {
        const int HEAD1 = 0x48;// H
        const int HEAD2 = 0x54;// T
        const int HEAD3 = 0x45;// E
        const int HEAD4 = 0x4D;// M
        const int HEAD5 = 0x50;// P
        const int HEAD6 = 0x3D;// =
        public const char SPLIT1 = '#';
        const char SPLIT2 = '@';
        const char SPLIT3 = '=';
        const char SPLIT4 = '+';
        const char SPLIT5 = ',';

        public void ChannelRead(IChannelHandlerContext ctx, object msg)
        {
            if (!ctx.Channel.Active) return;

            string data_content = "";

            try
            {
                string sn = TcpServerMgr.GetSN(ctx);
                UidEntity uidEntity = TcpServerMgr.GetUid(sn);
                string uid = "";

                if (null != uidEntity)
                {
                    uid = uidEntity.Uid;
                    uidEntity.LastTime = Common.GetNowTimestamp();
                }

                if (!TcpServerMgr.dicSn2Buffer.ContainsKey(sn)) TcpServerMgr.dicSn2Buffer.Add(sn, Unpooled.Buffer(1024));
                IByteBuffer oldBuffer = TcpServerMgr.dicSn2Buffer[sn];

                if (null != msg)
                {
                    IByteBuffer recvBuffer = (IByteBuffer)msg;
                    int size = recvBuffer.ReadableBytes;
                    if (size > 0)
                    {
                        //recvBuffer.markReaderIndex();
                        oldBuffer.WriteBytes(recvBuffer);
                        ReferenceCountUtil.Release(recvBuffer);
                        //recvBuffer.resetReaderIndex();
                        //byte[] recv = new byte[size];
                        //recvBuffer.readBytes(recv);
                        //CTxtHelp.AppendLog("接收数据:" + CDataHelper.ArrayByteToString(recv));
                    }
                }

                byte head1 = 0; byte head2 = 0; byte head3 = 0; byte head4 = 0; byte head5 = 0; byte head6 = 0; bool headok = false;

                oldBuffer.MarkReaderIndex();

                while (oldBuffer.IsReadable())
                {
                    head1 = oldBuffer.ReadByte();
                    if (HEAD1 == head1)// 垃圾包处理
                    {
                        head2 = oldBuffer.ReadByte(); if (!oldBuffer.IsReadable()) { oldBuffer.ResetReaderIndex(); return; }
                        head3 = oldBuffer.ReadByte(); if (!oldBuffer.IsReadable()) { oldBuffer.ResetReaderIndex(); return; }
                        head4 = oldBuffer.ReadByte(); if (!oldBuffer.IsReadable()) { oldBuffer.ResetReaderIndex(); return; }
                        head5 = oldBuffer.ReadByte(); if (!oldBuffer.IsReadable()) { oldBuffer.ResetReaderIndex(); return; }
                        head6 = oldBuffer.ReadByte(); if (!oldBuffer.IsReadable()) { oldBuffer.ResetReaderIndex(); return; }
                        if (HEAD2 == head2 && HEAD3 == head3 && HEAD4 == head4 && HEAD5 == head5 && HEAD6 == head6)
                        {
                            headok = true;
                            break;
                        }

                        break;
                    }
                    else
                    {
                        oldBuffer.MarkReaderIndex();
                        LogHelper.AppendLog("Error,Unable to parse the data:" + head1 + " source:" + (string.IsNullOrEmpty(uid) ? sn : uid));
                    }
                }

                if (!oldBuffer.IsReadable())
                {
                    if (headok) oldBuffer.ResetReaderIndex();
                    return;
                }

                //byte[] arrlen = bBuffer.GetByteArray(4); if (!analysis.IsRemainData(iPosition, bBuffer, analysis)) return;

                byte[] arrlen = new byte[4]; oldBuffer.ReadBytes(arrlen); if (!oldBuffer.IsReadable()) { oldBuffer.ResetReaderIndex(); return; }
                int len = Common.String2Int(Common.ByteToString(arrlen)); if (-1 == len) return;

                if (TcpServerMgr.GetWaitRecvRemain(oldBuffer, len)) { oldBuffer.ResetReaderIndex(); return; }

                byte[] source = new byte[len]; oldBuffer.ReadBytes(source);

                string data = Common.ByteToString(source);
                if (null == data || 0 == data.Length || data.Length - 1 != data.LastIndexOf(SPLIT1))
                {
                    return;
                }
                data = data.Substring(1, data.Length - 2);
                string[] item = data.Split(SPLIT1);
                if (null == item || 4 != item.Length)
                {
                    return;
                }

                uid = item[0];
                string taskid = item[1];
                int cmd = Common.String2Int(item[2]);
                string content = item[3];

                //Program.AddMessage("R: [" + sn + "] cmd=" + cmd.ToString() + " data=" + data);  

                switch (cmd)
                {
                    case 1:
                        //analysis.Msg = "ok";
                        TcpServerMgr.AddUid(sn, uid, ctx);
                        LogHelper.AppendLog("心跳反馈,uid=" + uid);
                        break;
                    case 2:
                        //analysis.Msg = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
                        break;
                    case 3:
                        // HTEMP=0263#WaterMeter-001#1520557004#03#buildid=44@edmid=37@meterid=1228@senddate=2018-02-05 17:36:22@[{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}+{132,0.0000}]#
                        //analysis.Msg = "ok";
                        break;
                }

                if (!oldBuffer.IsReadable())
                {
                    oldBuffer.Clear();
                }
                else
                {
                    ChannelRead(ctx, null);// 处理粘包
                }
            }
            catch (Exception ex)
            {
                LogHelper.AppendLog("[Error] ClientProtocol_ChannelRead,data_content=" + data_content + ",errmsg=" + ex.Message);
            } 
        }
    }
}

 

标签:oldBuffer,int,tcpserver,132,高性能,0.0000,byte,DotNetty,data
来源: https://www.cnblogs.com/chen1880/p/14934027.html