JetLinks 学习笔记 – mqtt网络组件模块

模块架构概览

目录结构

jetlinks-components/network-component/mqtt-component/
├── src/main/java/org/jetlinks/community/network/mqtt/
│   ├── server/           # MQTT服务器实现
│   │   ├── vertx/       # 基于Vertx的服务器实现
│   │   └── ...
│   ├── client/          # MQTT客户端实现
│   │   └── ...
│   └── gateway/         # 设备网关实现
│       └── device/      # 设备网关相关

核心组件

  • MQTT服务端 (server/) - 提供MQTT服务器功能,支持设备直连
  • MQTT客户端 (client/) - 提供MQTT客户端功能,支持连接外部Broker
  • 设备网关 (gateway/device/) - 将MQTT网络转换为设备网关,实现设备管理

架构设计

设备 → MQTT网络层 → 设备网关 → 设备会话 → 设备管理

自动配置启动机制

Spring Boot自动配置

MQTT组件通过Spring Boot的自动配置机制启动,无需手动配置:

// MQTT服务器提供者
@Component
@ConfigurationProperties(prefix = "jetlinks.network.mqtt-server")
public class DefaultVertxMqttServerProvider implements NetworkProvider<VertxMqttServerProperties> {
    // 自动配置实现
}

// MQTT客户端提供者
@Component
@Slf4j
@ConfigurationProperties(prefix = "jetlinks.network.mqtt-client")
public class MqttClientProvider implements NetworkProvider<MqttClientProperties> {
    // 自动配置实现
}

网络管理器注册

NetworkConfiguration 中,所有网络提供者会被自动注册:

@Bean
public CommandLineRunner networkProviderRegister(ObjectProvider<NetworkProvider<?>> providers) {
    for (NetworkProvider<?> provider : providers) {
        NetworkProvider.supports.register(provider.getType().getId(), provider);
    }
    return ignore -> {
    };
}

启动流程

  1. Spring Boot启动 → 扫描组件
  2. 自动配置加载 → 注册MQTT提供者
  3. 网络管理器初始化 → 注册网络提供者
  4. 配置加载 → 从配置文件读取MQTT配置
  5. MQTT服务器启动 → 创建并启动MQTT服务器实例
  6. 设备网关注册 → 注册MQTT设备网关

配置系统详解

配置设计模式

双层配置绑定

// 第一层:Provider级别的模板配置
@Component
@ConfigurationProperties(prefix = "jetlinks.network.mqtt-server")
public class DefaultVertxMqttServerProvider {
    @Getter
    @Setter
    private MqttServerOptions template = new MqttServerOptions();  // 模板配置
}

// 第二层:Properties级别的实例配置
public class VertxMqttServerProperties extends AbstractServerNetworkConfig {
    private int instance = Runtime.getRuntime().availableProcessors();
    private int maxMessageSize = 8096;
}

配置合并机制

private Mono<MqttServerOptions> convert(VertxMqttServerProperties properties) {
    MqttServerOptions options = new MqttServerOptions(template);  // 基础模板
    options.setPort(properties.getPort());                        // 具体配置覆盖
    options.setHost(properties.getHost());
    options.setMaxMessageSize(properties.getMaxMessageSize());
    return Mono.just(options);
}

MQTT服务器配置

基础网络配置

// 继承自AbstractServerNetworkConfig
protected String id;           // 网络ID
protected String publicHost;   // 公网地址
protected int publicPort;      // 公网端口
protected String host;         // 本地监听地址 (默认: 0.0.0.0)
protected int port;            // 本地监听端口
protected boolean secure;      // 是否启用TLS
protected String certId;       // 证书ID

MQTT特有配置

// MQTT服务器特有配置项
private int instance = Runtime.getRuntime().availableProcessors();  // 服务实例数量
private int maxMessageSize = 8096;                                  // 最大消息长度

配置元数据

// 用于前端配置界面的元数据
.add("id", "id", "", new StringType())
.add("host", "本地地址", "", new StringType())
.add("port", "本地端口", "", new IntType())
.add("publicHost", "公网地址", "", new StringType())
.add("publicPort", "公网端口", "", new IntType())
.add("certId", "证书id", "", new StringType())
.add("secure", "开启TSL", "", new BooleanType())
.add("maxMessageSize", "最大消息长度", "", new StringType());

