5 Commits

5 changed files with 206 additions and 120 deletions

View File

@@ -28,8 +28,6 @@
#include "piliterals_time.h" #include "piliterals_time.h"
#include "pimqtttypes.h" #include "pimqtttypes.h"
#include "pip_mqtt_client_export.h" #include "pip_mqtt_client_export.h"
#include "piprotectedvariable.h"
#include "pithread.h"
#include "pithreadpoolworker.h" #include "pithreadpoolworker.h"
@@ -52,6 +50,7 @@ public:
void unsubscribe(const PIString & topic); void unsubscribe(const PIString & topic);
void publish(const PIString & topic, const PIByteArray & msg, QoS qos = QoS::Level0); void publish(const PIString & topic, const PIByteArray & msg, QoS qos = QoS::Level0);
void publish(const MessageConst & msg);
void unsubscribeAll() { unsubscribe("#"); } void unsubscribeAll() { unsubscribe("#"); }
bool isConnecting() const { return m_status == Connecting; } bool isConnecting() const { return m_status == Connecting; }
@@ -59,18 +58,17 @@ public:
EVENT0(connected); EVENT0(connected);
EVENT1(disconnected, PIMQTT::Error, code); EVENT1(disconnected, PIMQTT::Error, code);
EVENT1(received, PIMQTT::Message, message); EVENT1(received, PIMQTT::MessageConst, message);
private: private:
NO_COPY_CLASS(Client) NO_COPY_CLASS(Client)
PRIVATE_DECLARATION(PIP_EXPORT) PRIVATE_DECLARATION(PIP_MQTT_CLIENT_EXPORT)
enum Status { enum Status {
Idle, Idle,
Connecting, Connecting,
Connected, Connected,
Disconnecting,
}; };
struct ConnectInfo { struct ConnectInfo {
@@ -86,10 +84,11 @@ private:
void mqtt_connectionLost(); void mqtt_connectionLost();
void mqtt_deliveryComplete(int token); void mqtt_deliveryComplete(int token);
void mqtt_messageArrived(const Message & msg); void mqtt_messageArrived(const MessageConst & msg);
void connectInternal(); void connectInternal(const ConnectInfo & ci);
void publishInternal(const Message & m); void disconnectInternal();
void publishInternal(const MessageConst & m);
void subscribeInternal(const Subscribe & sub); void subscribeInternal(const Subscribe & sub);
void unsubscribeInternal(const PIString & topic); void unsubscribeInternal(const PIString & topic);
void destroy(); void destroy();
@@ -97,15 +96,10 @@ private:
void run(); void run();
PIProtectedVariable<PIQueue<Message>> pub_queue; std::atomic_int m_status = {Idle};
PIProtectedVariable<PIQueue<Subscribe>> sub_queue; std::atomic_bool is_destoying = {false};
PIProtectedVariable<PIQueue<PIString>> unsub_queue; PISystemTime connect_timeout = 10_s;
PIProtectedVariable<ConnectInfo> connect_info; PIThreadPoolWorker * worker = nullptr;
PIThreadNotifier notifier;
std::atomic_int m_status = {Idle};
PISystemTime connect_timeout = 10_s;
PIThread * thread = nullptr;
// PIThreadPoolWorker * worker = nullptr;
}; };

View File

@@ -0,0 +1,31 @@
#include "pimqtttypes.h"
PIMQTT::MessageMutable & PIMQTT::MessageMutable::setTopic(PIString t) {
m_topic = t;
return *this;
}
PIMQTT::MessageMutable & PIMQTT::MessageMutable::setBody(PIByteArray b) {
m_payload = b;
return *this;
}
PIMQTT::MessageMutable & PIMQTT::MessageMutable::setDuplicate(bool yes) {
m_is_duplicate = yes;
return *this;
}
PIMQTT::MessageMutable & PIMQTT::MessageMutable::setQos(QoS qos) {
m_qos = qos;
return *this;
}
PIMQTT::MessageMutable & PIMQTT::MessageMutable::setID(int id) {
m_msg_id = id;
return *this;
}

View File

