diff --git a/libs/main/mqtt_client/pimqttclient.h b/libs/main/mqtt_client/pimqttclient.h index edd0fbe5..1e12c47d 100644 --- a/libs/main/mqtt_client/pimqttclient.h +++ b/libs/main/mqtt_client/pimqttclient.h @@ -28,8 +28,6 @@ #include "piliterals_time.h" #include "pimqtttypes.h" #include "pip_mqtt_client_export.h" -#include "piprotectedvariable.h" -#include "pithread.h" #include "pithreadpoolworker.h" @@ -52,6 +50,7 @@ public: void unsubscribe(const PIString & topic); void publish(const PIString & topic, const PIByteArray & msg, QoS qos = QoS::Level0); + void publish(const MessageConst & msg); void unsubscribeAll() { unsubscribe("#"); } bool isConnecting() const { return m_status == Connecting; } @@ -59,7 +58,7 @@ public: EVENT0(connected); EVENT1(disconnected, PIMQTT::Error, code); - EVENT1(received, PIMQTT::Message, message); + EVENT1(received, PIMQTT::MessageConst, message); private: NO_COPY_CLASS(Client) @@ -85,11 +84,11 @@ private: void mqtt_connectionLost(); void mqtt_deliveryComplete(int token); - void mqtt_messageArrived(const Message & msg); + void mqtt_messageArrived(const MessageConst & msg); void connectInternal(const ConnectInfo & ci); void disconnectInternal(); - void publishInternal(const Message & m); + void publishInternal(const MessageConst & m); void subscribeInternal(const Subscribe & sub); void unsubscribeInternal(const PIString & topic); void destroy(); diff --git a/libs/main/mqtt_common/pimqtttypes.cpp b/libs/main/mqtt_common/pimqtttypes.cpp new file mode 100644 index 00000000..9df77997 --- /dev/null +++ b/libs/main/mqtt_common/pimqtttypes.cpp @@ -0,0 +1,31 @@ +#include "pimqtttypes.h" + + +PIMQTT::MessageMutable & PIMQTT::MessageMutable::setTopic(PIString t) { + m_topic = t; + return *this; +} + + +PIMQTT::MessageMutable & PIMQTT::MessageMutable::setBody(PIByteArray b) { + m_payload = b; + return *this; +} + + +PIMQTT::MessageMutable & PIMQTT::MessageMutable::setDuplicate(bool yes) { + m_is_duplicate = yes; + return *this; +} + + +PIMQTT::MessageMutable & PIMQTT::MessageMutable::setQos(QoS qos) { + m_qos = qos; + return *this; +} + + +PIMQTT::MessageMutable & PIMQTT::MessageMutable::setID(int id) { + m_msg_id = id; + return *this; +} diff --git a/libs/main/mqtt_common/pimqtttypes.h b/libs/main/mqtt_common/pimqtttypes.h index 641aff25..17c90361 100644 --- a/libs/main/mqtt_common/pimqtttypes.h +++ b/libs/main/mqtt_common/pimqtttypes.h @@ -26,7 +26,7 @@ #define pimqtttypes_h #include "pip_export.h" -#include "pistring.h" +#include "pistringlist.h" //! \~english Namespace with shared MQTT data types. @@ -51,14 +51,113 @@ enum class Error { NotAuthorized = 5 /** */, }; -struct PIP_EXPORT Message { - bool isValid() const { return topic.isNotEmpty(); } - PIString topic; - PIByteArray payload; - PIMap properties; // for v5 - QoS qos = QoS::Level0; - int msg_id = 0; - bool is_duplicate = false; + +//! \~\ingroup MQTT +//! \~\brief +//! \~english Immutable MQTT message. +//! \~russian Неизменяемое MQTT-сообщение. +class PIP_EXPORT MessageConst { +public: + //! \~english Returns if the topic is not empty. + //! \~russian Возвращает не пустой ли топик. + bool isValid() const { return m_topic.isNotEmpty(); } + + //! \~english Returns the topic. + //! \~russian Возвращает топик. + const PIString & topic() const { return m_topic; } + + //! \~english Returns the topic split into non-empty components. + //! \~russian Возвращает топик, разбитый на непустые компоненты. + PIStringList topicList() const { return m_topic.split('/').removeAll({}); } + + //! \~english Returns the message body. + //! \~russian Возвращает тело сообщения. + const PIByteArray & body() const { return m_payload; } + + //! \~english Returns the message body. + //! \~russian Возвращает тело сообщения. + const PIByteArray & payload() const { return m_payload; } + + //! \~english Returns extracted path arguments. + //! \~russian Возвращает извлеченные аргументы пути. + const PIMap & pathArguments() const { return m_path_arguments; } + + //! \~english Returns map of properties. + //! \~russian Возвращает карту свойств. + const PIMap & properties() const { return m_properties; } + + + //! \~english Returns \c true for . + //! \~russian Возвращает \c true для . + bool isDuplicate() const { return m_is_duplicate; } + + //! \~english Returns QoS. + //! \~russian Возвращает QoS. + QoS qos() const { return m_qos; } + + //! \~english Returns message ID. + //! \~russian Возвращает ID сообщения. + int ID() const { return m_msg_id; } + +protected: + PIString m_topic; + PIMap m_path_arguments; + PIByteArray m_payload; + PIMap m_properties; // for v5 + QoS m_qos = QoS::Level0; + int m_msg_id = 0; + bool m_is_duplicate = false; +}; + + +//! \~\ingroup MQTT +//! \~\brief +//! \~english Mutable MQTT message. +//! \~russian Изменяемое MQTT-сообщение. +class PIP_EXPORT MessageMutable: public MessageConst { +public: + //! \~english Sets the topic. + //! \~russian Устанавливает топик. + MessageMutable & setTopic(PIString t); + + //! \~english Sets the message body. + //! \~russian Устанавливает тело сообщения. + MessageMutable & setBody(PIByteArray b); + + //! \~english Sets the message body. + //! \~russian Устанавливает тело сообщения. + MessageMutable & setPayload(PIByteArray b) { return setBody(b); } + + + //! \~english Returns path arguments. + //! \~russian Возвращает аргументы пути. + const PIMap & pathArguments() const { return m_path_arguments; } + + //! \~english Returns a modifiable map of path arguments. + //! \~russian Возвращает изменяемую карту аргументов пути. + PIMap & pathArguments() { return m_path_arguments; } + + + //! \~english Returns map of properties. + //! \~russian Возвращает карту свойств. + const PIMap & properties() const { return m_properties; } + + //! \~english Returns a modifiable map of properties. + //! \~russian Возвращает изменяемую карту свойств. + PIMap & properties() { return m_properties; } + + + //! \~english Returns \c true for . + //! \~russian Возвращает \c true для . + MessageMutable & setDuplicate(bool yes); + + //! \~english Returns QoS. + //! \~russian Возвращает QoS. + MessageMutable & setQos(QoS qos); + + //! \~english Returns message ID. + //! \~russian Возвращает ID сообщения. + MessageMutable & setID(int id); }; diff --git a/libs/mqtt_client/pimqttclient.cpp b/libs/mqtt_client/pimqttclient.cpp index d23ad227..6518d7b2 100644 --- a/libs/mqtt_client/pimqttclient.cpp +++ b/libs/mqtt_client/pimqttclient.cpp @@ -41,12 +41,12 @@ PRIVATE_DEFINITION_START(PIMQTT::Client) } static int messageArrived_callback(void * context, char * topicName, int topicLen, MQTTClient_message * message) { - PIMQTT::Message msg; - msg.topic = PIString::fromUTF8(topicName); - msg.payload = PIByteArray(message->payload, message->payloadlen); - msg.msg_id = message->msgid; - msg.is_duplicate = message->dup != 0; - msg.qos = static_cast(message->qos); + PIMQTT::MessageMutable msg; + msg.setTopic(PIString::fromUTF8(topicName)); + msg.setPayload(PIByteArray(message->payload, message->payloadlen)); + msg.setID(message->msgid); + msg.setDuplicate(message->dup != 0); + msg.setQos(static_cast(message->qos)); MQTTClient_freeMessage(&message); MQTTClient_free(topicName); ((PIMQTT::Client *)context)->mqtt_messageArrived(msg); @@ -101,12 +101,17 @@ void PIMQTT::Client::unsubscribe(const PIString & topic) { void PIMQTT::Client::publish(const PIString & topic, const PIByteArray & msg, QoS qos) { + MessageMutable m; + m.setTopic(topic); + m.setPayload(msg); + m.setQos(qos); + publish(m); +} + + +void PIMQTT::Client::publish(const MessageConst & msg) { if (is_destoying) return; - Message m; - m.topic = topic; - m.payload = msg; - m.qos = qos; - worker->enqueueTask([this, m] { publishInternal(m); }); + worker->enqueueTask([this, msg] { publishInternal(msg); }); } @@ -121,7 +126,7 @@ void PIMQTT::Client::mqtt_connectionLost() { void PIMQTT::Client::mqtt_deliveryComplete(int token) {} -void PIMQTT::Client::mqtt_messageArrived(const Message & msg) { +void PIMQTT::Client::mqtt_messageArrived(const MessageConst & msg) { piCoutObj << "mqtt_messageArrived"; received(msg); } @@ -169,13 +174,13 @@ inline void PIMQTT::Client::disconnectInternal() { } -void PIMQTT::Client::publishInternal(const Message & m) { +void PIMQTT::Client::publishInternal(const MessageConst & m) { if (!PRIVATE->client) return; MQTTClient_message pubmsg = MQTTClient_message_initializer; - pubmsg.payload = const_cast(static_cast(m.payload.data())); - pubmsg.payloadlen = m.payload.size_s(); - pubmsg.qos = static_cast(m.qos); - int ret = MQTTClient_publishMessage(PRIVATE->client, m.topic.dataAscii(), &pubmsg, nullptr); + pubmsg.payload = const_cast(static_cast(m.payload().data())); + pubmsg.payloadlen = m.payload().size_s(); + pubmsg.qos = static_cast(m.qos()); + int ret = MQTTClient_publishMessage(PRIVATE->client, m.topic().dataAscii(), &pubmsg, nullptr); if (ret != MQTTCLIENT_SUCCESS) { piCoutObj << "Failed to publishMessage, code" << ret; } diff --git a/main.cpp b/main.cpp index 2c3aeb8c..fa83d4df 100644 --- a/main.cpp +++ b/main.cpp @@ -35,7 +35,9 @@ int main(int argc, char * argv[]) { piCout << "disconnected code" << (int)code; cl.connect("localhost", "PIP"); }); - CONNECTL(&cl, received, [](const PIMQTT::Message & message) { piCout << "received" << message.topic << message.payload.size(); }); + CONNECTL(&cl, received, [](const PIMQTT::MessageConst & message) { + piCout << "received" << message.topic() << message.payload().size(); + }); cl.connect("localhost", "PIP");