添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

对netty感兴趣的小伙伴可以点击这里哦,我的 netty专栏

在网络通信中,网络链路是不稳定的,会经常发生异常,而异常的表现就是请求超时或者响应超时。这类异常对系统的可靠性产生重大影响。那么怎么监测通信异常呢?监测异常后又怎么处理呢?这本就来聊聊超时处理这个问题。

Netty 的超时类型 IdleState 主要分为以下3类:

  • ALL_IDLE : 一段时间内没有数据接收或者发送。
  • READER_IDLE : 一段时间内没有数据接收。
  • WRITER_IDLE : 一段时间内没有数据发送。
  • 针对上面的 3 类超时异常,Netty 提供了 3 类 ChannelHandler 来进行监测。

  • IdleStateHandler : 当 Channel 一段时间未执行读取、写入或者两者都未执行时,触发 IdleStateEvent 事件。
  • ReadTimeoutHandler :在一定时间内未读取任何数据时,引发 ReadTimeoutEvent 事件。
  • WriteTimeoutHandler :当写操作在一定时间内无法完成时,引发 WriteTimeoutEvent 事件。
  • IdleStateHandler类

    IdleStateHandler 包括了读\写超时状态处理,观察以下 IdleStateHandler 类的构造函数源码。

    public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
        this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
    public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
        this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
    public IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
        this.writeListener = new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                IdleStateHandler.this.lastWriteTime = IdleStateHandler.this.ticksInNanos();
                IdleStateHandler.this.firstWriterIdleEvent = IdleStateHandler.this.firstAllIdleEvent = true;
        this.firstReaderIdleEvent = true;
        this.firstWriterIdleEvent = true;
        this.firstAllIdleEvent = true;
        ObjectUtil.checkNotNull(unit, "unit");
        this.observeOutput = observeOutput;
        if (readerIdleTime <= 0L) {
            this.readerIdleTimeNanos = 0L;
        } else {
            this.readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
        if (writerIdleTime <= 0L) {
            this.writerIdleTimeNanos = 0L;
        } else {
            this.writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
        if (allIdleTime <= 0L) {
            this.allIdleTimeNanos = 0L;
        } else {
            this.allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
    

    在上述源码中,构造函数可以接收以下参数:

    readerIdleTimeSecond:指定读超时时间,指定 0 表明为禁用。

    writerIdleTimeSecond:指定写超时时间,指定 0 表明为禁用。

    allIdleTimeSecond:在指定读写超时时间,指定 0 表明为禁用。

    IdleStateHandler 使用示例:

    public class MyChannelInitializer extends ChannelInitializer<Channel> {
        @Override
        protected void initChannel(Channel channel) throws Exception {
            channel.pipeline().addLast("idleStateHandler",new IdleStateHandler(60,30,0));
            channel.pipeline().addLast("myHandler",new MyHandler());
    public class MyHandler extends ChannelDuplexHandler {
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if(evt instanceof IdleStateEvent){
                IdleStateEvent e = (IdleStateEvent) evt;
                if(e.state() == IdleState.READER_IDLE){
                    ctx.close();
                }else if(e.state() == IdleState.WRITER_IDLE){
                    ctx.writeAndFlush(new PingMessage());
    

    在上述示例中,IdleStateHandler 设置了读超时时间为 60 秒,写超时时间为 30 秒。MyHandler 是针对超时事件 IdleStateEvent 的处理。

  • 如果 30 秒内没有出站流量(写超时)时发送 ping 消息的示例。
  • 如果 60 秒内没有入站流量(读超时)时,连接关闭。
  • ReadTimeoutHandler类

    ReadTimeoutHandler 类包括了读超时状态处理。ReadTimeoutHandler 类的源码如下:

    public class ReadTimeoutHandler extends IdleStateHandler {
        private boolean closed;
        public ReadTimeoutHandler(int timeoutSeconds) {
            this((long)timeoutSeconds, TimeUnit.SECONDS);
        public ReadTimeoutHandler(long timeout, TimeUnit unit) {
            super(timeout, 0L, 0L, unit);//禁用了写超时、读写超时
        protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
            assert evt.state() == IdleState.READER_IDLE;//只处理读超时
            this.readTimedOut(ctx);
        protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
            if (!this.closed) {
                ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);//引发异常
                ctx.close();
                this.closed = true;
    

    从上述源码可以看出,ReadTimeoutHandler 继承自 IdleStateHandler,并在构造函数中禁用了写超时、读写超时,而且在处理超时时,只会针对 READER_IDLE状态进行处理,并引发 ReadTimeoutException 异常。

    ReadTimeoutHandler 的使用示例如下:

    public class MyChannelInitializer extends ChannelInitializer<Channel> {
        @Override
        protected void initChannel(Channel channel) throws Exception {
            channel.pipeline().addLast("readTimeoutHandler",new ReadTimeoutHandler(30));
            channel.pipeline().addLast("myHandler",new MyHandler());
    //处理器处理ReadTimeoutException 
    public class MyHandler extends ChannelDuplexHandler {
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            if(cause instanceof ReadTimeoutException){
                //...
            }else {
                super.exceptionCaught(ctx,cause);
    

    在上述示例中,ReadTimeoutHandler 设置了读超时时间是 30 秒。

    WriteTimeoutHandler类

    WriteTimeoutHandler 类包括了写超时状态处理。WriteTimeoutHandler 类的源码如下:

    public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {
        private static final long MIN_TIMEOUT_NANOS;
        private final long timeoutNanos;
        private WriteTimeoutHandler.WriteTimeoutTask lastTask;
        private boolean closed;
        public WriteTimeoutHandler(int timeoutSeconds) {
            this((long)timeoutSeconds, TimeUnit.SECONDS);
        public WriteTimeoutHandler(long timeout, TimeUnit unit) {
            ObjectUtil.checkNotNull(unit, "unit");
            if (timeout <= 0L) {
                this.timeoutNanos = 0L;
            } else {
                this.timeoutNanos = Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            if (this.timeoutNanos > 0L) {
                promise = promise.unvoid();
                this.scheduleTimeout(ctx, promise);
            ctx.write(msg, promise);
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            WriteTimeoutHandler.WriteTimeoutTask task = this.lastTask;
            WriteTimeoutHandler.WriteTimeoutTask prev;
            for(this.lastTask = null; task != null; task = prev) {
                task.scheduledFuture.cancel(false);
                prev = task.prev;
                task.prev = null;
                task.next = null;
        private void scheduleTimeout(ChannelHandlerContext ctx, ChannelPromise promise) {
            WriteTimeoutHandler.WriteTimeoutTask task = new WriteTimeoutHandler.WriteTimeoutTask(ctx, promise);
            task.scheduledFuture = ctx.executor().schedule(task, this.timeoutNanos, TimeUnit.NANOSECONDS);
            if (!task.scheduledFuture.isDone()) {
                this.addWriteTimeoutTask(task);
                promise.addListener(task);
        private void addWriteTimeoutTask(WriteTimeoutHandler.WriteTimeoutTask task) {
            if (this.lastTask != null) {
                this.lastTask.next = task;
                task.prev = this.lastTask;
            this.lastTask = task;
        private void removeWriteTimeoutTask(WriteTimeoutHandler.WriteTimeoutTask task) {
            if (task == this.lastTask) {
                assert task.next == null;
                this.lastTask = this.lastTask.prev;
                if (this.lastTask != null) {
                    this.lastTask.next = null;
            } else {
                if (task.prev == null && task.next == null) {
                    return;
                if (task.prev == null) {
                    task.next.prev = null;
                } else {
                    task.prev.next = task.next;
                    task.next.prev = task.prev;
            task.prev = null;
            task.next = null;
        protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
            if (!this.closed) {
                ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
                ctx.close();
                this.closed = true;
      //...
    

    从上述源码可以看出,WriteTimeoutHandler 在处理超时时,引发了 WriteTimeoutException 异常。

    WriteTimeoutHandler 的使用示例如下:

    public class MyChannelInitializer extends ChannelInitializer<Channel> {
        @Override
        protected void initChannel(Channel channel) throws Exception {
            channel.pipeline().addLast("writeTimeoutHandler",new WriteTimeoutHandler(30));
            channel.pipeline().addLast("myHandler",new MyHandler());
    //处理器处理ReadTimeoutException 
    public class MyHandler extends ChannelDuplexHandler {
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            if(cause instanceof WriteTimeoutException ){
                //...
            }else {
                super.exceptionCaught(ctx,cause);
    

    在上述示例中,WriteTimeoutHandler 设置了写超时时间是 30 秒。

    实现心跳机制

    针对超时的解决方案——心跳机制。

    在程序开发中,心跳机制是非常常见的。其原理是,当连接闲置时可以发送一个心跳来维持连接。一般而言,心跳就是一段小的通信。

    1.定义心跳处理器

    public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {
    	// (1)心跳内容
    	private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled
    			.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
    					CharsetUtil.UTF_8));  
    	@Override
    	public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
    			throws Exception {
    		// (2)判断超时类型
    		if (evt instanceof IdleStateEvent) {
    			IdleStateEvent event = (IdleStateEvent) evt;
    			String type = "";
    			if (event.state() == IdleState.READER_IDLE) {
    				type = "read idle";
    			} else if (event.state() == IdleState.WRITER_IDLE) {
    				type = "write idle";
    			} else if (event.state() == IdleState.ALL_IDLE) {
    				type = "all idle";
    			// (3)发送心跳
    			ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(
    					ChannelFutureListener.CLOSE_ON_FAILURE);
    			System.out.println( ctx.channel().remoteAddress()+"超时类型:" + type);
    		} else {
    			super.userEventTriggered(ctx, evt);
    

    对上述代码说明:

    定义了心跳时,要发送的内容。

    判断是不是 IdleStateEvent 事件,是则处理。

    将心跳内容发送给客户端。

    2.定义 ChannelInitializer

    HeartbeatHandlerInitializer用于封装各类ChannelHandler,代码如下:

    public class HeartbeatHandlerInitializer extends ChannelInitializer<Channel> {
    	private static final int READ_IDEL_TIME_OUT = 4; // 读超时
    	private static final int WRITE_IDEL_TIME_OUT = 5;// 写超时
    	private static final int ALL_IDEL_TIME_OUT = 7; // 所有超时
    	@Override
    	protected void initChannel(Channel ch) throws Exception {
    		ChannelPipeline pipeline = ch.pipeline();
    		pipeline.addLast(new IdleStateHandler(READ_IDEL_TIME_OUT,
    				WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS)); // (1)
    		pipeline.addLast(new HeartbeatServerHandler()); // (2)
    

    对上述代码说明如下:

  • 添加了一个IdleStateHandler到 ChannelPipeline,并分别设置了读、写超时的时间。为了方便演示,将超时时间设置的比较短。
  • 添加了HeartbeatServerHandler,用来处理超时时,发送心跳。
  • 3.编写服务器

    服务器代码比较简单,启动后侦听 8083 端口。

    public final class HeartbeatServer {
        static final int PORT = 8083;
        public static void main(String[] args) throws Exception {
            // 配置服务器
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)
                 .option(ChannelOption.SO_BACKLOG, 100)
                 .handler(new LoggingHandler(LogLevel.INFO))
                 .childHandler(new HeartbeatHandlerInitializer());
                // 启动
                ChannelFuture f = b.bind(PORT).sync();
                f.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
    

    首先启动 HeartbeatServer,客户端用操作系统自带的 Telnet 程序即可:

    telnet 127.0.0.1 8083
    

    可以看到客户端与服务器的交互效果如下图。

    文章如果对你有帮助,看完记得点赞、关注、收藏哟。