java WebSocket客户端断线重连的实现方法

作者:剑客阿良_ALiang 时间:2023-05-26 22:45:18 

目录
  • 前言

  • Maven依赖

  • 代码

前言

在工作中是否会遇到实用websocket客户端连接服务端的时候,网络波动,服务端断连的情况。会导致客户端被动断开连接。为了解决这个问题,需要对被动断开连接的情况进行捕获,并重新创建连接。这篇文章主要是提供可以直接使用的断线重连websocket客户端代码。

Maven依赖


       <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
           <optional>true</optional>
       </dependency>
       <dependency>
           <groupId>cn.hutool</groupId>
           <artifactId>hutool-all</artifactId>
           <version>5.5.2</version>
       </dependency>
       <dependency>
           <groupId>org.java-websocket</groupId>
           <artifactId>Java-WebSocket</artifactId>
           <version>1.5.1</version>
       </dependency>

代码

不废话,上代码。


package ai.guiji.csdn.ws.client;

import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.WebSocket;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.framing.Framedata;
import org.java_websocket.handshake.ServerHandshake;

import javax.net.ssl.*;
import java.net.Socket;
import java.net.URI;
import java.nio.ByteBuffer;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

/** @Author huyi @Date 2021/10/15 20:03 @Description: 重连websocket客户端 */
@Slf4j
public class ReConnectWebSocketClient {
 /** 字符串消息回调 */
 private Consumer<String> msgStr;
 /** 字节流消息回调 */
 private Consumer<ByteBuffer> msgByte;
 /** 异常回调 */
 private Consumer<Exception> error;
 /** 连接标识 */
 private String key;
 /** ws服务端连接 */
 private URI serverUri;
 /** 尝试重连标识 */
 private AtomicBoolean tryReconnect;
 /** 需要ping标识 */
 private AtomicBoolean needPing;
 /** websocket连接实体 */
 private WebSocketClient webSocketClient;
 /** 重连次数 */
 private AtomicInteger reConnectTimes;
 /** 连接结束标识 */
 private AtomicBoolean end;
 /** 连接后初始发送报文,这里也可以不需要,如果服务端主动断开连接,重连后可以继续推送报文的话。 */
 private String initReConnectReq;
 /** 结束回调 */
 private Consumer<String> endConsumer;

public ReConnectWebSocketClient(
     URI serverUri,
     String key,
     Consumer<String> msgStr,
     Consumer<ByteBuffer> msgByte,
     Consumer<Exception> error) {
   this.msgStr = msgStr;
   this.msgByte = msgByte;
   this.error = error;
   this.key = key;
   this.serverUri = serverUri;
   this.tryReconnect = new AtomicBoolean(false);
   this.needPing = new AtomicBoolean(true);
   this.reConnectTimes = new AtomicInteger(0);
   this.end = new AtomicBoolean(false);
   this.endConsumer = this::close;
   init();
 }

/** 初始化连接 */
 public void init() {
   // 创建连接
   createWebSocketClient();
   // ping线程
   circlePing();
 }

private void needReconnect() throws Exception {
   ThreadUtil.sleep(10, TimeUnit.SECONDS);
   int cul = reConnectTimes.incrementAndGet();
   if (cul > 3) {
     close("real stop");
     throw new Exception("服务端断连,3次重连均失败");
   }
   log.warn("[{}]第[{}]次断开重连", key, cul);
   if (tryReconnect.get()) {
     log.error("[{}]第[{}]次断开重连结果 -> 连接正在重连,本次重连请求放弃", key, cul);
     needReconnect();
     return;
   }
   try {
     tryReconnect.set(true);

if (webSocketClient.isOpen()) {
       log.warn("[{}]第[{}]次断开重连,关闭旧连接", key, cul);
       webSocketClient.closeConnection(2, "reconnect stop");
     }
     webSocketClient = null;
     createWebSocketClient();
     connect();
     if (StrUtil.hasBlank(initReConnectReq)) {
       send(initReConnectReq);
     }
   } catch (Exception exception) {
     log.error("[{}]第[{}]次断开重连结果 -> 连接正在重连,重连异常:[{}]", key, cul, exception.getMessage());
     needReconnect();
   } finally {
     tryReconnect.set(false);
   }
 }

private void createWebSocketClient() {
   webSocketClient =
       new WebSocketClient(serverUri) {
         @Override
         public void onOpen(ServerHandshake serverHandshake) {
           log.info("[{}]ReConnectWebSocketClient [onOpen]连接成功{}", key, getRemoteSocketAddress());
           tryReconnect.set(false);
         }

@Override
         public void onMessage(String text) {
           log.info("[{}]ReConnectWebSocketClient [onMessage]接收到服务端数据:text={}", key, text);
           msgStr.accept(text);
         }

@Override
         public void onMessage(ByteBuffer bytes) {
           log.info("[{}]ReConnectWebSocketClient [onMessage]接收到服务端数据:bytes={}", key, bytes);
           msgByte.accept(bytes);
         }

@Override
         public void onWebsocketPong(WebSocket conn, Framedata f) {
           log.info(
               "[{}]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode={}",
               key,
               f.getOpcode());
         }

@Override
         public void onClose(int i, String s, boolean b) {
           log.info("[{}]ReConnectWebSocketClient [onClose]关闭,s={},b={}", key, s, b);
           if (StrUtil.hasBlank(s) || s.contains("https")) {
             if (end.get()) {
               return;
             }
             try {
               needReconnect();
             } catch (Exception exception) {
               endConsumer.accept("reconnect error");
               error.accept(exception);
             }
           }
         }

@Override
         public void onError(Exception e) {
           log.info("[{}]ReConnectWebSocketClient [onError]异常,e={}", key, e);
           endConsumer.accept("error close");
           error.accept(e);
         }
       };
   if (serverUri.toString().contains("wss://")) {
     trustAllHosts(webSocketClient);
   }
 }

public void circlePing() {
   new Thread(
           () -> {
             while (needPing.get()) {
               if (webSocketClient.isOpen()) {
                 webSocketClient.sendPing();
               }
               ThreadUtil.sleep(5, TimeUnit.SECONDS);
             }
             log.warn("[{}]Ping循环关闭", key);
           })
       .start();
 }

/**
  * 连接
  *
  * @throws Exception 异常
  */
 public void connect() throws Exception {
   webSocketClient.connectBlocking(10, TimeUnit.SECONDS);
 }

/**
  * 发送
  *
  * @param msg 消息
  * @throws Exception 异常
  */
 public void send(String msg) throws Exception {
   this.initReConnectReq = msg;
   if (webSocketClient.isOpen()) {
     webSocketClient.send(msg);
   }
 }

/**
  * 关闭
  *
  * @param msg 关闭消息
  */
 public void close(String msg) {
   needPing.set(false);
   end.set(true);
   if (webSocketClient != null) {
     webSocketClient.closeConnection(3, msg);
   }
 }

/**
  * 忽略证书
  *
  * @param client
  */
 public void trustAllHosts(WebSocketClient client) {
   TrustManager[] trustAllCerts =
       new TrustManager[] {
         new X509ExtendedTrustManager() {

@Override
           public void checkClientTrusted(
               X509Certificate[] x509Certificates, String s, Socket socket)
               throws CertificateException {}

@Override
           public void checkServerTrusted(
               X509Certificate[] x509Certificates, String s, Socket socket)
               throws CertificateException {}

@Override
           public void checkClientTrusted(
               X509Certificate[] x509Certificates, String s, SSLEngine sslEngine)
               throws CertificateException {}

@Override
           public void checkServerTrusted(
               X509Certificate[] x509Certificates, String s, SSLEngine sslEngine)
               throws CertificateException {}

@Override
           public void checkClientTrusted(X509Certificate[] x509Certificates, String s)
               throws CertificateException {}

@Override
           public void checkServerTrusted(X509Certificate[] x509Certificates, String s)
               throws CertificateException {}

@Override
           public X509Certificate[] getAcceptedIssuers() {
             return null;
           }
         }
       };

try {
     SSLContext ssl = SSLContext.getInstance("SSL");
     ssl.init(null, trustAllCerts, new java.security.SecureRandom());
     SSLSocketFactory socketFactory = ssl.getSocketFactory();
     client.setSocketFactory(socketFactory);
   } catch (Exception e) {
     log.error("ReConnectWebSocketClient trustAllHosts 异常,e={0}", e);
   }
 }
}

