/* PIP - Platform Independent Primitives MQTT Client Ivan Pelipenko peri4ko@yandex.ru This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #include "pimqttclient.h" #include "MQTTClient.h" STATIC_INITIALIZER_BEGIN MQTTClient_init_options opts = MQTTClient_init_options_initializer; MQTTClient_global_init(&opts); STATIC_INITIALIZER_END PRIVATE_DEFINITION_START(PIMQTT::Client) MQTTClient client = nullptr; bool connected = false; static void connectionLost_callback(void * context, char *) { ((PIMQTT::Client *)context)->mqtt_connectionLost(); } static void deliveryComplete_callback(void * context, MQTTClient_deliveryToken dt) { ((PIMQTT::Client *)context)->mqtt_deliveryComplete(dt); } static int messageArrived_callback(void * context, char * topicName, int topicLen, MQTTClient_message * message) { PIMQTT::Message msg; msg.topic = PIString::fromUTF8(topicName); msg.payload = PIByteArray(message->payload, message->payloadlen); msg.msg_id = message->msgid; msg.is_duplicate = message->dup != 0; msg.qos = static_cast(message->qos); MQTTClient_freeMessage(&message); MQTTClient_free(topicName); ((PIMQTT::Client *)context)->mqtt_messageArrived(msg); return 1; } PRIVATE_DEFINITION_END(PIMQTT::Client) PIMQTT::Client::Client() { thread = new PIThread([this] { run(); }); thread->start(); } PIMQTT::Client::~Client() { thread->stop(); disconnect(); thread->waitForFinish(); piDeleteSafety(thread); } void PIMQTT::Client::connect(const PIString & address, const PIString & client, const PIString & username, const PIString & password) { auto ci = connect_info.getRef(); ci->address = address; ci->clientID = client; ci->username = username; ci->password = password; changeStatus(Connecting); } void PIMQTT::Client::disconnect() { changeStatus(Disconnecting); } void PIMQTT::Client::subscribe(const PIString & topic, QoS qos) { sub_queue.getRef()->enqueue({topic, qos}); notifier.notify(); } void PIMQTT::Client::unsubscribe(const PIString & topic) { unsub_queue.getRef()->enqueue(topic); notifier.notify(); } void PIMQTT::Client::publish(const PIString & topic, const PIByteArray & msg, QoS qos) { Message m; m.topic = topic; m.payload = msg; m.qos = qos; pub_queue.getRef()->enqueue(m); notifier.notify(); } void PIMQTT::Client::mqtt_connectionLost() { piCoutObj << "mqtt_connectionLost"; PRIVATE->connected = false; changeStatus(Idle); disconnected(Error::ServerUnavailable); } void PIMQTT::Client::mqtt_deliveryComplete(int token) {} void PIMQTT::Client::mqtt_messageArrived(const Message & msg) { piCoutObj << "mqtt_messageArrived"; received(msg); } void PIMQTT::Client::connectInternal() { destroy(); PRIVATE->connected = false; auto ci = connect_info.get(); PINetworkAddress net_addr = PINetworkAddress::resolve(ci.address); if (net_addr.port() == 0) net_addr.setPort(1883); PIString server_uri = "tcp://" + net_addr.toString(); MQTTClient_create(&PRIVATE->client, server_uri.dataAscii(), ci.clientID.dataUTF8(), MQTTCLIENT_PERSISTENCE_NONE, nullptr); MQTTClient_setCallbacks(PRIVATE->client, this, PRIVATE->connectionLost_callback, PRIVATE->messageArrived_callback, PRIVATE->deliveryComplete_callback); MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; conn_opts.keepAliveInterval = 9; conn_opts.cleansession = 1; conn_opts.connectTimeout = piRound(connect_timeout.toSeconds()); if (ci.username.isNotEmpty()) conn_opts.username = ci.username.dataAscii(); if (ci.password.isNotEmpty()) conn_opts.password = ci.password.dataAscii(); int ret = MQTTClient_connect(PRIVATE->client, &conn_opts); if (ret != MQTTCLIENT_SUCCESS) { piCoutObj << "Failed to connect to " + server_uri << ", code" << ret; changeStatus(Idle); if (ret < 0) ret = static_cast(Error::ServerUnavailable); disconnected(static_cast(ret)); return; } PRIVATE->connected = true; changeStatus(Connected); connected(); } void PIMQTT::Client::publishInternal(const Message & m) { if (!PRIVATE->client) return; MQTTClient_message pubmsg = MQTTClient_message_initializer; pubmsg.payload = const_cast(static_cast(m.payload.data())); pubmsg.payloadlen = m.payload.size_s(); pubmsg.qos = static_cast(m.qos); int ret = MQTTClient_publishMessage(PRIVATE->client, m.topic.dataAscii(), &pubmsg, nullptr); if (ret != MQTTCLIENT_SUCCESS) { piCoutObj << "Failed to publishMessage, code" << ret; } } void PIMQTT::Client::subscribeInternal(const Subscribe & sub) { if (!PRIVATE->client) return; int ret = MQTTClient_subscribe(PRIVATE->client, sub.topic.dataUTF8(), static_cast(sub.qos)); if (ret != MQTTCLIENT_SUCCESS) { piCoutObj << "Failed to subscribe" << sub.topic << ", code" << ret; } } void PIMQTT::Client::unsubscribeInternal(const PIString & topic) { if (!PRIVATE->client) return; int ret = MQTTClient_unsubscribe(PRIVATE->client, topic.dataUTF8()); if (ret != MQTTCLIENT_SUCCESS) { piCoutObj << "Failed to unsubscribe" << topic << ", code" << ret; } } void PIMQTT::Client::destroy() { if (!PRIVATE->client) return; if (PRIVATE->connected) MQTTClient_disconnect(PRIVATE->client, 1000); MQTTClient_destroy(&PRIVATE->client); PRIVATE->connected = false; PRIVATE->client = nullptr; } void PIMQTT::Client::changeStatus(Status s) { m_status = s; notifier.notify(); } void PIMQTT::Client::run() { notifier.wait(); piCoutObj << "run" << (int)m_status; if (thread->isStopping() || m_status == Disconnecting) { bool was_connected = PRIVATE->connected; changeStatus(Idle); destroy(); if (was_connected) disconnected(Error::Unknown); PRIVATE->connected = false; return; } if (m_status == Connecting) { connectInternal(); return; } if (m_status == Connected) { Subscribe sub; { auto ref = sub_queue.getRef(); if (!ref->isEmpty()) sub = ref->dequeue(); } if (sub.topic.isNotEmpty()) { subscribeInternal(sub); notifier.notify(); return; } PIString topic; { auto ref = unsub_queue.getRef(); if (!ref->isEmpty()) topic = ref->dequeue(); } if (topic.isNotEmpty()) { unsubscribeInternal(topic); notifier.notify(); return; } Message msg; { auto ref = pub_queue.getRef(); if (!ref->isEmpty()) msg = ref->dequeue(); } if (msg.isValid()) { publishInternal(msg); notifier.notify(); return; } } }