Compare commits
8 Commits
mqtt_clien
...
22b47799dc
| Author | SHA1 | Date | |
|---|---|---|---|
| 22b47799dc | |||
| 9076cc749a | |||
| d27e796595 | |||
| aadf3375a6 | |||
| 089e241a67 | |||
| c05fe46d04 | |||
| 1ef4103cf9 | |||
| 7e48df7e01 |
@@ -7,7 +7,7 @@ project(PIP)
|
||||
set(PIP_MAJOR 5)
|
||||
set(PIP_MINOR 7)
|
||||
set(PIP_REVISION 0)
|
||||
set(PIP_SUFFIX _alpha)
|
||||
set(PIP_SUFFIX _beta)
|
||||
set(PIP_COMPANY SHS)
|
||||
set(PIP_DOMAIN org.SHS)
|
||||
|
||||
|
||||
@@ -1220,6 +1220,22 @@ public:
|
||||
return result;
|
||||
}
|
||||
|
||||
//! \~english Applies a function to each element (read-only).
|
||||
//! \~russian Применяет функцию к каждому элементу (только чтение).
|
||||
//! \details
|
||||
//! \~english The function can't modify the elements.
|
||||
//! \~russian Функция не может изменять элементы.
|
||||
//! \~\sa forEach (modifiable), PIVector::forEach()
|
||||
inline void forEach(std::function<void(const T &)> func) const { mat.forEach(func); }
|
||||
|
||||
//! \~english Applies a function to each element (modifiable).
|
||||
//! \~russian Применяет функцию к каждому элементу (с возможностью изменения).
|
||||
//! \details
|
||||
//! \~english The function can modify the elements.
|
||||
//! \~russian Функция может изменять элементы.
|
||||
//! \~\sa forEach (read-only), PIVector::forEach()
|
||||
inline PIVector2D<T> & forEach(std::function<void(T &)> func) { mat.forEach(func); return *this; }
|
||||
|
||||
//! \~english Applies a function to each element and returns a new 2D array of a different type.
|
||||
//! \~russian Применяет функцию к каждому элементу и возвращает новый двумерный массив другого типа.
|
||||
//! \details
|
||||
|
||||
@@ -132,6 +132,10 @@ public:
|
||||
//! \~russian Возвращает полярный угол в градусах.
|
||||
double angleDeg() const { return toDeg(atan2(y, x)); }
|
||||
|
||||
//! \~english Returns |x| + |y|.
|
||||
//! \~russian Возвращает |x| + |y|.
|
||||
Type manhattanLength() const { return piAbs<Type>(x) + piAbs<Type>(y); }
|
||||
|
||||
//! \~english Returns polar form with radius in \a x and angle in \a y.
|
||||
//! \~russian Возвращает полярную форму, где радиус хранится в \a x, а угол в \a y.
|
||||
PIPoint<Type> toPolar(bool isDeg = false) const { return PIPoint<Type>(sqrt(x * x + y * y), isDeg ? angleDeg() : angleRad()); }
|
||||
@@ -165,31 +169,31 @@ public:
|
||||
|
||||
//! \~english Returns sum of two points.
|
||||
//! \~russian Возвращает сумму двух точек.
|
||||
PIPoint<Type> operator+(const PIPoint<Type> & p) { return PIPoint<Type>(x + p.x, y + p.y); }
|
||||
PIPoint<Type> operator+(const PIPoint<Type> & p) const { return PIPoint<Type>(x + p.x, y + p.y); }
|
||||
|
||||
//! \~english Returns point with `p` added to both coordinates.
|
||||
//! \~russian Возвращает точку с добавлением `p` к обеим координатам.
|
||||
PIPoint<Type> operator+(const Type & p) { return PIPoint<Type>(x + p, y + p); }
|
||||
PIPoint<Type> operator+(const Type & p) const { return PIPoint<Type>(x + p, y + p); }
|
||||
|
||||
//! \~english Returns difference between two points.
|
||||
//! \~russian Возвращает разность двух точек.
|
||||
PIPoint<Type> operator-(const PIPoint<Type> & p) { return PIPoint<Type>(x - p.x, y - p.y); }
|
||||
PIPoint<Type> operator-(const PIPoint<Type> & p) const { return PIPoint<Type>(x - p.x, y - p.y); }
|
||||
|
||||
//! \~english Returns point with `p` subtracted from both coordinates.
|
||||
//! \~russian Возвращает точку с вычитанием `p` из обеих координат.
|
||||
PIPoint<Type> operator-(const Type & p) { return PIPoint<Type>(x - p, y - p); }
|
||||
PIPoint<Type> operator-(const Type & p) const { return PIPoint<Type>(x - p, y - p); }
|
||||
|
||||
//! \~english Returns point with inverted coordinates.
|
||||
//! \~russian Возвращает точку с инвертированными координатами.
|
||||
PIPoint<Type> operator-() { return PIPoint<Type>(-x, -y); }
|
||||
PIPoint<Type> operator-() const { return PIPoint<Type>(-x, -y); }
|
||||
|
||||
//! \~english Returns point scaled by `v`.
|
||||
//! \~russian Возвращает точку, масштабированную на `v`.
|
||||
PIPoint<Type> operator*(Type v) { return PIPoint<Type>(x * v, y * v); }
|
||||
PIPoint<Type> operator*(Type v) const { return PIPoint<Type>(x * v, y * v); }
|
||||
|
||||
//! \~english Returns point divided by `v`.
|
||||
//! \~russian Возвращает точку, деленную на `v`.
|
||||
PIPoint<Type> operator/(Type v) { return PIPoint<Type>(x / v, y / v); }
|
||||
PIPoint<Type> operator/(Type v) const { return PIPoint<Type>(x / v, y / v); }
|
||||
|
||||
//! \~english Checks whether point coordinates are equal.
|
||||
//! \~russian Проверяет равенство координат точек.
|
||||
@@ -200,6 +204,35 @@ public:
|
||||
bool operator!=(const PIPoint<Type> & p) const { return (x != p.x || y != p.y); }
|
||||
};
|
||||
|
||||
template<typename Type>
|
||||
inline PIPoint<Type> operator+(const Type & f, const PIPoint<Type> & s) {
|
||||
return PIPoint<Type>(f + s.x, f + s.y);
|
||||
}
|
||||
|
||||
template<typename Type>
|
||||
inline PIPoint<Type> operator-(const Type & f, const PIPoint<Type> & s) {
|
||||
return PIPoint<Type>(f - s.x, f - s.y);
|
||||
}
|
||||
|
||||
template<typename Type>
|
||||
inline PIPoint<Type> operator*(const Type & f, const PIPoint<Type> & s) {
|
||||
return PIPoint<Type>(f * s.x, f * s.y);
|
||||
}
|
||||
|
||||
template<typename Type>
|
||||
inline PIPoint<Type> operator/(const Type & f, const PIPoint<Type> & s) {
|
||||
return PIPoint<Type>(f / s.x, f / s.y);
|
||||
}
|
||||
|
||||
|
||||
//! \~english Returns {|x|, |y|}.
|
||||
//! \~russian Возвращает {|x|, |y|}.
|
||||
template<typename Type>
|
||||
inline constexpr PIPoint<Type> piAbs(const PIPoint<Type> & v) {
|
||||
return PIPoint<Type>(piAbs<Type>(v.x), piAbs<Type>(v.y));
|
||||
}
|
||||
|
||||
|
||||
//! \relatesalso PICout
|
||||
//! \~english Writes point coordinates to \a PICout.
|
||||
//! \~russian Выводит координаты точки в \a PICout.
|
||||
|
||||
@@ -28,8 +28,6 @@
|
||||
#include "piliterals_time.h"
|
||||
#include "pimqtttypes.h"
|
||||
#include "pip_mqtt_client_export.h"
|
||||
#include "piprotectedvariable.h"
|
||||
#include "pithread.h"
|
||||
#include "pithreadpoolworker.h"
|
||||
|
||||
|
||||
@@ -52,6 +50,7 @@ public:
|
||||
void unsubscribe(const PIString & topic);
|
||||
|
||||
void publish(const PIString & topic, const PIByteArray & msg, QoS qos = QoS::Level0);
|
||||
void publish(const MessageConst & msg);
|
||||
void unsubscribeAll() { unsubscribe("#"); }
|
||||
|
||||
bool isConnecting() const { return m_status == Connecting; }
|
||||
@@ -59,18 +58,17 @@ public:
|
||||
|
||||
EVENT0(connected);
|
||||
EVENT1(disconnected, PIMQTT::Error, code);
|
||||
EVENT1(received, PIMQTT::Message, message);
|
||||
EVENT1(received, PIMQTT::MessageConst, message);
|
||||
|
||||
private:
|
||||
NO_COPY_CLASS(Client)
|
||||
|
||||
PRIVATE_DECLARATION(PIP_EXPORT)
|
||||
PRIVATE_DECLARATION(PIP_MQTT_CLIENT_EXPORT)
|
||||
|
||||
enum Status {
|
||||
Idle,
|
||||
Connecting,
|
||||
Connected,
|
||||
Disconnecting,
|
||||
};
|
||||
|
||||
struct ConnectInfo {
|
||||
@@ -86,10 +84,11 @@ private:
|
||||
|
||||
void mqtt_connectionLost();
|
||||
void mqtt_deliveryComplete(int token);
|
||||
void mqtt_messageArrived(const Message & msg);
|
||||
void mqtt_messageArrived(const MessageConst & msg);
|
||||
|
||||
void connectInternal();
|
||||
void publishInternal(const Message & m);
|
||||
void connectInternal(const ConnectInfo & ci);
|
||||
void disconnectInternal();
|
||||
void publishInternal(const MessageConst & m);
|
||||
void subscribeInternal(const Subscribe & sub);
|
||||
void unsubscribeInternal(const PIString & topic);
|
||||
void destroy();
|
||||
@@ -97,15 +96,10 @@ private:
|
||||
|
||||
void run();
|
||||
|
||||
PIProtectedVariable<PIQueue<Message>> pub_queue;
|
||||
PIProtectedVariable<PIQueue<Subscribe>> sub_queue;
|
||||
PIProtectedVariable<PIQueue<PIString>> unsub_queue;
|
||||
PIProtectedVariable<ConnectInfo> connect_info;
|
||||
PIThreadNotifier notifier;
|
||||
std::atomic_int m_status = {Idle};
|
||||
std::atomic_bool is_destoying = {false};
|
||||
PISystemTime connect_timeout = 10_s;
|
||||
PIThread * thread = nullptr;
|
||||
// PIThreadPoolWorker * worker = nullptr;
|
||||
PIThreadPoolWorker * worker = nullptr;
|
||||
};
|
||||
|
||||
|
||||
|
||||
31
libs/main/mqtt_common/pimqtttypes.cpp
Normal file
31
libs/main/mqtt_common/pimqtttypes.cpp
Normal file
@@ -0,0 +1,31 @@
|
||||
#include "pimqtttypes.h"
|
||||
|
||||
|
||||
PIMQTT::MessageMutable & PIMQTT::MessageMutable::setTopic(PIString t) {
|
||||
m_topic = t;
|
||||
return *this;
|
||||
}
|
||||
|
||||
|
||||
PIMQTT::MessageMutable & PIMQTT::MessageMutable::setBody(PIByteArray b) {
|
||||
m_payload = b;
|
||||
return *this;
|
||||
}
|
||||
|
||||
|
||||
PIMQTT::MessageMutable & PIMQTT::MessageMutable::setDuplicate(bool yes) {
|
||||
m_is_duplicate = yes;
|
||||
return *this;
|
||||
}
|
||||
|
||||
|
||||
PIMQTT::MessageMutable & PIMQTT::MessageMutable::setQos(QoS qos) {
|
||||
m_qos = qos;
|
||||
return *this;
|
||||
}
|
||||
|
||||
|
||||
PIMQTT::MessageMutable & PIMQTT::MessageMutable::setID(int id) {
|
||||
m_msg_id = id;
|
||||
return *this;
|
||||
}
|
||||
@@ -26,7 +26,7 @@
|
||||
#define pimqtttypes_h
|
||||
|
||||
#include "pip_export.h"
|
||||
#include "pistring.h"
|
||||
#include "pistringlist.h"
|
||||
|
||||
|
||||
//! \~english Namespace with shared MQTT data types.
|
||||
@@ -51,14 +51,113 @@ enum class Error {
|
||||
NotAuthorized = 5 /** */,
|
||||
};
|
||||
|
||||
struct PIP_EXPORT Message {
|
||||
bool isValid() const { return topic.isNotEmpty(); }
|
||||
PIString topic;
|
||||
PIByteArray payload;
|
||||
PIMap<int, PIString> properties; // for v5
|
||||
QoS qos = QoS::Level0;
|
||||
int msg_id = 0;
|
||||
bool is_duplicate = false;
|
||||
|
||||
//! \~\ingroup MQTT
|
||||
//! \~\brief
|
||||
//! \~english Immutable MQTT message.
|
||||
//! \~russian Неизменяемое MQTT-сообщение.
|
||||
class PIP_EXPORT MessageConst {
|
||||
public:
|
||||
//! \~english Returns if the topic is not empty.
|
||||
//! \~russian Возвращает не пустой ли топик.
|
||||
bool isValid() const { return m_topic.isNotEmpty(); }
|
||||
|
||||
//! \~english Returns the topic.
|
||||
//! \~russian Возвращает топик.
|
||||
const PIString & topic() const { return m_topic; }
|
||||
|
||||
//! \~english Returns the topic split into non-empty components.
|
||||
//! \~russian Возвращает топик, разбитый на непустые компоненты.
|
||||
PIStringList topicList() const { return m_topic.split('/').removeAll({}); }
|
||||
|
||||
//! \~english Returns the message body.
|
||||
//! \~russian Возвращает тело сообщения.
|
||||
const PIByteArray & body() const { return m_payload; }
|
||||
|
||||
//! \~english Returns the message body.
|
||||
//! \~russian Возвращает тело сообщения.
|
||||
const PIByteArray & payload() const { return m_payload; }
|
||||
|
||||
//! \~english Returns extracted path arguments.
|
||||
//! \~russian Возвращает извлеченные аргументы пути.
|
||||
const PIMap<PIString, PIString> & pathArguments() const { return m_path_arguments; }
|
||||
|
||||
//! \~english Returns map of properties.
|
||||
//! \~russian Возвращает карту свойств.
|
||||
const PIMap<int, PIString> & properties() const { return m_properties; }
|
||||
|
||||
|
||||
//! \~english Returns \c true for .
|
||||
//! \~russian Возвращает \c true для .
|
||||
bool isDuplicate() const { return m_is_duplicate; }
|
||||
|
||||
//! \~english Returns QoS.
|
||||
//! \~russian Возвращает QoS.
|
||||
QoS qos() const { return m_qos; }
|
||||
|
||||
//! \~english Returns message ID.
|
||||
//! \~russian Возвращает ID сообщения.
|
||||
int ID() const { return m_msg_id; }
|
||||
|
||||
protected:
|
||||
PIString m_topic;
|
||||
PIMap<PIString, PIString> m_path_arguments;
|
||||
PIByteArray m_payload;
|
||||
PIMap<int, PIString> m_properties; // for v5
|
||||
QoS m_qos = QoS::Level0;
|
||||
int m_msg_id = 0;
|
||||
bool m_is_duplicate = false;
|
||||
};
|
||||
|
||||
|
||||
//! \~\ingroup MQTT
|
||||
//! \~\brief
|
||||
//! \~english Mutable MQTT message.
|
||||
//! \~russian Изменяемое MQTT-сообщение.
|
||||
class PIP_EXPORT MessageMutable: public MessageConst {
|
||||
public:
|
||||
//! \~english Sets the topic.
|
||||
//! \~russian Устанавливает топик.
|
||||
MessageMutable & setTopic(PIString t);
|
||||
|
||||
//! \~english Sets the message body.
|
||||
//! \~russian Устанавливает тело сообщения.
|
||||
MessageMutable & setBody(PIByteArray b);
|
||||
|
||||
//! \~english Sets the message body.
|
||||
//! \~russian Устанавливает тело сообщения.
|
||||
MessageMutable & setPayload(PIByteArray b) { return setBody(b); }
|
||||
|
||||
|
||||
//! \~english Returns path arguments.
|
||||
//! \~russian Возвращает аргументы пути.
|
||||
const PIMap<PIString, PIString> & pathArguments() const { return m_path_arguments; }
|
||||
|
||||
//! \~english Returns a modifiable map of path arguments.
|
||||
//! \~russian Возвращает изменяемую карту аргументов пути.
|
||||
PIMap<PIString, PIString> & pathArguments() { return m_path_arguments; }
|
||||
|
||||
|
||||
//! \~english Returns map of properties.
|
||||
//! \~russian Возвращает карту свойств.
|
||||
const PIMap<int, PIString> & properties() const { return m_properties; }
|
||||
|
||||
//! \~english Returns a modifiable map of properties.
|
||||
//! \~russian Возвращает изменяемую карту свойств.
|
||||
PIMap<int, PIString> & properties() { return m_properties; }
|
||||
|
||||
|
||||
//! \~english Returns \c true for .
|
||||
//! \~russian Возвращает \c true для .
|
||||
MessageMutable & setDuplicate(bool yes);
|
||||
|
||||
//! \~english Returns QoS.
|
||||
//! \~russian Возвращает QoS.
|
||||
MessageMutable & setQos(QoS qos);
|
||||
|
||||
//! \~english Returns message ID.
|
||||
//! \~russian Возвращает ID сообщения.
|
||||
MessageMutable & setID(int id);
|
||||
};
|
||||
|
||||
|
||||
|
||||
@@ -41,12 +41,12 @@ PRIVATE_DEFINITION_START(PIMQTT::Client)
|
||||
}
|
||||
|
||||
static int messageArrived_callback(void * context, char * topicName, int topicLen, MQTTClient_message * message) {
|
||||
PIMQTT::Message msg;
|
||||
msg.topic = PIString::fromUTF8(topicName);
|
||||
msg.payload = PIByteArray(message->payload, message->payloadlen);
|
||||
msg.msg_id = message->msgid;
|
||||
msg.is_duplicate = message->dup != 0;
|
||||
msg.qos = static_cast<PIMQTT::QoS>(message->qos);
|
||||
PIMQTT::MessageMutable msg;
|
||||
msg.setTopic(PIString::fromUTF8(topicName));
|
||||
msg.setPayload(PIByteArray(message->payload, message->payloadlen));
|
||||
msg.setID(message->msgid);
|
||||
msg.setDuplicate(message->dup != 0);
|
||||
msg.setQos(static_cast<PIMQTT::QoS>(message->qos));
|
||||
MQTTClient_freeMessage(&message);
|
||||
MQTTClient_free(topicName);
|
||||
((PIMQTT::Client *)context)->mqtt_messageArrived(msg);
|
||||
@@ -57,57 +57,61 @@ PRIVATE_DEFINITION_END(PIMQTT::Client)
|
||||
|
||||
|
||||
PIMQTT::Client::Client() {
|
||||
thread = new PIThread([this] { run(); });
|
||||
thread->start();
|
||||
// worker = new PIThreadPoolWorker(1);
|
||||
// worker->start();
|
||||
worker = new PIThreadPoolWorker(1);
|
||||
worker->start();
|
||||
}
|
||||
|
||||
|
||||
PIMQTT::Client::~Client() {
|
||||
// worker->stopAndWait();
|
||||
// piDeleteSafety(worker);
|
||||
thread->stop();
|
||||
is_destoying = true;
|
||||
worker->clearTasks();
|
||||
disconnect();
|
||||
thread->waitForFinish();
|
||||
piDeleteSafety(thread);
|
||||
worker->waitForTasks();
|
||||
worker->stopAndWait();
|
||||
piDeleteSafety(worker);
|
||||
}
|
||||
|
||||
|
||||
void PIMQTT::Client::connect(const PIString & address, const PIString & client, const PIString & username, const PIString & password) {
|
||||
auto ci = connect_info.getRef();
|
||||
ci->address = address;
|
||||
ci->clientID = client;
|
||||
ci->username = username;
|
||||
ci->password = password;
|
||||
changeStatus(Connecting);
|
||||
if (is_destoying) return;
|
||||
ConnectInfo ci;
|
||||
ci.address = address;
|
||||
ci.clientID = client;
|
||||
ci.username = username;
|
||||
ci.password = password;
|
||||
worker->enqueueTask([this, ci] { connectInternal(ci); });
|
||||
}
|
||||
|
||||
|
||||
void PIMQTT::Client::disconnect() {
|
||||
changeStatus(Disconnecting);
|
||||
worker->enqueueTask([this] { disconnectInternal(); });
|
||||
}
|
||||
|
||||
|
||||
void PIMQTT::Client::subscribe(const PIString & topic, QoS qos) {
|
||||
sub_queue.getRef()->enqueue({topic, qos});
|
||||
notifier.notify();
|
||||
if (is_destoying) return;
|
||||
worker->enqueueTask([this, topic, qos] { subscribeInternal({topic, qos}); });
|
||||
}
|
||||
|
||||
|
||||
void PIMQTT::Client::unsubscribe(const PIString & topic) {
|
||||
unsub_queue.getRef()->enqueue(topic);
|
||||
notifier.notify();
|
||||
if (is_destoying) return;
|
||||
worker->enqueueTask([this, topic] { unsubscribeInternal(topic); });
|
||||
}
|
||||
|
||||
|
||||
void PIMQTT::Client::publish(const PIString & topic, const PIByteArray & msg, QoS qos) {
|
||||
Message m;
|
||||
m.topic = topic;
|
||||
m.payload = msg;
|
||||
m.qos = qos;
|
||||
pub_queue.getRef()->enqueue(m);
|
||||
notifier.notify();
|
||||
MessageMutable m;
|
||||
m.setTopic(topic);
|
||||
m.setPayload(msg);
|
||||
m.setQos(qos);
|
||||
publish(m);
|
||||
}
|
||||
|
||||
|
||||
void PIMQTT::Client::publish(const MessageConst & msg) {
|
||||
if (is_destoying) return;
|
||||
worker->enqueueTask([this, msg] { publishInternal(msg); });
|
||||
}
|
||||
|
||||
|
||||
@@ -122,16 +126,16 @@ void PIMQTT::Client::mqtt_connectionLost() {
|
||||
void PIMQTT::Client::mqtt_deliveryComplete(int token) {}
|
||||
|
||||
|
||||
void PIMQTT::Client::mqtt_messageArrived(const Message & msg) {
|
||||
void PIMQTT::Client::mqtt_messageArrived(const MessageConst & msg) {
|
||||
piCoutObj << "mqtt_messageArrived";
|
||||
received(msg);
|
||||
}
|
||||
|
||||
|
||||
void PIMQTT::Client::connectInternal() {
|
||||
void PIMQTT::Client::connectInternal(const ConnectInfo & ci) {
|
||||
destroy();
|
||||
changeStatus(Connecting);
|
||||
PRIVATE->connected = false;
|
||||
auto ci = connect_info.get();
|
||||
PINetworkAddress net_addr = PINetworkAddress::resolve(ci.address);
|
||||
if (net_addr.port() == 0) net_addr.setPort(1883);
|
||||
PIString server_uri = "tcp://" + net_addr.toString();
|
||||
@@ -161,13 +165,22 @@ void PIMQTT::Client::connectInternal() {
|
||||
}
|
||||
|
||||
|
||||
void PIMQTT::Client::publishInternal(const Message & m) {
|
||||
inline void PIMQTT::Client::disconnectInternal() {
|
||||
bool was_connected = PRIVATE->connected;
|
||||
changeStatus(Idle);
|
||||
destroy();
|
||||
if (was_connected) disconnected(Error::Unknown);
|
||||
PRIVATE->connected = false;
|
||||
}
|
||||
|
||||
|
||||
void PIMQTT::Client::publishInternal(const MessageConst & m) {
|
||||
if (!PRIVATE->client) return;
|
||||
MQTTClient_message pubmsg = MQTTClient_message_initializer;
|
||||
pubmsg.payload = const_cast<void *>(static_cast<const void *>(m.payload.data()));
|
||||
pubmsg.payloadlen = m.payload.size_s();
|
||||
pubmsg.qos = static_cast<int>(m.qos);
|
||||
int ret = MQTTClient_publishMessage(PRIVATE->client, m.topic.dataAscii(), &pubmsg, nullptr);
|
||||
pubmsg.payload = const_cast<void *>(static_cast<const void *>(m.payload().data()));
|
||||
pubmsg.payloadlen = m.payload().size_s();
|
||||
pubmsg.qos = static_cast<int>(m.qos());
|
||||
int ret = MQTTClient_publishMessage(PRIVATE->client, m.topic().dataAscii(), &pubmsg, nullptr);
|
||||
if (ret != MQTTCLIENT_SUCCESS) {
|
||||
piCoutObj << "Failed to publishMessage, code" << ret;
|
||||
}
|
||||
@@ -203,57 +216,4 @@ void PIMQTT::Client::destroy() {
|
||||
|
||||
void PIMQTT::Client::changeStatus(Status s) {
|
||||
m_status = s;
|
||||
notifier.notify();
|
||||
}
|
||||
|
||||
|
||||
void PIMQTT::Client::run() {
|
||||
notifier.wait();
|
||||
piCoutObj << "run" << (int)m_status;
|
||||
if (thread->isStopping() || m_status == Disconnecting) {
|
||||
bool was_connected = PRIVATE->connected;
|
||||
changeStatus(Idle);
|
||||
destroy();
|
||||
if (was_connected) disconnected(Error::Unknown);
|
||||
PRIVATE->connected = false;
|
||||
return;
|
||||
}
|
||||
if (m_status == Connecting) {
|
||||
connectInternal();
|
||||
return;
|
||||
}
|
||||
if (m_status == Connected) {
|
||||
Subscribe sub;
|
||||
{
|
||||
auto ref = sub_queue.getRef();
|
||||
if (!ref->isEmpty()) sub = ref->dequeue();
|
||||
}
|
||||
if (sub.topic.isNotEmpty()) {
|
||||
subscribeInternal(sub);
|
||||
notifier.notify();
|
||||
return;
|
||||
}
|
||||
|
||||
PIString topic;
|
||||
{
|
||||
auto ref = unsub_queue.getRef();
|
||||
if (!ref->isEmpty()) topic = ref->dequeue();
|
||||
}
|
||||
if (topic.isNotEmpty()) {
|
||||
unsubscribeInternal(topic);
|
||||
notifier.notify();
|
||||
return;
|
||||
}
|
||||
|
||||
Message msg;
|
||||
{
|
||||
auto ref = pub_queue.getRef();
|
||||
if (!ref->isEmpty()) msg = ref->dequeue();
|
||||
}
|
||||
if (msg.isValid()) {
|
||||
publishInternal(msg);
|
||||
notifier.notify();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
4
main.cpp
4
main.cpp
@@ -35,7 +35,9 @@ int main(int argc, char * argv[]) {
|
||||
piCout << "disconnected code" << (int)code;
|
||||
cl.connect("localhost", "PIP");
|
||||
});
|
||||
CONNECTL(&cl, received, [](const PIMQTT::Message & message) { piCout << "received" << message.topic << message.payload.size(); });
|
||||
CONNECTL(&cl, received, [](const PIMQTT::MessageConst & message) {
|
||||
piCout << "received" << message.topic() << message.payload().size();
|
||||
});
|
||||
|
||||
cl.connect("localhost", "PIP");
|
||||
|
||||
|
||||
Reference in New Issue
Block a user