最近项目用到了mqtt,所以记录下SpringBoot集成MQTT的步骤和注意事项,整理一下知识,方便自己和他人。

一、pom文件里引入maven依赖jar包

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.3.2.RELEASE</version>
</dependency>

二、在application.yml配置文件里加入mqtt配置信息

snake:
  server:
    mqttAddr: 162.11.22.33:1883
    mqttUser: admin
    mqttPwd: 111111

三、新建配置映射类SnakeServerProperties

@Data
@Component
@ConfigurationProperties(prefix = "snake.server")
public class SnakeServerProperties {

    private String mqttPwd;
    private String mqttAddr;
    private String mqttUser;
}

四、新建mqtt连接的工厂类MqttFactory

里面会有一个getInstance方法,用来构造MqttClient。

这边需要注意的有两点:

1.这边要考虑mqtt断开自动重连的情况。
2.如果要分布式部署,clientId不能设置成一样的,不然会导致多个实例相同mqttClient抢占连接。这会导致明明获取到mqttClient,但发送消息却报异常。

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @Author: 夏威夷8080
 * @Date: 2022/8/12 15:38
 */
@Slf4j
@Component
public class MqttFactory {

    private ConcurrentHashMap<String, MqttClient> clientMap = new ConcurrentHashMap<>();

    @Autowired
    private SnakeServerProperties snakeServerProperties;


    /**
     * 初始化客户端
     */
    public MqttClient getInstance(String clientId) {
        MqttClient client = null;
        String key = clientId;
        if (clientMap.get(key) == null) {
            try {
                client = new MqttClient("tcp://" + snakeServerProperties.getMqttAddr(), clientId);
                // MQTT配置对象
                MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
                // 设置自动重连, 其它具体参数可以查看MqttConnectOptions
                mqttConnectOptions.setAutomaticReconnect(true);
                // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
                // mqttConnectOptions.setCleanSession(true);
                // 设置超时时间 单位为秒
                mqttConnectOptions.setConnectionTimeout(10);
                mqttConnectOptions.setUserName(snakeServerProperties.getMqttUser());
                mqttConnectOptions.setPassword(snakeServerProperties.getMqttPwd().toCharArray());
                // mqttConnectOptions.setServerURIs(new String[]{url});
                // 设置会话心跳时间 单位为秒
                mqttConnectOptions.setKeepAliveInterval(10);
                // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
                // mqttConnectOptions.setWill("willTopic", "offline".getBytes(), 2, false);

                if (!client.isConnected()) {
                    client.connect(mqttConnectOptions);
                }
                log.info("MQTT创建client成功={}", JSONObject.toJSONString(client));
                clientMap.put(key, client);
            } catch (MqttException e) {
                log.error("MQTT连接消息服务器[{}]失败", key + "-" + snakeServerProperties.getMqttAddr());
            }
        } else {
            client = clientMap.get(key);
            log.info("MQTT从map里获取到client={}", JSONObject.toJSONString(client));
            if (!client.isConnected()) {
                // 如果缓存里的client已经断开,则清除该缓存,再重新创建客户端连接
                clientMap.remove(key);
                this.getInstance(clientId);
            }
        }
        return client;
    }

}

五、新建MqttTemplate模板类

里面定义了发送消息和订阅消息两个方法。

import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

/**
 * @Author: 夏威夷8080
 * @Date: 2022/8/12 15:47
 */
@Slf4j
@Component
public class MqttTemplate {

    @Autowired
    private MqttFactory mqttFactory;

    /**
     * 发送消息
     *
     * @param topic 主题
     * @param data  消息内容
     */
    public void send(String clientId, String topic, Object data) {
        // 获取客户端实例
        MqttClient client = mqttFactory.getInstance(clientId);
        try {
            // 转换消息为json字符串
            String json = JSONObject.toJSONString(data);
            log.info("MQTT主题[{}]发送消息...\r\n{}", topic, json);
            client.publish(topic, new MqttMessage(json.getBytes(StandardCharsets.UTF_8)));
        } catch (MqttException e) {
            log.error("MQTT主题[{}]发送消息失败,{}", topic, Throwables.getStackTraceAsString(e));
        }
    }

