用户登录
用户注册

分享至

SpringBoot整合MQTT

  • 作者: 你宾哥
  • 来源: 51数据库
  • 2021-10-19

使用java代码通过MQTT完成发布订阅操作

pom 导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hzx</groupId>
    <artifactId>mqtt_test_02</artifactId>
    <version>1.0-SNAPSHOT</version>
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.9.RELEASE</version>
</parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
            <version>5.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>5.2.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.12</version>
        </dependency>
    </dependencies>

</project>

yml文件中配置

spring:
  mqtt:
    username: admin
    password: public
    host-url: tcp://127.0.0.1:1883
    client-id: test
    default-topic: test
    timeout: 100
    keepalive: 100

获取配置

package com.mqtt.common;

import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties("spring.mqtt")
@Setter
@Getter
public class MqttConfig {

    @Autowired
    private MqttPushClient mqttPushClient;

    private String username;
    private String password;
    private String hostUrl;
    private String clientId;
    private String defaultTopic;
    private int timeout;
    private int keepalive;

    @Bean
    public MqttPushClient getMqttPushClient(){
        mqttPushClient.connect(hostUrl,clientId,username,password,timeout,keepalive);
        mqttPushClient.subscribe("test/#",0);
        return mqttPushClient;
    }
}

MQTT推送客户端

package com.mqtt.common;


import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MqttPushClient {
    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

    @Autowired
    private PushCallback pushCallback;


    private static MqttClient client;

    private static MqttClient getClient() {
        return client;
    }

    private static void setClient(MqttClient client) {
        MqttPushClient.client = client;
    }

    /**
     * 客户端连接
     *
     * @param host      ip+端口
     * @param clientId
     * @param username
     * @param password
     * @param timeout
     * @param keepalive 保留数
     */
    public void connect(String host, String clientId, String username, String password, int timeout, int keepalive) {
        MqttClient client;
        try {
            client = new MqttClient(host, clientId, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepalive);
            MqttPushClient.setClient(client);
            try {
                client.setCallback(pushCallback);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 发布
     *
     * @param qos         连接方式
     * @param retained    是否保留
     * @param topic       主题
     * @param pushMessage 消息体
     */
    public void publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mqttTopic = MqttPushClient.getClient().getTopic(topic);
        if (mqttTopic == null) {
            logger.error("topic not exist");
        }
        MqttDeliveryToken token;
        try {
            token = mqttTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 订阅某个主题
     * @param topic
     * @param qos
     */
    public void subscribe(String topic,int qos){
        logger.info("开始订阅主题"+topic);
        try {
            MqttPushClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

监听类

package com.mqtt.common;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class PushCallback implements MqttCallback {

    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

    @Autowired
    private MqttConfig mqttConfig;

    private static MqttClient client;

    @Override
    public void connectionLost(Throwable throwable) {
        // 连接丢失后,在此处重连
        logger.info("连接断开,可以重连");
        if (client == null || !client.isConnected()) {
            mqttConfig.getMqttPushClient();
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        // subscribe后得到的消息会执行到这里面
        logger.info("接收消息主题:"+ topic);
        logger.info("接收消息QoS:" + mqttMessage.getQos());
        logger.info("接收消息内容:" + new String(mqttMessage.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        logger.info("deliveryCompplete-------------"+iMqttDeliveryToken.isComplete());
    }
}

Controller返回值格式

package com.mqtt.common;

import org.springframework.http.HttpStatus;

import java.util.HashMap;
import java.util.Map;

public class RUtils extends HashMap<String,Object> {
    private static final long serialVersionUID = 1L;

    public RUtils(){
        put("code",200);
        put("msg","success");
    }

    public static RUtils error(HttpStatus code, String msg){
        RUtils r = new RUtils();
        r.put("code",code);
        r.put("msg",msg);
        return r;
    }

    public static RUtils error(int code,String msg,String data){
        RUtils r = new RUtils();
        r.put("code",code);
        r.put("msg",msg);
        r.put("data",data);
        return r;
    }

    public static RUtils ok(String msg){
        RUtils r = new RUtils();
        r.put("msg",msg);
        return r;
    }

    public static RUtils ok(Map<String,Object> map){
        RUtils r = new RUtils();
        r.putAll(map);
        return r;
    }

    public static RUtils error(){
        return error(HttpStatus.INTERNAL_SERVER_ERROR,"未知异常");
    }

    public static RUtils error(String msg){
        return error(HttpStatus.INTERNAL_SERVER_ERROR,msg);
    }
}

controller测试

@RestController
@RequestMapping("/")
public class TestController {
    @Autowired
    private MqttPushClient mqttPushClient;

    @GetMapping(value = "/publishTopic")
    public RUtils publishTopic(){
        mqttPushClient.publish(0,false,"test/test","测试发布消息");
        return RUtils.ok("发布成功");
    }
}

测试结果

访问http://localhost:18083/#/clients
有新的客户端连接

在MQTT中,在主题test/test中发布消息

在java控制台接收到消息

在MQTTX中,在主题test/te中发布消息

在java控制台接收消息

软件
前端设计
程序设计
Java相关