Netty源码分析——Channel初始化
作者:互联网
前面的章节相对来说比较容易理解,从本节开始,真正的挑战开始了,加油!
本节从这段代码开始。
ServerBootstrap serverBootstrap = new ServerBootstrap() .group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ServerHandler()); } }); ChannelFuture channelFuture = serverBootstrap.bind(6666).sync(); //绑定端口
ServerBootstrap是netty服务端应用程序的引导器,Bootstrap是socket客户端程序的的引导器,相对来说比服务端简单。
这部分采用链式编程,每次方法的调用返回本身ServerBootstrap的引用。
进入ServerBootstrap类中的group方法,在ServerBootstrap类中保存了boss和worker事件循环组(EventLoopGroup)的引用,为了后续使用。
再看channel方法,传入NioServerSocketChannel.class作为参数。执行channelFactory(new ReflectiveChannelFactory<C>(channelClass));
这里会实例化一个Channel工厂ReflectiveChannelFactory,当下面初始化channel时就会通过调用newChannel()来反射出对应channel的实例
public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); }
ReflectiveChannelFactory类的其他方法省略
@Override public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } }
再继续执行进入这个方法,会在当前对象中保存channel工厂的实例,为后续调用。
@Deprecated @SuppressWarnings("unchecked") public B channelFactory(ChannelFactory<? extends C> channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return (B) this; }
接下来执行ChannelFuture channelFuture = serverBootstrap.bind(6666).sync();这里完成的工作很多,一步一步往下看。
经过几步的追踪回进入以下方法
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); //初始化和注册通道,这里完成两个主要的操作,本节的初始化就在这里开始。 final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
进入initAndRegister方法
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); //这里就时上文提到的调用channel工厂的newChannel方法发生出一个NioServerSocketChannel,这里有一个细节需要提醒一下,实例化NioServerSocketChannel时,会实例化一个ChannelPipline,
ChannelPipline是一个很重要的概念,这里提一下,记住每一个Channel都对应一个ChannelPipline,就是在实例化Channel的时候同时实例化的。 init(channel); //这里就是初始化这个通道 } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); //这里时下节要说的注册这个通道的方法入口,预告一下 if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } // If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread. return regFuture; }
才进入主题init(channel)方法,因为抽象方法,程序会进入到ServerBootstrap类中重写的方法
@Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); //将配置的选项设置到通道,本例中没有设置通道选项 } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { //将属性设置到通道 for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); //获取pipline final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { //在pipline中加入一个ChannelInitializer处理器。 @Override public void initChannel(Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); } // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler. // In this case the initChannel(...) method will only be called after this method returns. Because // of this we need to ensure we add our handler in a delayed fashion so all the users handler are // placed in front of the ServerBootstrapAcceptor. ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
要理解上面代码的意图,需要后面channelpipline的知识,这里先简单介绍一下。简单来说一个channel就管理一个channelpipline,两者相互关联,都保存着对方的引用。
要解释channelpipline就要提到channelhandler,channelhandler就是真正处理socket事件的处理器,而channelpipline中保存着一个channelhandler的链表(实际上是channelhandlercontext链表,这里简单理解,后续详细说明),这里的p.addLast(new ChannelInitializer<Channel>()方法就是相当于在channelhandler链表中中加入了一个节点,用于处理下一节要说的注册的。这里的ChannelInitializer实际上就是一个入站处理器,这里重写的方法还会再下一节再次说明。我还会再回来的!
至此通道的初始化完成,主要初始化通道的一些基本信息,未后续的处理做好准备。
标签:Netty,channelFactory,regFuture,源码,Channel,new,final,channel 来源: https://www.cnblogs.com/chirsli/p/12540216.html