небольшая чистка и улучшение кода, попытка исправить picloud

This commit is contained in:
Бычков Андрей
2022-11-07 17:16:27 +03:00
parent 8a5e72c723
commit f08a07cab0
11 changed files with 54 additions and 79 deletions

View File

@@ -45,10 +45,7 @@ PICloudClient::PICloudClient(const PIString & path, PIIODevice::DeviceMode mode)
PICloudClient::~PICloudClient() { PICloudClient::~PICloudClient() {
//piCoutObj << "~PICloudClient()"; //piCoutObj << "~PICloudClient()";
//softStopThreadedRead();
//eth.close();
//if (is_connected) disconnected();
//close();
stopAndWait(); stopAndWait();
//piCoutObj << "~PICloudClient() closed"; //piCoutObj << "~PICloudClient() closed";
internalDisconnect(); internalDisconnect();
@@ -71,7 +68,8 @@ void PICloudClient::setKeepConnection(bool on) {
void PICloudClient::interrupt() { void PICloudClient::interrupt() {
eth.interrupt(); cond_buff.notifyOne();
cond_connect.notifyOne();
} }
@@ -87,7 +85,7 @@ bool PICloudClient::openDevice() {
mutex_connect.unlock(); mutex_connect.unlock();
if (!conn_ok) { if (!conn_ok) {
mutex_connect.lock(); mutex_connect.lock();
eth.stop(); eth.stopAndWait();
eth.close(); eth.close();
mutex_connect.unlock(); mutex_connect.unlock();
} }
@@ -104,7 +102,7 @@ bool PICloudClient::closeDevice() {
if (is_connected) { if (is_connected) {
internalDisconnect(); internalDisconnect();
} }
eth.stop(); eth.stopAndWait();
eth.close(); eth.close();
return true; return true;
} }
@@ -116,11 +114,15 @@ ssize_t PICloudClient::readDevice(void * read_to, ssize_t max_size) {
if (!is_connected && eth.isClosed()) openDevice(); if (!is_connected && eth.isClosed()) openDevice();
ssize_t sz = -1; ssize_t sz = -1;
mutex_buff.lock(); mutex_buff.lock();
cond_buff.wait(mutex_buff, [this](){return !buff.isEmpty() || !is_connected;}); cond_buff.wait(mutex_buff);
if (is_connected) { if (is_connected) {
sz = piMini(max_size, buff.size()); if (buff.isEmpty()) {
memcpy(read_to, buff.data(), sz); sz = 0;
buff.remove(0, sz); } else {
sz = piMini(max_size, buff.size());
memcpy(read_to, buff.data(), sz);
buff.remove(0, sz);
}
} }
mutex_buff.unlock(); mutex_buff.unlock();
if (!is_connected) opened_ = false; if (!is_connected) opened_ = false;
@@ -136,11 +138,6 @@ ssize_t PICloudClient::writeDevice(const void * data, ssize_t size) {
} }
void PICloudClient::stopThreadedReadDevice() {
cond_buff.notifyOne();
}
void PICloudClient::internalDisconnect() { void PICloudClient::internalDisconnect() {
is_connected = false; is_connected = false;
cond_buff.notifyOne(); cond_buff.notifyOne();

View File

@@ -102,11 +102,6 @@ ssize_t PICloudServer::writeDevice(const void * data, ssize_t max_size) {
} }
void PICloudServer::stopThreadedReadDevice() {
}
void PICloudServer::interrupt() { void PICloudServer::interrupt() {
eth.interrupt(); eth.interrupt();
} }
@@ -159,11 +154,15 @@ ssize_t PICloudServer::Client::readDevice(void * read_to, ssize_t max_size) {
if (!is_connected) return -1; if (!is_connected) return -1;
ssize_t sz = -1; ssize_t sz = -1;
mutex_buff.lock(); mutex_buff.lock();
cond_buff.wait(mutex_buff, [this](){return !buff.isEmpty() || !is_connected;}); cond_buff.wait(mutex_buff);
if (is_connected) { if (is_connected) {
sz = piMini(max_size, buff.size()); if (buff.isEmpty()) {
memcpy(read_to, buff.data(), sz); sz = 0;
buff.remove(0, sz); } else {
sz = piMini(max_size, buff.size());
memcpy(read_to, buff.data(), sz);
buff.remove(0, sz);
}
} }
mutex_buff.unlock(); mutex_buff.unlock();
return sz; return sz;

View File

@@ -54,7 +54,6 @@ protected:
ssize_t readDevice(void * read_to, ssize_t max_size) override; ssize_t readDevice(void * read_to, ssize_t max_size) override;
ssize_t writeDevice(const void * data, ssize_t size) override; ssize_t writeDevice(const void * data, ssize_t size) override;
DeviceInfoFlags deviceInfoFlags() const override {return PIIODevice::Reliable;} DeviceInfoFlags deviceInfoFlags() const override {return PIIODevice::Reliable;}
void stopThreadedReadDevice() override;
private: private:
EVENT_HANDLER1(void, _readed, PIByteArray &, data); EVENT_HANDLER1(void, _readed, PIByteArray &, data);

View File

@@ -74,7 +74,6 @@ protected:
bool closeDevice() override; bool closeDevice() override;
ssize_t readDevice(void * read_to, ssize_t max_size) override; ssize_t readDevice(void * read_to, ssize_t max_size) override;
ssize_t writeDevice(const void * data, ssize_t max_size) override; ssize_t writeDevice(const void * data, ssize_t max_size) override;
void stopThreadedReadDevice() override;
void interrupt() override; void interrupt() override;
private: private:

View File

@@ -45,7 +45,7 @@ void PIWaitEvent::create() {
piCout << "Error with CreateEventA:" << errorString(); piCout << "Error with CreateEventA:" << errorString();
} }
#else #else
for (int i = 0; i < 3; ++i) memset(&(fds[i]), 0, sizeof(fds[i])); for (int i = 0; i < sizeof(fds); ++i) memset(&(fds[i]), 0, sizeof(fds[i]));
if (::pipe(pipe_fd) < 0) { if (::pipe(pipe_fd) < 0) {
piCout << "Error with pipe:" << errorString(); piCout << "Error with pipe:" << errorString();
} else { } else {
@@ -62,7 +62,7 @@ void PIWaitEvent::destroy() {
event = NULL; event = NULL;
} }
#else #else
for (int i = 0; i < 2; ++i) { for (int i = 0; i < sizeof(pipe_fd); ++i) {
if (pipe_fd[i] != 0) { if (pipe_fd[i] != 0) {
::close(pipe_fd[i]); ::close(pipe_fd[i]);
pipe_fd[i] = 0; pipe_fd[i] = 0;
@@ -72,35 +72,24 @@ void PIWaitEvent::destroy() {
} }
#ifdef WINDOWS bool PIWaitEvent::wait(int fd, CheckRole role) {
bool PIWaitEvent::wait() {
#else
bool PIWaitEvent::wait(int fd, PIWaitEvent::SelectRole role) {
#endif
if (!isCreate()) return false; if (!isCreate()) return false;
#ifdef WINDOWS #ifdef WINDOWS
DWORD ret = WaitForSingleObjectEx(event, INFINITE, TRUE); DWORD ret = WaitForSingleObjectEx(event, INFINITE, TRUE);
ResetEvent(event); ResetEvent(event);
if (ret == WAIT_IO_COMPLETION || ret == WAIT_FAILED) return false; if (ret == WAIT_IO_COMPLETION || ret == WAIT_FAILED) return false;
#else #else
if (fd == -1) return false;
int nfds = piMaxi(pipe_fd[ReadEnd], fd) + 1; int nfds = piMaxi(pipe_fd[ReadEnd], fd) + 1;
int fd_index = 0; int fd_index = role;
switch (role) { for (int i = 0; i < sizeof(fds); ++i) FD_ZERO(&(fds[i]));
case CheckRead: FD_SET(pipe_fd[ReadEnd], &(fds[CheckRead]));
fd_index = 0; FD_SET(fd, &(fds[CheckExeption]));
break; if (fd_index != CheckExeption) FD_SET(fd, &(fds[fd_index]));
case CheckWrite: ::select(nfds, &(fds[CheckRead]), &(fds[CheckWrite]), &(fds[CheckExeption]), nullptr);
fd_index = 1;
break;
}
for (int i = 0; i < 3; ++i) FD_ZERO(&(fds[i]));
FD_SET(pipe_fd[ReadEnd], &(fds[0]));
FD_SET(fd, &(fds[2]));
FD_SET(fd, &(fds[fd_index]));
::select(nfds, &(fds[0]), &(fds[1]), &(fds[2]), nullptr);
int buf = 0; int buf = 0;
while (::read(pipe_fd[ReadEnd], &buf, sizeof(buf)) > 0); while (::read(pipe_fd[ReadEnd], &buf, sizeof(buf)) > 0);
if (FD_ISSET(fd, &(fds[2]))) return false; if (FD_ISSET(fd, &(fds[CheckExeption]))) return false;
return FD_ISSET(fd, &(fds[fd_index])); return FD_ISSET(fd, &(fds[fd_index]));
#endif #endif
return true; return true;
@@ -124,3 +113,8 @@ bool PIWaitEvent::isCreate() const {
return pipe_fd[ReadEnd] != 0; return pipe_fd[ReadEnd] != 0;
#endif #endif
} }
void * PIWaitEvent::getEvent() const {
return event;
}

View File

@@ -37,22 +37,23 @@ public:
void create(); void create();
void destroy(); void destroy();
#ifdef WINDOWS
bool wait(); enum CheckRole { // UNIX only
#else
enum SelectRole {
CheckRead, CheckRead,
CheckWrite CheckWrite,
CheckExeption
}; };
bool wait(int fd, SelectRole role = CheckRead); bool wait(int fd = -1, CheckRole role = CheckRead);
#endif
void interrupt(); void interrupt();
bool isCreate() const; bool isCreate() const;
#ifdef WINDOWS void * getEvent() const; // WINDOWS only
HANDLE event = NULL;
#else
private: private:
#ifdef WINDOWS
void * event = nullptr;
#else
int pipe_fd[2] = {0, 0}; int pipe_fd[2] = {0, 0};
fd_set fds[3]; fd_set fds[3];
enum { enum {

View File

@@ -977,14 +977,14 @@ bool PIEthernet::connectTCP() {
#ifdef WINDOWS #ifdef WINDOWS
long PIEthernet::waitForEvent(long mask) { long PIEthernet::waitForEvent(long mask) {
if (!PRIVATE->event.isCreate() || sock < 0) return 0; if (!PRIVATE->event.isCreate() || sock < 0) return 0;
WSAEventSelect(sock, PRIVATE->event.event, mask); WSAEventSelect(sock, PRIVATE->event.getEvent(), mask);
if (PRIVATE->event.wait()) { if (PRIVATE->event.wait()) {
//DWORD wr = WSAWaitForMultipleEvents(1, &(PRIVATE->read_event), FALSE, WSA_INFINITE, TRUE); //DWORD wr = WSAWaitForMultipleEvents(1, &(PRIVATE->read_event), FALSE, WSA_INFINITE, TRUE);
//piCout << "wait result" << wr; //piCout << "wait result" << wr;
//if (wr == WSA_WAIT_EVENT_0) { //if (wr == WSA_WAIT_EVENT_0) {
WSANETWORKEVENTS events; WSANETWORKEVENTS events;
memset(&events, 0, sizeof(events)); memset(&events, 0, sizeof(events));
WSAEnumNetworkEvents(sock, PRIVATE->event.event, &events); WSAEnumNetworkEvents(sock, PRIVATE->event.getEvent(), &events);
//piCout << "wait result" << events.lNetworkEvents; //piCout << "wait result" << events.lNetworkEvents;
return events.lNetworkEvents; return events.lNetworkEvents;
} }

View File

@@ -439,9 +439,7 @@ PIByteArray PIFile::get() {
ssize_t PIFile::readDevice(void * read_to, ssize_t max_size) { ssize_t PIFile::readDevice(void * read_to, ssize_t max_size) {
if (!canRead() || PRIVATE->fd == 0) return -1; if (!canRead() || PRIVATE->fd == 0) return -1;
reading_now = true;
ssize_t ret = fread(read_to, 1, max_size, PRIVATE->fd); ssize_t ret = fread(read_to, 1, max_size, PRIVATE->fd);
reading_now = false;
return ret; return ret;
} }

View File

@@ -217,11 +217,8 @@ void PIIODevice::stopThreadedRead() {
read_thread.stop(); read_thread.stop();
if (!destroying) { if (!destroying) {
interrupt(); interrupt();
stopThreadedReadDevice(); } else {
} piCoutObj << "Error: Device is running after destructor!";
if (reading_now) {
read_thread.terminate();
reading_now = false;
} }
#endif #endif
} }
@@ -307,7 +304,6 @@ ssize_t PIIODevice::write(const void * data, ssize_t max_size) {
void PIIODevice::_init() { void PIIODevice::_init() {
reading_now = false;
setOptions(0); setOptions(0);
setReopenEnabled(true); setReopenEnabled(true);
setReopenTimeout(1000); setReopenTimeout(1000);

View File

@@ -498,10 +498,6 @@ protected:
//! \~english Function executed when thread read some data, default implementation execute external callback "ret_func_" //! \~english Function executed when thread read some data, default implementation execute external callback "ret_func_"
//! \~russian Метод вызывается после каждого успешного потокового чтения, по умолчанию вызывает callback "ret_func_" //! \~russian Метод вызывается после каждого успешного потокового чтения, по умолчанию вызывает callback "ret_func_"
virtual bool threadedRead(const uchar * readed, ssize_t size); virtual bool threadedRead(const uchar * readed, ssize_t size);
//! \~english Function executed after PIThread::stop() and PIThread::interrupt() of read thread
//! \~russian Метод вызывается после PIThread::stop() и PIThread::interrupt() потока чтения
virtual void stopThreadedReadDevice() {}
//! \~english Reimplement to construct full unambiguous string, describes this device. //! \~english Reimplement to construct full unambiguous string, describes this device.
//! Default implementation returns \a path() //! Default implementation returns \a path()
@@ -549,10 +545,6 @@ protected:
bool opened_ = false; bool opened_ = false;
void * ret_data_ = nullptr; void * ret_data_ = nullptr;
//! \~english Set this flag while blocking operations
//! \~russian Устанавливайте этот флаг во время блокирующих операций
std::atomic_bool reading_now;
private: private:
EVENT_HANDLER(void, read_func); EVENT_HANDLER(void, read_func);
EVENT_HANDLER(void, write_func); EVENT_HANDLER(void, write_func);

View File

@@ -814,7 +814,7 @@ ssize_t PISerial::readDevice(void * read_to, ssize_t max_size) {
if (sending) return -1; if (sending) return -1;
//piCoutObj << "read ..." << PRIVATE->hCom; //piCoutObj << "read ..." << PRIVATE->hCom;
memset(&(PRIVATE->overlap), 0, sizeof(PRIVATE->overlap)); memset(&(PRIVATE->overlap), 0, sizeof(PRIVATE->overlap));
PRIVATE->overlap.hEvent = PRIVATE->event.event; PRIVATE->overlap.hEvent = PRIVATE->event.getEvent();
ReadFile(PRIVATE->hCom, read_to, max_size, NULL, &(PRIVATE->overlap)); ReadFile(PRIVATE->hCom, read_to, max_size, NULL, &(PRIVATE->overlap));
PRIVATE->readed = 0; PRIVATE->readed = 0;
if (PRIVATE->event.wait()) { if (PRIVATE->event.wait()) {
@@ -860,7 +860,7 @@ ssize_t PISerial::writeDevice(const void * data, ssize_t max_size) {
//piCoutObj << "send ..." << max_size;// << ": " << PIString((char*)data, max_size); //piCoutObj << "send ..." << max_size;// << ": " << PIString((char*)data, max_size);
sending = true; sending = true;
memset(&(PRIVATE->overlap_write), 0, sizeof(PRIVATE->overlap_write)); memset(&(PRIVATE->overlap_write), 0, sizeof(PRIVATE->overlap_write));
PRIVATE->overlap_write.hEvent = PRIVATE->event_write.event; PRIVATE->overlap_write.hEvent = PRIVATE->event_write.getEvent();
WriteFile(PRIVATE->hCom, data, max_size, NULL, &(PRIVATE->overlap_write)); WriteFile(PRIVATE->hCom, data, max_size, NULL, &(PRIVATE->overlap_write));
if (PRIVATE->event_write.wait()) { if (PRIVATE->event_write.wait()) {
GetOverlappedResult(PRIVATE->hCom, &(PRIVATE->overlap_write), &wrote, FALSE); GetOverlappedResult(PRIVATE->hCom, &(PRIVATE->overlap_write), &wrote, FALSE);