七仔的博客

七仔的博客GithubPages分博

0%

【“魔镜”系列】MQTT发送与接收

魔镜的环境数据存储使用HBase进行存储,使用传感器获取环境数据,然后Python使用MQTT协议发送到后端进行接收存储

【“魔镜”系列】MQTT发送与接收

发送

这里用的是python进行发送,代码很简单,如下:

1
2
3
4
5
6
7
import paho.mqtt.publish as publish

# 发送mqtt消息到服务器
def send_mqtt(message):
publish.single(settings.TOPIC, message, hostname=settings.HOSTNAME,
auth={'username': settings.USERNAME, 'password': settings.PASSWORD})
print("send message success" + message)

接收

接收使用Java进行接收(暂时先这样写,后面有时间再完善)
InitMQTT:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import com.pig4cloud.pigx.statis.config.MqttConfig;
import com.pig4cloud.pigx.statis.task.MqttTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@Order(2)
public class InitMQTT implements ApplicationRunner {

@Autowired
private MqttTask mqttTask;

@Override
public void run(ApplicationArguments var1) {
log.info("初始化MQTT监听");
log.info("初始化环境MQTT监听");
mqttTask.runMQTT(MqttConfig.MQTT_TOPIC, MqttConfig.MQTT_USER_NAME, MqttConfig.MQTT_PASSWORD, MqttConfig.MQTT_CLIENT_ID);
}
}

MqttTask:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import com.pig4cloud.pigx.statis.api.entity.Sensor;
import com.pig4cloud.pigx.statis.mqtt.ReceiveMessage;
import com.pig4cloud.pigx.statis.service.SensorService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
* 线程任务
*/
@Component
@Slf4j
public class MqttTask {
@Autowired
private ReceiveMessage receiveMessage;

@Autowired
private SensorService sensorService;

@Async
public void runMQTT(String topic,String userName,String password, String clientId) {
try {
log.info("开始连接EMQ:{}",topic);
receiveMessage.start(topic, userName,password,clientId);
}catch (Exception e){
log.error("连接MQTT异常:{}",e.getMessage());
}
}

@Async
public void save(Sensor sensor){
sensorService.save(sensor);
}
}

ReceiveMessage:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import com.pig4cloud.pigx.statis.config.MqttConfig;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ReceiveMessage {

private static MqttClient sensorClient; //环境

public MqttClient getSensorClient(){return sensorClient;}

private MqttClient connect(String host, String userName, String password, String clientId) throws MqttException {
MemoryPersistence persistence = new MemoryPersistence();
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(password.toCharArray());
options.setKeepAliveInterval(20);
options.setConnectionTimeout(10);
//设置环境回调
sensorClient = new MqttClient(host, clientId, persistence);
sensorClient.setCallback(new PushSensorCallback());
sensorClient.connect(options);
return sensorClient;
}

public void receive(MqttClient client, String topic) throws MqttException {
int[] Qos = {MqttConfig.QOS};
String[] topics = {topic};
client.subscribe(topics, Qos);
}

public void start(String topic, String userName, String password, String clientId) throws MqttException {
MqttClient client = connect(MqttConfig.MQTT_HOST, userName, password, clientId);
if (client != null) {
receive(client, topic);
}
log.info("连接EMQ:{}成功", MqttConfig.MQTT_HOST);
}
}

PushSensorCallback:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import com.alibaba.fastjson.JSON;
import com.pig4cloud.pigx.statis.api.entity.Sensor;
import com.pig4cloud.pigx.statis.config.MqttConfig;
import com.pig4cloud.pigx.statis.service.SensorService;
import com.pig4cloud.pigx.statis.task.MqttTask;
import com.pig4cloud.pigx.statis.util.SpringUtils;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.UUID;

/**
* 环境接收消息
*/
@Component
@Slf4j
public class PushSensorCallback implements MqttCallback {

@Override
public void connectionLost(Throwable throwable) {
/**
* 连接丢失后,一般在这里面进行重连
*/
System.out.println("环境MQTT连接断开,开始重连");
try{
log.info("重连环境MQTT");
MqttTask mqttTask = SpringUtils.getBean("mqttTask", MqttTask.class);
mqttTask.runMQTT(MqttConfig.MQTT_TOPIC, MqttConfig.MQTT_USER_NAME, MqttConfig.MQTT_PASSWORD, MqttConfig.MQTT_CLIENT_ID);
}catch (Exception e){
log.error("重连环境MQTT异常:{}",e.getMessage());
log.info("继续重连环境MQTT");
connectionLost(throwable);
}
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
try {
String msg = new String(message.getPayload());
log.info("接收环境MQTT消息:{}", msg);
Sensor sensor = JSON.parseObject(msg, Sensor.class);
sensor.setId(UUID.randomUUID().toString());
sensor.setCreateTime(new Date().getTime());
MqttTask mqttTask = SpringUtils.getBean("mqttTask", MqttTask.class);
mqttTask.save(sensor);
System.out.println(sensor.toString());
}catch (Exception e){
log.error("接收环境MQTT消息异常:{}",e.toString());
}
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}

此为博主副博客,留言请去主博客,转载请注明出处:https://www.baby7blog.com/myBlog/88.html

欢迎关注我的其它发布渠道