代码说明:

1、参数的重连次数可以配置。

2、增加异步pingpong线程,一旦结束连接会自动关闭。

3、对字符串、字节流、异常都有回调措施。

测试代码方法


 public static void main(String[] args) throws Exception {
   ReConnectWebSocketClient client =
       new ReConnectWebSocketClient(
           new URI(String.format("wss://192.168.1.77:24009")),
           "test",
           // 字符串消息处理
           msg -> {
             // todo 字符串消息处理
             System.out.println("字符串消息:" + msg);
           },
           null,
           // 异常回调
           error -> {
             // todo 字符串消息处理
             System.out.println("异常:" + error.getMessage());
           });
   client.connect();
   client.send("haha");
 }

验证结果

16:08:54.468 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onOpen]连接成功/192.168.1.77:24009
16:08:54.475 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onMessage]接收到服务端数据:text=connect success from tcp4:192.168.6.63:11018!
字符串消息:connect success from tcp4:192.168.6.63:11018!
16:08:56.080 [WebSocketConnectReadThread-12] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]关闭,s=,b=true
16:09:06.097 [WebSocketConnectReadThread-12] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]第[1]次断开重连
16:09:06.150 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onOpen]连接成功/192.168.1.77:24009
16:09:06.150 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onMessage]接收到服务端数据:text=connect success from tcp4:192.168.6.63:11038!
字符串消息:connect success from tcp4:192.168.6.63:11038!
16:09:09.369 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:14.370 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:19.371 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:24.379 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:29.382 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:34.398 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:39.402 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:44.404 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:49.415 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:54.429 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:09:59.437 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:10:04.449 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:10:06.154 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:10:09.455 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:10:14.462 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:10:19.468 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onWebsocketPong]接收到服务端数据:opcode=PONG
16:10:19.644 [WebSocketConnectReadThread-16] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]关闭,s=,b=true
16:10:29.654 [WebSocketConnectReadThread-16] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]第[2]次断开重连
16:10:31.710 [WebSocketConnectReadThread-19] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onError]异常,e={}
java.net.ConnectException: Connection refused: connect
 at java.net.DualStackPlainSocketImpl.connect0(Native Method)
 at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:79)
 at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
 at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
 at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
 at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
 at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
 at java.net.Socket.connect(Socket.java:589)
 at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:673)
 at org.java_websocket.client.WebSocketClient.run(WebSocketClient.java:461)
 at java.lang.Thread.run(Thread.java:748)
