因为harmonyOS暂时没有发现现成的mqtt的js包,所以使用Java进行Mqtt消息的接收,使用JS去定时调用Java接收到消息并展示
HarmonyOS实现MQTT消息监听展示
思路
因为harmonyOS暂时没有发现现成的mqtt的js包,所以使用Java进行Mqtt消息的接收,使用JS去定时调用Java接收到消息并展示
首先是JS调用Java,JS FA(Feature Ability)调用Java PA(Particle Ability)有两种方式,Ability和Internal Ability,这里使用的是第一种Ability
然后是Java端的Mqtt消息接收,使用paho的第三方库进行消息接收,页面启动时JS端调用Java端实现Mqtt消息接收开始,使用异步挂起,接收消息并缓存,随后JS端每次调用Java端拿到的都是最新缓存的信息
具体代码
hml页面:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| <div class="container"> <div> <text class="title"> {{ title }} </text> </div> <div> <text class="title" onclick="mqttMessage"> 开始mqtt </text> </div> <div> <text class="title" onclick="stopMqtt"> 停止mqtt </text> </div> </div>
|
JS代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| const ABILITY_TYPE_EXTERNAL = 0; const ACTION_SYNC = 0; const ACTION_MESSAGE_CODE_START_MQTT = 1001; const ACTION_MESSAGE_CODE_MQTT_MESSAGE = 1002; const BUNDLE_NAME = 'com.example.mqttapplication'; const ABILITY_NAME = 'com.example.mqttapplication.PlayAbility';
export const playAbility = { startMqtt: async function() { FeatureAbility.callAbility({ messageCode: ACTION_MESSAGE_CODE_START_MQTT, abilityType: ABILITY_TYPE_EXTERNAL, syncOption: ACTION_SYNC, bundleName: BUNDLE_NAME, abilityName: ABILITY_NAME }); }, mqttMessage: async function(that) { var result = await FeatureAbility.callAbility({ messageCode: ACTION_MESSAGE_CODE_MQTT_MESSAGE, abilityType: ABILITY_TYPE_EXTERNAL, syncOption: ACTION_SYNC, bundleName: BUNDLE_NAME, abilityName: ABILITY_NAME }); var ret = JSON.parse(result); if (ret.code == 0) { console.info('mqtt is:' + JSON.stringify(ret.abilityResult)); that.title = 'mqtt is:' + JSON.stringify(ret.abilityResult); } else { console.error('mqtt error code:' + JSON.stringify(ret.code)); } } } export default { data: { title: "", timer: null }, task() { playAbility.mqttMessage(this); }, mqttMessage() { this.title = "开始获取MQTT消息"; this.task() this.timer=setInterval(this.task,200) }, stopMqtt() { clearInterval(this.timer) } }
playAbility.startMqtt()
|
Java端代码(接收Mqtt消息,异步)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.List;
public class MqttThread implements Runnable {
public static final String MQTT_BROKER_HOST = "tcp://xxx.xxx.xxx.xxx:1883"; public static final String MQTT_CLIENT_ID = "client"; public static final String MQTT_TOPIC = "HarmonyTest"; private volatile static MqttClient mqttClient; private static MqttConnectOptions options; private final List<String> message;
public MqttThread(List<String> message) { this.message = message; }
public void run() { try { mqttClient = new MqttClient(MQTT_BROKER_HOST, MQTT_CLIENT_ID, new MemoryPersistence()); options = new MqttConnectOptions(); options.setCleanSession(true); options.setConnectionTimeout(20); options.setKeepAliveInterval(20); mqttClient.connect(options); mqttClient.subscribe(MQTT_TOPIC); mqttClient.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { } @Override public void messageArrived(String s, MqttMessage mqttMessage) { message.clear(); message.add(mqttMessage.toString()); System.out.println("接收到mqtt消息:" + mqttMessage.toString()); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }); } catch (Exception e) { e.printStackTrace(); } } }
|
Java端代码(Particle Ability)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| import com.example.mqttapplication.mqtt.MqttThread; import ohos.aafwk.ability.Ability; import ohos.aafwk.content.Intent; import ohos.hiviewdfx.HiLog; import ohos.hiviewdfx.HiLogLabel; import ohos.rpc.*; import ohos.utils.zson.ZSONObject;
import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map;
public class PlayAbility extends Ability {
static final HiLogLabel label = new HiLogLabel(HiLog.LOG_APP, 1, "MY_TAG");
private static final int ERROR = -1; private static final int SUCCESS = 0; private static final int START_MQTT = 1001; private static final int MQTT_MESSAGE = 1002;
@Override protected void onStart(Intent intent) { super.onStart(intent); }
@Override protected IRemoteObject onConnect(Intent intent) { super.onConnect(intent); PlayRemote remote = new PlayRemote(); return remote.asObject(); }
static class PlayRemote extends RemoteObject implements IRemoteBroker {
private List<String> message;
private Thread thread;
public PlayRemote() { super("PlayRemote"); }
@Override public boolean onRemoteRequest(int code, MessageParcel data, MessageParcel reply, MessageOption option) { else if (code == START_MQTT) { Map<String, Object> result = new HashMap<>(); result.put("code", SUCCESS); result.put("abilityResult", "成功开始mqtt"); try { message = new ArrayList<>(); MqttThread mqttThread = new MqttThread(message); thread = new Thread(mqttThread); thread.start(); System.out.println("mqtt启动成功"); } catch (Exception e) { result.put("code", ERROR); result.put("abilityResult", "启动失败"); } reply.writeString(ZSONObject.toZSONString(result)); } else if (code == MQTT_MESSAGE) { Map<String, Object> result = new HashMap<>(); result.put("code", SUCCESS); if (message.isEmpty()) { result.put("abilityResult", "未接收到MQTT消息"); } else { ZSONObject zsonObject = ZSONObject.stringToZSON(message.get(0)); result.put("abilityResult", zsonObject.getString("message")); } reply.writeString(ZSONObject.toZSONString(result)); } else { Map<String, Object> result = new HashMap<>(); result.put("abilityError", ERROR); reply.writeString(ZSONObject.toZSONString(result)); return false; } return true; }
@Override public IRemoteObject asObject() { return this; } } }
|
另外启动网络连接还需要往config.json里加点东西获取权限
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| { ... "module": { ... "reqPermissions": [ { "name": "ohos.permission.GET_NETWORK_INFO" }, { "name": "ohos.permission.INTERNET" }, { "name": "ohos.permission.SET_NETWORK_INFO" }, { "name": "ohos.permission.MANAGE_WIFI_CONNECTION" }, { "name": "ohos.permission.SET_WIFI_INFO" }, { "name": "ohos.permission.GET_WIFI_INFO" } ] } }
|
最后写了个python的脚本用来发送mqtt消息,很简单就一行
1 2
| import paho.mqtt.publish as publish publish.single('HarmonyTest', '{"message":"BongShakalaka"}', hostname='xxx.xxx.xxx.xxx')
|
附:mqtt消息是要有mqtt服务器的,这个就自己搭或者买吧
应鸿蒙开发者论坛读者要求,这里贴上源码地址
github: https://github.com/baby7/MqttApplication
gitee(码云): https://gitee.com/baby7/MqttApplication
此为博主副博客,留言请去主博客,转载请注明出处:https://www.baby7blog.com/myBlog/101.html