migrate to PIThreadPoolWorker

This commit is contained in:
2026-03-26 13:28:34 +03:00
parent c05fe46d04
commit 089e241a67
2 changed files with 38 additions and 88 deletions

View File

@@ -70,7 +70,6 @@ private:
Idle, Idle,
Connecting, Connecting,
Connected, Connected,
Disconnecting,
}; };
struct ConnectInfo { struct ConnectInfo {
@@ -88,7 +87,8 @@ private:
void mqtt_deliveryComplete(int token); void mqtt_deliveryComplete(int token);
void mqtt_messageArrived(const Message & msg); void mqtt_messageArrived(const Message & msg);
void connectInternal(); void connectInternal(const ConnectInfo & ci);
void disconnectInternal();
void publishInternal(const Message & m); void publishInternal(const Message & m);
void subscribeInternal(const Subscribe & sub); void subscribeInternal(const Subscribe & sub);
void unsubscribeInternal(const PIString & topic); void unsubscribeInternal(const PIString & topic);
@@ -97,15 +97,10 @@ private:
void run(); void run();
PIProtectedVariable<PIQueue<Message>> pub_queue; std::atomic_int m_status = {Idle};
PIProtectedVariable<PIQueue<Subscribe>> sub_queue; std::atomic_bool is_destoying = {false};
PIProtectedVariable<PIQueue<PIString>> unsub_queue; PISystemTime connect_timeout = 10_s;
PIProtectedVariable<ConnectInfo> connect_info; PIThreadPoolWorker * worker = nullptr;
PIThreadNotifier notifier;
std::atomic_int m_status = {Idle};
PISystemTime connect_timeout = 10_s;
PIThread * thread = nullptr;
// PIThreadPoolWorker * worker = nullptr;
}; };

View File

@@ -57,57 +57,56 @@ PRIVATE_DEFINITION_END(PIMQTT::Client)
PIMQTT::Client::Client() { PIMQTT::Client::Client() {
thread = new PIThread([this] { run(); }); worker = new PIThreadPoolWorker(1);
thread->start(); worker->start();
// worker = new PIThreadPoolWorker(1);
// worker->start();
} }
PIMQTT::Client::~Client() { PIMQTT::Client::~Client() {
// worker->stopAndWait(); is_destoying = true;
// piDeleteSafety(worker); worker->clearTasks();
thread->stop();
disconnect(); disconnect();
thread->waitForFinish(); worker->waitForTasks();
piDeleteSafety(thread); worker->stopAndWait();
piDeleteSafety(worker);
} }
void PIMQTT::Client::connect(const PIString & address, const PIString & client, const PIString & username, const PIString & password) { void PIMQTT::Client::connect(const PIString & address, const PIString & client, const PIString & username, const PIString & password) {
auto ci = connect_info.getRef(); if (is_destoying) return;
ci->address = address; ConnectInfo ci;
ci->clientID = client; ci.address = address;
ci->username = username; ci.clientID = client;
ci->password = password; ci.username = username;
changeStatus(Connecting); ci.password = password;
worker->enqueueTask([this, ci] { connectInternal(ci); });
} }
void PIMQTT::Client::disconnect() { void PIMQTT::Client::disconnect() {
changeStatus(Disconnecting); worker->enqueueTask([this] { disconnectInternal(); });
} }
void PIMQTT::Client::subscribe(const PIString & topic, QoS qos) { void PIMQTT::Client::subscribe(const PIString & topic, QoS qos) {
sub_queue.getRef()->enqueue({topic, qos}); if (is_destoying) return;
notifier.notify(); worker->enqueueTask([this, topic, qos] { subscribeInternal({topic, qos}); });
} }
void PIMQTT::Client::unsubscribe(const PIString & topic) { void PIMQTT::Client::unsubscribe(const PIString & topic) {
unsub_queue.getRef()->enqueue(topic); if (is_destoying) return;
notifier.notify(); worker->enqueueTask([this, topic] { unsubscribeInternal(topic); });
} }
void PIMQTT::Client::publish(const PIString & topic, const PIByteArray & msg, QoS qos) { void PIMQTT::Client::publish(const PIString & topic, const PIByteArray & msg, QoS qos) {
if (is_destoying) return;
Message m; Message m;
m.topic = topic; m.topic = topic;
m.payload = msg; m.payload = msg;
m.qos = qos; m.qos = qos;
pub_queue.getRef()->enqueue(m); worker->enqueueTask([this, m] { publishInternal(m); });
notifier.notify();
} }
@@ -128,10 +127,10 @@ void PIMQTT::Client::mqtt_messageArrived(const Message & msg) {
} }
void PIMQTT::Client::connectInternal() { void PIMQTT::Client::connectInternal(const ConnectInfo & ci) {
destroy(); destroy();
changeStatus(Connecting);
PRIVATE->connected = false; PRIVATE->connected = false;
auto ci = connect_info.get();
PINetworkAddress net_addr = PINetworkAddress::resolve(ci.address); PINetworkAddress net_addr = PINetworkAddress::resolve(ci.address);
if (net_addr.port() == 0) net_addr.setPort(1883); if (net_addr.port() == 0) net_addr.setPort(1883);
PIString server_uri = "tcp://" + net_addr.toString(); PIString server_uri = "tcp://" + net_addr.toString();
@@ -161,6 +160,15 @@ void PIMQTT::Client::connectInternal() {
} }
inline void PIMQTT::Client::disconnectInternal() {
bool was_connected = PRIVATE->connected;
changeStatus(Idle);
destroy();
if (was_connected) disconnected(Error::Unknown);
PRIVATE->connected = false;
}
void PIMQTT::Client::publishInternal(const Message & m) { void PIMQTT::Client::publishInternal(const Message & m) {
if (!PRIVATE->client) return; if (!PRIVATE->client) return;
MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer;
@@ -203,57 +211,4 @@ void PIMQTT::Client::destroy() {
void PIMQTT::Client::changeStatus(Status s) { void PIMQTT::Client::changeStatus(Status s) {
m_status = s; m_status = s;
notifier.notify();
}
void PIMQTT::Client::run() {
notifier.wait();
piCoutObj << "run" << (int)m_status;
if (thread->isStopping() || m_status == Disconnecting) {
bool was_connected = PRIVATE->connected;
changeStatus(Idle);
destroy();
if (was_connected) disconnected(Error::Unknown);
PRIVATE->connected = false;
return;
}
if (m_status == Connecting) {
connectInternal();
return;
}
if (m_status == Connected) {
Subscribe sub;
{
auto ref = sub_queue.getRef();
if (!ref->isEmpty()) sub = ref->dequeue();
}
if (sub.topic.isNotEmpty()) {
subscribeInternal(sub);
notifier.notify();
return;
}
PIString topic;
{
auto ref = unsub_queue.getRef();
if (!ref->isEmpty()) topic = ref->dequeue();
}
if (topic.isNotEmpty()) {
unsubscribeInternal(topic);
notifier.notify();
return;
}
Message msg;
{
auto ref = pub_queue.getRef();
if (!ref->isEmpty()) msg = ref->dequeue();
}
if (msg.isValid()) {
publishInternal(msg);
notifier.notify();
return;
}
}
} }