package com.newfiber.termite.service; import com.alibaba.fastjson2.JSONObject; import com.newfiber.termite.domain.TermiteMqttData; import com.newfiber.termite.util.MqttUtils; import javax.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.stereotype.Component; @Slf4j @Component public class DataReceiveService { String url = "139.155.49.237"; Integer port = 8383; String userName = "termite_huangpi"; String userPassword = "Huangpi@2024"; String subscribeTopic = "wulinshuiku/+/antResult"; @PostConstruct public void initMqttConnect(){ try{ // 创建Mqtt连接 MqttClient client = MqttUtils.getClient(parseHost(), userName, userPassword); // 订阅消息 client.subscribe(subscribeTopic, 1); // 设置回调函数 client.setCallback(new TermiteDataReceiveCallback()); log.info("【数据接收】MQTT订阅成功,订阅主题{},客户端编号{}", subscribeTopic, client.getClientId()); }catch (Exception e){ log.info("【数据接收】MQTT订阅失败,订阅主题{}", subscribeTopic); e.printStackTrace(); } } public String parseHost(){ return "tcp://".concat(url).concat(":").concat(port.toString()); } static class TermiteDataReceiveCallback implements MqttCallback { @Override public void connectionLost(Throwable cause) { // ignore } @Override public void messageArrived(String topic, MqttMessage mqttMessage){ String message = new String(mqttMessage.getPayload()); if(StringUtils.isBlank(message)){ return; } log.info("【数据接收】主题[{}]接收到数据:{}", topic, message); TermiteMqttData termiteMqttData = JSONObject.parseObject(message, TermiteMqttData.class); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { // ignore } } }