Springboot整合mqtt服务的示例代码

作者:李泰山 时间:2022-07-20 02:58:01 

首先在pom文件里引入mqtt的依赖配置

<!--mqtt-->
       <dependency>
           <groupId>org.eclipse.paho</groupId>
           <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
           <version>1.2.4</version>
       </dependency>

其次在springboot 的配置yml文件,配置mqtt的服务配置

spring:  
 mqtt:
   url: tcp://127.0.0.1:1883
   client-id: niubility-tiger
   username:
   password:
   topic: [/unify/test]

创建 MqttProperties配置参数类

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

@Data
@ConfigurationProperties("spring.mqtt")
public class MqttProperties {
   private String url;
   private String clientId;
   private String username;
   private String password;
   private String[] topic;
}

创建 MqttConfiguration 配置类

import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.core.tool.utils.Func;
import org.springblade.ubw.listener.MqttSubscribeListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableConfigurationProperties({MqttProperties.class})
public class MqttConfiguration {
   private static final Logger log = LoggerFactory.getLogger(MqttConfiguration.class);
   @Autowired
   private MqttProperties mqttProperties;

public MqttConfiguration() {
   }

@Bean
   public MqttConnectOptions mqttConnectOptions() {
       MqttConnectOptions connectOptions = new MqttConnectOptions();
       connectOptions.setServerURIs(new String[]{this.mqttProperties.getUrl()});
       if (Func.isNotBlank(this.mqttProperties.getUrl())) {
           connectOptions.setUserName(this.mqttProperties.getUsername());
       }

if (Func.isNotBlank(this.mqttProperties.getPassword())) {
           connectOptions.setPassword(this.mqttProperties.getPassword().toCharArray());
       }

connectOptions.setKeepAliveInterval(60);
       return connectOptions;
   }

@Bean
   public IMqttClient mqttClient(MqttConnectOptions options) throws MqttException {
       IMqttClient mqttClient = new MqttClient(this.mqttProperties.getUrl(), this.mqttProperties.getClientId());
       mqttClient.connect(options);
       for(int i = 0; i< this.mqttProperties.getTopic().length; ++i) {
           mqttClient.subscribe(this.mqttProperties.getTopic()[i], new MqttSubscribeListener());
       }
       return mqttClient;
   }
}

创建 订阅事件类

import org.springframework.context.ApplicationEvent;

public class UWBMqttSubscribeEvent extends ApplicationEvent {
   private String topic;

public UWBMqttSubscribeEvent(String topic, Object source) {
       super(source);
       this.topic = topic;
   }

public String getTopic() {
       return this.topic;
   }
}

创建订阅事件 *

import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springblade.core.tool.utils.SpringUtil;
import org.springblade.ubw.event.UWBMqttSubscribeEvent;

public class MqttSubscribeListener implements IMqttMessageListener {

@Override
   public void messageArrived(String s, MqttMessage mqttMessage) {
       String content = new String(mqttMessage.getPayload());
       UWBMqttSubscribeEvent event = new UWBMqttSubscribeEvent(s, content);
       SpringUtil.publishEvent(event);
   }
}

创建mqtt消息事件异步处理 *

import com.baomidou.mybatisplus.core.toolkit.StringPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.core.tool.utils.Func;
import org.springblade.ubw.config.MqttProperties;
import org.springblade.ubw.event.UWBMqttSubscribeEvent;
import org.springblade.ubw.service.MqttService;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;

import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;

@Configuration
public class MqttEventListener {

private static final Logger log = LoggerFactory.getLogger(MqttEventListener.class);

@Resource
   private MqttProperties mqttProperties;

@Resource
   private MqttService mqttService;

private String processTopic (String topic) {
       List<String> topics = Arrays.asList(mqttProperties.getTopic());
       for (String wild : topics) {
           wild = wild.replace(StringPool.HASH, StringPool.EMPTY);
           if (topic.startsWith(wild)) {
               return topic.replace(wild, StringPool.EMPTY);
           }
       }
       return StringPool.EMPTY;
   }

@Async
   @EventListener(UWBMqttSubscribeEvent.class)
   public void listen (UWBMqttSubscribeEvent event) {
       String topic = processTopic(event.getTopic());
       Object source = event.getSource();
       if (Func.isEmpty(source)) {
           return;
       }
       mqttService.issue(topic,source);
//        log.info("mqtt接收到 通道 {} 的信息为:{}",topic,source);
   }
}

创建MqttService 数据处理服务类

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springblade.core.tool.utils.Func;
import org.springblade.ubw.area.entity.WorkArea;
import org.springblade.ubw.area.entity.WorkSite;
import org.springblade.ubw.area.entity.WorkSiteNeighbourInfo;
import org.springblade.ubw.area.entity.WorkSitePassInfo;
import org.springblade.ubw.area.service.WorkAreaService;
import org.springblade.ubw.area.service.WorkSiteNeighbourInfoService;
import org.springblade.ubw.area.service.WorkSitePassInfoService;
import org.springblade.ubw.area.service.WorkSiteService;
import org.springblade.ubw.constant.UbwConstant;
import org.springblade.ubw.history.entity.HistoryLocusInfo;
import org.springblade.ubw.history.entity.HistoryOverTimeSosAlarmInfo;
import org.springblade.ubw.history.service.HistoryLocusInfoService;
import org.springblade.ubw.history.service.HistoryOverTimeSosAlarmInfoService;
import org.springblade.ubw.loc.entity.LocStatusInfo;
import org.springblade.ubw.loc.entity.LocStatusInfoHistory;
import org.springblade.ubw.loc.service.LocStatusInfoHistoryService;
import org.springblade.ubw.loc.service.LocStatusInfoService;
import org.springblade.ubw.msg.entity.*;
import org.springblade.ubw.msg.service.*;
import org.springblade.ubw.system.entity.*;
import org.springblade.ubw.system.service.*;
import org.springblade.ubw.system.wrapper.MqttWrapper;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;

