Java Netty实现心跳机制过程解析

作者:逃离沙漠 时间:2023-05-24 21:27:10 

netty心跳机制示例,使用Netty实现心跳机制,使用netty4,IdleStateHandler 实现。Netty心跳机制,netty心跳检测,netty,心跳

本文假设你已经了解了Netty的使用,或者至少写过netty的helloworld,知道了netty的基本使用。我们知道使用netty的时候,大多数的东西都与Handler有关,我们的业务逻辑基本都是在Handler中实现的。Netty中自带了一个IdleStateHandler 可以用来实现心跳检测。

心跳检测的逻辑

本文中我们将要实现的心跳检测逻辑是这样的:服务端启动后,等待客户端连接,客户端连接之后,向服务端发送消息。如果客户端在“干活”那么服务端必定会收到数据,如果客户端“闲下来了”那么服务端就接收不到这个客户端的消息,既然客户端闲下来了,不干事,那么何必浪费连接资源呢?所以服务端检测到一定时间内客户端不活跃的时候,将客户端连接关闭。本文要实现的逻辑步骤为:

  • 启动服务端,启动客户端

  • 客户端向服务端发送"I am alive",并sleep随机时间,用来模拟空闲。

  • 服务端接收客户端消息,并返回"copy that",客户端空闲时 计数+1.

  • 服务端客户端继续通信

  • 服务端检测客户端空闲太多,关闭连接。客户端发现连接关闭了,就退出了。

有了这个思路,我们先来编写服务端。

心跳检测服务端代码


public class HeartBeatServer {

int port ;
 public HeartBeatServer(int port){
   this.port = port;
 }

public void start(){
   ServerBootstrap bootstrap = new ServerBootstrap();
   EventLoopGroup boss = new NioEventLoopGroup();
   EventLoopGroup worker = new NioEventLoopGroup();
   try{
     bootstrap.group(boss,worker)
         .handler(new LoggingHandler(LogLevel.INFO))
         .channel(NioServerSocketChannel.class)
         .childHandler(new HeartBeatInitializer());

ChannelFuture future = bootstrap.bind(port).sync();
     future.channel().closeFuture().sync();
   }catch(Exception e){
     e.printStackTrace();
   }finally {
     worker.shutdownGracefully();
     boss.shutdownGracefully();
   }
 }
 public static void main(String[] args) throws Exception {
   HeartBeatServer server = new HeartBeatServer(8090);
   server.start();
 }
}

熟悉netty的同志,对于上面的模板一样的代码一定是在熟悉不过了。啥都不用看,只需要看childHandler(new HeartBeatInitializer()) 这一句。HeartBeatInitializer就是一个ChannelInitializer顾名思义,他就是在初始化channel的时做一些事情。我们所需要开发的业务逻辑Handler就是在这里添加的。其代码如下:


public class HeartBeatInitializer extends ChannelInitializer<Channel> {

@Override
 protected void initChannel(Channel channel) throws Exception {
   ChannelPipeline pipeline = channel.pipeline();
   pipeline.addLast("decoder", new StringDecoder());
   pipeline.addLast("encoder", new StringEncoder());
   pipeline.addLast(new IdleStateHandler(2,2,2, TimeUnit.SECONDS));
   pipeline.addLast(new HeartBeatHandler());
 }
}

代码很简单,我们先添加了StringDecoder,和StringEncoder。这两个其实就是编解码用的,下面的IdleStateHandler才是本次心跳的核心组件。我们可以看到IdleStateHandler的构造函数中接收了4个参数,其定义如下:


public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit);

三个空闲时间参数,以及时间参数的格式。我们的例子中设置的是2,2,2,意思就是客户端2秒没有读/写,这个超时时间就会被触发。超时事件触发就需要我们来处理了,这就是上的HeartBeatInitializer中最后一行的HeartBeatHandler所做的事情。代码如下:


public class HeartBeatHandler extends SimpleChannelInboundHandler<String> {

int readIdleTimes = 0;

@Override
 protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
   System.out.println(" ====== > [server] message received : " + s);
   if("I am alive".equals(s)){
     ctx.channel().writeAndFlush("copy that");
   }else {
     System.out.println(" 其他信息处理 ... ");
   }
 }

@Override
 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
   IdleStateEvent event = (IdleStateEvent)evt;

String eventType = null;
   switch (event.state()){
     case READER_IDLE:
       eventType = "读空闲";
       readIdleTimes ++; // 读空闲的计数加1
       break;
     case WRITER_IDLE:
       eventType = "写空闲";
       // 不处理
       break;
     case ALL_IDLE:
       eventType ="读写空闲";
       // 不处理
       break;
   }
   System.out.println(ctx.channel().remoteAddress() + "超时事件:" +eventType);
   if(readIdleTimes > 3){
     System.out.println(" [server]读空闲超过3次,关闭连接");
     ctx.channel().writeAndFlush("you are out");
     ctx.channel().close();
   }
 }
 @Override
 public void channelActive(ChannelHandlerContext ctx) throws Exception {
   System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
 }

}

至此,我们的服务端写好了。

心跳检测客户端代码

netty的api设计使得编码的模式非常具有通用性,所以客户端代码和服务端的代码几乎一样:启动client端的代码几乎一样,也需要一个ChannelInitializer,也需要Handler。改动的地方很少,因此本文不对客户端代码进行详细解释。下面给出client端的完整代码:


