深入netty之Pipeline 设计原理
上一篇文章中我们已经知道了在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下:
通过上图我们可以看到,一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表。这个链表的头是 HeadContext,链表的尾是 TailContext,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler
接下来我们从源码角度去看一下netty中是如何设计Pipeline 的,从AbstractChannel的构造器方法开始
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
AbstractChannel 有一个 pipeline 字段,在构造器中会初始化它为 DefaultChannelPipeline 的实例
接着我们跟踪一下 DefaultChannelPipeline 的初始化过程,首先进入到 DefaultChannelPipeline 构造器中:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
在 DefaultChannelPipeline 构造器中, 首先将与之关联的 Channel 保存到字段 channel 中。然后实例化两个ChannelHandlerContext:一个是 HeadContext 实例 head,另一个是 TailContext 实例 tail。接着将 head 和 tail 互相指向, 构成一个双向链表
TailContext 类层次结构:
HeadContext 类层次结构:
从类层次结构图中可以很清楚地看到,head 实现了 ChannelInboundHandler 接口,而 tail 实现了 ChannelOutboundHandler 接口,并且它们都实现了 ChannelHandlerContext 接口, 因此可以说 head 和 tail 即是一个 ChannelHandler,又是一个 ChannelHandlerContext
ChannelInitializer 的添加
到目前为止,这个 Pipeline 还并不能实现什么特殊的功能,因为我们还 没有给它添加自定义的 ChannelHandler
通常来说,我们在初始化 Bootstrap,会添加我们自定义的 ChannelHandler, 以客户端启动代码片段来举例:
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ChatClientHandler(nickName));
}
});
上面代码的初始化过程,相信大家都不陌生。在调用 handler 时,传入了一个 ChannelInitializer 对象,它提供了一个 initChannel()方法给我我们初始化 ChannelHandler,这个初始化过程是怎样的呢?又是在什么时候添加到 ChannelPipeline 中的呢?
Bootstrap 的 init()方法中添加到 ChannelPipeline 中的,其代码如下:
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
...
}
从上面的代码可见,Bootstrap 将 config.handler()返回的 ChannelHandler 添加到 Pipeline 中,而 handler()返回的其实就是我们在初始化 Bootstrap 时通过 handler()方法设置的 ChannelInitializer 实例(ChannelInitializer 实现了 ChannelInboundHandlerAdapter),因此这里其实就是将 ChannelInitializer 插入到了 Pipeline 的末端。此时 Pipeline 的结构如下图所示
可是我们在客户端代码中插入的是一个 ChannelInitializer 实例,为什么在 ChannelPipeline 中的双向链表中的元素却是一个 ChannelHandlerContext 呢?我们需要继续去源码中寻找答案
刚才,我们提到,在 Bootstrap 的 init()方法中会调用 p.addLast()方法,将 ChannelInitializer 插入到链表的末端:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
}
return this;
}
addLast()方法最终落在DefaultChannelPipeline中,在addLast()方法中首先检查 ChannelHandler 的名字是否是重复,如果不重复,则调用 newContex()方法为 Handler 创建一个对应的 DefaultChannelHandlerContext 实例,并将两者关联起来(Context 中有一个 handler 属性保存着对应的 Handler 实例)。
为了添加一个 handler 到 pipeline 中,必须把此 handler 包装成 ChannelHandlerContext。
newContext()方法会构建一个DefaultChannelHandlerContext 对象,构造器如下:
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
自定义 ChannelHandler 的添加过程
前面我们已经分析了 ChannelInitializer 是如何插入到 Pipeline 中的,接下来就来探讨 ChannelInitializer 在哪里被调用,ChannelInitializer 的作用,以及我们自定义的 ChannelHandler 是如何插入到 Pipeline 中的
先简单复习一下 Channel 的注册过程:
1、首先在 AbstractBootstrap 的 initAndRegister()中,通过 group().register(channel),调用 MultithreadEventLoopGroup 的 register()方法。
2、在 MultithreadEventLoopGroup 的 register()中调用 next()获取一个可用的 SingleThreadEventLoop,然后调用 它的 register()方法。
3、在 SingleThreadEventLoop 的 register()方法中,通过 channel.unsafe().register(this, promise)方法获取 channel 的 unsafe()底层 IO 操作对象,然后调用它的 register()。
4、在 AbstractUnsafe 的 register()方法中,调用 register0()方法注册 Channel 对象。
5、在 register0()方法中,调用 AbstractNioChannel 的 doRegister()方法。
6、doRegister()方法将 Channel 对应的 Java NIO 的 SockerChannel 对象注册到一个 eventLoop 的 Selector 中,并且将当前 Channel 作为 attachment
自定义 ChannelHandler 的添加过程,就发生在 步骤四 AbstractUnsafe 的 register0()方法中,在这个方法中调用了 pipeline.fireChannelRegistered()方法,其代码实现如下:
private void register0(ChannelPromise promise) {
...
pipeline.fireChannelRegistered();
...
}
fireChannelRegistered()方法代码实现如下:
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
再看 AbstractChannelHandlerContext 的 invokeChannelRegistered()方法:
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
显然,这个代码会从 head 开始遍历 Pipeline 的双向链表,然后找到第一个属性 inbound 为 true 的 ChannelHandlerContext 实例。而ChannelInitializer 实现的是 ChannelInboudHandler,因此它所对应 的 ChannelHandlerContext 的 inbound 属性就是 true,因此这里返回就是 ChannelInitializer 实例所对应的 ChannelHandlerContext 对象,如下图所示:
当获取到 inbound(ChannelInitializer) 的 Context 后,就调用它的 invokeChannelRegistered()方法:
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
我们知道每个 ChannelHandler 都和一个 ChannelHandlerContext 关联,很明显这里 handler()返回的对象就是我们实例化的 ChannelInitializer 对象,接着调用了 ChannelInitializer 的 channelRegistered()方法,继续看代码:
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
if (initChannel(ctx)) {
ctx.pipeline().fireChannelRegistered();
} else {
ctx.fireChannelRegistered();
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
return true;
}
return false;
}
注意initChannel()方法中调用了initChannel(© ctx.channel()),这个initChannel(C ch)方法我们也很熟悉,它就是我们在初始化 Bootstrap 时,调用 handler 方法传入的匿 名内部类所实现的方法:
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ChatClientHandler(nickName));
}
});
因此,当调用这个方法之后, 我们自定义的 ChannelHandler 就插入到了 Pipeline,此时 Pipeline 的状态如下图所示:
当添加完成自定义的 ChannelHandler 后,在 finally 代码块会删除自定义的 ChannelInitializer,也就是 remove(ctx)最 终调用 ctx.pipeline().remove(this),因此最后的 Pipeline 的状态如下:
Pipeline 的事件传播机制
我们已经知道 AbstractChannelHandlerContext 中有 inbound 和 outbound 两个 boolean 变量,分别用 于标识 Context 所对应的 handler 的类型,即:
1、inbound 为 true 是,表示其对应的 ChannelHandler 是 ChannelInboundHandler 的子类。
2、outbound 为 true 时,表示对应的 ChannelHandler 是 ChannelOutboundHandler 的子类。
这两个字段到底有什么作用呢? 这还要从 ChannelPipeline 的事件传播类型说起。 Netty 中的传播事件可以分为两种:Inbound 事件和 Outbound 事件
从上图可以看出,inbound 事件和 outbound 事件的流向是不一样的,inbound 事件的流行是从下至上,而 outbound 刚好相反,是从上到下
inbound 类似于是事件回调(响应请求的事件),而 outbound 类似于主动触发(发起请求的 事件)
Outbound 事件传播方式
Outbound 事件都是请求事件(request event),即请求某件事情的发生,然后通过 Outbound 事件进行通知。 Outbound 事件的传播方向是 tail -> customContext -> head。
我们接下来以 connect 事件为例,分析一下 Outbound 事件的传播机制。 首先,当用户调用了 Bootstrap 的 connect()方法时,就会触发一个 Connect 请求事件,此调用会触发如下调用链:
继续跟踪,我们就发现 AbstractChannel 的 connect()其实由调用了 DefaultChannelPipeline 的 connect()方法:
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
可以看到,当 outbound 类型的事件(这里是 connect 事件)传递到 Pipeline 后,它其实是以 tail 为起点开始传播的。
而 tail.connect()会调用到 AbstractChannelHandlerContext 的 connect()方法:
public ChannelFuture connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
......
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
next.invokeConnect(remoteAddress, localAddress, promise);
.....
return promise;
}
findContextOutbound()方法是以当前 Context 为起点,沿着Pipeline 中的 Context 双向链表寻找第一个 outbound 属性为 true 的 Context(即关联 ChannelOutboundHandler 的 Context),然后返回
当找到了一个 outbound 类型的 Context 后,就调用它的 invokeConnect()方法,这个方法中会调用 Context 关联的 ChannelHandler 的 connect()方法:
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
于是这个调用回到了: Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect 这样的循环中
直到 connect 事件传递到 DefaultChannelPipeline 的双向链表的头节点,即 head 中。为什么会传递 到 head 中呢?因为,head 实现了 ChannelOutboundHandler
因为 head 本身既是一个 ChannelHandlerContext,又实现了 ChannelOutboundHandler 接口,因此最终 connect()事件是在 head 中被处理。head 的 connect()事件处理逻辑如下:
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
connect事件传播流程总结:
1.Bootstrap调用connect方法,触发事件事件被Channel.connect接收处理 Bootstrap.connect() -->Channel.connect()
2.Channel会调用其关联的Pipeline的connect()方法, Channel.connect()–>Pipeline.connect()
3.Pipeline会从双向链表的tail节点开始处理,调用tail节点的connect()方法Pipeline.connect() -->TailContext.connect()
4.TailContext会寻找下一个Outbound的ChannelHandlerContext,然后调用其invokeConnect方法TailContext.connect() -->ChannelHandlerContext.invokeConnect()
5.ChannelHandlerContext会调用其相关联的ChannelOutboundHandler的connect()方法,ChannelHandlerContext.invokeConnect()–>ChannelOutboundHandler.connect()
6.这样一直向下传递直到HeadContext,HeadContext同时也实现了ChannelOutboundHandler接口,因此最终connect事件被HeadContext的connect()方法处理