diff --git a/libs/mqtt_client/pimqttclient.cpp b/libs/mqtt_client/pimqttclient.cpp
new file mode 100644
index 00000000..6a787dbb
--- /dev/null
+++ b/libs/mqtt_client/pimqttclient.cpp
@@ -0,0 +1,259 @@
+/*
+ PIP - Platform Independent Primitives
+ MQTT Client
+ Ivan Pelipenko peri4ko@yandex.ru
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see .
+*/
+
+#include "pimqttclient.h"
+
+#include "MQTTClient.h"
+
+
+STATIC_INITIALIZER_BEGIN
+ MQTTClient_init_options opts = MQTTClient_init_options_initializer;
+ MQTTClient_global_init(&opts);
+STATIC_INITIALIZER_END
+
+
+PRIVATE_DEFINITION_START(PIMQTT::Client)
+ MQTTClient client = nullptr;
+ bool connected = false;
+
+ static void connectionLost_callback(void * context, char *) {
+ ((PIMQTT::Client *)context)->mqtt_connectionLost();
+ }
+
+ static void deliveryComplete_callback(void * context, MQTTClient_deliveryToken dt) {
+ ((PIMQTT::Client *)context)->mqtt_deliveryComplete(dt);
+ }
+
+ static int messageArrived_callback(void * context, char * topicName, int topicLen, MQTTClient_message * message) {
+ PIMQTT::Message msg;
+ msg.topic = PIString::fromUTF8(topicName);
+ msg.payload = PIByteArray(message->payload, message->payloadlen);
+ msg.msg_id = message->msgid;
+ msg.is_duplicate = message->dup != 0;
+ msg.qos = static_cast(message->qos);
+ MQTTClient_freeMessage(&message);
+ MQTTClient_free(topicName);
+ ((PIMQTT::Client *)context)->mqtt_messageArrived(msg);
+ return 1;
+ }
+
+PRIVATE_DEFINITION_END(PIMQTT::Client)
+
+
+PIMQTT::Client::Client() {
+ thread = new PIThread([this] { run(); });
+ thread->start();
+ // worker = new PIThreadPoolWorker(1);
+ // worker->start();
+}
+
+
+PIMQTT::Client::~Client() {
+ // worker->stopAndWait();
+ // piDeleteSafety(worker);
+ thread->stop();
+ disconnect();
+ thread->waitForFinish();
+ piDeleteSafety(thread);
+}
+
+
+void PIMQTT::Client::connect(const PIString & address, const PIString & client, const PIString & username, const PIString & password) {
+ auto ci = connect_info.getRef();
+ ci->address = address;
+ ci->clientID = client;
+ ci->username = username;
+ ci->password = password;
+ changeStatus(Connecting);
+}
+
+
+void PIMQTT::Client::disconnect() {
+ changeStatus(Disconnecting);
+}
+
+
+void PIMQTT::Client::subscribe(const PIString & topic, QoS qos) {
+ sub_queue.getRef()->enqueue({topic, qos});
+ notifier.notify();
+}
+
+
+void PIMQTT::Client::unsubscribe(const PIString & topic) {
+ unsub_queue.getRef()->enqueue(topic);
+ notifier.notify();
+}
+
+
+void PIMQTT::Client::publish(const PIString & topic, const PIByteArray & msg, QoS qos) {
+ Message m;
+ m.topic = topic;
+ m.payload = msg;
+ m.qos = qos;
+ pub_queue.getRef()->enqueue(m);
+ notifier.notify();
+}
+
+
+void PIMQTT::Client::mqtt_connectionLost() {
+ piCoutObj << "mqtt_connectionLost";
+ PRIVATE->connected = false;
+ changeStatus(Idle);
+ disconnected(Error::ServerUnavailable);
+}
+
+
+void PIMQTT::Client::mqtt_deliveryComplete(int token) {}
+
+
+void PIMQTT::Client::mqtt_messageArrived(const Message & msg) {
+ piCoutObj << "mqtt_messageArrived";
+ received(msg);
+}
+
+
+void PIMQTT::Client::connectInternal() {
+ destroy();
+ PRIVATE->connected = false;
+ auto ci = connect_info.get();
+ PINetworkAddress net_addr = PINetworkAddress::resolve(ci.address);
+ if (net_addr.port() == 0) net_addr.setPort(1883);
+ PIString server_uri = "tcp://" + net_addr.toString();
+ MQTTClient_create(&PRIVATE->client, server_uri.dataAscii(), ci.clientID.dataUTF8(), MQTTCLIENT_PERSISTENCE_NONE, nullptr);
+ MQTTClient_setCallbacks(PRIVATE->client,
+ this,
+ PRIVATE->connectionLost_callback,
+ PRIVATE->messageArrived_callback,
+ PRIVATE->deliveryComplete_callback);
+ MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
+ conn_opts.keepAliveInterval = 9;
+ conn_opts.cleansession = 1;
+ conn_opts.connectTimeout = piRound(connect_timeout.toSeconds());
+ if (ci.username.isNotEmpty()) conn_opts.username = ci.username.dataAscii();
+ if (ci.password.isNotEmpty()) conn_opts.password = ci.password.dataAscii();
+ int ret = MQTTClient_connect(PRIVATE->client, &conn_opts);
+ if (ret != MQTTCLIENT_SUCCESS) {
+ piCoutObj << "Failed to connect to " + server_uri << ", code" << ret;
+ changeStatus(Idle);
+ if (ret < 0) ret = static_cast(Error::ServerUnavailable);
+ disconnected(static_cast(ret));
+ return;
+ }
+ PRIVATE->connected = true;
+ changeStatus(Connected);
+ connected();
+}
+
+
+void PIMQTT::Client::publishInternal(const Message & m) {
+ if (!PRIVATE->client) return;
+ MQTTClient_message pubmsg = MQTTClient_message_initializer;
+ pubmsg.payload = const_cast(static_cast(m.payload.data()));
+ pubmsg.payloadlen = m.payload.size_s();
+ pubmsg.qos = static_cast(m.qos);
+ int ret = MQTTClient_publishMessage(PRIVATE->client, m.topic.dataAscii(), &pubmsg, nullptr);
+ if (ret != MQTTCLIENT_SUCCESS) {
+ piCoutObj << "Failed to publishMessage, code" << ret;
+ }
+}
+
+
+void PIMQTT::Client::subscribeInternal(const Subscribe & sub) {
+ if (!PRIVATE->client) return;
+ int ret = MQTTClient_subscribe(PRIVATE->client, sub.topic.dataUTF8(), static_cast(sub.qos));
+ if (ret != MQTTCLIENT_SUCCESS) {
+ piCoutObj << "Failed to subscribe" << sub.topic << ", code" << ret;
+ }
+}
+
+
+void PIMQTT::Client::unsubscribeInternal(const PIString & topic) {
+ if (!PRIVATE->client) return;
+ int ret = MQTTClient_unsubscribe(PRIVATE->client, topic.dataUTF8());
+ if (ret != MQTTCLIENT_SUCCESS) {
+ piCoutObj << "Failed to unsubscribe" << topic << ", code" << ret;
+ }
+}
+
+
+void PIMQTT::Client::destroy() {
+ if (!PRIVATE->client) return;
+ if (PRIVATE->connected) MQTTClient_disconnect(PRIVATE->client, 1000);
+ MQTTClient_destroy(&PRIVATE->client);
+ PRIVATE->connected = false;
+ PRIVATE->client = nullptr;
+}
+
+
+void PIMQTT::Client::changeStatus(Status s) {
+ m_status = s;
+ notifier.notify();
+}
+
+
+void PIMQTT::Client::run() {
+ notifier.wait();
+ piCoutObj << "run" << (int)m_status;
+ if (thread->isStopping() || m_status == Disconnecting) {
+ bool was_connected = PRIVATE->connected;
+ changeStatus(Idle);
+ destroy();
+ if (was_connected) disconnected(Error::Unknown);
+ PRIVATE->connected = false;
+ return;
+ }
+ if (m_status == Connecting) {
+ connectInternal();
+ return;
+ }
+ if (m_status == Connected) {
+ Subscribe sub;
+ {
+ auto ref = sub_queue.getRef();
+ if (!ref->isEmpty()) sub = ref->dequeue();
+ }
+ if (sub.topic.isNotEmpty()) {
+ subscribeInternal(sub);
+ notifier.notify();
+ return;
+ }
+
+ PIString topic;
+ {
+ auto ref = unsub_queue.getRef();
+ if (!ref->isEmpty()) topic = ref->dequeue();
+ }
+ if (topic.isNotEmpty()) {
+ unsubscribeInternal(topic);
+ notifier.notify();
+ return;
+ }
+
+ Message msg;
+ {
+ auto ref = pub_queue.getRef();
+ if (!ref->isEmpty()) msg = ref->dequeue();
+ }
+ if (msg.isValid()) {
+ publishInternal(msg);
+ notifier.notify();
+ return;
+ }
+ }
+}