SpringBoot+Netty+WebSocket实现消息发送的示例代码

作者:阿杜同学 时间:2023-08-16 00:02:52 

一.导入Netty依赖


<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>4.1.25.Final</version>
 </dependency>

二.搭建websocket服务器


@Component
public class WebSocketServer {

/**
 * 主线程池
 */
private EventLoopGroup bossGroup;
/**
 * 工作线程池
 */
private EventLoopGroup workerGroup;
/**
 * 服务器
 */
private ServerBootstrap server;
/**
 * 回调
 */
private ChannelFuture future;

public void start() {
 future = server.bind(9001);
 System.out.println("netty server - 启动成功");
}

public WebSocketServer() {
 bossGroup = new NioEventLoopGroup();
 workerGroup = new NioEventLoopGroup();

server = new ServerBootstrap();
 server.group(bossGroup, workerGroup)
   .channel(NioServerSocketChannel.class)
   .childHandler(new WebsocketInitializer());
}
}

三.初始化Websocket


public class WebsocketInitializer extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
 ChannelPipeline pipeline = ch.pipeline();
 // ------------------
 // 用于支持Http协议
 // ------------------
 // websocket基于http协议,需要有http的编解码器
 pipeline.addLast(new HttpServerCodec());
 // 对写大数据流的支持
 pipeline.addLast(new ChunkedWriteHandler());
 // 添加对HTTP请求和响应的聚合器:只要使用Netty进行Http编程都需要使用
 //设置单次请求的文件的大小
 pipeline.addLast(new HttpObjectAggregator(1024 * 64));
 //webSocket 服务器处理的协议,用于指定给客户端连接访问的路由 :/ws
 pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
 // 添加Netty空闲超时检查的支持
 // 1. 读空闲超时(超过一定的时间会发送对应的事件消息)
 // 2. 写空闲超时
 // 3. 读写空闲超时
 pipeline.addLast(new IdleStateHandler(4, 8, 12));
 //添加心跳处理
 pipeline.addLast(new HearBeatHandler());
 // 添加自定义的handler
 pipeline.addLast(new ChatHandler());

}
}

四.创建Netty *


@Component
public class NettyListener implements ApplicationListener<ContextRefreshedEvent> {

@Resource
private WebSocketServer websocketServer;

@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
 if(event.getApplicationContext().getParent() == null) {
  try {
   websocketServer.start();
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}
}

五.建立消息通道


public class UserChannelMap {
/**
 * 用户保存用户id与通道的Map对象
 */
// private static Map<String, Channel> userChannelMap;

/* static {
 userChannelMap = new HashMap<String, Channel>();
}*/

/**
 * 定义一个channel组,管理所有的channel
 * GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例
 */
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

/**
 * 存放用户与Chanel的对应信息,用于给指定用户发送消息
 */
private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>();

private UserChannelMap(){}
/**
 * 添加用户id与channel的关联
 * @param userNum
 * @param channel
 */
public static void put(String userNum, Channel channel) {
 userChannelMap.put(userNum, channel);
}

/**
 * 根据用户id移除用户id与channel的关联
 * @param userNum
 */
public static void remove(String userNum) {
 userChannelMap.remove(userNum);
}

/**
 * 根据通道id移除用户与channel的关联
 * @param channelId 通道的id
 */
public static void removeByChannelId(String channelId) {
 if(!StringUtils.isNotBlank(channelId)) {
  return;
 }
 for (String s : userChannelMap.keySet()) {
  Channel channel = userChannelMap.get(s);
  if(channelId.equals(channel.id().asLongText())) {
   System.out.println("客户端连接断开,取消用户" + s + "与通道" + channelId + "的关联");
   userChannelMap.remove(s);
   UserService userService = SpringUtil.getBean(UserService.class);
   userService.logout(s);
   break;
  }
 }
}

/**
 * 打印所有的用户与通道的关联数据
 */
public static void print() {
 for (String s : userChannelMap.keySet()) {
  System.out.println("用户id:" + s + " 通道:" + userChannelMap.get(s).id());
 }
}

/**
 * 根据好友id获取对应的通道
 * @param receiverNum 接收人编号
 * @return Netty通道
 */
public static Channel get(String receiverNum) {
 return userChannelMap.get(receiverNum);
}

/**
 * 获取channel组
 * @return
 */
public static ChannelGroup getChannelGroup() {
 return channelGroup;
}

/**
 * 获取用户channel map
 * @return
 */
public static ConcurrentHashMap<String,Channel> getUserChannelMap(){
 return userChannelMap;
}
}

六.自定义消息类型


public class Message {
/**
 * 消息类型
 */
private Integer type;
/**
 * 聊天消息
 */
private String message;
/**
 * 扩展消息字段
 */
private Object ext;
public Integer getType() {
 return type;
}

public void setType(Integer type) {
 this.type = type;
}

public MarketChatRecord getChatRecord() {
 return marketChatRecord;
}
public void setChatRecord(MarketChatRecord chatRecord) {
 this.marketChatRecord = chatRecord;
}

public Object getExt() {
 return ext;
}

public void setExt(Object ext) {
 this.ext = ext;
}

@Override
public String toString() {
 return "Message{" +
   "type=" + type +
   ", marketChatRecord=" + marketChatRecord +
   ", ext=" + ext +
   '}';
}

}

七.创建处理消息的handler


public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);