16:10:31.710 [WebSocketConnectReadThread-19] INFO ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]ReConnectWebSocketClient [onClose]关闭,s=error close,b=false
异常:Connection refused: connect
16:10:34.473 [Thread-0] WARN ai.guiji.csdn.ws.client.ReConnectWebSocketClient - [test]Ping循环关闭

这里我才用的是手动关闭服务端方式触发,客户端被动断连情况。重连两次,第二次服务端还未启动导致异常触发。

来源:https://blog.csdn.net/zhiweihongyan1/article/details/120956762

标签:java,WebSocket,客户端,断线重连
0
投稿

猜你喜欢

  • IDEA无法使用Git Pull的问题

    2023-05-04 10:55:39
  • C#的泛型方法解析

    2022-10-17 10:20:54
  • Java实体类不要使用基本类型的知识点总结

    2023-02-21 10:04:49
  • Javassist之一秒理解java动态编程

    2023-11-09 09:36:33
  • C语言预处理预编译命令及宏定义详解

    2023-06-18 16:28:06
  • JVM中的flag设置详解

    2022-08-11 01:37:20
  • java合并多个文件的实例代码

    2023-07-28 12:56:02
  • 如何利用泛型封装通用的service层

    2023-05-15 04:55:43
  • springboot编程式事务TransactionTemplate的使用说明

    2022-03-01 15:19:37
  • Android实现拍照截图功能

    2023-06-21 04:43:23
  • C# 判断字符串为空的几种办法

    2023-05-21 16:06:44
  • SpringBoot多线程进行异步请求的处理方式

    2021-11-10 10:48:30
  • Java 并发编程学习笔记之Synchronized简介

    2023-09-01 03:24:05
  • c#解析jobject的数据结构

    2023-09-28 00:25:33
  • Android实现自由拖动并显示文字的悬浮框

    2023-06-08 06:30:39
  • 解决idea web项目中out目录更新不同步问题

    2023-01-30 01:32:28
  • Java基础题新手练习(二)

    2022-03-10 00:11:57
  • 使用idea+gradle编译spring5.x.x源码分析

    2022-05-13 15:13:31
  • Eclipse中改变默认的workspace的方法及说明详解

    2022-07-31 12:07:21
  • Android入门之画图详解

    2023-11-09 11:47:47
  • asp之家 软件编程 m.aspxhome.com