七仔的博客

七仔的博客GithubPages分博

0%

HarmonyOS实现MQTT消息监听展示

因为harmonyOS暂时没有发现现成的mqtt的js包,所以使用Java进行Mqtt消息的接收,使用JS去定时调用Java接收到消息并展示

HarmonyOS实现MQTT消息监听展示

思路

因为harmonyOS暂时没有发现现成的mqtt的js包,所以使用Java进行Mqtt消息的接收,使用JS去定时调用Java接收到消息并展示
首先是JS调用Java,JS FA(Feature Ability)调用Java PA(Particle Ability)有两种方式,Ability和Internal Ability,这里使用的是第一种Ability
然后是Java端的Mqtt消息接收,使用paho的第三方库进行消息接收,页面启动时JS端调用Java端实现Mqtt消息接收开始,使用异步挂起,接收消息并缓存,随后JS端每次调用Java端拿到的都是最新缓存的信息

具体代码

hml页面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<div class="container">
<div>
<text class="title">
{{ title }}
</text>
</div>
<div>
<text class="title" onclick="mqttMessage">
开始mqtt
</text>
</div>
<div>
<text class="title" onclick="stopMqtt">
停止mqtt
</text>
</div>
</div>

JS代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
const ABILITY_TYPE_EXTERNAL = 0;
const ACTION_SYNC = 0;
const ACTION_MESSAGE_CODE_START_MQTT = 1001;
const ACTION_MESSAGE_CODE_MQTT_MESSAGE = 1002;
const BUNDLE_NAME = 'com.example.mqttapplication';
const ABILITY_NAME = 'com.example.mqttapplication.PlayAbility';

export const playAbility = {
startMqtt: async function() {
FeatureAbility.callAbility({
messageCode: ACTION_MESSAGE_CODE_START_MQTT,
abilityType: ABILITY_TYPE_EXTERNAL,
syncOption: ACTION_SYNC,
bundleName: BUNDLE_NAME,
abilityName: ABILITY_NAME
});
},
mqttMessage: async function(that) {
var result = await FeatureAbility.callAbility({
messageCode: ACTION_MESSAGE_CODE_MQTT_MESSAGE,
abilityType: ABILITY_TYPE_EXTERNAL,
syncOption: ACTION_SYNC,
bundleName: BUNDLE_NAME,
abilityName: ABILITY_NAME
});
var ret = JSON.parse(result);
if (ret.code == 0) {
console.info('mqtt is:' + JSON.stringify(ret.abilityResult));
that.title = 'mqtt is:' + JSON.stringify(ret.abilityResult);
} else {
console.error('mqtt error code:' + JSON.stringify(ret.code));
}
}
}
export default {
data: {
title: "",
timer: null
},
task() {
playAbility.mqttMessage(this);
},
mqttMessage() {
this.title = "开始获取MQTT消息";
this.task()
this.timer=setInterval(this.task,200)
},
stopMqtt() {
clearInterval(this.timer)
}
}
//初始化Java端Mqtt消息接收
playAbility.startMqtt()

Java端代码(接收Mqtt消息,异步)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

import java.util.List;


public class MqttThread implements Runnable {

/**地址*/
public static final String MQTT_BROKER_HOST = "tcp://xxx.xxx.xxx.xxx:1883";
/**客户端唯一标识*/
public static final String MQTT_CLIENT_ID = "client";
/**订阅标识*/
public static final String MQTT_TOPIC = "HarmonyTest";
/**客户端*/
private volatile static MqttClient mqttClient;
/**连接选项*/
private static MqttConnectOptions options;
/**消息*/
private final List<String> message;

public MqttThread(List<String> message) {
this.message = message;
}

public void run() {
try {
mqttClient = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence());
options = new MqttConnectOptions();
options.setCleanSession(true);
options.setConnectionTimeout(20);
options.setKeepAliveInterval(20);
mqttClient.connect(options);
mqttClient.subscribe(MQTT_TOPIC);
mqttClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable throwable) { }
@Override
public void messageArrived(String s, MqttMessage mqttMessage) {
message.clear();
message.add(mqttMessage.toString());
System.out.println("接收到mqtt消息:" + mqttMessage.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { }
});
} catch (Exception e) {
e.printStackTrace();
}
}
}

