springboot集成mq,spring boot mqtt

  springboot集成mq,spring boot mqtt

  首先在砰的一声文件里引入遥测传输的依赖配置

  !-mqtt-从属groupIdorg.eclipse.paho/groupId artifact id组织。月食。泛美卫生组织。客户。mqtt v3/artifactId版本1 . 2 . 4/版本/依赖关系其次在跳羚的配置阳明海运股份有限公司文件,配置遥测传输的服务配置

  spring : mqtt : URL : TCP ://127。0 .0 .1:1883客户端-id : nibility-tiger用户名:密码:主题: [/unify/test]创建MqttProperties配置参数类

  进口龙目岛。数据;导入org。spring框架。靴子。语境。属性。配置属性;@ Data @ configuration属性( spring。mqtt’)公共类MqttProperties {私有字符串URL私有字符串标识符私有字符串用户名;私有字符串密码;私有字符串[]主题;}创建遥测传输配置配置类

  导入org。月食。泛美卫生组织。客户。mqtt v3。imqttclient导入org。月食。泛美卫生组织。客户。mqttv 3。mqttclient导入org。月食。泛美卫生组织。客户。mqtt v3。mqttconnectoptions导入org。月食。泛美卫生组织。客户。mqtt v3。MQT例外;导入org。slf4j。记录者;导入org。SLF 4j。伐木工厂;导入org。弹簧刀。核心。工具。utils。func导入org。弹簧刀。UBW。听众。mqttsubscribelistener导入org。spring框架。豆子。工厂。注释。自动连线;导入组织。spring框架。靴子。语境。属性。enableconfigurationproperties导入org。spring框架。语境。注释。豆;导入org。spring框架。语境。注释。配置;@ Configuration @ EnableConfigurationProperties({ mqttproperties。class })公共类MqttConfiguration {私有静态最终记录器日志=记录器工厂。获取记录器(MqttConfiguration。类);@ auto wired private MqttProperties MqttProperties;public MqttConfiguration(){ } @ Bean public MqttConnectOptions MqttConnectOptions(){ MqttConnectOptions connect options=new MqttConnectOptions();连接选项。setserveris(新字符串[]{ this。mqttproperties。geturl()});if(func。isnotblank(这个。mqttproperties。geturl()){连接选项。设置用户名(这。mqttproperties。getusername())。} if(func。isnotblank(这个。mqttproperties。获取密码

  ())) { connectOptions.setPassword(this.mqttProperties.getPassword().toCharArray()); } connectOptions.setKeepAliveInterval(60); return connectOptions; } @Bean public IMqttClient mqttClient(MqttConnectOptions options) throws MqttException { IMqttClient mqttClient = new MqttClient(this.mqttProperties.getUrl(), this.mqttProperties.getClientId()); mqttClient.connect(options); for(int i = 0; i< this.mqttProperties.getTopic().length; ++i) { mqttClient.subscribe(this.mqttProperties.getTopic()[i], new MqttSubscribeListener()); } return mqttClient; }}创建 订阅事件类

  

import org.springframework.context.ApplicationEvent; public class UWBMqttSubscribeEvent extends ApplicationEvent { private String topic; public UWBMqttSubscribeEvent(String topic, Object source) { super(source); this.topic = topic; } public String getTopic() { return this.topic; }}

创建订阅事件监听器

 

  

import org.eclipse.paho.client.mqttv3.IMqttMessageListener;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.springblade.core.tool.utils.SpringUtil;import org.springblade.ubw.event.UWBMqttSubscribeEvent; public class MqttSubscribeListener implements IMqttMessageListener { @Override public void messageArrived(String s, MqttMessage mqttMessage) { String content = new String(mqttMessage.getPayload()); UWBMqttSubscribeEvent event = new UWBMqttSubscribeEvent(s, content); SpringUtil.publishEvent(event); }}

创建mqtt消息事件异步处理监听器

 

  

import com.baomidou.mybatisplus.core.toolkit.StringPool;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springblade.core.tool.utils.Func;import org.springblade.ubw.config.MqttProperties;import org.springblade.ubw.event.UWBMqttSubscribeEvent;import org.springblade.ubw.service.MqttService;import org.springframework.context.annotation.Configuration;import org.springframework.context.event.EventListener;import org.springframework.scheduling.annotation.Async; import javax.annotation.Resource;import java.util.Arrays;import java.util.List; @Configurationpublic class MqttEventListener { private static final Logger log = LoggerFactory.getLogger(MqttEventListener.class); @Resource private MqttProperties mqttProperties; @Resource private MqttService mqttService; private String processTopic (String topic) { List<String> topics = Arrays.asList(mqttProperties.getTopic()); for (String wild : topics) { wild = wild.replace(StringPool.HASH, StringPool.EMPTY); if (topic.startsWith(wild)) { return topic.replace(wild, StringPool.EMPTY); } } return StringPool.EMPTY; } @Async @EventListener(UWBMqttSubscribeEvent.class) public void listen (UWBMqttSubscribeEvent event) { String topic = processTopic(event.getTopic()); Object source = event.getSource(); if (Func.isEmpty(source)) { return; } mqttService.issue(topic,source);// log.info("mqtt接收到 通道 {} 的信息为:{}",topic,source); }}

创建MqttService 数据处理服务类

 

  

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springblade.core.tool.utils.Func;import org.springblade.ubw.area.entity.WorkArea;import org.springblade.ubw.area.entity.WorkSite;import org.springblade.ubw.area.entity.WorkSiteNeighbourInfo;import org.springblade.ubw.area.entity.WorkSitePassInfo;import org.springblade.ubw.area.service.WorkAreaService;import org.springblade.ubw.area.service.WorkSiteNeighbourInfoService;import org.springblade.ubw.area.service.WorkSitePassInfoService;import org.springblade.ubw.area.service.WorkSiteService;import org.springblade.ubw.constant.UbwConstant;import org.springblade.ubw.history.entity.HistoryLocusInfo;import org.springblade.ubw.history.entity.HistoryOverTimeSosAlarmInfo;import org.springblade.ubw.history.service.HistoryLocusInfoService;import org.springblade.ubw.history.service.HistoryOverTimeSosAlarmInfoService;import org.springblade.ubw.loc.entity.LocStatusInfo;import org.springblade.ubw.loc.entity.LocStatusInfoHistory;import org.springblade.ubw.loc.service.LocStatusInfoHistoryService;import org.springblade.ubw.loc.service.LocStatusInfoService;import org.springblade.ubw.msg.entity.*;import org.springblade.ubw.msg.service.*;import org.springblade.ubw.system.entity.*;import org.springblade.ubw.system.service.*;import org.springblade.ubw.system.wrapper.MqttWrapper;import org.springframework.stereotype.Service; import javax.annotation.Resource;import java.util.List;import java.util.stream.Collectors; @Servicepublic class MqttService { private static final Logger log = LoggerFactory.getLogger(MqttService.class); @Resource private EmployeeAndDepartmentService employeeAndDepartmentService; @Resource private VehicleInfoService vehicleInfoService; @Resource private WorkSiteService workSiteService; @Resource private LocStatusInfoService locStatusInfoService; @Resource private LocStatusInfoHistoryService locStatusInfoHistoryService; @Resource private LocOverTimeSosAlarminfoService locOverTimeSosAlarminfoService; @Resource private LocAreaOverSosAlarminfoService locAreaOverSosAlarmInfoService; @Resource private LocSosAlarminfoService locSosAlarminfoService; @Resource private AttendanceInfoService attendanceInfoService; @Resource private HistoryLocusInfoService historyLocusInfoService; @Resource private WorkSitePassInfoService workSitePassInfoService; @Resource private EnvironmentalMonitorInfoService environmentalMonitorInfoService; @Resource private TrAlertService trAlertService; @Resource private AddEvacuateInfoService addEvacuateInfoService; @Resource private CancelEvacuateInfoService cancelEvacuateInfoService; @Resource private WorkSiteNeighbourInfoService workSiteNeighbourInfoService; @Resource private LinkMsgAlarmInfoService linkMsgAlarmInfoService; @Resource private LeaderEmployeeInfoService leaderEmployeeInfoService; @Resource private ElectricMsgInfoService electricMsgInfoService; @Resource private WorkAreaService workAreaService; @Resource private HistoryOverTimeSosAlarmInfoService historyOverTimeSosAlarmInfoService; @Resource private SpecialWorksService specialWorksService; @Resource private AttendanceLocusInfoService attendanceLocusInfoService; @Resource private WorkTypeService workTypeService; @Resource private OfficePositionService officePositionService; @Resource private ClassTeamService classTeamService; /** * 方法描述: 消息分发 * * @param topic * @param source * @author liwenbin * @date 2021年12月14日 14:14:09 */ public void issue(String topic,Object source){ switch(topic){ case UbwConstant.TOPIC_EMP : //人员和部门信息 employeeAndDepartmentService.saveBatch(source); break; case UbwConstant.TOPIC_VEHICLE : //车辆信息 List<VehicleInfo> vehicleInfos = MqttWrapper.build().toEntityList(source,new VehicleInfo()); vehicleInfoService.deleteAll(); vehicleInfoService.saveBatch(vehicleInfos); break; case UbwConstant.TOPIC_WORK_SITE : //基站信息 List<WorkSite> workSites = MqttWrapper.build().toEntityList(source,new WorkSite()); workSiteService.deleteAll(); workSiteService.saveBatch(workSites); break; case UbwConstant.TOPIC_LOC_STATUS: //井下车辆人员实时 List<LocStatusInfo> locStatusInfos = MqttWrapper.build().toEntityList(source,new LocStatusInfo()); if (Func.isEmpty(locStatusInfos)){ break; } locStatusInfoService.deleteAll(); //筛选入井人员列表 List<LocStatusInfo> inWellList = locStatusInfos.stream().filter(s -> s.getIsInWell() == 1).collect(Collectors.toList()); locStatusInfoService.saveBatch(inWellList); //人员历史数据入库 List<LocStatusInfoHistory> locStatusInfoHistorys = MqttWrapper.build().toEntityList(source,new LocStatusInfoHistory()); locStatusInfoHistoryService.saveBatch(locStatusInfoHistorys); break; case UbwConstant.TOPIC_LOC_OVER_TIME: //超时报警信息 List<LocOverTimeSosAlarminfo> locOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocOverTimeSosAlarminfo()); locOverTimeSosAlarminfoService.saveBatch(locOverTimeSosAlarmInfos); break; case UbwConstant.TOPIC_LOC_OVER_AREA: //超员报警信息 List<LocAreaOverSosAlarminfo> locAreaOverSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocAreaOverSosAlarminfo()); locAreaOverSosAlarmInfoService.saveBatch(locAreaOverSosAlarmInfos); break; case UbwConstant.TOPIC_LOC_SOS: //求救报警信息 List<LocSosAlarminfo> locSosAlarmInfos = MqttWrapper.build().toEntityList(source,new LocSosAlarminfo()); locSosAlarminfoService.saveBatch(locSosAlarmInfos); break; case UbwConstant.TOPIC_ATTEND: //考勤信息 List<AttendanceInfo> attendanceInfos = MqttWrapper.build().toEntityList(source,new AttendanceInfo()); attendanceInfoService.saveBatch(attendanceInfos); break; case UbwConstant.TOPIC_HISTORY_LOCUS: //精确轨迹信息 List<HistoryLocusInfo> historyLocusInfos = MqttWrapper.build().toEntityList(source,new HistoryLocusInfo()); historyLocusInfoService.saveBatch(historyLocusInfos); break; case UbwConstant.TOPIC_WORK_SITE_PASS: //基站经过信息 List<WorkSitePassInfo> workSitePassInfos = MqttWrapper.build().toEntityList(source,new WorkSitePassInfo()); workSitePassInfoService.saveBatch(workSitePassInfos); break; case UbwConstant.TOPIC_ENV_MON: //环境监测信息 List<EnvironmentalMonitorInfo> environmentalMonitorInfos = MqttWrapper.build().toEntityList(source,new EnvironmentalMonitorInfo()); environmentalMonitorInfoService.saveBatch(environmentalMonitorInfos); break; case UbwConstant.TOPIC_TR_ALERT: //环境监测报警信息 List<TrAlert> trAlerts = MqttWrapper.build().toEntityList(source,new TrAlert()); trAlertService.saveBatch(trAlerts); break; case UbwConstant.TOPIC_ADD_EVA: //下发撤离信息 List<AddEvacuateInfo> addEvacuateInfos = MqttWrapper.build().toEntityList(source,new AddEvacuateInfo()); addEvacuateInfoService.saveBatch(addEvacuateInfos); break; case UbwConstant.TOPIC_CANCEL_EVA: //取消撤离信息 List<CancelEvacuateInfo> cancelEvacuateInfos = MqttWrapper.build().toEntityList(source,new CancelEvacuateInfo()); cancelEvacuateInfoService.saveBatch(cancelEvacuateInfos); break; case UbwConstant.TOPIC_WORK_SITE_NEI: //相邻基站关系信息 workSiteNeighbourInfoService.deleteAll(); List<WorkSiteNeighbourInfo> workSiteNeighbourInfos = MqttWrapper.build().toEntityList(source,new WorkSiteNeighbourInfo()); workSiteNeighbourInfoService.saveBatch(workSiteNeighbourInfos); break; case UbwConstant.TOPIC_LINK_MSG: //基站链路信息 linkMsgAlarmInfoService.deleteAll(); List<LinkMsgAlarmInfo> linkMsgAlarmInfos = MqttWrapper.build().toEntityList(source,new LinkMsgAlarmInfo()); linkMsgAlarmInfoService.saveBatch(linkMsgAlarmInfos); break; case UbwConstant.TOPIC_LEADER_EMP: //带班领导信息 leaderEmployeeInfoService.deleteAll(); List<LeaderEmployeeInfo> leaderEmployeeInfos = MqttWrapper.build().toEntityList(source,new LeaderEmployeeInfo()); leaderEmployeeInfoService.saveBatch(leaderEmployeeInfos); break; case UbwConstant.TOPIC_ELE_MSG: //低电报警信息 List<ElectricMsgInfo> electricMsgInfos = MqttWrapper.build().toEntityList(source,new ElectricMsgInfo()); electricMsgInfoService.saveBatch(electricMsgInfos); break; case UbwConstant.TOPIC_WORK_AREA: //区域信息 workAreaService.deleteAll(); List<WorkArea> workAreas = MqttWrapper.build().toEntityList(source,new WorkArea()); workAreaService.saveBatch(workAreas); break; case UbwConstant.TOPIC_HIS_OVER_TIME_SOS: //历史超时报警信息 List<HistoryOverTimeSosAlarmInfo> historyOverTimeSosAlarmInfos = MqttWrapper.build().toEntityList(source,new HistoryOverTimeSosAlarmInfo()); historyOverTimeSosAlarmInfoService.saveBatch(historyOverTimeSosAlarmInfos); break; case UbwConstant.TOPIC_SPECIAL_WORK: //特种人员预设线路信息 specialWorksService.deleteAll(); List<SpecialWorks> specialWorks = MqttWrapper.build().toEntityList(source,new SpecialWorks()); specialWorksService.saveBatch(specialWorks); break; case UbwConstant.TOPIC_ATTEND_LOC: //历史考勤轨迹信息 List<AttendanceLocusInfo> attendanceLocusInfos = MqttWrapper.build().toEntityList(source,new AttendanceLocusInfo()); attendanceLocusInfoService.saveBatch(attendanceLocusInfos); break; case UbwConstant.TOPIC_WORK_TYPE: //工种信息 workTypeService.deleteAll(); List<WorkType> workTypes = MqttWrapper.build().toEntityList(source,new WorkType()); workTypeService.saveBatch(workTypes); break; case UbwConstant.TOPIC_OFFICE_POS: //职务信息 officePositionService.deleteAll(); List<OfficePosition> officePositions = MqttWrapper.build().toEntityList(source,new OfficePosition()); officePositionService.saveBatch(officePositions); break; case UbwConstant.TOPIC_CLASS_TEAM: //班组信息 classTeamService.deleteAll(); List<ClassTeam> classTeams = MqttWrapper.build().toEntityList(source,new ClassTeam()); classTeamService.saveBatch(classTeams); break; default : //可选 break; } }}

完结,小伙伴们,可以根据这个demo 改造自己的mqtt服务处理!!!

 

  以上就是Springboot整合mqtt服务的示例代码的详细内容,更多关于Springboot整合mqtt的资料请关注盛行IT其它相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: