编程语言
首页 > 编程语言> > Thingsboard MQTT权限校验源码解读

Thingsboard MQTT权限校验源码解读

作者:互联网

第一次读源码,理解不到位,请多批评

1、接收MQTT连接请求

首先找到MQTT的模块,./common/transport/mqtt,我们可以看到该模块是一个使用Netty封装的mqttServer,通过读取配置文件来初始化这个mqttServer

1. MqttTransportService

@Service("MqttTransportService")
@ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true' && '${transport.mqtt.enabled}'=='true')")
@Slf4j
public class MqttTransportService {

    @Value("${transport.mqtt.bind_address}")
    private String host;
    @Value("${transport.mqtt.bind_port}")
    private Integer port;

    @Value("${transport.mqtt.netty.leak_detector_level}")
    private String leakDetectorLevel;
    @Value("${transport.mqtt.netty.boss_group_thread_count}")
    private Integer bossGroupThreadCount;
    @Value("${transport.mqtt.netty.worker_group_thread_count}")
    private Integer workerGroupThreadCount;
    @Value("${transport.mqtt.netty.so_keep_alive}")
    private boolean keepAlive;

    @Autowired
    private MqttTransportContext context;

    private Channel serverChannel;
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    @PostConstruct
    public void init() throws Exception {
        log.info("Setting resource leak detector level to {}", leakDetectorLevel);
        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));

        log.info("Starting MQTT transport...");
        bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
        workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new MqttTransportServerInitializer(context))
                .childOption(ChannelOption.SO_KEEPALIVE, keepAlive);

        serverChannel = b.bind(host, port).sync().channel();
        log.info("Mqtt transport started!");
    }

    @PreDestroy
    public void shutdown() throws InterruptedException {
        log.info("Stopping MQTT transport!");
        try {
            serverChannel.close().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
        log.info("MQTT transport stopped!");
    }
}

代码中这里初始化了一个handler,来具体处理接收到的消息。(好像是netty的使用方式)
在这里插入图片描述

2. MqttTransportHandler

Degbug启动Thingsboard,并让MqttClinet发送连接请求。发现消息是从channelRead函数读入。
在这里插入图片描述
进入processMqttMsgh函数在这里插入图片描述
看到此时header中的消息类型是CONNECT
在这里插入图片描述
进入processConnect函数
我们这里使用的authToken的方式进行连接的,也就是在设备中定义的token。该token作为用户名传入。不需要密码。
在这里插入图片描述
进入processAuthTokenConnect函数
在本函数中,消息被封装成了ValidateBasicMqttCredRequestMsg类型的消息,封装完成之后,调用transportService的process函数,进行消息的传输,将消息传到core模块进行权限校验。
我们看到在process函数中传入了传输类型MQTT,封装好的登录信息,以及校验完成的回调函数。
在这里插入图片描述

3. 总结

至此,MQTT登录消息就处理完毕了,封装好的消息将被传输至core模块进行处理。

2、消息传输准备

1. DefaultTransportService

在process函数中,消息被进一步封装起来,封装成了一个TbProtoQueueMsg<TransportApiRequestMsg>类型的信息,并增加了一个UUID。在这里插入图片描述
消息被传输到了doProcess函数
我们先将细节屏蔽看看结构。
在这里插入图片描述
第一步是下面的代码
在这里插入图片描述
第一个参数是函数的原返回值,第二个参数是转换函数(也就是如何将原返回值进行转化的函数),第三个参数是Executor(详见Guava)
也就是第一步,将send函数的返回结果从ListenableFuture<Response>转换成了ListenableFuture<ValidateDeviceCredentialsResponse>类型的结果
第二步是
在这里插入图片描述
点进去发现实际上是为上一步转换来的response注册listener,也就是注册回调函数。
在这里插入图片描述
而这回调函数,就是之前process函数中传入的回调。
在这里插入图片描述
综上,doProcess函数将返回值转换成合适的格式,并设置回调函数。下面我们进入transportApiRequestTemplate.send(protoMsg)函数

2. DefaultTbQueueRequestTemplate的send函数

(TbQueueRequestTemplate实现类)
在这里插入图片描述

  1. 判断pending状态的请求数量是否超出限制
    在这里插入图片描述
  2. 生成requestId并组好header信息
    可以看到requestId被存放在了请求头
    在这里插入图片描述
  3. pendingRequests存储
    在这里插入图片描述
    pendingRequests以requestId为key,以一个包含了一个可手动添加future值的future句柄和一个过期时间。这个future句柄目前是没有任何设置的。这个函数被返回出去,在DefaultTransportService的doProcess函数中转换了格式并设置了回调函数。
  4. 真正的发送消息
    在这里插入图片描述

3. 总结

消息传输准备阶段,我们从里向外看,DefaultTbQueueRequestTemplate的send函数生成了一个可手动设置值的future操作句柄,并这次请求的requestId为key,包含操作句柄future和本次请求的过期时间的对象为value存储在pendingRequests中(pendingRequests可以保证requestId唯一)。而后send函数将这个future句柄返回,在上层的调用函数中被转换格式并设置了对应的回调函数。

3、消息传输

1. responseTemplate.send()

