/* PIP - Platform Independent Primitives MQTT Client Ivan Pelipenko peri4ko@yandex.ru This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #include "pimqttclient.h" #include "MQTTClient.h" STATIC_INITIALIZER_BEGIN MQTTClient_init_options opts = MQTTClient_init_options_initializer; MQTTClient_global_init(&opts); STATIC_INITIALIZER_END PRIVATE_DEFINITION_START(PIMQTT::Client) MQTTClient client = nullptr; bool connected = false; static void connectionLost_callback(void * context, char *) { ((PIMQTT::Client *)context)->mqtt_connectionLost(); } static void deliveryComplete_callback(void * context, MQTTClient_deliveryToken dt) { ((PIMQTT::Client *)context)->mqtt_deliveryComplete(dt); } static int messageArrived_callback(void * context, char * topicName, int topicLen, MQTTClient_message * message) { PIMQTT::MessageMutable msg; msg.setTopic(PIString::fromUTF8(topicName)); msg.setPayload(PIByteArray(message->payload, message->payloadlen)); msg.setID(message->msgid); msg.setDuplicate(message->dup != 0); msg.setQos(static_cast(message->qos)); MQTTClient_freeMessage(&message); MQTTClient_free(topicName); ((PIMQTT::Client *)context)->mqtt_messageArrived(msg); return 1; } PRIVATE_DEFINITION_END(PIMQTT::Client) PIMQTT::Client::Client() { worker = new PIThreadPoolWorker(1); worker->start(); } PIMQTT::Client::~Client() { is_destoying = true; worker->clearTasks(); disconnect(); worker->waitForTasks(); worker->stopAndWait(); piDeleteSafety(worker); } void PIMQTT::Client::connect(const PIString & address, const PIString & client, const PIString & username, const PIString & password) { if (is_destoying) return; ConnectInfo ci; ci.address = address; ci.clientID = client; ci.username = username; ci.password = password; worker->enqueueTask([this, ci] { connectInternal(ci); }); } void PIMQTT::Client::disconnect() { worker->enqueueTask([this] { disconnectInternal(); }); } void PIMQTT::Client::subscribe(const PIString & topic, QoS qos) { if (is_destoying) return; worker->enqueueTask([this, topic, qos] { subscribeInternal({topic, qos}); }); } void PIMQTT::Client::unsubscribe(const PIString & topic) { if (is_destoying) return; worker->enqueueTask([this, topic] { unsubscribeInternal(topic); }); } void PIMQTT::Client::publish(const PIString & topic, const PIByteArray & msg, QoS qos) { MessageMutable m; m.setTopic(topic); m.setPayload(msg); m.setQos(qos); publish(m); } void PIMQTT::Client::publish(const MessageConst & msg) { if (is_destoying) return; worker->enqueueTask([this, msg] { publishInternal(msg); }); } void PIMQTT::Client::mqtt_connectionLost() { piCoutObj << "mqtt_connectionLost"; PRIVATE->connected = false; changeStatus(Idle); disconnected(Error::ServerUnavailable); } void PIMQTT::Client::mqtt_deliveryComplete(int token) {} void PIMQTT::Client::mqtt_messageArrived(const MessageConst & msg) { piCoutObj << "mqtt_messageArrived"; received(msg); } void PIMQTT::Client::connectInternal(const ConnectInfo & ci) { destroy(); changeStatus(Connecting); PRIVATE->connected = false; PINetworkAddress net_addr = PINetworkAddress::resolve(ci.address); if (net_addr.port() == 0) net_addr.setPort(1883); PIString server_uri = "tcp://" + net_addr.toString(); MQTTClient_create(&PRIVATE->client, server_uri.dataAscii(), ci.clientID.dataUTF8(), MQTTCLIENT_PERSISTENCE_NONE, nullptr); MQTTClient_setCallbacks(PRIVATE->client, this, PRIVATE->connectionLost_callback, PRIVATE->messageArrived_callback, PRIVATE->deliveryComplete_callback); MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; conn_opts.keepAliveInterval = 9; conn_opts.cleansession = 1; conn_opts.connectTimeout = piRound(connect_timeout.toSeconds()); if (ci.username.isNotEmpty()) conn_opts.username = ci.username.dataAscii(); if (ci.password.isNotEmpty()) conn_opts.password = ci.password.dataAscii(); int ret = MQTTClient_connect(PRIVATE->client, &conn_opts); if (ret != MQTTCLIENT_SUCCESS) { piCoutObj << "Failed to connect to " + server_uri << ", code" << ret; changeStatus(Idle); if (ret < 0) ret = static_cast(Error::ServerUnavailable); disconnected(static_cast(ret)); return; } PRIVATE->connected = true; changeStatus(Connected); connected(); } inline void PIMQTT::Client::disconnectInternal() { bool was_connected = PRIVATE->connected; changeStatus(Idle); destroy(); if (was_connected) disconnected(Error::Unknown); PRIVATE->connected = false; } void PIMQTT::Client::publishInternal(const MessageConst & m) { if (!PRIVATE->client) return; MQTTClient_message pubmsg = MQTTClient_message_initializer; pubmsg.payload = const_cast(static_cast(m.payload().data())); pubmsg.payloadlen = m.payload().size_s(); pubmsg.qos = static_cast(m.qos()); int ret = MQTTClient_publishMessage(PRIVATE->client, m.topic().dataAscii(), &pubmsg, nullptr); if (ret != MQTTCLIENT_SUCCESS) { piCoutObj << "Failed to publishMessage, code" << ret; } } void PIMQTT::Client::subscribeInternal(const Subscribe & sub) { if (!PRIVATE->client) return; int ret = MQTTClient_subscribe(PRIVATE->client, sub.topic.dataUTF8(), static_cast(sub.qos)); if (ret != MQTTCLIENT_SUCCESS) { piCoutObj << "Failed to subscribe" << sub.topic << ", code" << ret; } } void PIMQTT::Client::unsubscribeInternal(const PIString & topic) { if (!PRIVATE->client) return; int ret = MQTTClient_unsubscribe(PRIVATE->client, topic.dataUTF8()); if (ret != MQTTCLIENT_SUCCESS) { piCoutObj << "Failed to unsubscribe" << topic << ", code" << ret; } } void PIMQTT::Client::destroy() { if (!PRIVATE->client) return; if (PRIVATE->connected) MQTTClient_disconnect(PRIVATE->client, 1000); MQTTClient_destroy(&PRIVATE->client); PRIVATE->connected = false; PRIVATE->client = nullptr; } void PIMQTT::Client::changeStatus(Status s) { m_status = s; }