/**
 * 用来保存所有的客户端连接
 */
private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

/**
 *当Channel中有新的事件消息会自动调用
 */
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
 // 当接收到数据后会自动调用
 // 获取客户端发送过来的文本消息
 Gson gson = new Gson();
 log.info("服务器收到消息:{}",msg.text());
 System.out.println("接收到消息数据为:" + msg.text());
 Message message = gson.fromJson(msg.text(), Message.class);
//根据业务要求进行消息处理
 switch (message.getType()) {
  // 处理客户端连接的消息
  case 0:
   // 建立用户与通道的关联
  // 处理客户端发送好友消息
   break;
  case 1:
  // 处理客户端的签收消息
   break;
  case 2:
   // 将消息记录设置为已读
   break;
  case 3:
   // 接收心跳消息
   break;
  default:
   break;
 }

}

// 当有新的客户端连接服务器之后,会自动调用这个方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
 log.info("handlerAdded 被调用"+ctx.channel().id().asLongText());
 // 添加到channelGroup 通道组
 UserChannelMap.getChannelGroup().add(ctx.channel());
//  clients.add(ctx.channel());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 log.info("{异常:}"+cause.getMessage());
 // 删除通道
 UserChannelMap.getChannelGroup().remove(ctx.channel());
 UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
 ctx.channel().close();
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
 log.info("handlerRemoved 被调用"+ctx.channel().id().asLongText());
 //删除通道
 UserChannelMap.getChannelGroup().remove(ctx.channel());
 UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
 UserChannelMap.print();
}

}

八.处理心跳


public class HearBeatHandler extends ChannelInboundHandlerAdapter {

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
 if(evt instanceof IdleStateEvent) {
  IdleStateEvent idleStateEvent = (IdleStateEvent)evt;

if(idleStateEvent.state() == IdleState.READER_IDLE) {
   System.out.println("读空闲事件触发...");
  }
  else if(idleStateEvent.state() == IdleState.WRITER_IDLE) {
   System.out.println("写空闲事件触发...");
  }
  else if(idleStateEvent.state() == IdleState.ALL_IDLE) {
   System.out.println("---------------");
   System.out.println("读写空闲事件触发");
   System.out.println("关闭通道资源");
   ctx.channel().close();
  }
 }
}
}

搭建完成后调用测试

1.页面访问http://localhost:9001/ws
 2.端口号9001和访问路径ws都是我们在上边配置的,然后传入我们自定义的消息message类型。
3.大概流程:消息发送 :用户1先连接通道,然后发送消息给用户2,用户2若是在线直接可以发送给用户,若没在线可以将消息暂存在redis或者通道里,用户2链接通道的话,两者可以直接通讯。
消息推送 :用户1连接通道,根据通道id查询要推送的人是否在线,或者推送给所有人,这里我只推送给指定的人。

来源:https://blog.csdn.net/qq_35142561/article/details/108664780

标签:SpringBoot,Netty,WebSocket,消息发送
0
投稿

猜你喜欢

  • Android开发之实现GridView支付宝九宫格

    2023-01-26 15:26:16
  • Android编程实现滑动按钮功能详解

    2022-04-14 04:58:39
  • Java同步锁Synchronized底层源码和原理剖析(推荐)

    2023-09-25 08:36:22
  • java将一个目录下的所有数据复制到另一个目录下

    2023-01-08 15:11:44
  • javaweb学习总结——使用JDBC处理MySQL大数据

    2022-10-19 22:45:32
  • 浅析Java中对象的创建与对象的数据类型转换

    2023-11-26 09:20:07
  • java语法糖之jdk迭代的新特性汇总

    2022-07-09 10:05:19
  • Android开发环境安装和配置图文教程

    2023-08-04 16:58:26
  • C#控件picturebox实现图像拖拽和缩放

    2023-08-09 08:23:05
  • RestTemplate自定义请求失败异常处理示例解析

    2021-12-03 22:13:17
  • Javafx实现国际象棋游戏

    2021-07-21 18:39:47
  • 用c#实现简易的计算器功能实例代码

    2022-05-09 19:28:51
  • SpringbootJPA分页 PageRequest过时的替代方法

    2022-03-10 11:53:13
  • SpringBoot整合Dozer映射框架流程详解

    2023-03-08 02:23:48
  • Android Application级别自定义Toast

    2022-01-02 03:41:45
  • Android画板开发之添加背景和保存画板内容为图片

    2022-05-30 04:01:37
  • Java中Builder模式的实现详解

    2022-08-06 15:37:24
  • Android RecyclerView基本使用详解

    2023-07-24 21:13:30
  • 基于C# winform实现图片上传功能的方法

    2022-09-12 18:18:14
  • 使用C#调用系统API实现内存注入的代码

    2021-12-01 00:25:59
  • asp之家 软件编程 m.aspxhome.com