ready to test

This commit is contained in:
2022-11-01 09:17:24 +03:00
parent 6e81a419fb
commit 591c92b4bb
10 changed files with 61 additions and 29 deletions

View File

@@ -107,7 +107,6 @@ bool PICloudClient::closeDevice() {
ssize_t PICloudClient::readDevice(void * read_to, ssize_t max_size) {
if (is_deleted) return -1;
//piCoutObj << "readDevice";
reading_now = true;
if (!is_connected && eth.isClosed()) openDevice();
ssize_t sz = -1;
mutex_buff.lock();
@@ -119,7 +118,6 @@ ssize_t PICloudClient::readDevice(void * read_to, ssize_t max_size) {
}
mutex_buff.unlock();
if (!is_connected) opened_ = false;
reading_now = false;
//piCoutObj << "readDevice done" << sz;
return sz;
}
@@ -132,6 +130,11 @@ ssize_t PICloudClient::writeDevice(const void * data, ssize_t size) {
}
void PICloudClient::stopThreadedReadDevice() {
cond_buff.notifyOne();
}
void PICloudClient::internalDisconnect() {
is_connected = false;
cond_buff.notifyOne();
@@ -177,4 +180,3 @@ void PICloudClient::_readed(PIByteArray & ba) {
while (buff.size_s() > threadedReadBufferSize()) piMSleep(100); // FIXME: sleep here is bad
//piCoutObj << "_readed done";
}

View File

@@ -90,10 +90,8 @@ bool PICloudServer::closeDevice() {
ssize_t PICloudServer::readDevice(void * read_to, ssize_t max_size) {
//piCoutObj << "readDevice";
reading_now = true;
if (!opened_) openDevice();
else piMSleep(eth.readTimeout());
reading_now = false;
return -1;
}
@@ -104,6 +102,11 @@ ssize_t PICloudServer::writeDevice(const void * data, ssize_t max_size) {
}
void PICloudServer::stopThreadedReadDevice() {
}
void PICloudServer::clientDisconnect(uint client_id) {
tcp.sendDisconnected(client_id);
}

View File

@@ -53,6 +53,7 @@ protected:
ssize_t readDevice(void * read_to, ssize_t max_size) override;
ssize_t writeDevice(const void * data, ssize_t size) override;
DeviceInfoFlags deviceInfoFlags() const override {return PIIODevice::Reliable;}
void stopThreadedReadDevice() override;
private:
EVENT_HANDLER1(void, _readed, PIByteArray &, data);
@@ -65,6 +66,7 @@ private:
PIConditionVariable cond_connect;
std::atomic_bool is_connected;
std::atomic_bool is_deleted;
};
#endif // PICLOUDCLIENT_H

View File

@@ -73,6 +73,7 @@ protected:
bool closeDevice() override;
ssize_t readDevice(void * read_to, ssize_t max_size) override;
ssize_t writeDevice(const void * data, ssize_t max_size) override;
void stopThreadedReadDevice() override;
private:
EVENT_HANDLER1(void, _readed, PIByteArray &, ba);

View File

@@ -752,9 +752,9 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) {
#ifdef QNX
PRIVATE->addr_.sin_len = sizeof(PRIVATE->addr_);
#endif
//piCout << "connect to " << path() << "...";
piCout << "connect to " << path() << "...";
connected_ = connectTCP();
//piCout << "connect to " << path() << connected_;
piCout << "connect to " << path() << connected_;
if (!connected_)
piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString();
opened_ = connected_;

View File

@@ -132,6 +132,7 @@ PIIODevice::PIIODevice(const PIString & path, PIIODevice::DeviceMode mode): PIOb
PIIODevice::~PIIODevice() {
stop();
waitThreadedReadFinished();
}
@@ -213,11 +214,12 @@ void PIIODevice::stopThreadedRead() {
#ifdef MICRO_PIP
read_thread.stop();
#else
read_thread.stop();
read_thread.interrupt();
stopThreadedReadDevice();
if (reading_now) {
read_thread.terminate();
reading_now = false;
} else {
read_thread.stop();
}
#endif
}

View File

@@ -490,6 +490,10 @@ protected:
//! \~english Function executed when thread read some data, default implementation execute external callback "ret_func_"
//! \~russian Метод вызывается после каждого успешного потокового чтения, по умолчанию вызывает callback "ret_func_"
virtual bool threadedRead(const uchar * readed, ssize_t size);
//! \~english Function executed after PIThread::stop() and PIThread::interrupt() of read thread
//! \~russian Метод вызывается после PIThread::stop() и PIThread::interrupt() потока чтения
virtual void stopThreadedReadDevice() {}
//! \~english Reimplement to construct full unambiguous string, describes this device.
//! Default implementation returns \a path()

View File

@@ -798,9 +798,7 @@ ssize_t PISerial::readDevice(void * read_to, ssize_t max_size) {
if (sending) return -1;
// piCoutObj << "com event ...";
//piCoutObj << "read ..." << PRIVATE->hCom;
reading_now = true;
ReadFile(PRIVATE->hCom, read_to, max_size, &PRIVATE->readed, 0);
reading_now = false;
DWORD err = GetLastError();
//piCout << err;
if (err == ERROR_BAD_COMMAND || err == ERROR_ACCESS_DENIED) {

View File

@@ -1114,7 +1114,10 @@ PIVector<PIIODevice * > PIConnection::DevicePool::boundedDevices(const PIConnect
PIConnection::DevicePool::DeviceData::~DeviceData() {
if (rthread) {
rthread->terminate();
rthread->stop();
rthread->interrupt();
if (!rthread->waitForFinish(1000))
rthread->terminate();
delete rthread;
rthread = nullptr;
}
@@ -1146,8 +1149,13 @@ void __DevicePool_threadReadDP(void * ddp) {
}
if (dev->isClosed()) {
if (!dev->open()) {
piMSleep(dev->reopenTimeout());
return;
PITimeMeasurer tm;
int timeout = dev->reopenTimeout();
while (tm.elapsed_m() < timeout) {
if (dd->rthread->isStopping())
return;
piMSleep(50);
}
}
}
PIByteArray ba;

View File

@@ -109,41 +109,53 @@ int main(int argc, char * argv[]) {
delete threads[i];
}*/
PIEthernet eth(PIEthernet::TCP_Server), seth(PIEthernet::TCP_Client);
//eth.setReadAddress("127.0.0.1", 50000);
PIEthernet eth(PIEthernet::TCP_Client), seth(PIEthernet::TCP_Client);
eth.connect("192.168.1.13", 22);
eth.startThreadedRead();
//piCout << eth.open();
//PISerial ser;
//ser.setSpeed(PISerial::S9600);
//ser.setOption(PIIODevice::BlockingRead);
//piCout << ser.open("COM3");
/*
PISerial ser;
ser.setSpeed(PISerial::S9600);
ser.setOption(PIIODevice::BlockingRead);
piCout << ser.open("COM3");
*/
/*
PIThread thread;
thread.start([&](void*){
piCout << "[T] start" << GetCurrentThreadId();
//PIByteArray data = ((PIIODevice*)&eth)->read(1024);
//PIByteArray data = ((PIIODevice*)&ser)->read(1024);
eth.connect("192.168.1.13", 23, false);
piCout << "[T] connected" << eth.isConnected() << errorString();
//piCout << "[T] readed" << data.size() << errorString();
piCout << "[T] end";
});
piMSleep(500);
eth.close();
//eth.close();
//piMSleep(500);
//thread.stop();
//thread.interrupt();
thread.stop();
thread.interrupt();
//seth.send("127.0.0.1", 50000, "string", 7);
//thread.interrupt();
thread.waitForFinish();*/
thread.waitForFinish();
*/
/*
eth.listen("127.0.0.1", 50000);
piMSleep(500);
seth.connect("127.0.0.1", 50001, false);
piMSleep(500);
piCout << "connected" << seth.isConnected();
//eth.close();
piCout << "main end";
*/
piMSleep(1000);
piCout << "main stop ...";
eth.stopThreadedRead();
piCout << "main wait ..." << eth.isThreadedRead();
eth.waitThreadedReadFinished();
eth.close();
//eth.close();
piCout << "main end" << eth.isThreadedRead();
//ser.close();
return 0;
}