    /**
     * 订阅主题
     *
     * @param topic    主题
     * @param listener 消息监听处理器
     */
    public void subscribe(String clientId, String topic, IMqttMessageListener listener) {
        MqttClient client = mqttFactory.getInstance(clientId);
        try {
            log.info("MQTT订阅主题[{}]...", topic);
            client.subscribe(topic, listener);
        } catch (MqttException e) {
            log.error("MQTT订阅主题[{}]失败,{}", topic, Throwables.getStackTraceAsString(e));
        }
    }

}

六、新建业务操作MQTT处理类MyHandler

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @Author: 夏威夷8080
 * @Date: 2022/8/12 15:59
 */
@Component
public class MyHandler {

    @Autowired
    private MqttTemplate mqttTemplate;

    public void listener(String clientId, String topic) {
        mqttTemplate.subscribe(clientId, topic, new SnakeMqttMessageListener());
    }

    public void send(String clientId, String topic, CmdRequest cmdRequest) {
        mqttTemplate.send(clientId, topic, cmdRequest);
    }

}

七、新建自己的mqtt消息监听处理类,需要实现IMqttMessageListener接口

这边无法直接注入bean,需要从applicationContext里拿

import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

/**
 * @Author: 夏威夷8080
 * @Date: 2022/8/12 15:50
 */
@Slf4j
public class SnakeMqttMessageListener implements IMqttMessageListener {

    /**
     * 处理消息
     *
     * @param topic       主题
     * @param mqttMessage 消息
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        log.info("MQTT实时订阅主题[{}]发来消息[{}]", topic, new String(mqttMessage.getPayload()));
        ApplicationContext applicationContext = SpringContextHolder.getApplicationContext();
        xxxxxxx xxx = applicationContext.getBean(xxxx.class);
    }

}

八、顺便把工具类也贴出来SpringContextHolder

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;

/**
 * @author 夏威夷8080
 * @date 2018/6/27 Spring 工具类
 */
@Slf4j
@Service
@Lazy(false)
public class SpringContextHolder implements ApplicationContextAware, DisposableBean {

    private static ApplicationContext applicationContext = null;

    /**
     * 取得存储在静态变量中的ApplicationContext.
     */
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    /**
     * 实现ApplicationContextAware接口, 注入Context到静态变量中.
     */
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) {
        SpringContextHolder.applicationContext = applicationContext;
    }

    /**
     * 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
     */
    @SuppressWarnings("unchecked")
    public static <T> T getBean(String name) {
        return (T) applicationContext.getBean(name);
    }

    /**
     * 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
     */
    public static <T> T getBean(Class<T> requiredType) {
        return applicationContext.getBean(requiredType);
    }

    /**
     * 清除SpringContextHolder中的ApplicationContext为Null.
     */
    public static void clearHolder() {
        if (log.isDebugEnabled()) {
            log.debug("清除SpringContextHolder中的ApplicationContext:" + applicationContext);
        }
        applicationContext = null;
    }

    /**
     * 发布事件
     * @param event
     */
    public static void publishEvent(ApplicationEvent event) {
        if (applicationContext == null) {
            return;
        }
        applicationContext.publishEvent(event);
    }

    /**
     * 实现DisposableBean接口, 在Context关闭时清理静态变量.
     */
    @Override
    public void destroy() {
        SpringContextHolder.clearHolder();
    }

}

OK,SpringBoot集成MQTT的步骤和注意事项到这边就介绍完了,希望对大家有帮助,有问题可以在下面留言。

原文地址:http://www.cnblogs.com/shamo89/p/16813785.html

1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长! 2. 分享目的仅供大家学习和交流,请务用于商业用途! 3. 如果你也有好源码或者教程,可以到用户中心发布,分享有积分奖励和额外收入! 4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解! 5. 如有链接无法下载、失效或广告,请联系管理员处理! 6. 本站资源售价只是赞助,收取费用仅维持本站的日常运营所需! 7. 如遇到加密压缩包,默认解压密码为"gltf",如遇到无法解压的请联系管理员! 8. 因为资源和程序源码均为可复制品,所以不支持任何理由的退款兑现,请斟酌后支付下载 声明:如果标题没有注明"已测试"或者"测试可用"等字样的资源源码均未经过站长测试.特别注意没有标注的源码不保证任何可用性