From afb4ae8126397e51dd38a99dcd4438663e953bed Mon Sep 17 00:00:00 2001 From: peri4 Date: Fri, 29 May 2026 09:56:29 +0300 Subject: [PATCH] MQTT ready to use change subscription logic - now keep subscriptions independently from connecting state. No unregisters on disconnect, but resubscriptions on connect. So one-time subscription on app start and just connect() on lost connection - all subscriptions keeps --- libs/main/mqtt_client/pimqttclient.h | 3 +- libs/mqtt_client/pimqttclient.cpp | 89 ++++++++++++++++++---------- main.cpp | 23 ++++--- 3 files changed, 70 insertions(+), 45 deletions(-) diff --git a/libs/main/mqtt_client/pimqttclient.h b/libs/main/mqtt_client/pimqttclient.h index 1a1ae5d2..6d32bf27 100644 --- a/libs/main/mqtt_client/pimqttclient.h +++ b/libs/main/mqtt_client/pimqttclient.h @@ -50,7 +50,6 @@ public: void connect(const PIString & address, const PIString & client, const PIString & username = {}, const PIString & password = {}); void disconnect(); - void subscribe(const PIString & topic, MessageFunction functor, QoS qos = QoS::Level1); template @@ -68,6 +67,8 @@ public: bool isConnecting() const { return m_status == Connecting; } bool isConnected() const { return m_status == Connected; } + PIStringList usedTopics() const; + EVENT0(connected); EVENT1(disconnected, PIMQTT::Error, code); EVENT1(receivedUnhandled, PIMQTT::MessageConst, message); diff --git a/libs/mqtt_client/pimqttclient.cpp b/libs/mqtt_client/pimqttclient.cpp index ee28ab69..60ffb958 100644 --- a/libs/mqtt_client/pimqttclient.cpp +++ b/libs/mqtt_client/pimqttclient.cpp @@ -28,9 +28,14 @@ struct PIMQTT::Client::Endpoint: public PIHTTP::ServerEndpoint { PIMQTT::Client::MessageFunction function; }; +struct TopicUsage { + int counter = 0; + PIMQTT::QoS qos = PIMQTT::QoS::Level1; +}; + struct EndpointsStorage { PIMap>> prepared; // [priority][topic] -> endpoints - PIMap topics_binded; // [topic] -> count + PIMap topics_binded; // [topic] -> TopicUsage }; @@ -117,13 +122,21 @@ void PIMQTT::Client::disconnect() { void PIMQTT::Client::subscribe(const PIString & topic, MessageFunction functor, QoS qos) { if (is_destoying) return; - worker->enqueueTask([this, topic, functor, qos] { subscribeInternal({topic, functor, qos}); }); + Subscribe sub{topic, functor, qos}; + // piCout << "subscribe" << topic; + PIString mqtt_topic = registerSubscribe(sub); + if (mqtt_topic.isEmpty()) return; + sub.topic = mqtt_topic; + worker->enqueueTask([this, sub] { subscribeInternal(sub); }); } void PIMQTT::Client::unsubscribe(const PIString & topic) { if (is_destoying) return; - worker->enqueueTask([this, topic] { unsubscribeInternal(topic); }); + // piCout << "unsubscribe" << topic; + PIString mqtt_topic = unregisterSubscribe(topic); + if (mqtt_topic.isEmpty()) return; + worker->enqueueTask([this, mqtt_topic] { unsubscribeInternal(mqtt_topic); }); } @@ -142,15 +155,27 @@ void PIMQTT::Client::publish(const MessageConst & msg) { } +PIStringList PIMQTT::Client::usedTopics() const { + return PRIVATE->endpoints.getRef()->topics_binded.keys(); +} + + void PIMQTT::Client::unsubscribeAll() { - unsubscribe("#"); + { + auto ref = PRIVATE->endpoints.getRef(); + auto tit = ref->topics_binded.makeIterator(); + while (tit.next()) { + if (tit.value().counter <= 0) continue; + PIString mqtt_topic = tit.key(); + worker->enqueueTask([this, mqtt_topic] { unsubscribeInternal(mqtt_topic); }); + } + } unregisterAll(); } void PIMQTT::Client::mqtt_connectionLost() { - piCoutObj << "mqtt_connectionLost"; - unregisterAll(); + // piCoutObj << "mqtt_connectionLost"; PRIVATE->connected = false; changeStatus(Idle); disconnected(Error::ServerUnavailable); @@ -163,7 +188,7 @@ void PIMQTT::Client::mqtt_deliveryComplete(int token) {} void PIMQTT::Client::mqtt_messageArrived(MessageMutable & msg) { PIStringList in_path = msg.topicList(); PIMQTT::Client::MessageFunction function; - piCoutObj << "mqtt_messageArrived"; + // piCoutObj << "mqtt_messageArrived"; { bool found = false; auto ref = PRIVATE->endpoints.getRef(); @@ -203,7 +228,7 @@ PIString PIMQTT::Client::registerSubscribe(const Subscribe & sub) { piCoutObj << "Warning: subscribe to empty topic, ignore"; return {}; } - piCout << sub.topic << "->" << topic << ep.priority; + // piCout << sub.topic << "->" << topic << ep.priority; bool is_new_topic = false; auto ref = PRIVATE->endpoints.getRef(); auto & eps_by_topic(ref->prepared[ep.priority][topic]); @@ -214,10 +239,11 @@ PIString PIMQTT::Client::registerSubscribe(const Subscribe & sub) { } } eps_by_topic << ep; - auto & counter(ref->topics_binded[topic]); - is_new_topic = counter == 0; - ++counter; + auto & usage(ref->topics_binded[topic]); + is_new_topic = usage.counter == 0; + ++usage.counter; if (!is_new_topic) return {}; + usage.qos = sub.qos; return topic; } @@ -230,7 +256,7 @@ PIString PIMQTT::Client::unregisterSubscribe(const PIString & mqtt_topic) { piCoutObj << "Warning: unsubscribe from empty topic, ignore"; return {}; } - piCout << mqtt_topic << "->" << topic << ep.priority; + // piCout << mqtt_topic << "->" << topic << ep.priority; auto ref = PRIVATE->endpoints.getRef(); auto pit = ref->prepared.makeIterator(); while (pit.next()) { // by priority @@ -242,12 +268,15 @@ PIString PIMQTT::Client::unregisterSubscribe(const PIString & mqtt_topic) { for (int i = 0; i < eps.size_s(); ++i) { if (eps[i].path == ep.path) { eps.remove(i); - auto & counter(ref->topics_binded[tit.key()]); - --counter; + auto & usage(ref->topics_binded[tit.key()]); + --usage.counter; PIString ret; - if (counter <= 0) ret = tit.key(); + if (usage.counter <= 0) { + ret = tit.key(); + ref->topics_binded.remove(tit.key()); + } if (eps.isEmpty()) { - piCout << "remove topics" << tit.key(); + // piCout << "remove topics" << tit.key(); pit.value().remove(tit.key()); } return ret; @@ -295,6 +324,12 @@ void PIMQTT::Client::connectInternal(const ConnectInfo & ci) { return; } PRIVATE->connected = true; + PIMap topics_binded; + { topics_binded = PRIVATE->endpoints.getRef()->topics_binded; } + auto it = topics_binded.makeIterator(); + while (it.next()) { + if (it.value().counter > 0) subscribeInternal({it.key(), nullptr, it.value().qos}); + } changeStatus(Connected); connected(); } @@ -324,15 +359,10 @@ void PIMQTT::Client::publishInternal(const MessageConst & m) { void PIMQTT::Client::subscribeInternal(const Subscribe & sub) { if (!PRIVATE->client) return; - piCout << ""; - piCout << "subscribeInternal" << sub.topic; - PIString topic = registerSubscribe(sub); - if (topic.isEmpty()) return; - piCout << "NEW" << topic; // << PRIVATE->endpoints.getRef()->size(); - int ret = MQTTClient_subscribe(PRIVATE->client, topic.dataUTF8(), static_cast(sub.qos)); + // piCout << "subscribeInternal" << sub.topic; + int ret = MQTTClient_subscribe(PRIVATE->client, sub.topic.dataUTF8(), static_cast(sub.qos)); if (ret != MQTTCLIENT_SUCCESS) { - piCoutObj << "Failed to subscribe" << topic << ", code" << ret; - + piCoutObj << "Failed to subscribe" << sub.topic << ", code" << ret; return; } } @@ -340,20 +370,15 @@ void PIMQTT::Client::subscribeInternal(const Subscribe & sub) { void PIMQTT::Client::unsubscribeInternal(const PIString & mqtt_topic) { if (!PRIVATE->client) return; - piCout << ""; - piCout << "unsubscribeInternal" << mqtt_topic; - PIString topic = unregisterSubscribe(mqtt_topic); - if (topic.isEmpty()) return; - piCout << "DEL" << topic; // << PRIVATE->endpoints.getRef()->size(); - int ret = MQTTClient_unsubscribe(PRIVATE->client, topic.dataUTF8()); + // piCout << "unsubscribeInternal" << mqtt_topic; + int ret = MQTTClient_unsubscribe(PRIVATE->client, mqtt_topic.dataUTF8()); if (ret != MQTTCLIENT_SUCCESS) { - piCoutObj << "Failed to unsubscribe" << topic << ", code" << ret; + piCoutObj << "Failed to unsubscribe" << mqtt_topic << ", code" << ret; } } void PIMQTT::Client::destroy() { - unregisterAll(); if (!PRIVATE->client) return; if (PRIVATE->connected) MQTTClient_disconnect(PRIVATE->client, 1000); MQTTClient_destroy(&PRIVATE->client); diff --git a/main.cpp b/main.cpp index 95ac62cd..6550c43a 100644 --- a/main.cpp +++ b/main.cpp @@ -64,6 +64,14 @@ int main(int argc, char * argv[]) { PIMQTT::Client cl; cl.setConnectTimeout(2_s); + cl.subscribe("api/v1/all/bort{A}/f", + [](const PIMQTT::MessageConst & msg) { piCout << "1" << msg.topicList() << msg.pathArguments() << msg.body().size(); }); + cl.subscribe("api/v1/all/task{T}/f", + [](const PIMQTT::MessageConst & msg) { piCout << "2" << msg.topicList() << msg.pathArguments() << msg.body().size(); }); + cl.subscribe("api/v1/all/*/f", + [](const PIMQTT::MessageConst & msg) { piCout << "3" << msg.topicList() << msg.pathArguments() << msg.body().size(); }); + cl.subscribe("api/v1/all2/**", + [](const PIMQTT::MessageConst & msg) { piCout << "4" << msg.topicList() << msg.pathArguments() << msg.body().size(); }); CONNECTL(&cl, connected, [&cl] { piCout << "connected"; // cl.subscribe("api/v1/plugins"); @@ -71,18 +79,6 @@ int main(int argc, char * argv[]) { // cl.subscribe("api/v1/*/{taskID}/status"); // cl.subscribe("api/v1/bort/list"); // cl.subscribe("api/v1/all"); - cl.subscribe("api/v1/all/bort{A}/f", [](const PIMQTT::MessageConst & msg) { - piCout << "1" << msg.topicList() << msg.pathArguments() << msg.body().size(); - }); - cl.subscribe("api/v1/all/task{T}/f", [](const PIMQTT::MessageConst & msg) { - piCout << "2" << msg.topicList() << msg.pathArguments() << msg.body().size(); - }); - cl.subscribe("api/v1/all/*/f", [](const PIMQTT::MessageConst & msg) { - piCout << "3" << msg.topicList() << msg.pathArguments() << msg.body().size(); - }); - cl.subscribe("api/v1/all2/**", [](const PIMQTT::MessageConst & msg) { - piCout << "4" << msg.topicList() << msg.pathArguments() << msg.body().size(); - }); // cl.subscribe("/zigbee2mqtt"); // cl.subscribe("/zigbee2mqtt/+/status/"); // cl.subscribe("/zigbee2mqtt/*/status/"); @@ -103,6 +99,9 @@ int main(int argc, char * argv[]) { cl.connect("localhost", "PIP"); + piSleep(6.); + cl.unsubscribeAll(); + kbd.enableExitCapture('Q'); WAIT_FOR_EXIT