MQTT客户端配置

基础网络配置

// 继承自AbstractClientNetworkConfig
protected String id;           // 网络ID
protected String remoteHost;   // 远程服务器地址
protected int remotePort;      // 远程服务器端口
protected boolean secure;      // 是否启用TLS
protected String certId;       // 证书ID

MQTT特有配置

// MQTT客户端特有配置项
private String clientId;       // 客户端ID
private String username;       // 用户名
private String password;       // 密码
private int maxMessageSize = 0XFFFFFF;  // 最大消息长度 (默认16MB)
private String topicPrefix;    // 共享订阅前缀

配置文件示例

jetlinks:
  network:
    mqtt-server:
      # 基础配置
      id: mqtt-server-1
      host: 0.0.0.0
      port: 1883
      publicHost: your-public-ip
      publicPort: 1883

      # MQTT服务器特有配置
      instance: 4                    # 服务实例数量
      maxMessageSize: 8192           # 最大消息长度

      # 安全配置
      secure: false                  # 是否启用TLS
      certId: mqtt-server-cert       # 证书ID

    mqtt-client:
      # 基础配置
      id: mqtt-client-1
      remoteHost: broker.example.com
      remotePort: 1883

      # MQTT客户端特有配置
      clientId: jetlinks-client-001
      username: your-username
      password: your-password
      maxMessageSize: 16777215       # 16MB
      topicPrefix: $share/group1/    # 共享订阅前缀

      # 安全配置
      secure: false                  # 是否启用TLS
      certId: mqtt-client-cert       # 证书ID

MQTT服务器实现

服务器启动流程

服务器创建

public Mono<Network> createNetwork(@Nonnull VertxMqttServerProperties properties) {
    return initServer(new VertxMqttServer(properties.getId()), properties);
}

服务器初始化

private Mono<Network> initServer(VertxMqttServer server, VertxMqttServerProperties properties) {
    int numberOfInstance = Math.max(1, properties.getInstance());
    return convert(properties)
        .map(options -> {
            List<MqttServer> instances = new ArrayList<>(numberOfInstance);
            for (int i = 0; i < numberOfInstance; i++) {
                MqttServer mqttServer = MqttServer.create(vertx, options);
                instances.add(mqttServer);
            }
            server.setBind(new InetSocketAddress(options.getHost(), options.getPort()));
            server.setMqttServer(instances);
            for (MqttServer instance : instances) {
               vertx.nettyEventLoopGroup()
                   .execute(()->{
                       instance.listen(result -> {
                           if (result.succeeded()) {
                               log.debug("startup mqtt server [{}] on port :{} ", 
                                   properties.getId(), result.result().actualPort());
                           }
                       });
                   });
            }
            return server;
        });
}

连接处理机制

连接监听

public Flux<MqttConnection> handleConnection(String holder) {
    return Flux.fromIterable(mqttServers)
        .flatMap(server -> server.connectHandler(connection -> {
            connection.holder(holder);
            sink.next(connection);
        }));
}

连接认证

private Mono<Tuple3<DeviceOperator, AuthenticationResponse, MqttConnection>> handleConnection(MqttConnection connection) {
    return Mono
        .justOrEmpty(connection.getAuth())
        .flatMap(auth -> {
            // 创建认证请求
            MqttAuthenticationRequest request = new MqttAuthenticationRequest(
                connection.getClientId(),
                auth.getUsername(),
                auth.getPassword(),
                getTransport());

            return supportMono
                .map(support -> support.authenticate(request, registry))  // 自定义协议认证
                .defaultIfEmpty(Mono.defer(() -> registry
                    .getDevice(connection.getClientId())                 // 使用clientId作为设备ID
                    .flatMap(device -> device.authenticate(request))))   // 设备认证
                .flatMap(Function.identity())
                .switchIfEmpty(Mono.fromRunnable(() -> 
                    connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)));
        })
        .flatMap(resp -> {
            String deviceId = StringUtils.hasText(resp.getDeviceId()) ? 
                resp.getDeviceId() : connection.getClientId();
            return registry
                .getDevice(deviceId)
                .map(operator -> Tuples.of(operator, resp, connection))
                .switchIfEmpty(Mono.fromRunnable(() -> 
                    connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED)));
        });
}

MQTT客户端实现

客户端创建流程

客户端提供者

