From 38d09e272cafc6606dc86aff1fa70c3c51a4f80e Mon Sep 17 00:00:00 2001 From: peri4 Date: Thu, 28 May 2026 20:26:49 +0300 Subject: [PATCH] MQTT seems to work 1. subscribe now similar to HTTP server, with lambda 2. subscribe topic syntax support all HTTP features as path arguments and wildcards 3. event received() changed to receivedUnhandled() for unhandled messages (should never be called in proper work) 4. internal logic got more complicated, several endpoints may be serviced by single MQTT topic, so nested Map used --- libs/main/mqtt_client/pimqttclient.h | 35 +++++- libs/mqtt_client/pimqttclient.cpp | 171 +++++++++++++++++++++++++-- main.cpp | 111 ++++++++++------- 3 files changed, 261 insertions(+), 56 deletions(-) diff --git a/libs/main/mqtt_client/pimqttclient.h b/libs/main/mqtt_client/pimqttclient.h index 1e12c47d..1a1ae5d2 100644 --- a/libs/main/mqtt_client/pimqttclient.h +++ b/libs/main/mqtt_client/pimqttclient.h @@ -41,24 +41,38 @@ public: Client(); virtual ~Client(); + //! \~english Request handler used by registered routes and fallback processing. + //! \~russian Обработчик запроса, используемый зарегистрированными маршрутами и fallback-обработкой. + using MessageFunction = std::function; + void setConnectTimeout(PISystemTime time) { connect_timeout = time; } void connect(const PIString & address, const PIString & client, const PIString & username = {}, const PIString & password = {}); void disconnect(); - void subscribe(const PIString & topic, QoS qos = QoS::Level1); + + void subscribe(const PIString & topic, MessageFunction functor, QoS qos = QoS::Level1); + + template + void + subscribe(const PIString & topic, T * o, PIMQTT::MessageMutable (T::*function)(const PIMQTT::MessageConst &), QoS qos = QoS::Level1) { + subscribe(topic, [o, function](const PIMQTT::MessageConst & m) { return (o->*function)(m); }, qos); + } + void unsubscribe(const PIString & topic); + void unsubscribeAll(); void publish(const PIString & topic, const PIByteArray & msg, QoS qos = QoS::Level0); void publish(const MessageConst & msg); - void unsubscribeAll() { unsubscribe("#"); } bool isConnecting() const { return m_status == Connecting; } bool isConnected() const { return m_status == Connected; } EVENT0(connected); EVENT1(disconnected, PIMQTT::Error, code); - EVENT1(received, PIMQTT::MessageConst, message); + EVENT1(receivedUnhandled, PIMQTT::MessageConst, message); + + struct Endpoint; private: NO_COPY_CLASS(Client) @@ -79,23 +93,34 @@ private: }; struct Subscribe { PIString topic; + MessageFunction functor; QoS qos; }; void mqtt_connectionLost(); void mqtt_deliveryComplete(int token); - void mqtt_messageArrived(const MessageConst & msg); + void mqtt_messageArrived(MessageMutable & msg); + + PIString registerSubscribe(const Subscribe & sub); + PIString unregisterSubscribe(const PIString & mqtt_topic); + void unregisterAll(); void connectInternal(const ConnectInfo & ci); void disconnectInternal(); void publishInternal(const MessageConst & m); void subscribeInternal(const Subscribe & sub); - void unsubscribeInternal(const PIString & topic); + void unsubscribeInternal(const PIString & mqtt_topic); void destroy(); void changeStatus(Status s); void run(); + // from HTTP format + static PIString convertTopic2MQTT(const PIString & topic); + + // from MQTT format + static PIString convertTopic2HTTP(const PIString & topic); + std::atomic_int m_status = {Idle}; std::atomic_bool is_destoying = {false}; PISystemTime connect_timeout = 10_s; diff --git a/libs/mqtt_client/pimqttclient.cpp b/libs/mqtt_client/pimqttclient.cpp index 6518d7b2..ee28ab69 100644 --- a/libs/mqtt_client/pimqttclient.cpp +++ b/libs/mqtt_client/pimqttclient.cpp @@ -20,6 +20,31 @@ #include "pimqttclient.h" #include "MQTTClient.h" +#include "piliterals_string.h" +#include "piserverendpoint_p.h" + + +struct PIMQTT::Client::Endpoint: public PIHTTP::ServerEndpoint { + PIMQTT::Client::MessageFunction function; +}; + +struct EndpointsStorage { + PIMap>> prepared; // [priority][topic] -> endpoints + PIMap topics_binded; // [topic] -> count +}; + + +PIString topicFromEndpoint(const PIMQTT::Client::Endpoint & e) { + PIStringList ret; + for (const auto & i: e.prepared_path) { + switch (i.type) { + case PIHTTP::ServerEndpoint::PathElement::Type::Fixed: ret << i.source; break; + case PIHTTP::ServerEndpoint::PathElement::Type::AnyMany: ret << "#"_a; break; + default: ret << "+"_a; break; + } + } + return ret.join('/'); +} STATIC_INITIALIZER_BEGIN @@ -32,6 +57,8 @@ PRIVATE_DEFINITION_START(PIMQTT::Client) MQTTClient client = nullptr; bool connected = false; + PIProtectedVariable endpoints; + static void connectionLost_callback(void * context, char *) { ((PIMQTT::Client *)context)->mqtt_connectionLost(); } @@ -88,9 +115,9 @@ void PIMQTT::Client::disconnect() { } -void PIMQTT::Client::subscribe(const PIString & topic, QoS qos) { +void PIMQTT::Client::subscribe(const PIString & topic, MessageFunction functor, QoS qos) { if (is_destoying) return; - worker->enqueueTask([this, topic, qos] { subscribeInternal({topic, qos}); }); + worker->enqueueTask([this, topic, functor, qos] { subscribeInternal({topic, functor, qos}); }); } @@ -115,8 +142,15 @@ void PIMQTT::Client::publish(const MessageConst & msg) { } +void PIMQTT::Client::unsubscribeAll() { + unsubscribe("#"); + unregisterAll(); +} + + void PIMQTT::Client::mqtt_connectionLost() { piCoutObj << "mqtt_connectionLost"; + unregisterAll(); PRIVATE->connected = false; changeStatus(Idle); disconnected(Error::ServerUnavailable); @@ -126,9 +160,110 @@ void PIMQTT::Client::mqtt_connectionLost() { void PIMQTT::Client::mqtt_deliveryComplete(int token) {} -void PIMQTT::Client::mqtt_messageArrived(const MessageConst & msg) { +void PIMQTT::Client::mqtt_messageArrived(MessageMutable & msg) { + PIStringList in_path = msg.topicList(); + PIMQTT::Client::MessageFunction function; piCoutObj << "mqtt_messageArrived"; - received(msg); + { + bool found = false; + auto ref = PRIVATE->endpoints.getRef(); + auto pit = ref->prepared.makeReverseIterator(); + while (pit.next()) { // by priority + + auto tit = pit.value().makeIterator(); + while (tit.next()) { // by MQTT topic + + for (const auto & ep: tit.value()) { + PIMap ext_args; + if (ep.match(in_path, ext_args)) { + msg.pathArguments() = ext_args; + function = ep.function; + found = true; + break; + } + } + if (found) break; + } + if (found) break; + } + } + if (function) + function(msg); + else + receivedUnhandled(msg); +} + + +PIString PIMQTT::Client::registerSubscribe(const Subscribe & sub) { + Endpoint ep; + ep.create(convertTopic2HTTP(sub.topic)); + ep.function = sub.functor; + PIString topic = topicFromEndpoint(ep); + if (topic.isEmpty()) { + piCoutObj << "Warning: subscribe to empty topic, ignore"; + return {}; + } + piCout << sub.topic << "->" << topic << ep.priority; + bool is_new_topic = false; + auto ref = PRIVATE->endpoints.getRef(); + auto & eps_by_topic(ref->prepared[ep.priority][topic]); + for (const auto & i: eps_by_topic) { + if (i.path == ep.path) { + piCoutObj << "Warning: subscribe duplicate path, ignore"; + return {}; + } + } + eps_by_topic << ep; + auto & counter(ref->topics_binded[topic]); + is_new_topic = counter == 0; + ++counter; + if (!is_new_topic) return {}; + return topic; +} + + +PIString PIMQTT::Client::unregisterSubscribe(const PIString & mqtt_topic) { + Endpoint ep; + ep.create(convertTopic2HTTP(mqtt_topic)); + PIString topic = topicFromEndpoint(ep); + if (topic.isEmpty()) { + piCoutObj << "Warning: unsubscribe from empty topic, ignore"; + return {}; + } + piCout << mqtt_topic << "->" << topic << ep.priority; + auto ref = PRIVATE->endpoints.getRef(); + auto pit = ref->prepared.makeIterator(); + while (pit.next()) { // by priority + + auto tit = pit.value().makeIterator(); + while (tit.next()) { // by MQTT topic + + auto & eps(tit.value()); + for (int i = 0; i < eps.size_s(); ++i) { + if (eps[i].path == ep.path) { + eps.remove(i); + auto & counter(ref->topics_binded[tit.key()]); + --counter; + PIString ret; + if (counter <= 0) ret = tit.key(); + if (eps.isEmpty()) { + piCout << "remove topics" << tit.key(); + pit.value().remove(tit.key()); + } + return ret; + } + } + } + } + piCoutObj << "Warning: unsubscribe from" << mqtt_topic << ", topic not found"; + return {}; +} + + +void PIMQTT::Client::unregisterAll() { + auto ref = PRIVATE->endpoints.getRef(); + ref->prepared.clear(); + ref->topics_binded.clear(); } @@ -189,15 +324,27 @@ void PIMQTT::Client::publishInternal(const MessageConst & m) { void PIMQTT::Client::subscribeInternal(const Subscribe & sub) { if (!PRIVATE->client) return; - int ret = MQTTClient_subscribe(PRIVATE->client, sub.topic.dataUTF8(), static_cast(sub.qos)); + piCout << ""; + piCout << "subscribeInternal" << sub.topic; + PIString topic = registerSubscribe(sub); + if (topic.isEmpty()) return; + piCout << "NEW" << topic; // << PRIVATE->endpoints.getRef()->size(); + int ret = MQTTClient_subscribe(PRIVATE->client, topic.dataUTF8(), static_cast(sub.qos)); if (ret != MQTTCLIENT_SUCCESS) { - piCoutObj << "Failed to subscribe" << sub.topic << ", code" << ret; + piCoutObj << "Failed to subscribe" << topic << ", code" << ret; + + return; } } -void PIMQTT::Client::unsubscribeInternal(const PIString & topic) { +void PIMQTT::Client::unsubscribeInternal(const PIString & mqtt_topic) { if (!PRIVATE->client) return; + piCout << ""; + piCout << "unsubscribeInternal" << mqtt_topic; + PIString topic = unregisterSubscribe(mqtt_topic); + if (topic.isEmpty()) return; + piCout << "DEL" << topic; // << PRIVATE->endpoints.getRef()->size(); int ret = MQTTClient_unsubscribe(PRIVATE->client, topic.dataUTF8()); if (ret != MQTTCLIENT_SUCCESS) { piCoutObj << "Failed to unsubscribe" << topic << ", code" << ret; @@ -206,6 +353,7 @@ void PIMQTT::Client::unsubscribeInternal(const PIString & topic) { void PIMQTT::Client::destroy() { + unregisterAll(); if (!PRIVATE->client) return; if (PRIVATE->connected) MQTTClient_disconnect(PRIVATE->client, 1000); MQTTClient_destroy(&PRIVATE->client); @@ -217,3 +365,12 @@ void PIMQTT::Client::destroy() { void PIMQTT::Client::changeStatus(Status s) { m_status = s; } + +PIString PIMQTT::Client::convertTopic2MQTT(const PIString & topic) { + return topic.replacedAll("**", '#').replacedAll('*', '+'); +} + + +PIString PIMQTT::Client::convertTopic2HTTP(const PIString & topic) { + return topic.replacedAll('#', "**").replacedAll('+', '*'); +} diff --git a/main.cpp b/main.cpp index f92b7159..95ac62cd 100644 --- a/main.cpp +++ b/main.cpp @@ -19,63 +19,86 @@ MessageMutable createMessage(Code c, const char * path, const MessageConst & msg return MessageMutable().setCode(c); }; int main(int argc, char * argv[]) { - piCout << "start ..."; - PIHTTPServer server; - server.registerUnhandled([](const MessageConst & msg) { return createMessage(Code::BadRequest, "unhadled", msg); }); - server.registerPath("api/v1/status", Method::Get, [](const MessageConst & msg) { - return createMessage(Code::Accepted, "api/v1/status", msg); - }); - server.registerPath("api/v1/plugins", Method::Get, [](const MessageConst & msg) { - return createMessage(Code::Accepted, "api/v1/plugins", msg); - }); - server.registerPath("api/v1/task-status", Method::Get, [](const MessageConst & msg) { - return createMessage(Code::Accepted, "api/v1/task-status", msg); - }); - server.registerPath("api/v1/task/{taskID}/status", Method::Get, [](const MessageConst & msg) { - return createMessage(Code::Accepted, "api/v1/task/{taskID}/status", msg); - }); - server.registerPath("api/v1/bort/list", Method::Get, [](const MessageConst & msg) { - return createMessage(Code::Accepted, "api/v1/bort/list", msg); - }); - server.registerPath("api/v1/all", Method::Get, [](const MessageConst & msg) { - return createMessage(Code::Accepted, "api/v1/all", msg); - }); - server.registerPath("api/v1/all/bort{A}/f", Method::Get, [](const MessageConst & msg) { - return createMessage(Code::Accepted, "api/v1/all/*/f", msg); - }); - server.registerPath("api/v1/all2/**", Method::Get, [](const MessageConst & msg) { - return createMessage(Code::Accepted, "api/v1/all2/**", msg); - }); - server.listenAll(12345); + // piCout << "start ..."; + // PIHTTPServer server; + // server.registerUnhandled([](const MessageConst & msg) { return createMessage(Code::BadRequest, "unhadled", msg); }); + // server.registerPath("api/v1/status", Method::Get, [](const MessageConst & msg) { + // return createMessage(Code::Accepted, "api/v1/status", msg); + // }); + // server.registerPath("api/v1/plugins", Method::Get, [](const MessageConst & msg) { + // return createMessage(Code::Accepted, "api/v1/plugins", msg); + // }); + // server.registerPath("api/v1/task-status", Method::Get, [](const MessageConst & msg) { + // return createMessage(Code::Accepted, "api/v1/task-status", msg); + // }); + // server.registerPath("api/v1/task/{taskID}/status", Method::Get, [](const MessageConst & msg) { + // return createMessage(Code::Accepted, "api/v1/task/{taskID}/status", msg); + // }); + // server.registerPath("api/v1/bort/list", Method::Get, [](const MessageConst & msg) { + // return createMessage(Code::Accepted, "api/v1/bort/list", msg); + // }); + // server.registerPath("api/v1/all", Method::Get, [](const MessageConst & msg) { + // return createMessage(Code::Accepted, "api/v1/all", msg); + // }); + // server.registerPath("api/v1/all/bort{A}/f", Method::Get, [](const MessageConst & msg) { + // return createMessage(Code::Accepted, "api/v1/all/*/f", msg); + // }); + // server.registerPath("api/v1/all2/**", Method::Get, [](const MessageConst & msg) { + // return createMessage(Code::Accepted, "api/v1/all2/**", msg); + // }); + // server.listenAll(12345); - kbd.enableExitCapture('Q'); - WAIT_FOR_EXIT - piCout << "exiting ..."; - server.stop(); + // kbd.enableExitCapture('Q'); + // WAIT_FOR_EXIT + // piCout << "exiting ..."; + // server.stop(); - return 0; + // return 0; - PISystemMonitor mon; - mon.startOnSelf(); - PISystemMonitor::totalRAM(); - 2_s .sleep(); + // PISystemMonitor mon; + // mon.startOnSelf(); + // PISystemMonitor::totalRAM(); + // 2_s .sleep(); - return 0; + // return 0; PIMQTT::Client cl; cl.setConnectTimeout(2_s); + CONNECTL(&cl, connected, [&cl] { piCout << "connected"; - cl.subscribe("/zigbee2mqtt"); - cl.subscribe("/zigbee2mqtt/+"); - cl.unsubscribe("/zigbee2mqtt"); - cl.publish("/zigbee2mqtt/abc", "hello from PIP"_a.toAscii()); + // cl.subscribe("api/v1/plugins"); + // cl.subscribe("api/v1/task-status"); + // cl.subscribe("api/v1/*/{taskID}/status"); + // cl.subscribe("api/v1/bort/list"); + // cl.subscribe("api/v1/all"); + cl.subscribe("api/v1/all/bort{A}/f", [](const PIMQTT::MessageConst & msg) { + piCout << "1" << msg.topicList() << msg.pathArguments() << msg.body().size(); + }); + cl.subscribe("api/v1/all/task{T}/f", [](const PIMQTT::MessageConst & msg) { + piCout << "2" << msg.topicList() << msg.pathArguments() << msg.body().size(); + }); + cl.subscribe("api/v1/all/*/f", [](const PIMQTT::MessageConst & msg) { + piCout << "3" << msg.topicList() << msg.pathArguments() << msg.body().size(); + }); + cl.subscribe("api/v1/all2/**", [](const PIMQTT::MessageConst & msg) { + piCout << "4" << msg.topicList() << msg.pathArguments() << msg.body().size(); + }); + // cl.subscribe("/zigbee2mqtt"); + // cl.subscribe("/zigbee2mqtt/+/status/"); + // cl.subscribe("/zigbee2mqtt/*/status/"); + // cl.subscribe("test/#"); + // cl.subscribe("#"); + // cl.unsubscribe("/zigbee2mqtt"); + // cl.unsubscribe("api/v1/all/bort{A}/f"); + // cl.unsubscribe("api/v1/all/*/f"); + // cl.publish("/zigbee2mqtt/abc", "hello from PIP"_a.toAscii()); }); CONNECTL(&cl, disconnected, [&cl](PIMQTT::Error code) { piCout << "disconnected code" << (int)code; cl.connect("localhost", "PIP"); }); - CONNECTL(&cl, received, [](const PIMQTT::MessageConst & message) { - piCout << "received" << message.topic() << message.pathArguments() << message.payload().size(); + CONNECTL(&cl, receivedUnhandled, [](const PIMQTT::MessageConst & message) { + piCout << "receivedUnhandled" << message.topic() << message.pathArguments() << message.payload().size(); }); cl.connect("localhost", "PIP");