@@ -26,7 +26,7 @@
#define pimqtttypes_h #define pimqtttypes_h
#include "pip_export.h" #include "pip_export.h"
#include "pistring.h" #include "pistringlist.h"
//! \~english Namespace with shared MQTT data types. //! \~english Namespace with shared MQTT data types.
@@ -51,14 +51,113 @@ enum class Error {
NotAuthorized = 5 /** */, NotAuthorized = 5 /** */,
}; };
struct PIP_EXPORT Message {
bool isValid() const { return topic.isNotEmpty(); } //! \~\ingroup MQTT
PIString topic; //! \~\brief
PIByteArray payload; //! \~english Immutable MQTT message.
PIMap<int, PIString> properties; // for v5 //! \~russian Неизменяемое MQTT-сообщение.
QoS qos = QoS::Level0; class PIP_EXPORT MessageConst {
int msg_id = 0; public:
bool is_duplicate = false; //! \~english Returns if the topic is not empty.
//! \~russian Возвращает не пустой ли топик.
bool isValid() const { return m_topic.isNotEmpty(); }
//! \~english Returns the topic.
//! \~russian Возвращает топик.
const PIString & topic() const { return m_topic; }
//! \~english Returns the topic split into non-empty components.
//! \~russian Возвращает топик, разбитый на непустые компоненты.
PIStringList topicList() const { return m_topic.split('/').removeAll({}); }
//! \~english Returns the message body.
//! \~russian Возвращает тело сообщения.
const PIByteArray & body() const { return m_payload; }
//! \~english Returns the message body.
//! \~russian Возвращает тело сообщения.
const PIByteArray & payload() const { return m_payload; }
//! \~english Returns extracted path arguments.
//! \~russian Возвращает извлеченные аргументы пути.
const PIMap<PIString, PIString> & pathArguments() const { return m_path_arguments; }
//! \~english Returns map of properties.
//! \~russian Возвращает карту свойств.
const PIMap<int, PIString> & properties() const { return m_properties; }
//! \~english Returns \c true for .
//! \~russian Возвращает \c true для .
bool isDuplicate() const { return m_is_duplicate; }
//! \~english Returns QoS.
//! \~russian Возвращает QoS.
QoS qos() const { return m_qos; }
//! \~english Returns message ID.
//! \~russian Возвращает ID сообщения.
int ID() const { return m_msg_id; }
protected:
PIString m_topic;
PIMap<PIString, PIString> m_path_arguments;
PIByteArray m_payload;
PIMap<int, PIString> m_properties; // for v5
QoS m_qos = QoS::Level0;
int m_msg_id = 0;
bool m_is_duplicate = false;
};
//! \~\ingroup MQTT
//! \~\brief
//! \~english Mutable MQTT message.
//! \~russian Изменяемое MQTT-сообщение.
class PIP_EXPORT MessageMutable: public MessageConst {
public:
//! \~english Sets the topic.
//! \~russian Устанавливает топик.
MessageMutable & setTopic(PIString t);
//! \~english Sets the message body.
//! \~russian Устанавливает тело сообщения.
MessageMutable & setBody(PIByteArray b);
//! \~english Sets the message body.
//! \~russian Устанавливает тело сообщения.
MessageMutable & setPayload(PIByteArray b) { return setBody(b); }
//! \~english Returns path arguments.
//! \~russian Возвращает аргументы пути.
const PIMap<PIString, PIString> & pathArguments() const { return m_path_arguments; }
//! \~english Returns a modifiable map of path arguments.
//! \~russian Возвращает изменяемую карту аргументов пути.
PIMap<PIString, PIString> & pathArguments() { return m_path_arguments; }
//! \~english Returns map of properties.
//! \~russian Возвращает карту свойств.
const PIMap<int, PIString> & properties() const { return m_properties; }
//! \~english Returns a modifiable map of properties.
//! \~russian Возвращает изменяемую карту свойств.
PIMap<int, PIString> & properties() { return m_properties; }
//! \~english Returns \c true for .
//! \~russian Возвращает \c true для .
MessageMutable & setDuplicate(bool yes);
//! \~english Returns QoS.
//! \~russian Возвращает QoS.
MessageMutable & setQos(QoS qos);
//! \~english Returns message ID.
//! \~russian Возвращает ID сообщения.
MessageMutable & setID(int id);
}; };

View File