从DefaultTbQueueRequestTemplate的send函数中的responseTemplate.send()看起。
在这里插入图片描述
可以看到有三个入参

  1. 应该是topic等相关信息
    在这里插入图片描述
    可以看到消息将会被发到一个tb_transport.api.requests的topic中。

  2. reuqest
    就是前面封装后的登录信息

  3. 队列回调
    在这里插入图片描述

应该是看是否成功发送消息到指定topic的,成功做什么事,失败做什么事。可以看到失败的时候,pendingRequests删掉了对应requestId的信息,并直接给future设置了失败信息。

2. InMemoryTbQueueProducer.send

在这里插入图片描述
可以看到信息传输可以通过很多种方式,我们这里先选择内存队列来看。
在这里插入图片描述
可以看到send函数将前面传入的request放入了topicName对应的队列中。topicName和其对应的队列存在一个单例的ConcurrentHashMap中。
在这里插入图片描述
这样就相当于将消息放到了topicName对应的队列中了。

3. 总结

responseTemplate.send()有很多中实现,其结果就是将封装好的登录信息送到对应topic的队列中(可能是内存的也可能是消息中间件或其他形式)等待校验逻辑的消费。当传送成功时,会返回成功信息。失败时将会直接给future句柄设置为失败。

4、消息的消费

前文说明了使用内存队列消息的生产过程,接下来说明一下消息的消费过程。
消息消费的入口在DefaultTbQueueResponseTemplate的init函数中。

1. DefaultTbQueueResponseTemplate的init函数

init函数中有一个while循环,当stopped不为true的时候,while循环将一直运行
在这里插入图片描述

2. 获取所有的登录request

在这里插入图片描述
requestTemplate是一个下面类型的接口。
在这里插入图片描述
调用其poll(轮询)函数。对于我们的内存队列,其有单独的实现类。
在这里插入图片描述
其中关键一步就是
在这里插入图片描述
根据topic的名称,从单例的storage中取出该topic下的所有之前消息生产中存放的请求。

3. request的处理

  1. 解析header信息,获取超时时间requestId和responseTopic
    在这里插入图片描述
  2. 对于不超时的请求进入下面的处理步骤
    在这里插入图片描述
    AsyncCallbackTemplate的withCallbackAndTimeout函数定义如下
    在这里插入图片描述
    使用Futures.withTimeout生成了一个会超时的future句柄,如果超时将自被中断或者取消。在这里插入图片描述
    之后调用了AsyncCallbackTemplate.withCallback前面用过。withCallback里给这个会超时的future添加了withTimeout传入的回调函数。
    下面我们详细看看这三个参数的来龙去脉。

4. 登录信息校验

在这里插入图片描述
这里的handler是transportApiService,在核心服务初始化的时候被设置的。
在这里插入图片描述

在DefaultTransportApiService是其实现类。其handle函数如下。
在这里插入图片描述
我们找到我们对应的类型
在这里插入图片描述
进入validateCredentials函数
在这里插入图片描述
终于我们看到了校验的逻辑,在这里使用userName也就是我们传入的auth_token,经过业务校验获取到相关设备信息。
校验成功之后,将返回一个包含设备信息的future句柄。

5. 成功回调

在这里插入图片描述
这一块是成功后的回调。回调主要是要将设备信息发送回去。
我们可以看到,response的header里设置了REQUEST_ID_HEADER,这个request_id是之前消息发送过来的request中设置的。
在这里插入图片描述
responseTopic也是requestHeader中带的,是mqtt模块接收core信息的topic。
response就是handle里面返回的信息。

6. 总结

DefaultTbQueueResponseTemplate的init函数将不停地获取对应topic的队列中的request信息,并将所有的request进行处理校验,之后,将带有requestId和设备信息的response重新发送到内存队列中responseTopic对应的队列里。等待进一步处理。

5、校验成功后设备信息的返回

responseTemplate.send()函数,同样我们使用内存队列。
在这里插入图片描述
与消息发送过来的方式相同,将对应的responseTopic的队列中,放置刚获得的设备信息。

6、获取设备信息

TbQueueRequestTemplat接口中同样有一个init方法

其中有一个while循环,不停地获取responseTopic对应队列的信息,这些response信息就是设备相关信息

在这里插入图片描述
对于每个response信息,我们获取其header中的requestId,并通过requestId获取pendingRequests中的对应的ResponseMetaData<Response> 类型的expectedResponse信息,并将其中的future句柄的内容设置为response中的设备信息。
再看看最开始的回调信息
在这里插入图片描述
当成功获取到response之后,调用onValidateDeviceResponse进行后续的工作

7、总结

总的来说大致的流程是这样的,mqtt连接发送到mqtt-server,mqtt-server通过消息队列,将连接请求相关信息发送至core进行权限校验和认证,core进行校验之后,如果成功将设备信息发送到消息队列,消息消费后被预先注册的成功回调函数处理。否则被失败回调函数处理。

参考资料

https://www.baeldung.com/guava-futures-listenablefuture
https://www.jianshu.com/p/33ac5d394f68

标签:函数,队列,mqtt,信息,MQTT,源码,send,Thingsboard,transport
来源: https://blog.csdn.net/qq_43583902/article/details/122122868