魔镜的环境数据存储使用HBase进行存储,使用传感器获取环境数据,然后Python使用MQTT协议发送到后端进行接收存储
【“魔镜”系列】MQTT发送与接收
发送
这里用的是python进行发送,代码很简单,如下:
1 2 3 4 5 6 7
| import paho.mqtt.publish as publish
def send_mqtt(message): publish.single(settings.TOPIC, message, hostname=settings.HOSTNAME, auth={'username': settings.USERNAME, 'password': settings.PASSWORD}) print("send message success" + message)
|
接收
接收使用Java进行接收(暂时先这样写,后面有时间再完善)
InitMQTT:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import com.pig4cloud.pigx.statis.config.MqttConfig; import com.pig4cloud.pigx.statis.task.MqttTask; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component;
@Component @Slf4j @Order(2) public class InitMQTT implements ApplicationRunner {
@Autowired private MqttTask mqttTask;
@Override public void run(ApplicationArguments var1) { log.info("初始化MQTT监听"); log.info("初始化环境MQTT监听"); mqttTask.runMQTT(MqttConfig.MQTT_TOPIC, MqttConfig.MQTT_USER_NAME, MqttConfig.MQTT_PASSWORD, MqttConfig.MQTT_CLIENT_ID); } }
|
MqttTask:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import com.pig4cloud.pigx.statis.api.entity.Sensor; import com.pig4cloud.pigx.statis.mqtt.ReceiveMessage; import com.pig4cloud.pigx.statis.service.SensorService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component;
@Component @Slf4j public class MqttTask { @Autowired private ReceiveMessage receiveMessage;
@Autowired private SensorService sensorService;
@Async public void runMQTT(String topic,String userName,String password, String clientId) { try { log.info("开始连接EMQ:{}",topic); receiveMessage.start(topic, userName,password,clientId); }catch (Exception e){ log.error("连接MQTT异常:{}",e.getMessage()); } }
@Async public void save(Sensor sensor){ sensorService.save(sensor); } }
|
ReceiveMessage:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| import com.pig4cloud.pigx.statis.config.MqttConfig; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.stereotype.Component;
@Component @Slf4j public class ReceiveMessage {
private static MqttClient sensorClient;
public MqttClient getSensorClient(){return sensorClient;}
private MqttClient connect(String host, String userName, String password, String clientId) throws MqttException { MemoryPersistence persistence = new MemoryPersistence(); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(false); options.setUserName(userName); options.setPassword(password.toCharArray()); options.setKeepAliveInterval(20); options.setConnectionTimeout(10); sensorClient = new MqttClient(host, clientId, persistence); sensorClient.setCallback(new PushSensorCallback()); sensorClient.connect(options); return sensorClient; }
public void receive(MqttClient client, String topic) throws MqttException { int[] Qos = {MqttConfig.QOS}; String[] topics = {topic}; client.subscribe(topics, Qos); }
public void start(String topic, String userName, String password, String clientId) throws MqttException { MqttClient client = connect(MqttConfig.MQTT_HOST, userName, password, clientId); if (client != null) { receive(client, topic); } log.info("连接EMQ:{}成功", MqttConfig.MQTT_HOST); } }
|
PushSensorCallback:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| import com.alibaba.fastjson.JSON; import com.pig4cloud.pigx.statis.api.entity.Sensor; import com.pig4cloud.pigx.statis.config.MqttConfig; import com.pig4cloud.pigx.statis.service.SensorService; import com.pig4cloud.pigx.statis.task.MqttTask; import com.pig4cloud.pigx.statis.util.SpringUtils; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
import java.util.Date; import java.util.UUID;
@Component @Slf4j public class PushSensorCallback implements MqttCallback {
@Override public void connectionLost(Throwable throwable) {
System.out.println("环境MQTT连接断开,开始重连"); try{ log.info("重连环境MQTT"); MqttTask mqttTask = SpringUtils.getBean("mqttTask", MqttTask.class); mqttTask.runMQTT(MqttConfig.MQTT_TOPIC, MqttConfig.MQTT_USER_NAME, MqttConfig.MQTT_PASSWORD, MqttConfig.MQTT_CLIENT_ID); }catch (Exception e){ log.error("重连环境MQTT异常:{}",e.getMessage()); log.info("继续重连环境MQTT"); connectionLost(throwable); } }
@Override public void messageArrived(String topic, MqttMessage message) throws Exception { try { String msg = new String(message.getPayload()); log.info("接收环境MQTT消息:{}", msg); Sensor sensor = JSON.parseObject(msg, Sensor.class); sensor.setId(UUID.randomUUID().toString()); sensor.setCreateTime(new Date().getTime()); MqttTask mqttTask = SpringUtils.getBean("mqttTask", MqttTask.class); mqttTask.save(sensor); System.out.println(sensor.toString()); }catch (Exception e){ log.error("接收环境MQTT消息异常:{}",e.toString()); } }
@Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } }
|
此为博主副博客,留言请去主博客,转载请注明出处:https://www.baby7blog.com/myBlog/88.html