/* PIP - Platform Independent Primitives MQTT Client Ivan Pelipenko peri4ko@yandex.ru This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #include "pimqttclient.h" #include "MQTTClient.h" #include "piliterals_string.h" #include "piserverendpoint_p.h" struct PIMQTT::Client::Endpoint: public PIHTTP::ServerEndpoint { PIMQTT::Client::MessageFunction function; }; struct EndpointsStorage { PIMap>> prepared; // [priority][topic] -> endpoints PIMap topics_binded; // [topic] -> count }; PIString topicFromEndpoint(const PIMQTT::Client::Endpoint & e) { PIStringList ret; for (const auto & i: e.prepared_path) { switch (i.type) { case PIHTTP::ServerEndpoint::PathElement::Type::Fixed: ret << i.source; break; case PIHTTP::ServerEndpoint::PathElement::Type::AnyMany: ret << "#"_a; break; default: ret << "+"_a; break; } } return ret.join('/'); } STATIC_INITIALIZER_BEGIN MQTTClient_init_options opts = MQTTClient_init_options_initializer; MQTTClient_global_init(&opts); STATIC_INITIALIZER_END PRIVATE_DEFINITION_START(PIMQTT::Client) MQTTClient client = nullptr; bool connected = false; PIProtectedVariable endpoints; static void connectionLost_callback(void * context, char *) { ((PIMQTT::Client *)context)->mqtt_connectionLost(); } static void deliveryComplete_callback(void * context, MQTTClient_deliveryToken dt) { ((PIMQTT::Client *)context)->mqtt_deliveryComplete(dt); } static int messageArrived_callback(void * context, char * topicName, int topicLen, MQTTClient_message * message) { PIMQTT::MessageMutable msg; msg.setTopic(PIString::fromUTF8(topicName)); msg.setPayload(PIByteArray(message->payload, message->payloadlen)); msg.setID(message->msgid); msg.setDuplicate(message->dup != 0); msg.setQos(static_cast(message->qos)); MQTTClient_freeMessage(&message); MQTTClient_free(topicName); ((PIMQTT::Client *)context)->mqtt_messageArrived(msg); return 1; } PRIVATE_DEFINITION_END(PIMQTT::Client) PIMQTT::Client::Client() { worker = new PIThreadPoolWorker(1); worker->start(); } PIMQTT::Client::~Client() { is_destoying = true; worker->clearTasks(); disconnect(); worker->waitForTasks(); worker->stopAndWait(); piDeleteSafety(worker); } void PIMQTT::Client::connect(const PIString & address, const PIString & client, const PIString & username, const PIString & password) { if (is_destoying) return; ConnectInfo ci; ci.address = address; ci.clientID = client; ci.username = username; ci.password = password; worker->enqueueTask([this, ci] { connectInternal(ci); }); } void PIMQTT::Client::disconnect() { worker->enqueueTask([this] { disconnectInternal(); }); } void PIMQTT::Client::subscribe(const PIString & topic, MessageFunction functor, QoS qos) { if (is_destoying) return; worker->enqueueTask([this, topic, functor, qos] { subscribeInternal({topic, functor, qos}); }); } void PIMQTT::Client::unsubscribe(const PIString & topic) { if (is_destoying) return; worker->enqueueTask([this, topic] { unsubscribeInternal(topic); }); } void PIMQTT::Client::publish(const PIString & topic, const PIByteArray & msg, QoS qos) { MessageMutable m; m.setTopic(topic); m.setPayload(msg); m.setQos(qos); publish(m); } void PIMQTT::Client::publish(const MessageConst & msg) { if (is_destoying) return; worker->enqueueTask([this, msg] { publishInternal(msg); }); } void PIMQTT::Client::unsubscribeAll() { unsubscribe("#"); unregisterAll(); } void PIMQTT::Client::mqtt_connectionLost() { piCoutObj << "mqtt_connectionLost"; unregisterAll(); PRIVATE->connected = false; changeStatus(Idle); disconnected(Error::ServerUnavailable); } void PIMQTT::Client::mqtt_deliveryComplete(int token) {} void PIMQTT::Client::mqtt_messageArrived(MessageMutable & msg) { PIStringList in_path = msg.topicList(); PIMQTT::Client::MessageFunction function; piCoutObj << "mqtt_messageArrived"; { bool found = false; auto ref = PRIVATE->endpoints.getRef(); auto pit = ref->prepared.makeReverseIterator(); while (pit.next()) { // by priority auto tit = pit.value().makeIterator(); while (tit.next()) { // by MQTT topic for (const auto & ep: tit.value()) { PIMap ext_args; if (ep.match(in_path, ext_args)) { msg.pathArguments() = ext_args; function = ep.function; found = true; break; } } if (found) break; } if (found) break; } } if (function) function(msg); else receivedUnhandled(msg); } PIString PIMQTT::Client::registerSubscribe(const Subscribe & sub) { Endpoint ep; ep.create(convertTopic2HTTP(sub.topic)); ep.function = sub.functor; PIString topic = topicFromEndpoint(ep); if (topic.isEmpty()) { piCoutObj << "Warning: subscribe to empty topic, ignore"; return {}; } piCout << sub.topic << "->" << topic << ep.priority; bool is_new_topic = false; auto ref = PRIVATE->endpoints.getRef(); auto & eps_by_topic(ref->prepared[ep.priority][topic]); for (const auto & i: eps_by_topic) { if (i.path == ep.path) { piCoutObj << "Warning: subscribe duplicate path, ignore"; return {}; } } eps_by_topic << ep; auto & counter(ref->topics_binded[topic]); is_new_topic = counter == 0; ++counter; if (!is_new_topic) return {}; return topic; } PIString PIMQTT::Client::unregisterSubscribe(const PIString & mqtt_topic) { Endpoint ep; ep.create(convertTopic2HTTP(mqtt_topic)); PIString topic = topicFromEndpoint(ep); if (topic.isEmpty()) { piCoutObj << "Warning: unsubscribe from empty topic, ignore"; return {}; } piCout << mqtt_topic << "->" << topic << ep.priority; auto ref = PRIVATE->endpoints.getRef(); auto pit = ref->prepared.makeIterator(); while (pit.next()) { // by priority auto tit = pit.value().makeIterator(); while (tit.next()) { // by MQTT topic auto & eps(tit.value()); for (int i = 0; i < eps.size_s(); ++i) { if (eps[i].path == ep.path) { eps.remove(i); auto & counter(ref->topics_binded[tit.key()]); --counter; PIString ret; if (counter <= 0) ret = tit.key(); if (eps.isEmpty()) { piCout << "remove topics" << tit.key(); pit.value().remove(tit.key()); } return ret; } } } } piCoutObj << "Warning: unsubscribe from" << mqtt_topic << ", topic not found"; return {}; } void PIMQTT::Client::unregisterAll() { auto ref = PRIVATE->endpoints.getRef(); ref->prepared.clear(); ref->topics_binded.clear(); } void PIMQTT::Client::connectInternal(const ConnectInfo & ci) { destroy(); changeStatus(Connecting); PRIVATE->connected = false; PINetworkAddress net_addr = PINetworkAddress::resolve(ci.address); if (net_addr.port() == 0) net_addr.setPort(1883); PIString server_uri = "tcp://" + net_addr.toString(); MQTTClient_create(&PRIVATE->client, server_uri.dataAscii(), ci.clientID.dataUTF8(), MQTTCLIENT_PERSISTENCE_NONE, nullptr); MQTTClient_setCallbacks(PRIVATE->client, this, PRIVATE->connectionLost_callback, PRIVATE->messageArrived_callback, PRIVATE->deliveryComplete_callback); MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; conn_opts.keepAliveInterval = 9; conn_opts.cleansession = 1; conn_opts.connectTimeout = piRound(connect_timeout.toSeconds()); if (ci.username.isNotEmpty()) conn_opts.username = ci.username.dataAscii(); if (ci.password.isNotEmpty()) conn_opts.password = ci.password.dataAscii(); int ret = MQTTClient_connect(PRIVATE->client, &conn_opts); if (ret != MQTTCLIENT_SUCCESS) { piCoutObj << "Failed to connect to " + server_uri << ", code" << ret; changeStatus(Idle); if (ret < 0) ret = static_cast(Error::ServerUnavailable); disconnected(static_cast(ret)); return; } PRIVATE->connected = true; changeStatus(Connected); connected(); } inline void PIMQTT::Client::disconnectInternal() { bool was_connected = PRIVATE->connected; changeStatus(Idle); destroy(); if (was_connected) disconnected(Error::Unknown); PRIVATE->connected = false; } void PIMQTT::Client::publishInternal(const MessageConst & m) { if (!PRIVATE->client) return; MQTTClient_message pubmsg = MQTTClient_message_initializer; pubmsg.payload = const_cast(static_cast(m.payload().data())); pubmsg.payloadlen = m.payload().size_s(); pubmsg.qos = static_cast(m.qos()); int ret = MQTTClient_publishMessage(PRIVATE->client, m.topic().dataAscii(), &pubmsg, nullptr); if (ret != MQTTCLIENT_SUCCESS) { piCoutObj << "Failed to publishMessage, code" << ret; } } void PIMQTT::Client::subscribeInternal(const Subscribe & sub) { if (!PRIVATE->client) return; piCout << ""; piCout << "subscribeInternal" << sub.topic; PIString topic = registerSubscribe(sub); if (topic.isEmpty()) return; piCout << "NEW" << topic; // << PRIVATE->endpoints.getRef()->size(); int ret = MQTTClient_subscribe(PRIVATE->client, topic.dataUTF8(), static_cast(sub.qos)); if (ret != MQTTCLIENT_SUCCESS) { piCoutObj << "Failed to subscribe" << topic << ", code" << ret; return; } } void PIMQTT::Client::unsubscribeInternal(const PIString & mqtt_topic) { if (!PRIVATE->client) return; piCout << ""; piCout << "unsubscribeInternal" << mqtt_topic; PIString topic = unregisterSubscribe(mqtt_topic); if (topic.isEmpty()) return; piCout << "DEL" << topic; // << PRIVATE->endpoints.getRef()->size(); int ret = MQTTClient_unsubscribe(PRIVATE->client, topic.dataUTF8()); if (ret != MQTTCLIENT_SUCCESS) { piCoutObj << "Failed to unsubscribe" << topic << ", code" << ret; } } void PIMQTT::Client::destroy() { unregisterAll(); if (!PRIVATE->client) return; if (PRIVATE->connected) MQTTClient_disconnect(PRIVATE->client, 1000); MQTTClient_destroy(&PRIVATE->client); PRIVATE->connected = false; PRIVATE->client = nullptr; } void PIMQTT::Client::changeStatus(Status s) { m_status = s; } PIString PIMQTT::Client::convertTopic2MQTT(const PIString & topic) { return topic.replacedAll("**", '#').replacedAll('*', '+'); } PIString PIMQTT::Client::convertTopic2HTTP(const PIString & topic) { return topic.replacedAll('#', "**").replacedAll('+', '*'); }