afb4ae8126
change subscription logic - now keep subscriptions independently from connecting state. No unregisters on disconnect, but resubscriptions on connect. So one-time subscription on app start and just connect() on lost connection - all subscriptions keeps
402 lines
12 KiB
C++
402 lines
12 KiB
C++
/*
|
|
PIP - Platform Independent Primitives
|
|
MQTT Client
|
|
Ivan Pelipenko peri4ko@yandex.ru
|
|
|
|
This program is free software: you can redistribute it and/or modify
|
|
it under the terms of the GNU Lesser General Public License as published by
|
|
the Free Software Foundation, either version 3 of the License, or
|
|
(at your option) any later version.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU Lesser General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Lesser General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "pimqttclient.h"
|
|
|
|
#include "MQTTClient.h"
|
|
#include "piliterals_string.h"
|
|
#include "piserverendpoint_p.h"
|
|
|
|
|
|
struct PIMQTT::Client::Endpoint: public PIHTTP::ServerEndpoint {
|
|
PIMQTT::Client::MessageFunction function;
|
|
};
|
|
|
|
struct TopicUsage {
|
|
int counter = 0;
|
|
PIMQTT::QoS qos = PIMQTT::QoS::Level1;
|
|
};
|
|
|
|
struct EndpointsStorage {
|
|
PIMap<uint, PIMap<PIString, PIVector<PIMQTT::Client::Endpoint>>> prepared; // [priority][topic] -> endpoints
|
|
PIMap<PIString, TopicUsage> topics_binded; // [topic] -> TopicUsage
|
|
};
|
|
|
|
|
|
PIString topicFromEndpoint(const PIMQTT::Client::Endpoint & e) {
|
|
PIStringList ret;
|
|
for (const auto & i: e.prepared_path) {
|
|
switch (i.type) {
|
|
case PIHTTP::ServerEndpoint::PathElement::Type::Fixed: ret << i.source; break;
|
|
case PIHTTP::ServerEndpoint::PathElement::Type::AnyMany: ret << "#"_a; break;
|
|
default: ret << "+"_a; break;
|
|
}
|
|
}
|
|
return ret.join('/');
|
|
}
|
|
|
|
|
|
STATIC_INITIALIZER_BEGIN
|
|
MQTTClient_init_options opts = MQTTClient_init_options_initializer;
|
|
MQTTClient_global_init(&opts);
|
|
STATIC_INITIALIZER_END
|
|
|
|
|
|
PRIVATE_DEFINITION_START(PIMQTT::Client)
|
|
MQTTClient client = nullptr;
|
|
bool connected = false;
|
|
|
|
PIProtectedVariable<EndpointsStorage> endpoints;
|
|
|
|
static void connectionLost_callback(void * context, char *) {
|
|
((PIMQTT::Client *)context)->mqtt_connectionLost();
|
|
}
|
|
|
|
static void deliveryComplete_callback(void * context, MQTTClient_deliveryToken dt) {
|
|
((PIMQTT::Client *)context)->mqtt_deliveryComplete(dt);
|
|
}
|
|
|
|
static int messageArrived_callback(void * context, char * topicName, int topicLen, MQTTClient_message * message) {
|
|
PIMQTT::MessageMutable msg;
|
|
msg.setTopic(PIString::fromUTF8(topicName));
|
|
msg.setPayload(PIByteArray(message->payload, message->payloadlen));
|
|
msg.setID(message->msgid);
|
|
msg.setDuplicate(message->dup != 0);
|
|
msg.setQos(static_cast<PIMQTT::QoS>(message->qos));
|
|
MQTTClient_freeMessage(&message);
|
|
MQTTClient_free(topicName);
|
|
((PIMQTT::Client *)context)->mqtt_messageArrived(msg);
|
|
return 1;
|
|
}
|
|
|
|
PRIVATE_DEFINITION_END(PIMQTT::Client)
|
|
|
|
|
|
PIMQTT::Client::Client() {
|
|
worker = new PIThreadPoolWorker(1);
|
|
worker->start();
|
|
}
|
|
|
|
|
|
PIMQTT::Client::~Client() {
|
|
is_destoying = true;
|
|
worker->clearTasks();
|
|
disconnect();
|
|
worker->waitForTasks();
|
|
worker->stopAndWait();
|
|
piDeleteSafety(worker);
|
|
}
|
|
|
|
|
|
void PIMQTT::Client::connect(const PIString & address, const PIString & client, const PIString & username, const PIString & password) {
|
|
if (is_destoying) return;
|
|
ConnectInfo ci;
|
|
ci.address = address;
|
|
ci.clientID = client;
|
|
ci.username = username;
|
|
ci.password = password;
|
|
worker->enqueueTask([this, ci] { connectInternal(ci); });
|
|
}
|
|
|
|
|
|
void PIMQTT::Client::disconnect() {
|
|
worker->enqueueTask([this] { disconnectInternal(); });
|
|
}
|
|
|
|
|
|
void PIMQTT::Client::subscribe(const PIString & topic, MessageFunction functor, QoS qos) {
|
|
if (is_destoying) return;
|
|
Subscribe sub{topic, functor, qos};
|
|
// piCout << "subscribe" << topic;
|
|
PIString mqtt_topic = registerSubscribe(sub);
|
|
if (mqtt_topic.isEmpty()) return;
|
|
sub.topic = mqtt_topic;
|
|
worker->enqueueTask([this, sub] { subscribeInternal(sub); });
|
|
}
|
|
|
|
|
|
void PIMQTT::Client::unsubscribe(const PIString & topic) {
|
|
if (is_destoying) return;
|
|
// piCout << "unsubscribe" << topic;
|
|
PIString mqtt_topic = unregisterSubscribe(topic);
|
|
if (mqtt_topic.isEmpty()) return;
|
|
worker->enqueueTask([this, mqtt_topic] { unsubscribeInternal(mqtt_topic); });
|
|
}
|
|
|
|
|
|
void PIMQTT::Client::publish(const PIString & topic, const PIByteArray & msg, QoS qos) {
|
|
MessageMutable m;
|
|
m.setTopic(topic);
|
|
m.setPayload(msg);
|
|
m.setQos(qos);
|
|
publish(m);
|
|
}
|
|
|
|
|
|
void PIMQTT::Client::publish(const MessageConst & msg) {
|
|
if (is_destoying) return;
|
|
worker->enqueueTask([this, msg] { publishInternal(msg); });
|
|
}
|
|
|
|
|
|
PIStringList PIMQTT::Client::usedTopics() const {
|
|
return PRIVATE->endpoints.getRef()->topics_binded.keys();
|
|
}
|
|
|
|
|
|
void PIMQTT::Client::unsubscribeAll() {
|
|
{
|
|
auto ref = PRIVATE->endpoints.getRef();
|
|
auto tit = ref->topics_binded.makeIterator();
|
|
while (tit.next()) {
|
|
if (tit.value().counter <= 0) continue;
|
|
PIString mqtt_topic = tit.key();
|
|
worker->enqueueTask([this, mqtt_topic] { unsubscribeInternal(mqtt_topic); });
|
|
}
|
|
}
|
|
unregisterAll();
|
|
}
|
|
|
|
|
|
void PIMQTT::Client::mqtt_connectionLost() {
|
|
// piCoutObj << "mqtt_connectionLost";
|
|
PRIVATE->connected = false;
|
|
changeStatus(Idle);
|
|
disconnected(Error::ServerUnavailable);
|
|
}
|
|
|
|
|
|
void PIMQTT::Client::mqtt_deliveryComplete(int token) {}
|
|
|
|
|
|
void PIMQTT::Client::mqtt_messageArrived(MessageMutable & msg) {
|
|
PIStringList in_path = msg.topicList();
|
|
PIMQTT::Client::MessageFunction function;
|
|
// piCoutObj << "mqtt_messageArrived";
|
|
{
|
|
bool found = false;
|
|
auto ref = PRIVATE->endpoints.getRef();
|
|
auto pit = ref->prepared.makeReverseIterator();
|
|
while (pit.next()) { // by priority
|
|
|
|
auto tit = pit.value().makeIterator();
|
|
while (tit.next()) { // by MQTT topic
|
|
|
|
for (const auto & ep: tit.value()) {
|
|
PIMap<PIString, PIString> ext_args;
|
|
if (ep.match(in_path, ext_args)) {
|
|
msg.pathArguments() = ext_args;
|
|
function = ep.function;
|
|
found = true;
|
|
break;
|
|
}
|
|
}
|
|
if (found) break;
|
|
}
|
|
if (found) break;
|
|
}
|
|
}
|
|
if (function)
|
|
function(msg);
|
|
else
|
|
receivedUnhandled(msg);
|
|
}
|
|
|
|
|
|
PIString PIMQTT::Client::registerSubscribe(const Subscribe & sub) {
|
|
Endpoint ep;
|
|
ep.create(convertTopic2HTTP(sub.topic));
|
|
ep.function = sub.functor;
|
|
PIString topic = topicFromEndpoint(ep);
|
|
if (topic.isEmpty()) {
|
|
piCoutObj << "Warning: subscribe to empty topic, ignore";
|
|
return {};
|
|
}
|
|
// piCout << sub.topic << "->" << topic << ep.priority;
|
|
bool is_new_topic = false;
|
|
auto ref = PRIVATE->endpoints.getRef();
|
|
auto & eps_by_topic(ref->prepared[ep.priority][topic]);
|
|
for (const auto & i: eps_by_topic) {
|
|
if (i.path == ep.path) {
|
|
piCoutObj << "Warning: subscribe duplicate path, ignore";
|
|
return {};
|
|
}
|
|
}
|
|
eps_by_topic << ep;
|
|
auto & usage(ref->topics_binded[topic]);
|
|
is_new_topic = usage.counter == 0;
|
|
++usage.counter;
|
|
if (!is_new_topic) return {};
|
|
usage.qos = sub.qos;
|
|
return topic;
|
|
}
|
|
|
|
|
|
PIString PIMQTT::Client::unregisterSubscribe(const PIString & mqtt_topic) {
|
|
Endpoint ep;
|
|
ep.create(convertTopic2HTTP(mqtt_topic));
|
|
PIString topic = topicFromEndpoint(ep);
|
|
if (topic.isEmpty()) {
|
|
piCoutObj << "Warning: unsubscribe from empty topic, ignore";
|
|
return {};
|
|
}
|
|
// piCout << mqtt_topic << "->" << topic << ep.priority;
|
|
auto ref = PRIVATE->endpoints.getRef();
|
|
auto pit = ref->prepared.makeIterator();
|
|
while (pit.next()) { // by priority
|
|
|
|
auto tit = pit.value().makeIterator();
|
|
while (tit.next()) { // by MQTT topic
|
|
|
|
auto & eps(tit.value());
|
|
for (int i = 0; i < eps.size_s(); ++i) {
|
|
if (eps[i].path == ep.path) {
|
|
eps.remove(i);
|
|
auto & usage(ref->topics_binded[tit.key()]);
|
|
--usage.counter;
|
|
PIString ret;
|
|
if (usage.counter <= 0) {
|
|
ret = tit.key();
|
|
ref->topics_binded.remove(tit.key());
|
|
}
|
|
if (eps.isEmpty()) {
|
|
// piCout << "remove topics" << tit.key();
|
|
pit.value().remove(tit.key());
|
|
}
|
|
return ret;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
piCoutObj << "Warning: unsubscribe from" << mqtt_topic << ", topic not found";
|
|
return {};
|
|
}
|
|
|
|
|
|
void PIMQTT::Client::unregisterAll() {
|
|
auto ref = PRIVATE->endpoints.getRef();
|
|
ref->prepared.clear();
|
|
ref->topics_binded.clear();
|
|
}
|
|
|
|
|
|
void PIMQTT::Client::connectInternal(const ConnectInfo & ci) {
|
|
destroy();
|
|
changeStatus(Connecting);
|
|
PRIVATE->connected = false;
|
|
PINetworkAddress net_addr = PINetworkAddress::resolve(ci.address);
|
|
if (net_addr.port() == 0) net_addr.setPort(1883);
|
|
PIString server_uri = "tcp://" + net_addr.toString();
|
|
MQTTClient_create(&PRIVATE->client, server_uri.dataAscii(), ci.clientID.dataUTF8(), MQTTCLIENT_PERSISTENCE_NONE, nullptr);
|
|
MQTTClient_setCallbacks(PRIVATE->client,
|
|
this,
|
|
PRIVATE->connectionLost_callback,
|
|
PRIVATE->messageArrived_callback,
|
|
PRIVATE->deliveryComplete_callback);
|
|
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
|
|
conn_opts.keepAliveInterval = 9;
|
|
conn_opts.cleansession = 1;
|
|
conn_opts.connectTimeout = piRound(connect_timeout.toSeconds());
|
|
if (ci.username.isNotEmpty()) conn_opts.username = ci.username.dataAscii();
|
|
if (ci.password.isNotEmpty()) conn_opts.password = ci.password.dataAscii();
|
|
int ret = MQTTClient_connect(PRIVATE->client, &conn_opts);
|
|
if (ret != MQTTCLIENT_SUCCESS) {
|
|
piCoutObj << "Failed to connect to " + server_uri << ", code" << ret;
|
|
changeStatus(Idle);
|
|
if (ret < 0) ret = static_cast<int>(Error::ServerUnavailable);
|
|
disconnected(static_cast<Error>(ret));
|
|
return;
|
|
}
|
|
PRIVATE->connected = true;
|
|
PIMap<PIString, TopicUsage> topics_binded;
|
|
{ topics_binded = PRIVATE->endpoints.getRef()->topics_binded; }
|
|
auto it = topics_binded.makeIterator();
|
|
while (it.next()) {
|
|
if (it.value().counter > 0) subscribeInternal({it.key(), nullptr, it.value().qos});
|
|
}
|
|
changeStatus(Connected);
|
|
connected();
|
|
}
|
|
|
|
|
|
inline void PIMQTT::Client::disconnectInternal() {
|
|
bool was_connected = PRIVATE->connected;
|
|
changeStatus(Idle);
|
|
destroy();
|
|
if (was_connected) disconnected(Error::Unknown);
|
|
PRIVATE->connected = false;
|
|
}
|
|
|
|
|
|
void PIMQTT::Client::publishInternal(const MessageConst & m) {
|
|
if (!PRIVATE->client) return;
|
|
MQTTClient_message pubmsg = MQTTClient_message_initializer;
|
|
pubmsg.payload = const_cast<void *>(static_cast<const void *>(m.payload().data()));
|
|
pubmsg.payloadlen = m.payload().size_s();
|
|
pubmsg.qos = static_cast<int>(m.qos());
|
|
int ret = MQTTClient_publishMessage(PRIVATE->client, m.topic().dataAscii(), &pubmsg, nullptr);
|
|
if (ret != MQTTCLIENT_SUCCESS) {
|
|
piCoutObj << "Failed to publishMessage, code" << ret;
|
|
}
|
|
}
|
|
|
|
|
|
void PIMQTT::Client::subscribeInternal(const Subscribe & sub) {
|
|
if (!PRIVATE->client) return;
|
|
// piCout << "subscribeInternal" << sub.topic;
|
|
int ret = MQTTClient_subscribe(PRIVATE->client, sub.topic.dataUTF8(), static_cast<int>(sub.qos));
|
|
if (ret != MQTTCLIENT_SUCCESS) {
|
|
piCoutObj << "Failed to subscribe" << sub.topic << ", code" << ret;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
void PIMQTT::Client::unsubscribeInternal(const PIString & mqtt_topic) {
|
|
if (!PRIVATE->client) return;
|
|
// piCout << "unsubscribeInternal" << mqtt_topic;
|
|
int ret = MQTTClient_unsubscribe(PRIVATE->client, mqtt_topic.dataUTF8());
|
|
if (ret != MQTTCLIENT_SUCCESS) {
|
|
piCoutObj << "Failed to unsubscribe" << mqtt_topic << ", code" << ret;
|
|
}
|
|
}
|
|
|
|
|
|
void PIMQTT::Client::destroy() {
|
|
if (!PRIVATE->client) return;
|
|
if (PRIVATE->connected) MQTTClient_disconnect(PRIVATE->client, 1000);
|
|
MQTTClient_destroy(&PRIVATE->client);
|
|
PRIVATE->connected = false;
|
|
PRIVATE->client = nullptr;
|
|
}
|
|
|
|
|
|
void PIMQTT::Client::changeStatus(Status s) {
|
|
m_status = s;
|
|
}
|
|
|
|
PIString PIMQTT::Client::convertTopic2MQTT(const PIString & topic) {
|
|
return topic.replacedAll("**", '#').replacedAll('*', '+');
|
|
}
|
|
|
|
|
|
PIString PIMQTT::Client::convertTopic2HTTP(const PIString & topic) {
|
|
return topic.replacedAll('#', "**").replacedAll('+', '*');
|
|
}
|