其他分享
首页 > 其他分享> > 从NIO到netty(7) ServerBootstrap和Netty自带的Future

从NIO到netty(7) ServerBootstrap和Netty自带的Future

作者:互联网

上一篇我们分析了EventLoop相关的东西。这一篇接着分析ServerBootstrap

serverBootstrap.group(bossgroup,worker).channel(NioServerSocketChannel.class).option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000).attr(id,"123").handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer...)

找到ServerBootstrap group方法,这个方法主要是给

private volatile EventLoopGroup childGroup;以及父类中的volatile EventLoopGroup group;赋值。
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    if (childGroup == null) {
        throw new NullPointerException("childGroup");
    }
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = childGroup;
    return this;
}

进入channel方法:

AbstractBootstrap#channel

public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

实际这段代码的主要作用就是给AbstractBootstrap中的这个变量赋值

private volatile ChannelFactory<? extends C> channelFactory;

执行完成后,channelFactory是一个ReflectiveChannelFactory,从名字也能看出这是一个通过反射来实现的channel工厂,我们传入的clazz对象是NioServerSocketChannel

option和attr方法涉及到channelOption和AttributeKey,在引导类中就是给AbstractBootstrap中的

private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>()

赋值。

handler方法给下面变量赋值
private volatile ChannelHandler handler;

执行完这句话后的serverbootstrap

接着走

ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();

这里出现了一个ChannelFuture,我们知道JDK中也有一个Future,用于接受异步计算返回的结果

但是JDK的future有一个缺点,调用get()方法将会阻塞主线程,这就违背了异步变成的初衷,Netty的ChannelFuture继承了Future并且改进了这一缺点

public interface Future<V> extends java.util.concurrent.Future<V> {
    boolean isSuccess();//是否计算成功
    boolean isCancellable();//可以被取消
    Throwable cause();//原因
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);//添加一个监听器
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);//添加多个监听器
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);//移除一个监听器
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);//移除多个监听器
    Future<V> sync() throws InterruptedException;//等待结果返回
    Future<V> syncUninterruptibly();//等待结果返回,不能被中断
    Future<V> await() throws InterruptedException;//等待结果返回
    Future<V> awaitUninterruptibly();//等待结果返回,不能被中断
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
    boolean await(long timeoutMillis) throws InterruptedException;
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
    boolean awaitUninterruptibly(long timeoutMillis);
    V getNow();//立刻返回,没有计算完毕,返回null,需要配合isDone()方法判定是不是已经完成,因为runnable没有返回结果,
    //而callable有返回结果
    boolean cancel(boolean mayInterruptIfRunning);  //取消                                                          
}
从ChannelFuture的方法中我们可以看到很多监听器,也就是说某个计算完成时,会触发相应的事件,得到Future结果,这是观察者模式的一种实现。这就避免了jdk的get方法,如果调用早了就要等待的尴尬。JDK中的Futre还有isDone里边有2种情况,无法区分到底是正常的io完毕返回的true还是被取消之后返回的true,所以到了netty的Future里边加了一个isSuccess()方法,只有正常的io处理结束isSuccess()才返回true。

 

走一篇doc文档

