回想一下我们在 NIO 中是如何处理我们关心的事件的?在一个 while 循环中 select 出事件,然后依次处理每种事件。我们可以把它称为事件循环,这就是 EventLoop。interface io.netty.channel.EventLoop 定义了 Netty 的核心抽象,用于处理网络连接的生命周期中所发生的事件。
io.netty.util.concurrent
包构建在 JDK 的
java.util.concurrent
包上。而
io.netty.channel
包中的类,为了与 Channel 的事件进行交互,扩展了这些接口/类。一个 EventLoop 将由一个永远都不会改变的 Thread 驱动,同时任务(Runnable 或者 Callable)可以直接提交给EventLoop 实现,以立即执行或者调度执行。
线程的分配:
服务于 Channel 的 I/O 和事件的 EventLoop 包含在 EventLoopGroup 中。
异步传输实现只使用了少量的 EventLoop(以及和它们相关联的 Thread),而且在当前的线程模型中,它们可能会被多个 Channel 所共享。这使得可以通过尽可能少量的 Thread 来支撑大量的 Channel,而不是每个 Channel 分配一个 Thread。EventLoopGroup 负责为每个新创建的 Channel 分配一个 EventLoop。在当前实现中,使用顺序循环(round-robin)的方式进行分配以获取一个均衡的分布,并且相同的 EventLoop 可能会被分配给多个 Channel。
一旦一个 Channel 被分配给一个 EventLoop,它将在它的整个生命周期中都使用这个EventLoop(以及相关联的 Thread)。
需要注意,EventLoop 的分配方式对 ThreadLocal 的使用的影响。因为一个 EventLoop 通常会被用于支撑多个 Channel,所以对于所有相关联的 Channel 来说,ThreadLocal 都将是一样的。这使得它对于实现状态追踪等功能来说是个糟糕的选择。然而,在一些无状态的上下文中,它仍然可以被用于在多个 Channel 之间共享一些重度的或者代价昂贵的对象,甚至是事件。
线程管理:
在内部,当提交任务到如果(当前)调用线程正是支撑 EventLoop 的线程,那么所提交的代码块将会被(直接)执行。否则,EventLoop 将调度该任务以便稍后执行,并将它放入到内部队列中。当 EventLoop 下次处理它的事件时,它会执行队列中的那些任务/事件。
Netty 网络抽象的代表:
Channel:Socket;
EventLoop:控制流、多线程处理、并发;
ChannelFuture:异步通知。
Channel 和 EventLoop 关系如图:
从图上我们可以看出 Channel 需要被注册到某个 EventLoop 上,在 Channel 整个生命周期内都由这个EventLoop 处理 IO 事件,也就是说一个 Channel 和一个 EventLoop 进行了绑定,但是一个EventLoop 可以同时被多个 Channel 绑定。
基本的 I/O 操作(bind()、connect()、read()和 write())依赖于底层网络传输所提供的原语。在基于 Java 的网络编程中,其基本的构造是类 Socket。Netty 的 Channel 接口所提供的 API,被用于所有的 I/O 操作。大大地降低了直接使用 Socket 类的复杂性。此外,Channel 也是拥有许多预定义的、专门化实现的广泛类层次结构的根。
由于 Channel 是独一无二的,所以为了保证顺序将 Channel 声明为
java.lang.Comparable
的一个子接口。因此,如果两个不同的 Channel 实例都返回了相同的散列码,那么 AbstractChannel 中的 compareTo() 方法的实现将会抛出一个 Error。
Channel 的生命周期状态:
ChannelUnregistered:Channel 已经被创建,但还未注册到 EventLoop
ChannelRegistered:Channel 已经被注册到了 EventLoop
ChannelActive:Channel 处于活动状态(已经连接到它的远程节点)。它现在可以接收和发送数据了
ChannelInactive:Channel 没有连接到远程节点
当这些状态发生改变时,将会生成对应的事件。这些事件将会被转发给 ChannelPipeline 中的 ChannelHandler,其可以随后对它们做出响应。在我们的编程中,关注 ChannelActive 和 ChannelInactive 会更多一些。
重要 Channel 的方法:
eventLoop:返回分配给 Channel 的 EventLoop。
pipeline:返回 Channel 的 ChannelPipeline,也就是说每个 Channel 都有自己的 ChannelPipeline。
isActive:如果 Channel 是活动的,则返回 true。活动的意义可能依赖于底层的传输。例如,一个 Socket 传输一旦连接到了远程节点便是活动的,而一个 Datagram 传输一旦被打开便是活动的。
localAddress:返回本地的 SokcetAddress。
remoteAddress:返回远程的 SocketAddress。
write:将数据写到远程节点,注意,这个写只是写往 Netty 内部的缓存,还没有真正写往 socket。
flush:将之前已写的数据冲刷到底层 socket 进行传输。
writeAndFlush:一个简便的方法,等同于调用 write() 并接着调用 flush()。
(1)ChannelPipeline 接口
当 Channel 被创建时,它将会被自动地分配一个新的 ChannelPipeline,每个 Channel 都有自己的 ChannelPipeline。这项关联是永久性的。在 Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。
ChannelPipeline 提供了 ChannelHandler 链的容器,并定义了用于在该链上传播入站(也就是从网络到业务处理)和 出站(也就是从业务处理到网络),各种事件流的 API,我们代码中的 ChannelHandler 都是放在 ChannelPipeline 中的。
使得事件流经 ChannelPipeline 是 ChannelHandler 的工作,它们是在应用程序的初始化或者引导阶段被安装的。这些 ChannelHandler 对象接收事件、执行它们所实现的处理逻辑,并将数据传递给链中的下一个 ChannelHandler,而且 ChannelHandler 对象也完全可以拦截事件不让事件继续传递。它们的执行顺序是由它们被添加的顺序所决定的。
ChannelHandler 的生命周期:
在 ChannelHandler 被添加到 ChannelPipeline 中或者被从 ChannelPipeline 中移除时会调用下面这些方法。这些方法中的每一个都接受一个 ChannelHandlerContext 参数。
handlerAdded:当把 ChannelHandler 添加到 ChannelPipeline 中时被调用;
handlerRemoved:当从 ChannelPipeline 中移除 ChannelHandler 时被调用;
exceptionCaught:当处理过程中在 ChannelPipeline 中有错误产生时被调用;
ChannelPipeline 中的 ChannelHandler:
入站和出站 ChannelHandler 被安装到同一个 ChannelPipeline 中,ChannelPipeline 以双向链表的形式进行维护管理。比如下图,我们在网络上传递的数据,要求加密,但是加密后密文比较大,需要压缩后再传输,而且按照业务要求,需要检查报文中携带的用户信息是否合法,于是我们实现了 5 个 Handler:解压(入)Handler、压缩(出)handler、解密(入)Handler、加密(出) Handler、授权(入) Handler。
如果一个消息或者任何其他的入站事件被读取,那么它会从 ChannelPipeline 的头部开始流动,但是只被处理入站事件的 Handler 处理,也就是解压(入)Handler、解密(入)Handler、授权(入) Handler,最终,数据将会到达 ChannelPipeline 的尾端,届时,所有处理就都结束了。
数据的出站运动(即正在被写的数据)在概念上也是一样的。在这种情况下,数据将从链的尾端开始流动,但是只被处理出站事件的 Handler 处理,也就是加密(出) Handler、压缩(出)handler,直到它到达链的头部为止。在这之后,出站数据将会到达网络传输层,也就是我们的 Socket。
Netty 能区分入站事件的 Handler 和出站事件的 Handler,并确保数据只会在具有相同定向类型的两个 ChannelHandler 之间传递。
所以在我们编写 Netty 应用程序时要注意,分属出站和入站不同的 Handler ,在业务没特殊要求的情况下是无所谓顺序的,正如我们下面的图所示,比如“压缩(出)handler”可以放在“解压(入)handler”和“解密(入) Handler”中间,也可以放在“解密(入) Handler”和“授权(入) Handler”之间。
而同属一个方向的 Handler 则是有顺序的,因为上一个 Handler 处理的结果往往是下一个 Handler 的要求的输入。比如入站处理,对于收到的数据,只有先解压才能得到密文,才能解密,只有解密后才能拿到明文中的用户信息进行授权检查,所以解压->解密->授权这个三个入站 Handler 的顺序就不能乱。
ChannelPipeline 上的方法:
既然 ChannelPipeline 以双向链表的形式进行维护管理 Handler,自然也提供了对应的方法在 ChannelPipeline 中增加或者删除、替换 Handler。
addFirst、addBefore、addAfter、addLast:将一个 ChannelHandler 添加到 ChannelPipeline 中;
remove:将一个 ChannelHandler 从 ChannelPipeline 中移除;
replace:将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 ChannelHandler;
get:通过类型或者名称返回 ChannelHandler;
context:返回和 ChannelHandler 绑定的 ChannelHandlerContext;
names:返回 ChannelPipeline 中所有 ChannelHandler 的名称;
ChannelPipeline 的 API 公开了用于调用入站和出站操作的附加方法。
(2)ChannelHandlerContext
ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之间的关联,每当有 ChannelHandler 添加到 ChannelPipeline 中时,都会创建 ChannelHandlerContext,为什么需要这个 ChannelHandlerContext ?前面我们已经说过,ChannelPipeline 以双向链表的形式进行维护管理 Handler,毫无疑问,Handler 在放入 ChannelPipeline 的时候必须要有两个指针 pre 和 next 来说明它的前一个元素和后一个元素,但是 Handler 本身来维护这两个指针合适吗?想想我们在使用 JDK 的 LinkedList 的时候,我们放入 LinkedList 的数据是不会带这两个指针的,LinkedList 内部会用类 Node 对我们的数据进行包装,而类 Node 则带有两个指针 pre 和 next。
所以,ChannelHandlerContext 的主要作用就和 LinkedList 内部的类 Node 类似。
不过 ChannelHandlerContext 不仅仅只是个包装类,它还提供了很多的方法,比如让事件从当前 ChannelHandler 传递给链中的下一个 ChannelHandler,还可以被用于获取底层的Channel,还可以用于写出站数据。
Channel、ChannelPipeline 和 ChannelHandlerContext 上的事件传播:
ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 ChannelPipeline 本身上,但是有一点重要的不同。如果调用 Channel 或者 ChannelPipeline 上的这些方法,它们将沿着整个 ChannelPipeline 进行传播。而调用位于 ChannelHandlerContext 上的相同方法,则将从当前所关联的 ChannelHandler 开始,并且只会传播给位于该 ChannelPipeline 中的下一个(入站下一个,出站上一个)能够处理该事件的 ChannelHandler。
我们用一个实际例子来说明,比如服务器收到对端发过来的报文,解压后需要进行解密,结果解密失败,要给对端一个应答。
如果发现解密失败原因是服务器和对端的加密算法不一致,应答报文只能以明文的压缩格式发送,就可以在解密 handler 中直接使用 ctx.write 给对端应答,这样应答报文就只经过压缩 Handler 就发往了对端;
其他情况下,应答报文要以加密和压缩格式发送,就可以在解密 handler 中使用 channel.write() 或者 channelpipeline.write() 给对端应答,这样应答报文就会流经整个出站处理过程。
ChannelHandlerContext 的 API:
alloc:返回和这个实例相关联的 Channel 所配置的 ByteBufAllocator
bind:绑定到给定的 SocketAddress,并返回 ChannelFuture
channel:返回绑定到这个实例的 Channel
close:关闭 Channel,并返回 ChannelFuture
connect:连接给定的 SocketAddress,并返回 ChannelFuture
deregister:从之前分配的 EventExecutor 注销,并返回 ChannelFuture
disconnect:从远程节点断开,并返回 ChannelFuture
executor:返回调度事件的 EventExecutor
fireChannelActive:触发对下一个 ChannelInboundHandler 上的 channelActive()方法(已连接)的调用
fireChannelInactive:触发对下一个 ChannelInboundHandler 上的 channelInactive()方法(已关闭)的调用
fireChannelRead:触发对下一个 ChannelInboundHandler 上的 channelRead()方法(已接收的消息)的调用
fireChannelReadComplete:触发对下一个 ChannelInboundHandler 上的 channelReadComplete()方法的调用
fireChannelRegistered:触发对下一个 ChannelInboundHandler 上的 fireChannelRegistered()方法的调用
fireChannelUnregistered:触发对下一个 ChannelInboundHandler 上的 fireChannelUnregistered()方法的调用
fireChannelWritabilityChanged:触发对下一个 ChannelInboundHandler 上的 fireChannelWritabilityChanged()方法的调用
fireExceptionCaught:触发对下一个 ChannelInboundHandler 上的fireExceptionCaught(Throwable)方法的调用
fireUserEventTriggered:触发对下一个 ChannelInboundHandler 上的fireUserEventTriggered(Object evt)方法的调用
handler:返回绑定到这个实例的 ChannelHandler
isRemoved:如果所关联的 ChannelHandler 已经被从 ChannelPipeline 中移除则返回 true
name:返回这个实例的唯一名称
pipeline:返回这个实例所关联的 ChannelPipeline
read:将数据从 Channel 读取到第一个入站缓冲区;如果读取成功则触发一个 channelRead 事件,并(在最后一个消息被读取完成后)通知 ChannelInboundHandler 的channelReadComplete(ctx)方法
write:通过这个实例写入消息并经过 ChannelPipeline
writeAndFlush:通过这个实例写入并冲刷消息并经过 ChannelPipeline
当使用 ChannelHandlerContext 的 API 的时候,有以下两点:
-
ChannelHandlerContext 和 ChannelHandler 之间的关联(绑定)是永远不会改变的,所以缓存对它的引用是安全的;
-
相对于其他类的同名方法,ChannelHandlerContext 的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能;
(1)ChannelHandler 接口
从应用程序开发人员的角度来看,Netty 的主要组件是 ChannelHandler,它充当了所有处理入站和出站数据的应用程序逻辑的容器。ChannelHandler 的方法是由网络事件触发的。
事实上,ChannelHandler 可专门用于几乎任何类型的动作,例如将数据从一种格式转换为另外一种格式,例如各种编解码,或者处理转换过程中所抛出的异常。
举例来说,ChannelInboundHandler 是一个你将会经常实现的子接口。这种类型的 ChannelHandler 接收入站事件和数据,这些数据随后将会被你的应用程序的业务逻辑所处理。
当你要给连接的客户端发送响应时,也可以从 ChannelInboundHandler 直接冲刷数据然后输出到对端。应用程序的业务逻辑通常实现在一个或者多个 ChannelInboundHandler 中。
这种类型的 ChannelHandler 接收入站事件和数据,这些数据随后将会被应用程序的业务逻辑所处理。
Netty 定义了下面两个重要的 ChannelHandler 子接口:
ChannelInboundHandler,处理入站数据以及各种状态变化;
ChannelOutboundHandler,处理出站数据并且允许拦截所有的操作。
(2)ChannelInboundHandler 接口
下面列出了接口 ChannelInboundHandler 的生命周期方法。这些方法将会在数据被接收时或者与其对应的 Channel 状态发生改变时被调用。正如我们前面所提到的,这些方法和 Channel 的生命周期密切相关。
channelRegistered:当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用
channelUnregistered:当 Channel 从它的 EventLoop 注销并且无法处理任何 I/O 时被调用
channelActive:当 Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪
channelInactive:当 Channel 离开活动状态并且不再连接它的远程节点时被调用
channelReadComplete:当 Channel 上的一个读操作完成时被调用
channelRead:当从 Channel 读取数据时被调用
ChannelWritabilityChanged:当 Channel 的可写状态发生改变时被调用。可以通过调用 Channel 的 isWritable()方法来检测 Channel 的可写性。与可写性相关的阈值可以通过
Channel.config().setWriteHighWaterMark()
和
Channel.config().setWriteLowWaterMark()
方法来设置
userEventTriggered:当 ChannelnboundHandler.fireUserEventTriggered()方法被调用时被调用。
注意:channelReadComplete 和 channelRead 这两个方法非常让人搞不清两者的区别是什么,我们先放下这个疑问,后面会有解释。
(3)ChannelOutboundHandler 接口
出站操作和数据将由 ChannelOutboundHandler 处理。它的方法将被 Channel、ChannelPipeline 以及 ChannelHandlerContext 调用。
所有由 ChannelOutboundHandler 本身所定义的方法:
bind(ChannelHandlerContext,SocketAddress,ChannelPromise):当请求将 Channel 绑定到本地地址时被调用
connect(ChannelHandlerContext,SocketAddress,SocketAddress,ChannelPromise):当请求将 Channel 连接到远程节点时被调用
disconnect(ChannelHandlerContext,ChannelPromise):当请求将 Channel 从远程节点断开时被调用
close(ChannelHandlerContext,ChannelPromise):当请求关闭 Channel 时被调用
deregister(ChannelHandlerContext,ChannelPromise):当请求将 Channel 从它的 EventLoop 注销时被调用
read(ChannelHandlerContext):当请求从 Channel 读取更多的数据时被调用
flush(ChannelHandlerContext):当请求通过 Channel 将入队数据冲刷到远程节点时被调用
write(ChannelHandlerContext,Object,ChannelPromise):当请求通过 Channel 将数据写到远程节点时被调用
(4)ChannelHandler 的适配器
有一些适配器类可以将编写自定义的 ChannelHandler 所需要的工作降到最低限度,因为它们提供了定义在对应接口中的所有方法的默认实现。因为你有时会忽略那些不感兴趣的事件,所以 Netty 提供了抽象基类 ChannelInboundHandlerAdapter(处理入站) 和 ChannelOutboundHandlerAdapter(处理出站)。
我们可以使用 ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter 类作为自己的 ChannelHandler 的起始点。这两个适配器分别提供了 ChannelInboundHandler 和
ChannelOutboundHandler 的基本实现。通过扩展抽象类 ChannelHandlerAdapter,它们获得了它们共同的超接口 ChannelHandler 的方法。
不过 ChannelOutboundHandler 有个非常让人迷惑的 read 方法,ChannelOutboundHandler 不是处理出站事件的吗?怎么会有 read 方法呢?其实这个 read 方法不是表示读数据,而是表示业务发出了读(read)数据的要求,这个要求也会封装为一个事件进行传播,这个事件因为是业务发出到网络的,自然就是个出站事件,而且这个事件触发的就是 ChannelOutboundHandler 中 read 方法。
如果我们的 Handler 既要处理入站又要处理出站怎么办呢?这个时候就可以使用类 ChannelDuplexHandler,当然也可以同时实现 ChannelOutboundHandler, ChannelInboundHandler 这两个接口,自然就要麻烦很多了。
(5)Handler 的共享和并发安全性
ChannelHandlerAdapter 还提供了实用方法 isSharable()。如果其对应的实现被标注为 Sharable,那么这个方法将返回 true,表示它可以被添加到多个 ChannelPipeline。
这就牵涉到了我们实现的 Handler 的共享性和线程安全性。回顾我们的 Netty 代码,在往 pipeline 安装 Handler 的时候,我们基本上是 new 出 Handler 的实例
因为每个 socketChannel 有自己的 pipeline 而且每个 socketChannel 又是和线程绑定的,所以这些 Handler 的实例之间完全独立的,只要 Handler 的实例之间不是共享了全局变量,Handler 的实例是线程安全的。
但是如果业务需要我们在多个 socketChannel 之间共享一个 Handler 的实例怎么办呢?比如统计服务器接受到和发出的业务报文总数,我们就需要用一个 Handler 的实例来横跨所有的 socketChannel 来统计所有 socketChannel 业务报文数。
为了实现这一点,我们可以实现一个 MessageCountHandler,并且在 MessageCountHandler 上使用 Netty 的@Sharable 注解,然后在安装 MessageCountHandler 实例到 pipeline 时,共用一个即可。当然,因为 MessageCountHandler 实例是共享的,所以在实现 MessageCountHandler 的统计功能时,请务必注意线程安全,我们在具体实现时就使用了 Java 并发编程里的 Atomic 类来保证这一点。
(6)资源管理和 SimpleChannelInboundHandler
回想一下我们在 NIO 中是如何接收和发送网络数据的?都是首先创建了一个 Buffer,应用程序中的业务部分和 Channel 之间通过 Buffer 进行数据的交换:
Netty 在处理网络数据时,同样也需要 Buffer,在 Read 网络数据时由 Netty 创建 Buffer,Write 网络数据时 Buffer 往往是由业务方创建的。不管是读和写,Buffer 用完后都必须进行释放,否则可能会造成内存泄露。
在 Write 网络数据时,可以确保数据被写往网络了,Netty 会自动进行 Buffer 的释放,但是如果 Write 网络数据时,我们有 outBoundHandler 处理了 write()操作并丢弃了数据,没有继续往下写,要由我们负责释放这个 Buffer,就必须调用
ReferenceCountUtil.release
方法,否则就可能会造成内存泄露。
在 Read 网络数据时,如果我们可以确保每个 InboundHandler 都把数据往后传递了,也就是调用了相关的 fireChannelRead 方法,Netty 也会帮我们释放,同样的,如果我们有 InboundHandler 处理了数据,又不继续往后传递,又不调用负责释放的
ReferenceCountUtil.release
方法,就可能会造成内存泄露。
但是由于消费入站数据是一项常规任务,所以 Netty 提供了一个特殊的被称为 SimpleChannelInboundHandler 的 ChannelInboundHandler 实现。这个实现会在数据被 channelRead0()方法消费之后自动释放数据。
同时系统为我们提供的各种预定义 Handler 实现,都实现了数据的正确处理,所以我们自行在编写业务 Handler 时,也需要注意这一点:要么继续传递,要么自行释放。
ServerBootstrap 将绑定到一个端口,因为服务器必须要监听连接,而 Bootstrap 则是由想要连接到远程节点的客户端应用程序所使用的。
第二个区别可能更加明显。引导一个客户端只需要一个 EventLoopGroup,但是一个 ServerBootstrap 则需要两个(也可以是同一个实例)。
因为服务器需要两组不同的 Channel。第一组将只包含一个 ServerChannel,代表服务器自身的已绑定到某个本地端口的正在监听的套接字。而第二组将包含所有已创建的用来处理传入客户端连接(对于每个服务器已经接受的连接都有一个)的 Channel。
与 ServerChannel 相关联的 EventLoopGroup 将分配一个负责为传入连接请求创建 Channel 的 EventLoop。一旦连接被接受,第二个 EventLoopGroup 就会给它的 Channel 分配一个 EventLoop。