diff --git a/libs/main/mqtt_client/pimqttclient.h b/libs/main/mqtt_client/pimqttclient.h index 1e12c47d..1a1ae5d2 100644 --- a/libs/main/mqtt_client/pimqttclient.h +++ b/libs/main/mqtt_client/pimqttclient.h @@ -41,24 +41,38 @@ public: Client(); virtual ~Client(); + //! \~english Request handler used by registered routes and fallback processing. + //! \~russian Обработчик запроса, используемый зарегистрированными маршрутами и fallback-обработкой. + using MessageFunction = std::function; + void setConnectTimeout(PISystemTime time) { connect_timeout = time; } void connect(const PIString & address, const PIString & client, const PIString & username = {}, const PIString & password = {}); void disconnect(); - void subscribe(const PIString & topic, QoS qos = QoS::Level1); + + void subscribe(const PIString & topic, MessageFunction functor, QoS qos = QoS::Level1); + + template + void + subscribe(const PIString & topic, T * o, PIMQTT::MessageMutable (T::*function)(const PIMQTT::MessageConst &), QoS qos = QoS::Level1) { + subscribe(topic, [o, function](const PIMQTT::MessageConst & m) { return (o->*function)(m); }, qos); + } + void unsubscribe(const PIString & topic); + void unsubscribeAll(); void publish(const PIString & topic, const PIByteArray & msg, QoS qos = QoS::Level0); void publish(const MessageConst & msg); - void unsubscribeAll() { unsubscribe("#"); } bool isConnecting() const { return m_status == Connecting; } bool isConnected() const { return m_status == Connected; } EVENT0(connected); EVENT1(disconnected, PIMQTT::Error, code); - EVENT1(received, PIMQTT::MessageConst, message); + EVENT1(receivedUnhandled, PIMQTT::MessageConst, message); + + struct Endpoint; private: NO_COPY_CLASS(Client) @@ -79,23 +93,34 @@ private: }; struct Subscribe { PIString topic; + MessageFunction functor; QoS qos; }; void mqtt_connectionLost(); void mqtt_deliveryComplete(int token); - void mqtt_messageArrived(const MessageConst & msg); + void mqtt_messageArrived(MessageMutable & msg); + + PIString registerSubscribe(const Subscribe & sub); + PIString unregisterSubscribe(const PIString & mqtt_topic); + void unregisterAll(); void connectInternal(const ConnectInfo & ci); void disconnectInternal(); void publishInternal(const MessageConst & m); void subscribeInternal(const Subscribe & sub); - void unsubscribeInternal(const PIString & topic); + void unsubscribeInternal(const PIString & mqtt_topic); void destroy(); void changeStatus(Status s); void run(); + // from HTTP format + static PIString convertTopic2MQTT(const PIString & topic); + + // from MQTT format + static PIString convertTopic2HTTP(const PIString & topic); + std::atomic_int m_status = {Idle}; std::atomic_bool is_destoying = {false}; PISystemTime connect_timeout = 10_s; diff --git a/libs/mqtt_client/pimqttclient.cpp b/libs/mqtt_client/pimqttclient.cpp index 6518d7b2..ee28ab69 100644 --- a/libs/mqtt_client/pimqttclient.cpp +++ b/libs/mqtt_client/pimqttclient.cpp @@ -20,6 +20,31 @@ #include "pimqttclient.h" #include "MQTTClient.h" +#include "piliterals_string.h" +#include "piserverendpoint_p.h" + + +struct PIMQTT::Client::Endpoint: public PIHTTP::ServerEndpoint { + PIMQTT::Client::MessageFunction function; +}; + +struct EndpointsStorage { + PIMap>> prepared; // [priority][topic] -> endpoints + PIMap topics_binded; // [topic] -> count +}; + + +PIString topicFromEndpoint(const PIMQTT::Client::Endpoint & e) { + PIStringList ret; + for (const auto & i: e.prepared_path) { + switch (i.type) { + case PIHTTP::ServerEndpoint::PathElement::Type::Fixed: ret << i.source; break; + case PIHTTP::ServerEndpoint::PathElement::Type::AnyMany: ret << "#"_a; break; + default: ret << "+"_a; break; + } + } + return ret.join('/'); +} STATIC_INITIALIZER_BEGIN @@ -32,6 +57,8 @@ PRIVATE_DEFINITION_START(PIMQTT::Client) MQTTClient client = nullptr; bool connected = false; + PIProtectedVariable endpoints; + static void connectionLost_callback(void * context, char *) { ((PIMQTT::Client *)context)->mqtt_connectionLost(); } @@ -88,9 +115,9 @@ void PIMQTT::Client::disconnect() { } -void PIMQTT::Client::subscribe(const PIString & topic, QoS qos) { +void PIMQTT::Client::subscribe(const PIString & topic, MessageFunction functor, QoS qos) { if (is_destoying) return; - worker->enqueueTask([this, topic, qos] { subscribeInternal({topic, qos}); }); + worker->enqueueTask([this, topic, functor, qos] { subscribeInternal({topic, functor, qos}); }); } @@ -115,8 +142,15 @@ void PIMQTT::Client::publish(const MessageConst & msg) { } +void PIMQTT::Client::unsubscribeAll() { + unsubscribe("#"); + unregisterAll(); +} + + void PIMQTT::Client::mqtt_connectionLost() { piCoutObj << "mqtt_connectionLost"; + unregisterAll(); PRIVATE->connected = false; changeStatus(Idle); disconnected(Error::ServerUnavailable); @@ -126,9 +160,110 @@ void PIMQTT::Client::mqtt_connectionLost() { void PIMQTT::Client::mqtt_deliveryComplete(int token) {} -void PIMQTT::Client::mqtt_messageArrived(const MessageConst & msg) { +void PIMQTT::Client::mqtt_messageArrived(MessageMutable & msg) { + PIStringList in_path = msg.topicList(); + PIMQTT::Client::MessageFunction function; piCoutObj << "mqtt_messageArrived"; - received(msg); + { + bool found = false; + auto ref = PRIVATE->endpoints.getRef(); + auto pit = ref->prepared.makeReverseIterator(); + while (pit.next()) { // by priority + + auto tit = pit.value().makeIterator(); + while (tit.next()) { // by MQTT topic + + for (const auto & ep: tit.value()) { + PIMap ext_args; + if (ep.match(in_path, ext_args)) { + msg.pathArguments() = ext_args; + function = ep.function; + found = true; + break; + } + } + if (found) break; + } + if (found) break; + } + } + if (function) + function(msg); + else + receivedUnhandled(msg); +} + + +PIString PIMQTT::Client::registerSubscribe(const Subscribe & sub) { + Endpoint ep; + ep.create(convertTopic2HTTP(sub.topic)); + ep.function = sub.functor; + PIString topic = topicFromEndpoint(ep); + if (topic.isEmpty()) { + piCoutObj << "Warning: subscribe to empty topic, ignore"; + return {}; + } + piCout << sub.topic << "->" << topic << ep.priority; + bool is_new_topic = false; + auto ref = PRIVATE->endpoints.getRef(); + auto & eps_by_topic(ref->prepared[ep.priority][topic]); + for (const auto & i: eps_by_topic) { + if (i.path == ep.path) { + piCoutObj << "Warning: subscribe duplicate path, ignore"; + return {}; + } + } + eps_by_topic << ep; + auto & counter(ref->topics_binded[topic]); + is_new_topic = counter == 0; + ++counter; + if (!is_new_topic) return {}; + return topic; +} + + +PIString PIMQTT::Client::unregisterSubscribe(const PIString & mqtt_topic) { + Endpoint ep; + ep.create(convertTopic2HTTP(mqtt_topic)); + PIString topic = topicFromEndpoint(ep); + if (topic.isEmpty()) { + piCoutObj << "Warning: unsubscribe from empty topic, ignore"; + return {}; + } + piCout << mqtt_topic << "->" << topic << ep.priority; + auto ref = PRIVATE->endpoints.getRef(); + auto pit = ref->prepared.makeIterator(); + while (pit.next()) { // by priority + + auto tit = pit.value().makeIterator(); + while (tit.next()) { // by MQTT topic + + auto & eps(tit.value()); + for (int i = 0; i < eps.size_s(); ++i) { + if (eps[i].path == ep.path) { + eps.remove(i); + auto & counter(ref->topics_binded[tit.key()]); + --counter; + PIString ret; + if (counter <= 0) ret = tit.key(); + if (eps.isEmpty()) { + piCout << "remove topics" << tit.key(); + pit.value().remove(tit.key()); + } + return ret; + } + } + } + } + piCoutObj << "Warning: unsubscribe from" << mqtt_topic << ", topic not found"; + return {}; +} + + +void PIMQTT::Client::unregisterAll() { + auto ref = PRIVATE->endpoints.getRef(); + ref->prepared.clear(); + ref->topics_binded.clear(); } @@ -189,15 +324,27 @@ void PIMQTT::Client::publishInternal(const MessageConst & m) { void PIMQTT::Client::subscribeInternal(const Subscribe & sub) { if (!PRIVATE->client) return; - int ret = MQTTClient_subscribe(PRIVATE->client, sub.topic.dataUTF8(), static_cast(sub.qos)); + piCout << ""; + piCout << "subscribeInternal" << sub.topic; + PIString topic = registerSubscribe(sub); + if (topic.isEmpty()) return; + piCout << "NEW" << topic; // << PRIVATE->endpoints.getRef()->size(); + int ret = MQTTClient_subscribe(PRIVATE->client, topic.dataUTF8(), static_cast(sub.qos)); if (ret != MQTTCLIENT_SUCCESS) { - piCoutObj << "Failed to subscribe" << sub.topic << ", code" << ret; + piCoutObj << "Failed to subscribe" << topic << ", code" << ret; + + return; } } -void PIMQTT::Client::unsubscribeInternal(const PIString & topic) { +void PIMQTT::Client::unsubscribeInternal(const PIString & mqtt_topic) { if (!PRIVATE->client) return; + piCout << ""; + piCout << "unsubscribeInternal" << mqtt_topic; + PIString topic = unregisterSubscribe(mqtt_topic); + if (topic.isEmpty()) return; + piCout << "DEL" << topic; // << PRIVATE->endpoints.getRef()->size(); int ret = MQTTClient_unsubscribe(PRIVATE->client, topic.dataUTF8()); if (ret != MQTTCLIENT_SUCCESS) { piCoutObj << "Failed to unsubscribe" << topic << ", code" << ret; @@ -206,6 +353,7 @@ void PIMQTT::Client::unsubscribeInternal(const PIString & topic) { void PIMQTT::Client::destroy() { + unregisterAll(); if (!PRIVATE->client) return; if (PRIVATE->connected) MQTTClient_disconnect(PRIVATE->client, 1000); MQTTClient_destroy(&PRIVATE->client); @@ -217,3 +365,12 @@ void PIMQTT::Client::destroy() { void PIMQTT::Client::changeStatus(Status s) { m_status = s; } + +PIString PIMQTT::Client::convertTopic2MQTT(const PIString & topic) { + return topic.replacedAll("**", '#').replacedAll('*', '+'); +} + + +PIString PIMQTT::Client::convertTopic2HTTP(const PIString & topic) { + return topic.replacedAll('#', "**").replacedAll('+', '*'); +} diff --git a/main.cpp b/main.cpp index f92b7159..95ac62cd 100644 --- a/main.cpp +++ b/main.cpp @@ -19,63 +19,86 @@ MessageMutable createMessage(Code c, const char * path, const MessageConst & msg return MessageMutable().setCode(c); }; int main(int argc, char * argv[]) { - piCout << "start ..."; - PIHTTPServer server; - server.registerUnhandled([](const MessageConst & msg) { return createMessage(Code::BadRequest, "unhadled", msg); }); - server.registerPath("api/v1/status", Method::Get, [](const MessageConst & msg) { - return createMessage(Code::Accepted, "api/v1/status", msg); - }); - server.registerPath("api/v1/plugins", Method::Get, [](const MessageConst & msg) { - return createMessage(Code::Accepted, "api/v1/plugins", msg); - }); - server.registerPath("api/v1/task-status", Method::Get, [](const MessageConst & msg) { - return createMessage(Code::Accepted, "api/v1/task-status", msg); - }); - server.registerPath("api/v1/task/{taskID}/status", Method::Get, [](const MessageConst & msg) { - return createMessage(Code::Accepted, "api/v1/task/{taskID}/status", msg); - }); - server.registerPath("api/v1/bort/list", Method::Get, [](const MessageConst & msg) { - return createMessage(Code::Accepted, "api/v1/bort/list", msg); - }); - server.registerPath("api/v1/all", Method::Get, [](const MessageConst & msg) { - return createMessage(Code::Accepted, "api/v1/all", msg); - }); - server.registerPath("api/v1/all/bort{A}/f", Method::Get, [](const MessageConst & msg) { - return createMessage(Code::Accepted, "api/v1/all/*/f", msg); - }); - server.registerPath("api/v1/all2/**", Method::Get, [](const MessageConst & msg) { - return createMessage(Code::Accepted, "api/v1/all2/**", msg); - }); - server.listenAll(12345); + // piCout << "start ..."; + // PIHTTPServer server; + // server.registerUnhandled([](const MessageConst & msg) { return createMessage(Code::BadRequest, "unhadled", msg); }); + // server.registerPath("api/v1/status", Method::Get, [](const MessageConst & msg) { + // return createMessage(Code::Accepted, "api/v1/status", msg); + // }); + // server.registerPath("api/v1/plugins", Method::Get, [](const MessageConst & msg) { + // return createMessage(Code::Accepted, "api/v1/plugins", msg); + // }); + // server.registerPath("api/v1/task-status", Method::Get, [](const MessageConst & msg) { + // return createMessage(Code::Accepted, "api/v1/task-status", msg); + // }); + // server.registerPath("api/v1/task/{taskID}/status", Method::Get, [](const MessageConst & msg) { + // return createMessage(Code::Accepted, "api/v1/task/{taskID}/status", msg); + // }); + // server.registerPath("api/v1/bort/list", Method::Get, [](const MessageConst & msg) { + // return createMessage(Code::Accepted, "api/v1/bort/list", msg); + // }); + // server.registerPath("api/v1/all", Method::Get, [](const MessageConst & msg) { + // return createMessage(Code::Accepted, "api/v1/all", msg); + // }); + // server.registerPath("api/v1/all/bort{A}/f", Method::Get, [](const MessageConst & msg) { + // return createMessage(Code::Accepted, "api/v1/all/*/f", msg); + // }); + // server.registerPath("api/v1/all2/**", Method::Get, [](const MessageConst & msg) { + // return createMessage(Code::Accepted, "api/v1/all2/**", msg); + // }); + // server.listenAll(12345); - kbd.enableExitCapture('Q'); - WAIT_FOR_EXIT - piCout << "exiting ..."; - server.stop(); + // kbd.enableExitCapture('Q'); + // WAIT_FOR_EXIT + // piCout << "exiting ..."; + // server.stop(); - return 0; + // return 0; - PISystemMonitor mon; - mon.startOnSelf(); - PISystemMonitor::totalRAM(); - 2_s .sleep(); + // PISystemMonitor mon; + // mon.startOnSelf(); + // PISystemMonitor::totalRAM(); + // 2_s .sleep(); - return 0; + // return 0; PIMQTT::Client cl; cl.setConnectTimeout(2_s); + CONNECTL(&cl, connected, [&cl] { piCout << "connected"; - cl.subscribe("/zigbee2mqtt"); - cl.subscribe("/zigbee2mqtt/+"); - cl.unsubscribe("/zigbee2mqtt"); - cl.publish("/zigbee2mqtt/abc", "hello from PIP"_a.toAscii()); + // cl.subscribe("api/v1/plugins"); + // cl.subscribe("api/v1/task-status"); + // cl.subscribe("api/v1/*/{taskID}/status"); + // cl.subscribe("api/v1/bort/list"); + // cl.subscribe("api/v1/all"); + cl.subscribe("api/v1/all/bort{A}/f", [](const PIMQTT::MessageConst & msg) { + piCout << "1" << msg.topicList() << msg.pathArguments() << msg.body().size(); + }); + cl.subscribe("api/v1/all/task{T}/f", [](const PIMQTT::MessageConst & msg) { + piCout << "2" << msg.topicList() << msg.pathArguments() << msg.body().size(); + }); + cl.subscribe("api/v1/all/*/f", [](const PIMQTT::MessageConst & msg) { + piCout << "3" << msg.topicList() << msg.pathArguments() << msg.body().size(); + }); + cl.subscribe("api/v1/all2/**", [](const PIMQTT::MessageConst & msg) { + piCout << "4" << msg.topicList() << msg.pathArguments() << msg.body().size(); + }); + // cl.subscribe("/zigbee2mqtt"); + // cl.subscribe("/zigbee2mqtt/+/status/"); + // cl.subscribe("/zigbee2mqtt/*/status/"); + // cl.subscribe("test/#"); + // cl.subscribe("#"); + // cl.unsubscribe("/zigbee2mqtt"); + // cl.unsubscribe("api/v1/all/bort{A}/f"); + // cl.unsubscribe("api/v1/all/*/f"); + // cl.publish("/zigbee2mqtt/abc", "hello from PIP"_a.toAscii()); }); CONNECTL(&cl, disconnected, [&cl](PIMQTT::Error code) { piCout << "disconnected code" << (int)code; cl.connect("localhost", "PIP"); }); - CONNECTL(&cl, received, [](const PIMQTT::MessageConst & message) { - piCout << "received" << message.topic() << message.pathArguments() << message.payload().size(); + CONNECTL(&cl, receivedUnhandled, [](const PIMQTT::MessageConst & message) { + piCout << "receivedUnhandled" << message.topic() << message.pathArguments() << message.payload().size(); }); cl.connect("localhost", "PIP");