@Service
public class MqttService {

private static final Logger log = LoggerFactory.getLogger(MqttService.class);

@Resource
   private EmployeeAndDepartmentService employeeAndDepartmentService;

@Resource
   private VehicleInfoService vehicleInfoService;

@Resource
   private WorkSiteService workSiteService;

@Resource
   private LocStatusInfoService locStatusInfoService;

@Resource
   private LocStatusInfoHistoryService locStatusInfoHistoryService;

@Resource
   private LocOverTimeSosAlarminfoService locOverTimeSosAlarminfoService;

@Resource
   private LocAreaOverSosAlarminfoService locAreaOverSosAlarmInfoService;

@Resource
   private LocSosAlarminfoService locSosAlarminfoService;

@Resource
   private AttendanceInfoService attendanceInfoService;

@Resource
   private HistoryLocusInfoService historyLocusInfoService;

@Resource
   private WorkSitePassInfoService workSitePassInfoService;

@Resource
   private EnvironmentalMonitorInfoService environmentalMonitorInfoService;

@Resource
   private TrAlertService trAlertService;

@Resource
   private AddEvacuateInfoService addEvacuateInfoService;

@Resource
   private CancelEvacuateInfoService cancelEvacuateInfoService;

@Resource
   private WorkSiteNeighbourInfoService workSiteNeighbourInfoService;

@Resource
   private LinkMsgAlarmInfoService linkMsgAlarmInfoService;

@Resource
   private LeaderEmployeeInfoService leaderEmployeeInfoService;

@Resource
   private ElectricMsgInfoService electricMsgInfoService;

@Resource
   private WorkAreaService workAreaService;

@Resource
   private HistoryOverTimeSosAlarmInfoService historyOverTimeSosAlarmInfoService;

@Resource
   private SpecialWorksService specialWorksService;

@Resource
   private AttendanceLocusInfoService attendanceLocusInfoService;

@Resource
   private WorkTypeService workTypeService;

@Resource
   private OfficePositionService officePositionService;

@Resource
   private ClassTeamService classTeamService;

/**
    * 方法描述: 消息分发
    *
    * @param topic
    * @param source
    * @author liwenbin
    * @date 2021年12月14日 14:14:09
    */
   public void issue(String topic,Object source){
       switch(topic){
           case UbwConstant.TOPIC_EMP :
               //人员和部门信息
               employeeAndDepartmentService.saveBatch(source);
               break;
           case UbwConstant.TOPIC_VEHICLE :
               //车辆信息
               List<VehicleInfo> vehicleInfos = MqttWrapper.build().toEntityList(source,new VehicleInfo());
               vehicleInfoService.deleteAll();
               vehicleInfoService.saveBatch(vehicleInfos);
               break;
           case UbwConstant.TOPIC_WORK_SITE :
               //基站信息
               List<WorkSite> workSites = MqttWrapper.build().toEntityList(source,new WorkSite());
               workSiteService.deleteAll();
               workSiteService.saveBatch(workSites);
               break;
           case UbwConstant.TOPIC_LOC_STATUS:
               //井下车辆人员实时
               List<LocStatusInfo> locStatusInfos = MqttWrapper.build().toEntityList(source,new LocStatusInfo());
               if (Func.isEmpty(locStatusInfos)){
                   break;
               }
               locStatusInfoService.deleteAll();
               //筛选入井人员列表
               List<LocStatusInfo> inWellList = locStatusInfos.stream().filter(s -> s.getIsInWell() == 1).collect(Collectors.toList());
               locStatusInfoService.saveBatch(inWellList);
               //人员历史数据入库
               List<LocStatusInfoHistory> locStatusInfoHistorys = MqttWrapper.build().toEntityList(source,new LocStatusInfoHistory());
               locStatusInfoHistoryService.saveBatch(locStatusInfoHistorys);
               break;
           case UbwConstant.TOPIC_LOC_OVER_TIME:
               //超时报警信息
               List<LocOverTimeSosAlarminfo> locOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocOverTimeSosAlarminfo());
               locOverTimeSosAlarminfoService.saveBatch(locOverTimeSosAlarmInfos);
               break;
           case UbwConstant.TOPIC_LOC_OVER_AREA:
               //超员报警信息
               List<LocAreaOverSosAlarminfo> locAreaOverSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocAreaOverSosAlarminfo());
               locAreaOverSosAlarmInfoService.saveBatch(locAreaOverSosAlarmInfos);
               break;
           case UbwConstant.TOPIC_LOC_SOS:
               //求救报警信息
               List<LocSosAlarminfo> locSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocSosAlarminfo());
               locSosAlarminfoService.saveBatch(locSosAlarmInfos);
               break;
           case UbwConstant.TOPIC_ATTEND:
               //考勤信息
               List<AttendanceInfo> attendanceInfos = MqttWrapper.build().toEntityList(source,new AttendanceInfo());
               attendanceInfoService.saveBatch(attendanceInfos);
               break;
           case UbwConstant.TOPIC_HISTORY_LOCUS:
               //精确轨迹信息
               List<HistoryLocusInfo> historyLocusInfos = MqttWrapper.build().toEntityList(source,new HistoryLocusInfo());
               historyLocusInfoService.saveBatch(historyLocusInfos);
               break;
           case UbwConstant.TOPIC_WORK_SITE_PASS:
               //基站经过信息
               List<WorkSitePassInfo> workSitePassInfos = MqttWrapper.build().toEntityList(source,new WorkSitePassInfo());
               workSitePassInfoService.saveBatch(workSitePassInfos);
               break;
           case UbwConstant.TOPIC_ENV_MON:
               //环境监测信息
               List<EnvironmentalMonitorInfo> environmentalMonitorInfos = MqttWrapper.build().toEntityList(source,new EnvironmentalMonitorInfo());
               environmentalMonitorInfoService.saveBatch(environmentalMonitorInfos);
               break;
           case UbwConstant.TOPIC_TR_ALERT:
               //环境监测报警信息
               List<TrAlert> trAlerts = MqttWrapper.build().toEntityList(source,new TrAlert());
               trAlertService.saveBatch(trAlerts);
               break;
           case UbwConstant.TOPIC_ADD_EVA:
               //下发撤离信息
               List<AddEvacuateInfo> addEvacuateInfos = MqttWrapper.build().toEntityList(source,new AddEvacuateInfo());
               addEvacuateInfoService.saveBatch(addEvacuateInfos);
               break;
           case UbwConstant.TOPIC_CANCEL_EVA:
               //取消撤离信息
               List<CancelEvacuateInfo> cancelEvacuateInfos = MqttWrapper.build().toEntityList(source,new CancelEvacuateInfo());
               cancelEvacuateInfoService.saveBatch(cancelEvacuateInfos);
               break;
           case UbwConstant.TOPIC_WORK_SITE_NEI:
               //相邻基站关系信息
               workSiteNeighbourInfoService.deleteAll();
               List<WorkSiteNeighbourInfo> workSiteNeighbourInfos = MqttWrapper.build().toEntityList(source,new WorkSiteNeighbourInfo());
               workSiteNeighbourInfoService.saveBatch(workSiteNeighbourInfos);
               break;
           case UbwConstant.TOPIC_LINK_MSG:
               //基站链路信息
               linkMsgAlarmInfoService.deleteAll();
               List<LinkMsgAlarmInfo> linkMsgAlarmInfos = MqttWrapper.build().toEntityList(source,new LinkMsgAlarmInfo());
               linkMsgAlarmInfoService.saveBatch(linkMsgAlarmInfos);
               break;
           case UbwConstant.TOPIC_LEADER_EMP:
               //带班领导信息
               leaderEmployeeInfoService.deleteAll();
               List<LeaderEmployeeInfo> leaderEmployeeInfos = MqttWrapper.build().toEntityList(source,new LeaderEmployeeInfo());
               leaderEmployeeInfoService.saveBatch(leaderEmployeeInfos);
               break;
           case UbwConstant.TOPIC_ELE_MSG:
               //低电报警信息
               List<ElectricMsgInfo> electricMsgInfos = MqttWrapper.build().toEntityList(source,new ElectricMsgInfo());
               electricMsgInfoService.saveBatch(electricMsgInfos);
               break;
           case UbwConstant.TOPIC_WORK_AREA:
               //区域信息
               workAreaService.deleteAll();
               List<WorkArea> workAreas = MqttWrapper.build().toEntityList(source,new WorkArea());
               workAreaService.saveBatch(workAreas);
               break;
           case UbwConstant.TOPIC_HIS_OVER_TIME_SOS:
               //历史超时报警信息
               List<HistoryOverTimeSosAlarmInfo> historyOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new HistoryOverTimeSosAlarmInfo());
               historyOverTimeSosAlarmInfoService.saveBatch(historyOverTimeSosAlarmInfos);
               break;
           case UbwConstant.TOPIC_SPECIAL_WORK:
               //特种人员预设线路信息
               specialWorksService.deleteAll();
               List<SpecialWorks> specialWorks = MqttWrapper.build().toEntityList(source,new SpecialWorks());
               specialWorksService.saveBatch(specialWorks);
               break;
           case UbwConstant.TOPIC_ATTEND_LOC:
               //历史考勤轨迹信息
               List<AttendanceLocusInfo> attendanceLocusInfos = MqttWrapper.build().toEntityList(source,new AttendanceLocusInfo());
               attendanceLocusInfoService.saveBatch(attendanceLocusInfos);
               break;
           case UbwConstant.TOPIC_WORK_TYPE:
               //工种信息
               workTypeService.deleteAll();
               List<WorkType> workTypes = MqttWrapper.build().toEntityList(source,new WorkType());
               workTypeService.saveBatch(workTypes);
               break;
           case UbwConstant.TOPIC_OFFICE_POS:
               //职务信息
               officePositionService.deleteAll();
               List<OfficePosition> officePositions = MqttWrapper.build().toEntityList(source,new OfficePosition());
               officePositionService.saveBatch(officePositions);
               break;
           case UbwConstant.TOPIC_CLASS_TEAM:
               //班组信息
               classTeamService.deleteAll();
               List<ClassTeam> classTeams = MqttWrapper.build().toEntityList(source,new ClassTeam());
               classTeamService.saveBatch(classTeams);
               break;
           default : //可选
               break;
       }
   }
}