/**
 * The result of an asynchronous {@link Channel} I/O operation.
 * Channel的异步io操作的结果。
 * <p>
 * All I/O operations in Netty are asynchronous.  It means any I/O calls will
 * return immediately with no guarantee that the requested I/O operation has
 * been completed at the end of the call.  Instead, you will be returned with
 * a {@link ChannelFuture} instance which gives you the information about the
 * result or status of the I/O operation.
 * netty中所有的i/o都是异步的,意味着很多i/o操作被调用过后会立刻返回,并且不能保证i/o请求操作被调用后计算已经完毕,
 * 替代它的是返回一个当前i/o操作状态和结果信息的ChannelFuture实例。
 * <p>
 * A {@link ChannelFuture} is either <em>uncompleted</em> or <em>completed</em>.
 * When an I/O operation begins, a new future object is created.  The new future
 * is uncompleted initially - it is neither succeeded, failed, nor cancelled
 * because the I/O operation is not finished yet.  If the I/O operation is
 * finished either successfully, with failure, or by cancellation, the future is
 * marked as completed with more specific information, such as the cause of the
 * failure.  Please note that even failure and cancellation belong to the
 * completed state.
 * 一个ChannelFuture要么是完成的,要么是未完成的。当一个i/o操作开始的时候,会创建一个future 对象,future 初始化的时候是为完成的状态,
 * 既不是是成功的,或者失败的,也不是取消的,因为i/o操作还没有完成,如果一个i/o不管是成功,还是失败,或者被取消,future 会被标记一些特殊
 * 的信息,比如失败的原因,请注意即使是失败和取消也属于完成状态。
 *

                                    +---------------------------+
*                                      | Completed successfully    |
*                                      +---------------------------+
*                                 +---->      isDone() = true      |
* +--------------------------+    |    |   isSuccess() = true      |
* |        Uncompleted       |    |    +===========================+
* +--------------------------+    |    | Completed with failure    |
* |      isDone() = false    |    |    +---------------------------+
* |   isSuccess() = false    |----+---->      isDone() = true      |
* | isCancelled() = false    |    |    |       cause() = non-null  |
* |       cause() = null     |    |    +===========================+
* +--------------------------+    |    | Completed by cancellation |
*                                 |    +---------------------------+
*                                 +---->      isDone() = true      |
*                                      | isCancelled() = true      |
*                                      +---------------------------+


 *
 * Various methods are provided to let you check if the I/O operation has been
 * completed, wait for the completion, and retrieve the result of the I/O
 * operation. It also allows you to add {@link ChannelFutureListener}s so you
 * can get notified when the I/O operation is completed.
 * ChannelFuture提供了很多方法让你检查i/o操作是否完成、等待完成、获取i/o操作的结果,他也允许你添加ChannelFutureListener
 * 因此可以在i/o操作完成的时候被通知。
 * <h3>Prefer {@link #addListener(GenericFutureListener)} to {@link #await()}</h3>
 * 建议使用addListener(GenericFutureListener),而不使用await()
 * It is recommended to prefer {@link #addListener(GenericFutureListener)} to
 * {@link #await()} wherever possible to get notified when an I/O operation is
 * done and to do any follow-up tasks.
 * 推荐优先使用addListener(GenericFutureListener),不是await()在可能的情况下,这样就能在i/o操作完成的时候收到通知,并且可以去做 
 * 后续的任务处理。
 * <p>
 * {@link #addListener(GenericFutureListener)} is non-blocking.  It simply adds
 * the specified {@link ChannelFutureListener} to the {@link ChannelFuture}, and
 * I/O thread will notify the listeners when the I/O operation associated with
 * the future is done.  {@link ChannelFutureListener} yields the best
 * performance and resource utilization because it does not block at all, but
 * it could be tricky to implement a sequential logic if you are not used to
 * event-driven programming.
 * addListener(GenericFutureListener)本身是非阻塞的,他会添加一个指定的ChannelFutureListener到ChannelFuture
 * 并且i/o线程在完成对应的操作将会通知监听器,ChannelFutureListener也会提供最好的性能和资源利用率,因为他永远不会阻塞,但是如果
 * 不是基于事件编程,他可能在顺序逻辑存在棘手的问题。
 * <p>
 * By contrast, {@link #await()} is a blocking operation.  Once called, the
 * caller thread blocks until the operation is done.  It is easier to implement
 * a sequential logic with {@link #await()}, but the caller thread blocks
 * unnecessarily until the I/O operation is done and there's relatively
 * expensive cost of inter-thread notification.  Moreover, there's a chance of
 * dead lock in a particular circumstance, which is described below.
 *相反的,await()是一个阻塞的操作,一旦被调用,调用者线程在操作完成之前是阻塞的,实现顺序的逻辑比较容易,但是他让调用者线程等待是没有必要
 * 的,会造成资源的消耗,更多可能性会造成死锁,接下来会介绍。
 * <h3>Do not call {@link #await()} inside {@link ChannelHandler}</h3>
 * 不要再ChannelHandler里边调用await()方法
 * <p>
 * The event handler methods in {@link ChannelHandler} are usually called by
 * an I/O thread.  If {@link #await()} is called by an event handler
 * method, which is called by the I/O thread, the I/O operation it is waiting
 * for might never complete because {@link #await()} can block the I/O
 * operation it is waiting for, which is a dead lock.
 * ChannelHandler里边的时间处理器通常会被i/o线程调用,如果await()被一个时间处理方法调用,并且是一个i/o线程,那么这个i/o操作将永远不会 
 * 完成,因为await()是会阻塞i/o操作,这是一个死锁。
 * <pre>
 * // BAD - NEVER DO THIS 不推荐的使用方式
 * {@code @Override}
 * public void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
 *     {@link ChannelFuture} future = ctx.channel().close();
 *     future.awaitUninterruptibly();//不要使用await的 方式
 *     // Perform post-closure operation
 *     // ...
 * }
 *
 * // GOOD
 * {@code @Override} //推荐使用的方式
 * public void channelRead({@link ChannelHandlerContext} ctx, Object msg) {
 *     {@link ChannelFuture} future = ctx.channel().close();
 *     future.addListener(new {@link ChannelFutureListener}() {//使用时间的方式
 *         public void operationComplete({@link ChannelFuture} future) {
 *             // Perform post-closure operation
 *             // ...
 *         }
 *     });
 * }
 * </pre>
 * <p>
 * In spite of the disadvantages mentioned above, there are certainly the cases
 * where it is more convenient to call {@link #await()}. In such a case, please
 * make sure you do not call {@link #await()} in an I/O thread.  Otherwise,
 * {@link BlockingOperationException} will be raised to prevent a dead lock.
 * 尽管出现了上面提到的这些缺陷,但是在某些情况下更方便,在这种情况下,请确保不要再i/o线程里边调用await()方法,
 * 否则会出现BlockingOperationException异常,导致死锁。
 * <h3>Do not confuse I/O timeout and await timeout</h3>
 *不要将i/o超时和等待超时混淆。
 * The timeout value you specify with {@link #await(long)},
 * {@link #await(long, TimeUnit)}, {@link #awaitUninterruptibly(long)}, or
 * {@link #awaitUninterruptibly(long, TimeUnit)} are not related with I/O
 * timeout at all.  If an I/O operation times out, the future will be marked as
 * 'completed with failure,' as depicted in the diagram above.  For example,
 * connect timeout should be configured via a transport-specific option:
 * 使用await(long)、await(long, TimeUnit)、awaitUninterruptibly(long)、awaitUninterruptibly(long, TimeUnit)设置的超时时间
 * 和i/o超时没有任何关系,如果一个i/o操作超时,future 将被标记为失败的完成状态,比如连接超时通过一些选项来配置:
 * <pre>
 * // BAD - NEVER DO THIS //不推荐的方式
 * {@link Bootstrap} b = ...;
 * {@link ChannelFuture} f = b.connect(...);
 * f.awaitUninterruptibly(10, TimeUnit.SECONDS);//不真正确的等待超时
 * if (f.isCancelled()) {
 *     // Connection attempt cancelled by user
 * } else if (!f.isSuccess()) {
 *     // You might get a NullPointerException here because the future//不能确保future 的完成。
 *     // might not be completed yet.
 *     f.cause().printStackTrace();
 * } else {
 *     // Connection established successfully
 * }
 *
 * // GOOD//推荐的方式
 * {@link Bootstrap} b = ...;
 * // Configure the connect timeout option.
 * <b>b.option({@link ChannelOption}.CONNECT_TIMEOUT_MILLIS, 10000);</b>//配置连接超时
 * {@link ChannelFuture} f = b.connect(...);
 * f.awaitUninterruptibly();
 *
 * // Now we are sure the future is completed.//确保future 一定是完成了。
 * assert f.isDone();
 *
 * if (f.isCancelled()) {
 *     // Connection attempt cancelled by user
 * } else if (!f.isSuccess()) {
 *     f.cause().printStackTrace();
 * } else {
 *     // Connection established successfully
 * }
 * </pre>
 */

标签:netty,NIO,Netty,ChannelFuture,await,future,link,operation,Future
来源: https://blog.csdn.net/m0_37139189/article/details/89874088