Java端代码(Particle Ability)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import com.example.mqttapplication.mqtt.MqttThread;
import ohos.aafwk.ability.Ability;
import ohos.aafwk.content.Intent;
import ohos.hiviewdfx.HiLog;
import ohos.hiviewdfx.HiLogLabel;
import ohos.rpc.*;
import ohos.utils.zson.ZSONObject;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class PlayAbility extends Ability {

static final HiLogLabel label = new HiLogLabel(HiLog.LOG_APP, 1, "MY_TAG");

private static final int ERROR = -1;
private static final int SUCCESS = 0;
private static final int START_MQTT = 1001;
private static final int MQTT_MESSAGE = 1002;

@Override
protected void onStart(Intent intent) {
super.onStart(intent);
}

@Override
protected IRemoteObject onConnect(Intent intent) {
super.onConnect(intent);
PlayRemote remote = new PlayRemote();
return remote.asObject();
}

static class PlayRemote extends RemoteObject implements IRemoteBroker {

private List<String> message;

private Thread thread;

public PlayRemote() {
super("PlayRemote");
}

@Override
public boolean onRemoteRequest(int code, MessageParcel data, MessageParcel reply, MessageOption option) {
// 开始mqtt
else if (code == START_MQTT) {
Map<String, Object> result = new HashMap<>();
result.put("code", SUCCESS);
result.put("abilityResult", "成功开始mqtt");
try {
message = new ArrayList<>();
MqttThread mqttThread = new MqttThread(message);
thread = new Thread(mqttThread);
thread.start();
System.out.println("mqtt启动成功");
}
catch (Exception e) {
result.put("code", ERROR);
result.put("abilityResult", "启动失败");
}
reply.writeString(ZSONObject.toZSONString(result));
}
// 获取mqtt消息
else if (code == MQTT_MESSAGE) {
Map<String, Object> result = new HashMap<>();
result.put("code", SUCCESS);
if (message.isEmpty()) {
result.put("abilityResult", "未接收到MQTT消息");
}
else {
ZSONObject zsonObject = ZSONObject.stringToZSON(message.get(0));
result.put("abilityResult", zsonObject.getString("message"));
}
reply.writeString(ZSONObject.toZSONString(result));
}
else {
Map<String, Object> result = new HashMap<>();
result.put("abilityError", ERROR);
reply.writeString(ZSONObject.toZSONString(result));
return false;
}
return true;
}

@Override
public IRemoteObject asObject() {
return this;
}
}
}

另外启动网络连接还需要往config.json里加点东西获取权限

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
...
"module": {
...
"reqPermissions": [
{
"name": "ohos.permission.GET_NETWORK_INFO"
},
{
"name": "ohos.permission.INTERNET"
},
{
"name": "ohos.permission.SET_NETWORK_INFO"
},
{
"name": "ohos.permission.MANAGE_WIFI_CONNECTION"
},
{
"name": "ohos.permission.SET_WIFI_INFO"
},
{
"name": "ohos.permission.GET_WIFI_INFO"
}
]
}
}

最后写了个python的脚本用来发送mqtt消息,很简单就一行

1
2
import paho.mqtt.publish as publish
publish.single('HarmonyTest', '{"message":"BongShakalaka"}', hostname='xxx.xxx.xxx.xxx')

附:mqtt消息是要有mqtt服务器的,这个就自己搭或者买吧

应鸿蒙开发者论坛读者要求,这里贴上源码地址

github: https://github.com/baby7/MqttApplication

gitee(码云): https://gitee.com/baby7/MqttApplication

此为博主副博客,留言请去主博客,转载请注明出处:https://www.baby7blog.com/myBlog/101.html

欢迎关注我的其它发布渠道