@Component
@Slf4j
@ConfigurationProperties(prefix = "jetlinks.network.mqtt-client")
public class MqttClientProvider implements NetworkProvider<MqttClientProperties> {

    @Override
    public Mono<Network> createNetwork(@Nonnull MqttClientProperties properties) {
        return initClient(new MqttClient(properties.getId()), properties);
    }
}

客户端初始化

private Mono<Network> initClient(MqttClient client, MqttClientProperties properties) {
    return convert(properties)
        .map(options -> {
            MqttClient mqttClient = MqttClient.create(vertx, options);
            client.setBind(new InetSocketAddress(options.getHost(), options.getPort()));
            client.setMqttClient(mqttClient);

            mqttClient.connect(result -> {
                if (result.succeeded()) {
                    log.debug("connected to mqtt broker [{}] at {}:{}", 
                        properties.getId(), options.getHost(), options.getPort());
                } else {
                    log.error("failed to connect to mqtt broker [{}]", properties.getId(), result.cause());
                }
            });

            return client;
        });
}

消息订阅与发布

消息订阅

public Mono<Void> subscribe(String topic, int qos) {
    return Mono.fromRunnable(() -> {
        mqttClient.subscribe(topic, qos, result -> {
            if (result.succeeded()) {
                log.debug("subscribed to topic: {} with qos: {}", topic, qos);
            } else {
                log.error("failed to subscribe to topic: {}", topic, result.cause());
            }
        });
    });
}

消息发布

public Mono<Void> publish(String topic, Buffer payload, int qos, boolean retain) {
    return Mono.fromRunnable(() -> {
        mqttClient.publish(topic, payload, MqttQoS.valueOf(qos), false, retain, result -> {
            if (result.succeeded()) {
                log.debug("published message to topic: {}", topic);
            } else {
                log.error("failed to publish message to topic: {}", topic, result.cause());
            }
        });
    });
}

设备网关集成

设备网关架构

两种网关模式

  • MQTT服务器设备网关 - 设备直接连接到JetLinks内置MQTT服务器
  • MQTT客户端设备网关 - JetLinks作为客户端连接到外部MQTT Broker

网关提供者

// MQTT服务器设备网关提供者
@Component
public class MqttServerDeviceGatewayProvider implements DeviceGatewayProvider {

    @Override
    public String getId() {
        return "mqtt-server-gateway";
    }

    @Override
    public String getName() {
        return "MQTT直连接入";
    }

    @Override
    public Mono<DeviceGateway> createDeviceGateway(DeviceGatewayProperties properties) {
        return networkManager
            .<MqttServer>getNetwork(getNetworkType(), properties.getChannelId())
            .map(mqttServer -> new MqttServerDeviceGateway(
                properties.getId(),
                registry,
                sessionManager,
                mqttServer,
                messageHandler,
                Mono.empty()
            ));
    }
}

// MQTT客户端设备网关提供者
@Component
public class MqttClientDeviceGatewayProvider implements DeviceGatewayProvider {

    @Override
    public String getId() {
        return "mqtt-client-gateway";
    }

    @Override
    public String getName() {
        return "MQTT Broker接入";
    }
}

网关启动流程

连接监听

private void doStart() {
    disposable = mqttServer
        .handleConnection("device-gateway")  // 监听连接,指定holder避免重复接收
        .filter(conn -> {
            if (!isStarted()) {
                conn.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                return false;
            }
            return true;
        })
        .flatMap(connection -> this
            .handleConnection(connection)           // 处理连接认证
            .flatMap(tuple3 -> handleAuthResponse(tuple3.getT1(), tuple3.getT2(), tuple3.getT3()))
            .flatMap(tp -> handleAcceptedMqttConnection(tp.getT1(), tp.getT2(), tp.getT3()))
        )
        .subscribe();
}

会话创建

private Mono<Tuple3<MqttConnection, DeviceOperator, DeviceSession>> handleAuthResponse(DeviceOperator device,
                                                                                       AuthenticationResponse resp,
                                                                                       MqttConnection connection) {
    return Mono.defer(() -> {
        String deviceId = device.getDeviceId();

        // 认证通过,创建会话
        return sessionManager
            .compute(deviceId, id -> new MqttConnectionSession(
                deviceId,
                device,
                getTransport(),
                connection,
                monitor
            ))
            .map(session -> Tuples.of(connection, device, session));
    });
}

