从NIO到netty(7) ServerBootstrap和Netty自带的Future
作者:互联网
上一篇我们分析了EventLoop相关的东西。这一篇接着分析ServerBootstrap
serverBootstrap.group(bossgroup,worker).channel(NioServerSocketChannel.class).option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000).attr(id,"123").handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer...)
找到ServerBootstrap group方法,这个方法主要是给
private volatile EventLoopGroup childGroup;以及父类中的volatile EventLoopGroup group;赋值。
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this; }
进入channel方法:
AbstractBootstrap#channel
public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); }
实际这段代码的主要作用就是给AbstractBootstrap中的这个变量赋值
private volatile ChannelFactory<? extends C> channelFactory;
执行完成后,channelFactory是一个ReflectiveChannelFactory,从名字也能看出这是一个通过反射来实现的channel工厂,我们传入的clazz对象是NioServerSocketChannel
option和attr方法涉及到channelOption和AttributeKey,在引导类中就是给AbstractBootstrap中的
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>(); private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>()
赋值。
handler方法给下面变量赋值 private volatile ChannelHandler handler;
执行完这句话后的serverbootstrap
接着走
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
这里出现了一个ChannelFuture,我们知道JDK中也有一个Future,用于接受异步计算返回的结果
但是JDK的future有一个缺点,调用get()方法将会阻塞主线程,这就违背了异步变成的初衷,Netty的ChannelFuture继承了Future并且改进了这一缺点
public interface Future<V> extends java.util.concurrent.Future<V> {
boolean isSuccess();//是否计算成功
boolean isCancellable();//可以被取消
Throwable cause();//原因
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);//添加一个监听器
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);//添加多个监听器
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);//移除一个监听器
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);//移除多个监听器
Future<V> sync() throws InterruptedException;//等待结果返回
Future<V> syncUninterruptibly();//等待结果返回,不能被中断
Future<V> await() throws InterruptedException;//等待结果返回
Future<V> awaitUninterruptibly();//等待结果返回,不能被中断
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
V getNow();//立刻返回,没有计算完毕,返回null,需要配合isDone()方法判定是不是已经完成,因为runnable没有返回结果,
//而callable有返回结果
boolean cancel(boolean mayInterruptIfRunning); //取消
}
从ChannelFuture的方法中我们可以看到很多监听器,也就是说某个计算完成时,会触发相应的事件,得到Future结果,这是观察者模式的一种实现。这就避免了jdk的get方法,如果调用早了就要等待的尴尬。JDK中的Futre还有isDone里边有2种情况,无法区分到底是正常的io完毕返回的true还是被取消之后返回的true,所以到了netty的Future里边加了一个isSuccess()方法,只有正常的io处理结束isSuccess()才返回true。
走一篇doc文档
/**
* The result of an asynchronous {@link Channel} I/O operation.
* Channel的异步io操作的结果。
* <p>
* All I/O operations in Netty are asynchronous. It means any I/O calls will
* return immediately with no guarantee that the requested I/O operation has
* been completed at the end of the call. Instead, you will be returned with
* a {@link ChannelFuture} instance which gives you the information about the
* result or status of the I/O operation.
* netty中所有的i/o都是异步的,意味着很多i/o操作被调用过后会立刻返回,并且不能保证i/o请求操作被调用后计算已经完毕,
* 替代它的是返回一个当前i/o操作状态和结果信息的ChannelFuture实例。
* <p>
* A {@link ChannelFuture} is either <em>uncompleted</em> or <em>completed</em>.
* When an I/O operation begins, a new future object is created. The new future
* is uncompleted initially - it is neither succeeded, failed, nor cancelled
* because the I/O operation is not finished yet. If the I/O operation is
* finished either successfully, with failure, or by cancellation, the future is
* marked as completed with more specific information, such as the cause of the
* failure. Please note that even failure and cancellation belong to the
* completed state.
* 一个ChannelFuture要么是完成的,要么是未完成的。当一个i/o操作开始的时候,会创建一个future 对象,future 初始化的时候是为完成的状态,
* 既不是是成功的,或者失败的,也不是取消的,因为i/o操作还没有完成,如果一个i/o不管是成功,还是失败,或者被取消,future 会被标记一些特殊
* 的信息,比如失败的原因,请注意即使是失败和取消也属于完成状态。
*
+---------------------------+ * | Completed successfully | * +---------------------------+ * +----> isDone() = true | * +--------------------------+ | | isSuccess() = true | * | Uncompleted | | +===========================+ * +--------------------------+ | | Completed with failure | * | isDone() = false | | +---------------------------+ * | isSuccess() = false |----+----> isDone() = true | * | isCancelled() = false | | | cause() = non-null | * | cause() = null | | +===========================+ * +--------------------------+ | | Completed by cancellation | * | +---------------------------+ * +----> isDone() = true | * | isCancelled() = true | * +---------------------------+
*
* Various methods are provided to let you check if the I/O operation has been
* completed, wait for the completion, and retrieve the result of the I/O
* operation. It also allows you to add {@link ChannelFutureListener}s so you
* can get notified when the I/O operation is completed.
* ChannelFuture提供了很多方法让你检查i/o操作是否完成、等待完成、获取i/o操作的结果,他也允许你添加ChannelFutureListener
* 因此可以在i/o操作完成的时候被通知。
* <h3>Prefer {@link #addListener(GenericFutureListener)} to {@link #await()}</h3>
* 建议使用addListener(GenericFutureListener),而不使用await()
* It is recommended to prefer {@link #addListener(GenericFutureListener)} to
* {@link #await()} wherever possible to get notified when an I/O operation is
* done and to do any follow-up tasks.
* 推荐优先使用addListener(GenericFutureListener),不是await()在可能的情况下,这样就能在i/o操作完成的时候收到通知,并且可以去做
* 后续的任务处理。
* <p>
* {@link #addListener(GenericFutureListener)} is non-blocking. It simply adds
* the specified {@link ChannelFutureListener} to the {@link ChannelFuture}, and
* I/O thread will notify the listeners when the I/O operation associated with
* the future is done. {@link ChannelFutureListener} yields the best
* performance and resource utilization because it does not block at all, but
* it could be tricky to implement a sequential logic if you are not used to
* event-driven programming.
* addListener(GenericFutureListener)本身是非阻塞的,他会添加一个指定的ChannelFutureListener到ChannelFuture
* 并且i/o线程在完成对应的操作将会通知监听器,ChannelFutureListener也会提供最好的性能和资源利用率,因为他永远不会阻塞,但是如果
* 不是基于事件编程,他可能在顺序逻辑存在棘手的问题。
* <p>
* By contrast, {@link #await()} is a blocking operation. Once called, the
* caller thread blocks until the operation is done. It is easier to implement
* a sequential logic with {@link #await()}, but the caller thread blocks
* unnecessarily until the I/O operation is done and there's relatively
* expensive cost of inter-thread notification. Moreover, there's a chance of
* dead lock in a particular circumstance, which is described below.
*相反的,await()是一个阻塞的操作,一旦被调用,调用者线程在操作完成之前是阻塞的,实现顺序的逻辑比较容易,但是他让调用者线程等待是没有必要
* 的,会造成资源的消耗,更多可能性会造成死锁,接下来会介绍。
* <h3>Do not call {@link #await()} inside {@link ChannelHandler}</h3>
* 不要再ChannelHandler里边调用await()方法
* <p>
* The event handler methods in {@link ChannelHandler} are usually called by
* an I/O thread. If {@link #await()} is called by an event handler
* method, which is called by the I/O thread, the I/O operation it is waiting
* for might never complete because {@link #await()} can block the I/O
* operation it is waiting for, which is a dead lock.
* ChannelHandler里边的时间处理器通常会被i/o线程调用,如果await()被一个时间处理方法调用,并且是一个i/o线程,那么这个i/o操作将永远不会
* 完成,因为await()是会阻塞i/o操作,这是一个死锁。
* <pre>
* // BAD - NEVER DO THIS 不推荐的使用方式
* {@code @Override}
* public void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
* {@link ChannelFuture} future = ctx.channel().close();
* future.awaitUninterruptibly();//不要使用await的 方式
* // Perform post-closure operation
* // ...
* }
*
* // GOOD
* {@code @Override} //推荐使用的方式
* public void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
* {@link ChannelFuture} future = ctx.channel().close();
* future.addListener(new {@link ChannelFutureListener}() {//使用时间的方式
* public void operationComplete({@link ChannelFuture} future) {
* // Perform post-closure operation
* // ...
* }
* });
* }
* </pre>
* <p>
* In spite of the disadvantages mentioned above, there are certainly the cases
* where it is more convenient to call {@link #await()}. In such a case, please
* make sure you do not call {@link #await()} in an I/O thread. Otherwise,
* {@link BlockingOperationException} will be raised to prevent a dead lock.
* 尽管出现了上面提到的这些缺陷,但是在某些情况下更方便,在这种情况下,请确保不要再i/o线程里边调用await()方法,
* 否则会出现BlockingOperationException异常,导致死锁。
* <h3>Do not confuse I/O timeout and await timeout</h3>
*不要将i/o超时和等待超时混淆。
* The timeout value you specify with {@link #await(long)},
* {@link #await(long, TimeUnit)}, {@link #awaitUninterruptibly(long)}, or
* {@link #awaitUninterruptibly(long, TimeUnit)} are not related with I/O
* timeout at all. If an I/O operation times out, the future will be marked as
* 'completed with failure,' as depicted in the diagram above. For example,
* connect timeout should be configured via a transport-specific option:
* 使用await(long)、await(long, TimeUnit)、awaitUninterruptibly(long)、awaitUninterruptibly(long, TimeUnit)设置的超时时间
* 和i/o超时没有任何关系,如果一个i/o操作超时,future 将被标记为失败的完成状态,比如连接超时通过一些选项来配置:
* <pre>
* // BAD - NEVER DO THIS //不推荐的方式
* {@link Bootstrap} b = ...;
* {@link ChannelFuture} f = b.connect(...);
* f.awaitUninterruptibly(10, TimeUnit.SECONDS);//不真正确的等待超时
* if (f.isCancelled()) {
* // Connection attempt cancelled by user
* } else if (!f.isSuccess()) {
* // You might get a NullPointerException here because the future//不能确保future 的完成。
* // might not be completed yet.
* f.cause().printStackTrace();
* } else {
* // Connection established successfully
* }
*
* // GOOD//推荐的方式
* {@link Bootstrap} b = ...;
* // Configure the connect timeout option.
* <b>b.option({@link ChannelOption}.CONNECT_TIMEOUT_MILLIS, 10000);</b>//配置连接超时
* {@link ChannelFuture} f = b.connect(...);
* f.awaitUninterruptibly();
*
* // Now we are sure the future is completed.//确保future 一定是完成了。
* assert f.isDone();
*
* if (f.isCancelled()) {
* // Connection attempt cancelled by user
* } else if (!f.isSuccess()) {
* f.cause().printStackTrace();
* } else {
* // Connection established successfully
* }
* </pre>
*/
标签:netty,NIO,Netty,ChannelFuture,await,future,link,operation,Future 来源: https://blog.csdn.net/m0_37139189/article/details/89874088