完结,小伙伴们,可以根据这个demo 改造自己的mqtt服务处理!!!

来源:https://blog.csdn.net/weixin_40986713/article/details/123572101

标签:Springboot,mqtt
0
投稿

猜你喜欢

  • Java下载文件时文件名乱码问题解决办法

    2023-08-23 17:37:03
  • Java进阶:Struts多模块的技巧

    2023-06-18 09:40:47
  • 详解Java注解的实现与使用方法

    2023-10-31 12:33:20
  • Java加载property文件配置过程解析

    2023-10-07 07:53:03
  • Springboot 内部服务调用方式

    2023-08-24 00:32:20
  • JAVA递归生成树形菜单的实现过程

    2023-07-15 08:57:22
  • Flutter 队列任务的实现

    2023-07-07 17:25:14
  • 值得Java开发者关注的7款新工具

    2023-11-02 23:05:31
  • SpringCloud可视化链路追踪系统Zipkin部署过程

    2023-11-27 04:34:06
  • C++实现的O(n)复杂度内查找第K大数算法示例

    2023-06-30 15:51:13
  • java实现学生信息管理系统

    2023-11-11 08:22:35
  • 用java WebSocket做一个聊天室

    2021-11-30 00:39:55
  • android 获取视频,图片缩略图的具体实现

    2023-07-28 00:20:39
  • netty pipeline中的inbound和outbound事件传播分析

    2023-08-27 06:57:00
  • JPA中EntityListeners注解的使用详解

    2023-08-04 21:39:18
  • Java实现拖拽列表项的排序功能

    2023-11-28 23:39:00
  • IDEA解决springboot热部署失效问题(推荐)

    2023-08-12 10:40:49
  • Java内存模型之happens-before概念详解

    2023-11-23 03:11:50
  • SpringBoot路径映射实现过程图解

    2023-11-13 04:01:11
  • MybatisPlus如何自定义TypeHandler映射JSON类型为List

    2023-08-08 14:05:38
  • asp之家 软件编程 m.aspxhome.com