PIObject::deleted now has 1 argument

PIIODevice small refactoring
new PIIODevice virtual methods: threadedReadTerminated() and threadedWriteTerminated()
PIIODevice::stop now accept bool "hard" instead of "wait"

PIStreamPacker new features: packet size crypt and aggressive optimization
This commit is contained in:
2021-04-07 22:13:56 +03:00
parent b2f8132518
commit 4584d9c639
13 changed files with 219 additions and 50 deletions

View File

@@ -2,9 +2,9 @@ cmake_minimum_required(VERSION 3.0)
cmake_policy(SET CMP0017 NEW) # need include() with .cmake
project(pip)
set(pip_MAJOR 2)
set(pip_MINOR 20)
set(pip_MINOR 21)
set(pip_REVISION 0)
set(pip_SUFFIX )
set(pip_SUFFIX alpha)
set(pip_COMPANY SHS)
set(pip_DOMAIN org.SHS)

View File

@@ -47,6 +47,12 @@ void PICloudServer::setServerName(const PIString & server_name) {
}
PIVector<PICloudServer::Client *> PICloudServer::clients() const {
PIMutexLocker _ml(clients_mutex);
return clients_;
}
bool PICloudServer::openDevice() {
piCout << "PICloudServer open device" << path();
bool op = eth.connect(path(), false);
@@ -61,10 +67,12 @@ bool PICloudServer::openDevice() {
bool PICloudServer::closeDevice() {
eth.stop();
for (auto c : clients) {
clients_mutex.lock();
for (auto c : clients_) {
c->stop();
c->close();
}
clients_mutex.unlock();
eth.close();
return true;
}
@@ -103,6 +111,7 @@ PICloudServer::Client::~Client() {
is_connected = false;
cond_buff.notifyOne();
}
stop();
close();
}
@@ -156,32 +165,56 @@ void PICloudServer::_readed(PIByteArray & ba) {
switch (hdr.first) {
case PICloud::TCP::Connect: {
uint id = tcp.parseConnect(ba);
clients_mutex.lock();
Client * oc = index_clients.value(id, nullptr);
if (oc) {
clients_mutex.unlock();
tcp.sendDisconnected(id);
} else {
piCoutObj << "new Client" << id;
Client * c = new Client(this, id);
clients << c;
CONNECTU(c, deleted, this, clientDeleted)
clients_ << c;
index_clients.insert(id, c);
clients_mutex.unlock();
newConnection(c);
}
} break;
case PICloud::TCP::Disconnect: {
uint id = tcp.parseDisconnect(ba);
clients_mutex.lock();
Client * oc = index_clients.value(id, nullptr);
if (oc) {
oc->is_connected = false;
oc->close();
}
clients_mutex.unlock();
} break;
case PICloud::TCP::Data: {
PIPair<uint, PIByteArray> d = tcp.parseDataServer(ba);
clients_mutex.lock();
Client * oc = index_clients.value(d.first, nullptr);
if (oc && !d.second.isEmpty()) oc->pushBuffer(d.second);
clients_mutex.unlock();
} break;
default:
break;
}
}
}
void PICloudServer::clientDeleted(PIObject * o) {
PICloudServer::Client * c = (PICloudServer::Client*)o;
clients_mutex.lock();
clients_.removeOne(c);
auto it = index_clients.makeIterator();
while (it.hasNext()) {
it.next();
if (it.value() == c) {
index_clients.remove(it.key());
break;
}
}
clients_mutex.unlock();
}

View File

@@ -47,8 +47,10 @@
PIStreamPacker::PIStreamPacker(PIIODevice * dev): PIObject() {
crypt_frag = false;
crypt_frag = crypt_size = false;
aggressive_optimization = true;
packet_size = -1;
size_crypted_size = sizeof(int);
crypt_frag_size = 1024*1024;
max_packet_size = 1400;
packet_sign = 0xAFBE;
@@ -56,6 +58,16 @@ PIStreamPacker::PIStreamPacker(PIIODevice * dev): PIObject() {
}
void PIStreamPacker::setCryptSizeEnabled(bool on) {
crypt_size = on;
if (crypt_size) {
PIByteArray ba; ba << int(0);
size_crypted_size = cryptData(ba).size_s();
} else
size_crypted_size = sizeof(int);
}
void PIStreamPacker::send(const PIByteArray & data) {
if (data.isEmpty()) return;
PIByteArray cd;
@@ -74,7 +86,12 @@ void PIStreamPacker::send(const PIByteArray & data) {
}
//piCout << "crypt" << data.size() << "->" << cd.size() << key().size();
PIByteArray hdr, part;
hdr << packet_sign << int(cd.size_s());
hdr << packet_sign;
if (crypt_size) {
PIByteArray crsz; crsz << int(cd.size_s());
hdr.append(cryptData(crsz));
} else
hdr << int(cd.size_s());
cd.insert(0, hdr);
int pcnt = (cd.size_s() - 1) / max_packet_size + 1, pst = 0;
if (pcnt > 1) {
@@ -114,21 +131,37 @@ void PIStreamPacker::received(uchar * readed, int size) {
void PIStreamPacker::received(const PIByteArray & data) {
stream.append(data);
//piCout << "rec" << data.size();
while (stream.size_s() >= 6) {
while (!stream.isEmpty()) {
int hdr_size = sizeof(packet_sign) + size_crypted_size;
if (packet_size < 0) {
if (stream.size_s() < hdr_size) return;
ushort sign(0);
memcpy(&sign, stream.data(), 2);
if (sign != packet_sign) {
stream.pop_front();
if (aggressive_optimization) stream.clear();
else stream.pop_front();
continue;
}
int sz = -1;
memcpy(&sz, stream.data(2), 4);
if (crypt_size) {
PIByteArray crsz((uint)size_crypted_size);
memcpy(crsz.data(), stream.data(2), size_crypted_size);
crsz = decryptData(crsz);
if (crsz.size() < sizeof(sz)) {
if (aggressive_optimization) stream.clear();
else stream.pop_front();
continue;
}
crsz >> sz;
} else {
memcpy(&sz, stream.data(2), size_crypted_size);
}
if (sz < 0) {
stream.pop_front();
if (aggressive_optimization) stream.clear();
else stream.pop_front();
continue;
}
stream.remove(0, 6);
stream.remove(0, hdr_size);
packet.clear();
packet_size = sz;
if (packet_size == 0)
@@ -189,6 +222,8 @@ void PIStreamPacker::received(const PIByteArray & data) {
void PIStreamPacker::assignDevice(PIIODevice * dev) {
if (!dev) return;
if (!dev->infoFlags()[PIIODevice::Reliable])
piCoutObj << "Warning! Not recommended to use with non-reliable" << dev;
CONNECTU(dev, threadedReadEvent, this, received);
CONNECTU(this, sendRequest, dev, write);
}

View File

@@ -59,6 +59,8 @@ public:
void setServerName(const PIString & server_name);
PIVector<Client *> clients() const;;
EVENT1(newConnection, PICloudServer::Client * , client)
protected:
@@ -69,11 +71,13 @@ protected:
private:
EVENT_HANDLER1(void, _readed, PIByteArray &, ba);
EVENT_HANDLER1(void, clientDeleted, PIObject *, o);
void clientDisconnect(uint client_id);
int sendData(const PIByteArray & data, uint client_id);
PIVector<Client *> clients;
PIVector<Client *> clients_;
PIMap<uint, Client *> index_clients;
mutable PIMutex clients_mutex;
};
#endif // PICLOUDSERVER_H

View File

@@ -405,7 +405,7 @@ void PIObject::piDisconnect(PIObject * src, const PIString & sig) {
void PIObject::piDisconnect(PIObject * src) {
src->deleted();
src->deleted(src);
PIMutexLocker _ml(src->mutex_connect);
PIVector<PIObject * > cv = src->connectors.toVector();
piForeach (PIObject * o, cv) {

View File

@@ -414,14 +414,14 @@ protected:
//! Virtual function executes after property with name "name" has been changed
virtual void propertyChanged(const PIString & name) {}
EVENT(deleted)
EVENT1(deleted, PIObject *, o)
//! \events
//! \{
/** \fn void deleted()
/** \fn void deleted(PIObject * o)
* \brief Raise before object delete
* \note This event raised from destructor, so use only emitter() value,
* \note This event raised from destructor, so use only "o" value,
* don`t try to cast deleted object to some subclass! */
//! \}

View File

@@ -151,6 +151,8 @@ bool PIBinaryLog::openDevice() {
bool PIBinaryLog::closeDevice() {
stopThreadedRead();
pausemutex.unlock();
moveIndex(-1);
is_indexed = false;
index.clear();
@@ -177,8 +179,8 @@ bool PIBinaryLog::threadedRead(uchar *readed, int size) {
case PlayRealTime:
pausemutex.lock();
if (is_pause) {
piMSleep(100);
pausemutex.unlock();
piMSleep(100);
return false;
} else if (pause_time > PISystemTime()) {
startlogtime += pause_time;

View File

@@ -289,6 +289,7 @@ protected:
bool closeDevice();
void propertyChanged(const PIString &);
bool threadedRead(uchar *readed, int size);
void threadedReadTerminated() {pausemutex.unlock();}
DeviceInfoFlags deviceInfoFlags() const {return PIIODevice::Reliable;}
private:

View File

@@ -806,9 +806,9 @@ PIIODevice::DeviceInfoFlags PIEthernet::deviceInfoFlags() const {
}
void PIEthernet::clientDeleted() {
void PIEthernet::clientDeleted(PIObject * o) {
clients_mutex.lock();
clients_.removeOne((PIEthernet*)emitter());
clients_.removeOne((PIEthernet*)o);
clients_mutex.unlock();
}

View File

@@ -494,7 +494,7 @@ protected:
PIStringList mcast_groups;
private:
EVENT_HANDLER(void, clientDeleted);
EVENT_HANDLER1(void, clientDeleted, PIObject *, o);
static void server_func(void * eth);
void setType(Type t, bool reopen = true) {setProperty(PIStringAscii("type"), (int)t); if (reopen && isOpened()) {closeDevice(); init(); openDevice();}}

View File

@@ -154,21 +154,62 @@ bool PIIODevice::setOption(PIIODevice::DeviceOption o, bool yes) {
}
void PIIODevice::stopThreadedRead() {
bool stopThread(PIThread * t, bool hard) {
#ifdef FREERTOS
PIThread::stop(true);
t->stop(true);
#else
PIThread::terminate();
if (hard) {
t->terminate();
return true;
} else {
t->stop();
if (t->waitForFinish(10000)) {
t->terminate();
return true;
}
}
#endif
return false;
}
void PIIODevice::stopThreadedWrite() {
#ifdef FREERTOS
write_thread.stop(true);
#else
write_thread.terminate();
#endif
void PIIODevice::stopThreadedRead(bool hard) {
if (stopThread(this, hard))
threadedReadTerminated();
}
void PIIODevice::stopThreadedWrite(bool hard) {
if (stopThread(&write_thread, hard))
threadedWriteTerminated();
}
void PIIODevice::clearThreadedWriteQueue() {
write_thread.lock();
write_queue.clear();
write_thread.unlock();
}
void PIIODevice::start() {
startThreadedRead();
startThreadedWrite();
}
void PIIODevice::stop(bool hard) {
stopThreadedRead(hard);
stopThreadedWrite(hard);
}
PIByteArray PIIODevice::read(int max_size) {
buffer_in.resize(max_size);
int ret = readDevice(buffer_in.data(), max_size);
if (ret < 0)
return PIByteArray();
return buffer_in.resized(ret);
}
@@ -183,7 +224,7 @@ void PIIODevice::_init() {
setReopenTimeout(1000);
#ifdef FREERTOS
threaded_read_buffer_size = 512;
// setThreadedReadBufferSize(512);
//setThreadedReadBufferSize(512);
#else
threaded_read_buffer_size = 4096;
#endif
@@ -314,6 +355,41 @@ ullong PIIODevice::writeThreaded(const PIByteArray & data) {
}
bool PIIODevice::open() {
if (!init_) init();
buffer_tr.resize(threaded_read_buffer_size);
opened_ = openDevice();
if (opened_) opened();
return opened_;
}
bool PIIODevice::open(const PIString & _path) {
setPath(_path);
return open();
}
bool PIIODevice::open(DeviceMode _mode) {
mode_ = _mode;
return open();
}
bool PIIODevice::open(const PIString & _path, DeviceMode _mode) {
setPath(_path);
mode_ = _mode;
return open();
}
bool PIIODevice::close() {
opened_ = !closeDevice();
if (!opened_) closed();
return !opened_;
}
bool PIIODevice::configure(const PIString & config_file, const PIString & section, bool parent_section) {
PIConfig conf(config_file, PIIODevice::ReadOnly);
if (!conf.isOpened()) return false;

View File

@@ -175,10 +175,10 @@ public:
void startThreadedRead() {if (!isRunning()) PIThread::start();}
//! Start threaded read and assign "threaded read slot" to "func"
void startThreadedRead(ReadRetFunc func) {ret_func_ = func; if (!isRunning()) PIThread::start();}
void startThreadedRead(ReadRetFunc func) {ret_func_ = func; startThreadedRead();}
//! Stop threaded read
void stopThreadedRead();
//! Stop threaded read. Hard stop terminate thread, otherwise wait fo 10 seconds
void stopThreadedRead(bool hard = true);
//! Return \b true if threaded write is started
@@ -187,25 +187,25 @@ public:
//! Start threaded write
void startThreadedWrite() {if (!write_thread.isRunning()) write_thread.startOnce();}
//! Stop threaded write
void stopThreadedWrite();
//! Stop threaded write. Hard stop terminate thread, otherwise wait fo 10 seconds
void stopThreadedWrite(bool hard = true);
//! Clear threaded write task queue
void clearThreadedWriteQueue() {write_thread.lock(); write_queue.clear(); write_thread.unlock();}
void clearThreadedWriteQueue();
//! Start both threaded read and threaded write
void start() {startThreadedRead(); startThreadedWrite();}
void start();
//! Stop both threaded read and threaded write and if "wait" block until both threads are stop
void stop(bool wait = false) {stopThreadedRead(); stopThreadedWrite(); if (wait) while (write_thread.isRunning() || isRunning()) msleep(PIP_MIN_MSLEEP);}
//! Stop both threaded read and threaded write. Hard stop terminate threads, otherwise wait fo 10 seconds
void stop(bool hard = true);
//! Read from device maximum "max_size" bytes to "read_to"
int read(void * read_to, int max_size) {return readDevice(read_to, max_size);}
//! Read from device maximum "max_size" bytes and return them as PIByteArray
PIByteArray read(int max_size) {buffer_in.resize(max_size); int ret = readDevice(buffer_in.data(), max_size); if (ret < 0) return PIByteArray(); return buffer_in.resized(ret);}
PIByteArray read(int max_size);
//! Write maximum "max_size" bytes of "data" to device
int write(const void * data, int max_size) {return writeDevice(data, max_size);}
@@ -258,11 +258,11 @@ public:
static PIStringList availablePrefixes();
EVENT_HANDLER(bool, open) {if (!init_) init(); buffer_tr.resize(threaded_read_buffer_size); opened_ = openDevice(); if (opened_) opened(); return opened_;}
EVENT_HANDLER1(bool, open, const PIString &, _path) {setPath(_path); if (!init_) init(); buffer_tr.resize(threaded_read_buffer_size); opened_ = openDevice(); if (opened_) opened(); return opened_;}
bool open(DeviceMode _mode) {mode_ = _mode; if (!init_) init(); buffer_tr.resize(threaded_read_buffer_size); opened_ = openDevice(); if (opened_) opened(); return opened_;}
EVENT_HANDLER2(bool, open, const PIString &, _path, DeviceMode, _mode) {setPath(_path); mode_ = _mode; if (!init_) init(); buffer_tr.resize(threaded_read_buffer_size); opened_ = openDevice(); if (opened_) opened(); return opened_;}
EVENT_HANDLER(bool, close) {opened_ = !closeDevice(); if (!opened_) closed(); return !opened_;}
EVENT_HANDLER(bool, open);
EVENT_HANDLER1(bool, open, const PIString &, _path);
bool open(DeviceMode _mode);
EVENT_HANDLER2(bool, open, const PIString &, _path, DeviceMode, _mode);
EVENT_HANDLER(bool, close);
EVENT_HANDLER1(int, write, PIByteArray, data) {return writeDevice(data.data(), data.size_s());}
EVENT_VHANDLER(void, flush) {;}
@@ -377,6 +377,12 @@ protected:
//! Reimplement to apply new \a threadedReadBufferSize()
virtual void threadedReadBufferSizeChanged() {;}
//! Invoked after hard read thread stop
virtual void threadedReadTerminated() {;}
//! Invoked after hard write thread stop
virtual void threadedWriteTerminated() {;}
static PIIODevice * newDeviceByPrefix(const PIString & prefix);
void terminate();

View File

@@ -57,26 +57,38 @@ public:
void setMaxPacketSize(int max_size) {max_packet_size = max_size;}
//! Returns maximum size of single packet, default 1400 bytes
int maxPacketSize() {return max_packet_size;}
int maxPacketSize() const {return max_packet_size;}
//! Set packet sinature
void setPacketSign(ushort sign_) {packet_sign = sign_;}
//! Returns packet sinature, default 0xAFBE
ushort packetSign() {return packet_sign;}
ushort packetSign() const {return packet_sign;}
//! Set receive aggressive optimization. If yes then %PIStreamPacker doesn`t
//! check every byte in incoming stream but check only begin of each read()
//! result. Default is \b true.
void setaAggressiveOptimization(bool yes) {aggressive_optimization = yes;}
//! Returns aggressive optimization
bool aggressiveOptimization() const {return aggressive_optimization;}
bool cryptFragmentationEnabled() const {return crypt_frag;}
void setCryptFragmentationEnabled(bool on) {crypt_frag = on;}
int cryptFragmentationSize() const {return crypt_frag_size;}
void setCryptFragmentationSize(int size_) {crypt_frag_size = size_;}
bool cryptSizeEnabled() const {return crypt_size;}
void setCryptSizeEnabled(bool on);
//! Prepare data for send and raise \a sendRequest() events
void send(const PIByteArray & data);
//! Receive data part. If packet is ready, raise \a received() event
//! Receive data part. If packet is ready, raise \a packetReceiveEvent() event
//! and \a packetReceived() virtual method
void received(const PIByteArray & data);
EVENT_HANDLER2(void, received, uchar * , readed, int, size);
@@ -123,10 +135,10 @@ protected:
private:
PIByteArray stream, packet;
bool crypt_frag;
bool crypt_frag, crypt_size, aggressive_optimization;
int packet_size, crypt_frag_size;
ushort packet_sign;
int max_packet_size;
int max_packet_size, size_crypted_size;
Progress prog_s, prog_r;
mutable PIMutex prog_s_mutex, prog_r_mutex;