@@ -41,12 +41,12 @@ PRIVATE_DEFINITION_START(PIMQTT::Client)
} }
static int messageArrived_callback(void * context, char * topicName, int topicLen, MQTTClient_message * message) { static int messageArrived_callback(void * context, char * topicName, int topicLen, MQTTClient_message * message) {
PIMQTT::Message msg; PIMQTT::MessageMutable msg;
msg.topic = PIString::fromUTF8(topicName); msg.setTopic(PIString::fromUTF8(topicName));
msg.payload = PIByteArray(message->payload, message->payloadlen); msg.setPayload(PIByteArray(message->payload, message->payloadlen));
msg.msg_id = message->msgid; msg.setID(message->msgid);
msg.is_duplicate = message->dup != 0; msg.setDuplicate(message->dup != 0);
msg.qos = static_cast<PIMQTT::QoS>(message->qos); msg.setQos(static_cast<PIMQTT::QoS>(message->qos));
MQTTClient_freeMessage(&message); MQTTClient_freeMessage(&message);
MQTTClient_free(topicName); MQTTClient_free(topicName);
((PIMQTT::Client *)context)->mqtt_messageArrived(msg); ((PIMQTT::Client *)context)->mqtt_messageArrived(msg);
@@ -57,57 +57,61 @@ PRIVATE_DEFINITION_END(PIMQTT::Client)
PIMQTT::Client::Client() { PIMQTT::Client::Client() {
thread = new PIThread([this] { run(); }); worker = new PIThreadPoolWorker(1);
thread->start(); worker->start();
// worker = new PIThreadPoolWorker(1);
// worker->start();
} }
PIMQTT::Client::~Client() { PIMQTT::Client::~Client() {
// worker->stopAndWait(); is_destoying = true;
// piDeleteSafety(worker); worker->clearTasks();
thread->stop();
disconnect(); disconnect();
thread->waitForFinish(); worker->waitForTasks();
piDeleteSafety(thread); worker->stopAndWait();
piDeleteSafety(worker);
} }
void PIMQTT::Client::connect(const PIString & address, const PIString & client, const PIString & username, const PIString & password) { void PIMQTT::Client::connect(const PIString & address, const PIString & client, const PIString & username, const PIString & password) {
auto ci = connect_info.getRef(); if (is_destoying) return;
ci->address = address; ConnectInfo ci;
ci->clientID = client; ci.address = address;
ci->username = username; ci.clientID = client;
ci->password = password; ci.username = username;
changeStatus(Connecting); ci.password = password;
worker->enqueueTask([this, ci] { connectInternal(ci); });
} }
void PIMQTT::Client::disconnect() { void PIMQTT::Client::disconnect() {
changeStatus(Disconnecting); worker->enqueueTask([this] { disconnectInternal(); });
} }
void PIMQTT::Client::subscribe(const PIString & topic, QoS qos) { void PIMQTT::Client::subscribe(const PIString & topic, QoS qos) {
sub_queue.getRef()->enqueue({topic, qos}); if (is_destoying) return;
notifier.notify(); worker->enqueueTask([this, topic, qos] { subscribeInternal({topic, qos}); });
} }
void PIMQTT::Client::unsubscribe(const PIString & topic) { void PIMQTT::Client::unsubscribe(const PIString & topic) {
unsub_queue.getRef()->enqueue(topic); if (is_destoying) return;
notifier.notify(); worker->enqueueTask([this, topic] { unsubscribeInternal(topic); });
} }
void PIMQTT::Client::publish(const PIString & topic, const PIByteArray & msg, QoS qos) { void PIMQTT::Client::publish(const PIString & topic, const PIByteArray & msg, QoS qos) {
Message m; MessageMutable m;
m.topic = topic; m.setTopic(topic);
m.payload = msg; m.setPayload(msg);
m.qos = qos; m.setQos(qos);
pub_queue.getRef()->enqueue(m); publish(m);
notifier.notify(); }
void PIMQTT::Client::publish(const MessageConst & msg) {
if (is_destoying) return;
worker->enqueueTask([this, msg] { publishInternal(msg); });
} }
@@ -122,16 +126,16 @@ void PIMQTT::Client::mqtt_connectionLost() {
void PIMQTT::Client::mqtt_deliveryComplete(int token) {} void PIMQTT::Client::mqtt_deliveryComplete(int token) {}
void PIMQTT::Client::mqtt_messageArrived(const Message & msg) { void PIMQTT::Client::mqtt_messageArrived(const MessageConst & msg) {
piCoutObj << "mqtt_messageArrived"; piCoutObj << "mqtt_messageArrived";
received(msg); received(msg);
} }
void PIMQTT::Client::connectInternal() { void PIMQTT::Client::connectInternal(const ConnectInfo & ci) {
destroy(); destroy();
changeStatus(Connecting);
PRIVATE->connected = false; PRIVATE->connected = false;
auto ci = connect_info.get();
PINetworkAddress net_addr = PINetworkAddress::resolve(ci.address); PINetworkAddress net_addr = PINetworkAddress::resolve(ci.address);
if (net_addr.port() == 0) net_addr.setPort(1883); if (net_addr.port() == 0) net_addr.setPort(1883);
PIString server_uri = "tcp://" + net_addr.toString(); PIString server_uri = "tcp://" + net_addr.toString();
@@ -161,13 +165,22 @@ void PIMQTT::Client::connectInternal() {
} }
void PIMQTT::Client::publishInternal(const Message & m) { inline void PIMQTT::Client::disconnectInternal() {
bool was_connected = PRIVATE->connected;
changeStatus(Idle);
destroy();
if (was_connected) disconnected(Error::Unknown);
PRIVATE->connected = false;
}
void PIMQTT::Client::publishInternal(const MessageConst & m) {
if (!PRIVATE->client) return; if (!PRIVATE->client) return;
MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_message pubmsg = MQTTClient_message_initializer;
pubmsg.payload = const_cast<void *>(static_cast<const void *>(m.payload.data())); pubmsg.payload = const_cast<void *>(static_cast<const void *>(m.payload().data()));
pubmsg.payloadlen = m.payload.size_s(); pubmsg.payloadlen = m.payload().size_s();
pubmsg.qos = static_cast<int>(m.qos); pubmsg.qos = static_cast<int>(m.qos());
int ret = MQTTClient_publishMessage(PRIVATE->client, m.topic.dataAscii(), &pubmsg, nullptr); int ret = MQTTClient_publishMessage(PRIVATE->client, m.topic().dataAscii(), &pubmsg, nullptr);
if (ret != MQTTCLIENT_SUCCESS) { if (ret != MQTTCLIENT_SUCCESS) {
piCoutObj << "Failed to publishMessage, code" << ret; piCoutObj << "Failed to publishMessage, code" << ret;
} }
@@ -203,57 +216,4 @@ void PIMQTT::Client::destroy() {
void PIMQTT::Client::changeStatus(Status s) { void PIMQTT::Client::changeStatus(Status s) {
m_status = s; m_status = s;
notifier.notify();
}
void PIMQTT::Client::run() {
notifier.wait();
piCoutObj << "run" << (int)m_status;
if (thread->isStopping() || m_status == Disconnecting) {
bool was_connected = PRIVATE->connected;
changeStatus(Idle);
destroy();
if (was_connected) disconnected(Error::Unknown);
PRIVATE->connected = false;
return;
}
if (m_status == Connecting) {
connectInternal();
return;
}
if (m_status == Connected) {
Subscribe sub;
{
auto ref = sub_queue.getRef();
if (!ref->isEmpty()) sub = ref->dequeue();
}
if (sub.topic.isNotEmpty()) {
subscribeInternal(sub);
notifier.notify();
return;
}
PIString topic;
{
auto ref = unsub_queue.getRef();
if (!ref->isEmpty()) topic = ref->dequeue();
}
if (topic.isNotEmpty()) {
unsubscribeInternal(topic);
notifier.notify();
return;
}
Message msg;
{
auto ref = pub_queue.getRef();
if (!ref->isEmpty()) msg = ref->dequeue();
}
if (msg.isValid()) {
publishInternal(msg);
notifier.notify();
return;
}
}
} }

View File

@@ -35,7 +35,9 @@ int main(int argc, char * argv[]) {
piCout << "disconnected code" << (int)code; piCout << "disconnected code" << (int)code;
cl.connect("localhost", "PIP"); cl.connect("localhost", "PIP");
}); });
CONNECTL(&cl, received, [](const PIMQTT::Message & message) { piCout << "received" << message.topic << message.payload.size(); }); CONNECTL(&cl, received, [](const PIMQTT::MessageConst & message) {
piCout << "received" << message.topic() << message.payload().size();
});
cl.connect("localhost", "PIP"); cl.connect("localhost", "PIP");