package com.newfiber.termite.service; import com.alibaba.fastjson2.JSONObject; import com.newfiber.termite.domain.TermiteData; import com.newfiber.termite.util.MqttConfig; import com.newfiber.termite.util.MqttUtils; import javax.annotation.PostConstruct; import javax.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.stereotype.Component; @Slf4j @Component public class DataReceiveService { // @Value("${mqtt.url}") // String url; // // @Value("${mqtt.port}") // Integer port; // // @Value("${mqtt.userName}") // String userName; // // @Value("${mqtt.userPassword}") // String userPassword; // // @Value("${mqtt.subscribeTopics}") // List<String> subscribeTopics; @Resource private MqttConfig mqttConfig; @Resource private TermiteDataService termiteDataService; @PostConstruct public void initMqttConnect(){ try{ // 创建Mqtt连接 MqttClient client = MqttUtils.getClient(parseHost(), mqttConfig.getUserName(), mqttConfig.getUserPassword()); // 订阅消息 client.subscribe(mqttConfig.getSubscribeTopics().toArray(new String[0])); // 设置回调函数 client.setCallback(new TermiteDataReceiveCallback()); log.info("【数据接收】MQTT订阅成功,订阅主题{},客户端编号{}", String.join(",", mqttConfig.getSubscribeTopics()), client.getClientId()); }catch (Exception e){ log.info("【数据接收】MQTT订阅失败,订阅主题{}", String.join(",", mqttConfig.getSubscribeTopics())); e.printStackTrace(); } } public String parseHost(){ return "tcp://".concat(mqttConfig.getUrl()).concat(":").concat(mqttConfig.getPort().toString()); } class TermiteDataReceiveCallback implements MqttCallback { @Override public void connectionLost(Throwable cause) { // ignore } @Override public void messageArrived(String topic, MqttMessage mqttMessage){ String message = new String(mqttMessage.getPayload()); if(StringUtils.isBlank(message)){ return; } log.info("【数据接收】主题[{}]接收到数据:{}", topic, message); TermiteData termiteData = JSONObject.parseObject(message, TermiteData.class); termiteDataService.save(termiteData); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { // ignore } } }