diff --git a/libs/main/mqtt_client/pimqttclient.h b/libs/main/mqtt_client/pimqttclient.h index 2cda3414..edd0fbe5 100644 --- a/libs/main/mqtt_client/pimqttclient.h +++ b/libs/main/mqtt_client/pimqttclient.h @@ -70,7 +70,6 @@ private: Idle, Connecting, Connected, - Disconnecting, }; struct ConnectInfo { @@ -88,7 +87,8 @@ private: void mqtt_deliveryComplete(int token); void mqtt_messageArrived(const Message & msg); - void connectInternal(); + void connectInternal(const ConnectInfo & ci); + void disconnectInternal(); void publishInternal(const Message & m); void subscribeInternal(const Subscribe & sub); void unsubscribeInternal(const PIString & topic); @@ -97,15 +97,10 @@ private: void run(); - PIProtectedVariable> pub_queue; - PIProtectedVariable> sub_queue; - PIProtectedVariable> unsub_queue; - PIProtectedVariable connect_info; - PIThreadNotifier notifier; - std::atomic_int m_status = {Idle}; - PISystemTime connect_timeout = 10_s; - PIThread * thread = nullptr; - // PIThreadPoolWorker * worker = nullptr; + std::atomic_int m_status = {Idle}; + std::atomic_bool is_destoying = {false}; + PISystemTime connect_timeout = 10_s; + PIThreadPoolWorker * worker = nullptr; }; diff --git a/libs/mqtt_client/pimqttclient.cpp b/libs/mqtt_client/pimqttclient.cpp index 6a787dbb..d23ad227 100644 --- a/libs/mqtt_client/pimqttclient.cpp +++ b/libs/mqtt_client/pimqttclient.cpp @@ -57,57 +57,56 @@ PRIVATE_DEFINITION_END(PIMQTT::Client) PIMQTT::Client::Client() { - thread = new PIThread([this] { run(); }); - thread->start(); - // worker = new PIThreadPoolWorker(1); - // worker->start(); + worker = new PIThreadPoolWorker(1); + worker->start(); } PIMQTT::Client::~Client() { - // worker->stopAndWait(); - // piDeleteSafety(worker); - thread->stop(); + is_destoying = true; + worker->clearTasks(); disconnect(); - thread->waitForFinish(); - piDeleteSafety(thread); + worker->waitForTasks(); + worker->stopAndWait(); + piDeleteSafety(worker); } void PIMQTT::Client::connect(const PIString & address, const PIString & client, const PIString & username, const PIString & password) { - auto ci = connect_info.getRef(); - ci->address = address; - ci->clientID = client; - ci->username = username; - ci->password = password; - changeStatus(Connecting); + if (is_destoying) return; + ConnectInfo ci; + ci.address = address; + ci.clientID = client; + ci.username = username; + ci.password = password; + worker->enqueueTask([this, ci] { connectInternal(ci); }); } void PIMQTT::Client::disconnect() { - changeStatus(Disconnecting); + worker->enqueueTask([this] { disconnectInternal(); }); } void PIMQTT::Client::subscribe(const PIString & topic, QoS qos) { - sub_queue.getRef()->enqueue({topic, qos}); - notifier.notify(); + if (is_destoying) return; + worker->enqueueTask([this, topic, qos] { subscribeInternal({topic, qos}); }); } void PIMQTT::Client::unsubscribe(const PIString & topic) { - unsub_queue.getRef()->enqueue(topic); - notifier.notify(); + if (is_destoying) return; + worker->enqueueTask([this, topic] { unsubscribeInternal(topic); }); } void PIMQTT::Client::publish(const PIString & topic, const PIByteArray & msg, QoS qos) { + if (is_destoying) return; Message m; m.topic = topic; m.payload = msg; m.qos = qos; - pub_queue.getRef()->enqueue(m); - notifier.notify(); + worker->enqueueTask([this, m] { publishInternal(m); }); } @@ -128,10 +127,10 @@ void PIMQTT::Client::mqtt_messageArrived(const Message & msg) { } -void PIMQTT::Client::connectInternal() { +void PIMQTT::Client::connectInternal(const ConnectInfo & ci) { destroy(); + changeStatus(Connecting); PRIVATE->connected = false; - auto ci = connect_info.get(); PINetworkAddress net_addr = PINetworkAddress::resolve(ci.address); if (net_addr.port() == 0) net_addr.setPort(1883); PIString server_uri = "tcp://" + net_addr.toString(); @@ -161,6 +160,15 @@ void PIMQTT::Client::connectInternal() { } +inline void PIMQTT::Client::disconnectInternal() { + bool was_connected = PRIVATE->connected; + changeStatus(Idle); + destroy(); + if (was_connected) disconnected(Error::Unknown); + PRIVATE->connected = false; +} + + void PIMQTT::Client::publishInternal(const Message & m) { if (!PRIVATE->client) return; MQTTClient_message pubmsg = MQTTClient_message_initializer; @@ -203,57 +211,4 @@ void PIMQTT::Client::destroy() { void PIMQTT::Client::changeStatus(Status s) { m_status = s; - notifier.notify(); -} - - -void PIMQTT::Client::run() { - notifier.wait(); - piCoutObj << "run" << (int)m_status; - if (thread->isStopping() || m_status == Disconnecting) { - bool was_connected = PRIVATE->connected; - changeStatus(Idle); - destroy(); - if (was_connected) disconnected(Error::Unknown); - PRIVATE->connected = false; - return; - } - if (m_status == Connecting) { - connectInternal(); - return; - } - if (m_status == Connected) { - Subscribe sub; - { - auto ref = sub_queue.getRef(); - if (!ref->isEmpty()) sub = ref->dequeue(); - } - if (sub.topic.isNotEmpty()) { - subscribeInternal(sub); - notifier.notify(); - return; - } - - PIString topic; - { - auto ref = unsub_queue.getRef(); - if (!ref->isEmpty()) topic = ref->dequeue(); - } - if (topic.isNotEmpty()) { - unsubscribeInternal(topic); - notifier.notify(); - return; - } - - Message msg; - { - auto ref = pub_queue.getRef(); - if (!ref->isEmpty()) msg = ref->dequeue(); - } - if (msg.isValid()) { - publishInternal(msg); - notifier.notify(); - return; - } - } }