public class HeartBeatClient {

int port;
 Channel channel;
 Random random ;

public HeartBeatClient(int port){
   this.port = port;
   random = new Random();
 }
 public static void main(String[] args) throws Exception{
   HeartBeatClient client = new HeartBeatClient(8090);
   client.start();
 }

public void start() {
   EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
   try{
     Bootstrap bootstrap = new Bootstrap();
     bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
         .handler(new HeartBeatClientInitializer());

connect(bootstrap,port);
     String text = "I am alive";
     while (channel.isActive()){
       sendMsg(text);
     }
   }catch(Exception e){
     // do something
   }finally {
     eventLoopGroup.shutdownGracefully();
   }
 }

public void connect(Bootstrap bootstrap,int port) throws Exception{
   channel = bootstrap.connect("localhost",8090).sync().channel();
 }

public void sendMsg(String text) throws Exception{
   int num = random.nextInt(10);
   Thread.sleep(num * 1000);
   channel.writeAndFlush(text);
 }

static class HeartBeatClientInitializer extends ChannelInitializer<Channel> {

@Override
   protected void initChannel(Channel ch) throws Exception {
     ChannelPipeline pipeline = ch.pipeline();
     pipeline.addLast("decoder", new StringDecoder());
     pipeline.addLast("encoder", new StringEncoder());
     pipeline.addLast(new HeartBeatClientHandler());
   }
 }

static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
     System.out.println(" client received :" +msg);
     if(msg!= null && msg.equals("you are out")) {
       System.out.println(" server closed connection , so client will close too");
       ctx.channel().closeFuture();
     }
   }
 }
}

运行代码

在上面的代码写好之后,我们先启动服务端,然后在启动客户端。运行日志如下:

server端:


=== /127.0.0.1:57700 is active ===
====== > [server] message received : I am alive
====== > [server] message received : I am alive
/127.0.0.1:57700超时事件:写空闲
/127.0.0.1:57700超时事件:读空闲
/127.0.0.1:57700超时事件:读写空闲
/127.0.0.1:57700超时事件:写空闲
/127.0.0.1:57700超时事件:读空闲
/127.0.0.1:57700超时事件:读写空闲
/127.0.0.1:57700超时事件:写空闲
====== > [server] message received : I am alive
/127.0.0.1:57700超时事件:写空闲
/127.0.0.1:57700超时事件:读写空闲
/127.0.0.1:57700超时事件:读空闲
/127.0.0.1:57700超时事件:写空闲
/127.0.0.1:57700超时事件:读写空闲
/127.0.0.1:57700超时事件:读空闲
[server]读空闲超过3次,关闭连接

client端:


client sent msg and sleep 2
client received :copy that
client received :copy that
client sent msg and sleep 6
client sent msg and sleep 6
client received :copy that
client received :you are out
server closed connection , so client will close too
Process finished with exit code 0

通过上面的运行日志,我们可以看到:

1.客户端在与服务器成功建立之后,发送了3次'I am alive',服务端也回应了3次:'copy that'

2.由于客户端消极怠工,超时了多次,服务端关闭了链接。

3.客户端知道服务端抛弃自己之后,也关闭了连接,程序退出。

以上简单了演示了一下,netty的心跳机制,其实主要就是使用了IdleStateHandler。源码下载

来源:https://www.cnblogs.com/demingblog/p/9957143.html

标签:Java,Netty,心跳,机制
0
投稿

猜你喜欢

  • C++实现优先队列的示例详解

    2022-04-06 22:14:20
  • Java实现窗体程序显示日历

    2022-09-14 11:01:59
  • C# 遍历枚举类型的所有元素

    2023-02-06 00:27:46
  • 解决IDEA中maven导入jar包一直报错问题

    2021-12-26 11:52:22
  • Java实现输出数字三角形实例代码

    2023-08-25 02:09:51
  • java实现仿射密码加密解密

    2022-10-09 04:04:49
  • 详解Spring中的Environment外部化配置管理

    2023-11-23 05:24:24
  • 如何利用java实现生成PDF文件

    2023-03-31 16:25:36
  • android 下载时文件名是中文和空格会报错解决方案

    2023-05-25 04:45:19
  • Java实现简单的日历界面

    2021-10-08 03:13:01
  • 关于Mybatis-Plus Wrapper是否应该出现在Servcie类中

    2023-11-28 22:04:56
  • C#遍历List并删除某个元素的方法

    2023-03-22 10:22:08
  • Android nativePollOnce函数解析

    2022-05-29 01:51:26
  • SpringMVC 参数绑定相关知识总结

    2022-06-05 12:50:54
  • C#利用性能计数器监控网络状态

    2022-01-05 00:13:53
  • Java和C#输入输出流的方法(详解)

    2022-06-24 09:21:02
  • C# 设计模式系列教程-简单工厂模式

    2023-10-31 13:49:29
  • 在SpringBoot项目中的使用Swagger的方法示例

    2022-01-04 15:14:39
  • Spring Boot集成MyBatis的方法

    2021-11-03 23:11:05
  • 深入理解Android手势识别

    2021-08-12 11:05:16
  • asp之家 软件编程 m.aspxhome.com