Inbound事件传播
作者:互联网
ChannelHandler继承体系
ChannelHandler:所有逻辑处理器的抽象。
public interface ChannelHandler {
// handler添加完成回调
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
// handler删除完成回调
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
// 异常回调
@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
// 指定handler为共享handler,可重复添加到pipeline
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface Sharable {
// no value
}
}
ChannelInboundHandler:基于ChannelHandler,扩展了一些Inbound事件。
public interface ChannelInboundHandler extends ChannelHandler {
// channel注册回调,当channel注册到NioEventLoop对应的Selector是触发该回调
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
// channel注销回调
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
// channel激活回调
void channelActive(ChannelHandlerContext ctx) throws Exception;
// channel失效回调
void channelInactive(ChannelHandlerContext ctx) throws Exception;
// 读到数据时执行该方法
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
// 数据读完的回调
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
// 用户自定义的触发事件
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
// 改变channel的可写状态
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
// 异常捕获
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
事件传播过程
定义三个自定义的InBoundHandler类:
InBoundHandlerA
public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerA: " + msg);
ctx.fireChannelRead(msg);
}
}
InBoundHandlerB
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerB: " + msg);
ctx.fireChannelRead(msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("InBoundHandlerB: channelActive");
ctx.channel().pipeline().fireChannelRead("hello world");
}
}
InBoundHandlerC
public class InBoundHandlerC extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerC: " + msg);
ctx.fireChannelRead(msg);
}
}
在启动类中添加这三个handler
// 配置业务处理链 handler pipeline
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
}
});
启动服务端,通过“telnet 127.0.0.1 8888”测试一下调用结果:
因为InBoundHandlerB实现了channelActive方法,所以在channel被激活之后会首先调用该方法。
在该方法中,我们通过 ctx.channel().pipeline().fireChannelRead("hello world") 进行事件传播,传播顺序为A->B->C,与添加时的顺序是一样的。
fireChannelRead是从head节点或当前节点开始传播,先找到下一个inbound节点,然后再执行其channelRead方法,最后由tail节点做一些收尾工作并释放。
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
// findContextInbound方法遍历链表,寻找下一个inbound节点,传播事件
invokeChannelRead(findContextInbound(), msg);
return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// 调用channelRead方法
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
SimpleChannelInboundHandler
在上面了解到的事件传播机制中,资源的释放都是由tail节点完成的。如果由于某种原因,资源没有被传递到tail节点,这时就需要节点自己释放资源。
SimpleChannelInboundHandler提供了对channelRead方法的增强,封装了资源释放的逻辑。
当我们自定义的handler节点需要自己释放资源时,可以通过继承SimpleChannelInboundHandler来实现。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg);
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg);
}
}
}
KeepMoving++ 发布了151 篇原创文章 · 获赞 101 · 访问量 19万+ 私信 关注
标签:Exception,Inbound,ChannelHandlerContext,void,ctx,传播,事件,msg,throws 来源: https://blog.csdn.net/u011212394/article/details/103950445