From a2f73d053c403f5c72b1260eeda0b35dfc6089a5 Mon Sep 17 00:00:00 2001 From: peri4 Date: Thu, 26 Mar 2026 13:04:26 +0300 Subject: [PATCH] add file --- libs/mqtt_client/pimqttclient.cpp | 259 ++++++++++++++++++++++++++++++ 1 file changed, 259 insertions(+) create mode 100644 libs/mqtt_client/pimqttclient.cpp diff --git a/libs/mqtt_client/pimqttclient.cpp b/libs/mqtt_client/pimqttclient.cpp new file mode 100644 index 00000000..6a787dbb --- /dev/null +++ b/libs/mqtt_client/pimqttclient.cpp @@ -0,0 +1,259 @@ +/* + PIP - Platform Independent Primitives + MQTT Client + Ivan Pelipenko peri4ko@yandex.ru + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "pimqttclient.h" + +#include "MQTTClient.h" + + +STATIC_INITIALIZER_BEGIN + MQTTClient_init_options opts = MQTTClient_init_options_initializer; + MQTTClient_global_init(&opts); +STATIC_INITIALIZER_END + + +PRIVATE_DEFINITION_START(PIMQTT::Client) + MQTTClient client = nullptr; + bool connected = false; + + static void connectionLost_callback(void * context, char *) { + ((PIMQTT::Client *)context)->mqtt_connectionLost(); + } + + static void deliveryComplete_callback(void * context, MQTTClient_deliveryToken dt) { + ((PIMQTT::Client *)context)->mqtt_deliveryComplete(dt); + } + + static int messageArrived_callback(void * context, char * topicName, int topicLen, MQTTClient_message * message) { + PIMQTT::Message msg; + msg.topic = PIString::fromUTF8(topicName); + msg.payload = PIByteArray(message->payload, message->payloadlen); + msg.msg_id = message->msgid; + msg.is_duplicate = message->dup != 0; + msg.qos = static_cast(message->qos); + MQTTClient_freeMessage(&message); + MQTTClient_free(topicName); + ((PIMQTT::Client *)context)->mqtt_messageArrived(msg); + return 1; + } + +PRIVATE_DEFINITION_END(PIMQTT::Client) + + +PIMQTT::Client::Client() { + thread = new PIThread([this] { run(); }); + thread->start(); + // worker = new PIThreadPoolWorker(1); + // worker->start(); +} + + +PIMQTT::Client::~Client() { + // worker->stopAndWait(); + // piDeleteSafety(worker); + thread->stop(); + disconnect(); + thread->waitForFinish(); + piDeleteSafety(thread); +} + + +void PIMQTT::Client::connect(const PIString & address, const PIString & client, const PIString & username, const PIString & password) { + auto ci = connect_info.getRef(); + ci->address = address; + ci->clientID = client; + ci->username = username; + ci->password = password; + changeStatus(Connecting); +} + + +void PIMQTT::Client::disconnect() { + changeStatus(Disconnecting); +} + + +void PIMQTT::Client::subscribe(const PIString & topic, QoS qos) { + sub_queue.getRef()->enqueue({topic, qos}); + notifier.notify(); +} + + +void PIMQTT::Client::unsubscribe(const PIString & topic) { + unsub_queue.getRef()->enqueue(topic); + notifier.notify(); +} + + +void PIMQTT::Client::publish(const PIString & topic, const PIByteArray & msg, QoS qos) { + Message m; + m.topic = topic; + m.payload = msg; + m.qos = qos; + pub_queue.getRef()->enqueue(m); + notifier.notify(); +} + + +void PIMQTT::Client::mqtt_connectionLost() { + piCoutObj << "mqtt_connectionLost"; + PRIVATE->connected = false; + changeStatus(Idle); + disconnected(Error::ServerUnavailable); +} + + +void PIMQTT::Client::mqtt_deliveryComplete(int token) {} + + +void PIMQTT::Client::mqtt_messageArrived(const Message & msg) { + piCoutObj << "mqtt_messageArrived"; + received(msg); +} + + +void PIMQTT::Client::connectInternal() { + destroy(); + PRIVATE->connected = false; + auto ci = connect_info.get(); + PINetworkAddress net_addr = PINetworkAddress::resolve(ci.address); + if (net_addr.port() == 0) net_addr.setPort(1883); + PIString server_uri = "tcp://" + net_addr.toString(); + MQTTClient_create(&PRIVATE->client, server_uri.dataAscii(), ci.clientID.dataUTF8(), MQTTCLIENT_PERSISTENCE_NONE, nullptr); + MQTTClient_setCallbacks(PRIVATE->client, + this, + PRIVATE->connectionLost_callback, + PRIVATE->messageArrived_callback, + PRIVATE->deliveryComplete_callback); + MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; + conn_opts.keepAliveInterval = 9; + conn_opts.cleansession = 1; + conn_opts.connectTimeout = piRound(connect_timeout.toSeconds()); + if (ci.username.isNotEmpty()) conn_opts.username = ci.username.dataAscii(); + if (ci.password.isNotEmpty()) conn_opts.password = ci.password.dataAscii(); + int ret = MQTTClient_connect(PRIVATE->client, &conn_opts); + if (ret != MQTTCLIENT_SUCCESS) { + piCoutObj << "Failed to connect to " + server_uri << ", code" << ret; + changeStatus(Idle); + if (ret < 0) ret = static_cast(Error::ServerUnavailable); + disconnected(static_cast(ret)); + return; + } + PRIVATE->connected = true; + changeStatus(Connected); + connected(); +} + + +void PIMQTT::Client::publishInternal(const Message & m) { + if (!PRIVATE->client) return; + MQTTClient_message pubmsg = MQTTClient_message_initializer; + pubmsg.payload = const_cast(static_cast(m.payload.data())); + pubmsg.payloadlen = m.payload.size_s(); + pubmsg.qos = static_cast(m.qos); + int ret = MQTTClient_publishMessage(PRIVATE->client, m.topic.dataAscii(), &pubmsg, nullptr); + if (ret != MQTTCLIENT_SUCCESS) { + piCoutObj << "Failed to publishMessage, code" << ret; + } +} + + +void PIMQTT::Client::subscribeInternal(const Subscribe & sub) { + if (!PRIVATE->client) return; + int ret = MQTTClient_subscribe(PRIVATE->client, sub.topic.dataUTF8(), static_cast(sub.qos)); + if (ret != MQTTCLIENT_SUCCESS) { + piCoutObj << "Failed to subscribe" << sub.topic << ", code" << ret; + } +} + + +void PIMQTT::Client::unsubscribeInternal(const PIString & topic) { + if (!PRIVATE->client) return; + int ret = MQTTClient_unsubscribe(PRIVATE->client, topic.dataUTF8()); + if (ret != MQTTCLIENT_SUCCESS) { + piCoutObj << "Failed to unsubscribe" << topic << ", code" << ret; + } +} + + +void PIMQTT::Client::destroy() { + if (!PRIVATE->client) return; + if (PRIVATE->connected) MQTTClient_disconnect(PRIVATE->client, 1000); + MQTTClient_destroy(&PRIVATE->client); + PRIVATE->connected = false; + PRIVATE->client = nullptr; +} + + +void PIMQTT::Client::changeStatus(Status s) { + m_status = s; + notifier.notify(); +} + + +void PIMQTT::Client::run() { + notifier.wait(); + piCoutObj << "run" << (int)m_status; + if (thread->isStopping() || m_status == Disconnecting) { + bool was_connected = PRIVATE->connected; + changeStatus(Idle); + destroy(); + if (was_connected) disconnected(Error::Unknown); + PRIVATE->connected = false; + return; + } + if (m_status == Connecting) { + connectInternal(); + return; + } + if (m_status == Connected) { + Subscribe sub; + { + auto ref = sub_queue.getRef(); + if (!ref->isEmpty()) sub = ref->dequeue(); + } + if (sub.topic.isNotEmpty()) { + subscribeInternal(sub); + notifier.notify(); + return; + } + + PIString topic; + { + auto ref = unsub_queue.getRef(); + if (!ref->isEmpty()) topic = ref->dequeue(); + } + if (topic.isNotEmpty()) { + unsubscribeInternal(topic); + notifier.notify(); + return; + } + + Message msg; + { + auto ref = pub_queue.getRef(); + if (!ref->isEmpty()) msg = ref->dequeue(); + } + if (msg.isValid()) { + publishInternal(msg); + notifier.notify(); + return; + } + } +}