消息处理流程

消息接收与解码

private Mono<Void> decodeAndHandleMessage(DeviceOperator operator,
                                          DeviceSession session,
                                          MqttMessage message,
                                          MqttConnection connection) {
    monitor.receivedMessage();

    return operator
        .getProtocol()
        .flatMap(protocol -> protocol.getMessageCodec(getTransport()))
        // 使用协议解码器解码消息
        .flatMapMany(codec -> codec.decode(FromDeviceMessageContext.of(
            session, message, registry, msg -> handleMessage(operator, msg, connection).then())))
        .cast(DeviceMessage.class)
        .concatMap(msg -> {
            // 回填deviceId
            if (!StringUtils.hasText(msg.getDeviceId())) {
                msg.thingId(DeviceThingType.device, operator.getDeviceId());
            }
            return this.handleMessage(operator, msg, connection);
        })
        .doOnComplete(() -> {
            if (message instanceof MqttPublishing) {
                ((MqttPublishing) message).acknowledge();  // 确认消息
            }
        })
        .then();
}

设备会话管理

会话接口设计

设备会话接口

public interface DeviceSession {
    String getId();                    // 会话ID
    DeviceOperator getOperator();      // 设备操作器
    Transport getTransport();          // 传输协议
    Mono<Boolean> send(EncodedMessage encodedMessage);  // 发送消息
    boolean isAlive();                 // 会话是否活跃
    void close();                      // 关闭会话
}

MQTT连接会话实现

public class MqttConnectionSession implements DeviceSession, ReplaceableDeviceSession {

    @Getter
    private final String id;

    @Getter
    private final DeviceOperator operator;

    @Getter
    private final Transport transport;

    @Getter
    private MqttConnection connection;

    @Override
    public Mono<Boolean> send(EncodedMessage encodedMessage) {
        return Mono.defer(() -> connection.publish(((MqttMessage) encodedMessage)))
                   .doOnSuccess(nil -> monitor.sentMessage())
                   .thenReturn(true);
    }

    @Override
    public boolean isAlive() {
        return connection.isAlive();
    }

    @Override
    public void close() {
        connection.close();
    }

    // 支持会话替换
    @Override
    public void replaceWith(DeviceSession session) {
        if (session instanceof MqttConnectionSession) {
            this.connection = ((MqttConnectionSession) session).getConnection();
        }
    }
}

会话管理器

会话存储

public interface DeviceSessionManager {
    Mono<DeviceSession> getSession(String deviceId);           // 获取会话
    Mono<DeviceSession> compute(String deviceId, Function<String, DeviceSession> factory);  // 计算会话
    Mono<Void> removeSession(String deviceId);                 // 移除会话
    Flux<DeviceSession> getAllSessions();                      // 获取所有会话
}

会话生命周期管理

// 会话创建
sessionManager.compute(deviceId, id -> new MqttConnectionSession(...))

// 会话获取
sessionManager.getSession(deviceId)

// 会话移除
sessionManager.removeSession(deviceId)

其他问题

关于mqtt客户端连接ssl-mqtt

证书获取

连接ssl需要提供证书,我也是郁闷,客户端去哪搞证书。

后来查了下,通过以下方式获得了。

# 命令行用 openssl 工具执行
openssl s_client -connect mqt.bsgoal.net.cn:3883 -showcerts

上面命令会输出证书相关信息,其中这部分是证书,如果有多组 CERTIFICATE , 都复制出来。

-----BEGIN CERTIFICATE-----
...(中间省略)
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
...(中间省略)
-----END CERTIFICATE-----

证书加载异常

然后就报错了,报错信息是
io.vertx.core.impl.NoStackTraceThrowable: Missing hostname verification algorithm: you must set TCP client options host name verification algorithm

意思是要指定算法,解决方法就是指定算法。

// 在 MqttClientProvider 的 convert 方法里,做配置转换时指定算法
if (config.isSecure()) {
    options.setSsl(true);
    options.setHostnameVerificationAlgorithm("HTTPS"); // 就是加了这句话
    return certificateManager
        .getCertificate(config.getCertId())
        .map(VertxKeyCertTrustOptions::new)
        .doOnNext(options::setKeyCertOptions)
        .doOnNext(options::setTrustOptions)
        .thenReturn(options);
}
文末附加内容
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