migrate to async IO model

new PIIODevice::interrupt() virtual method
new PIWaitEvent private class
PIEthernet and PISerial basically tested on Windows and Linux
This commit is contained in:
2022-11-05 23:43:07 +03:00
parent e48d0ebaab
commit 8a5e72c723
15 changed files with 498 additions and 209 deletions

View File

@@ -67,6 +67,7 @@
# endif
# endif
#endif
#include "piwaitevent_p.h"
#include <errno.h>
@@ -212,18 +213,7 @@ PRIVATE_DEFINITION_START(PIEthernet)
sockaddr_in addr_;
sockaddr_in saddr_;
sockaddr_in raddr_;
#ifdef WINDOWS
WSAEVENT read_event = nullptr;
void createEvent() {
closeEvent();
read_event = WSACreateEvent();
}
void closeEvent() {
if (!read_event) return;
WSACloseEvent(read_event);
read_event = nullptr;
}
#endif
PIWaitEvent event;
PRIVATE_DEFINITION_END(PIEthernet)
@@ -252,23 +242,18 @@ PIEthernet::PIEthernet(int sock_, PIString ip_port): PIIODevice("", ReadWrite) {
setParameters(PIEthernet::ReuseAddress | PIEthernet::MulticastLoop);
setType(TCP_Client, false);
setPath(ip_port);
#ifdef WINDOWS
u_long mode = 1;
ioctlsocket(sock, FIONBIO, &mode);
PRIVATE->createEvent();
#endif
ethNonblocking(sock);
PRIVATE->event.create();
//piCoutObj << "new tcp client" << sock_;
}
PIEthernet::~PIEthernet() {
//piCout << "~PIEthernet" << uint(this);
stop();
//piCout << "~PIEthernet";
stopAndWait();
close();
#ifdef WINDOWS
PRIVATE->closeEvent();
#endif
//piCoutObj << "~PIEthernet done";
PRIVATE->event.destroy();
//piCout << "~PIEthernet done";
}
@@ -294,12 +279,11 @@ void PIEthernet::construct() {
bool PIEthernet::init() {
if (isOpened()) return true;
//cout << "init " << type_ << endl;
if (sock != -1) return true;
//piCout << "init " << type();
PRIVATE->event.destroy();
if (sock_s == sock)
sock_s = -1;
#ifdef WINDOWS
PRIVATE->closeEvent();
#endif
closeSocket(sock);
closeSocket(sock_s);
int st = 0, pr = 0;
@@ -312,14 +296,8 @@ bool PIEthernet::init() {
}
PIFlags<Parameters> params = parameters();
sock = ::socket(AF_INET, st, pr);
#ifdef WINDOWS
if (type() != TCP_Server) {
// non-blocking socket
u_long mode = 1;
ioctlsocket(sock, FIONBIO, &mode);
}
PRIVATE->createEvent();
#endif
ethNonblocking(sock);
PRIVATE->event.create();
if (params[SeparateSockets])
sock_s = ::socket(AF_INET, st, pr);
else
@@ -332,7 +310,7 @@ bool PIEthernet::init() {
if (params[PIEthernet::Broadcast]) ethSetsockoptBool(sock, SOL_SOCKET, SO_BROADCAST);
applyTimeouts();
applyOptInt(IPPROTO_IP, IP_TTL, TTL());
// piCoutObj << "inited" << path();
//piCoutObj << "inited" << path();
return true;
}
@@ -386,11 +364,12 @@ PIEthernet::Address PIEthernet::getBroadcast(const PIEthernet::Address & ip, con
bool PIEthernet::openDevice() {
if (connected_) return true;
//piCoutObj << "open";
init();
if (sock == -1 || path().isEmpty()) return false;
addr_r.set(path());
if (type() == TCP_Client)
connecting_ = true;
//if (type() == TCP_Client)
// connecting_ = true;
if (type() != UDP || mode() == PIIODevice::WriteOnly)
return true;
memset(&PRIVATE->addr_, 0, sizeof(PRIVATE->addr_));
@@ -423,24 +402,22 @@ bool PIEthernet::openDevice() {
bool PIEthernet::closeDevice() {
//cout << "close\n";
//piCoutObj << "close";
bool ned = connected_;
connected_ = connecting_ = false;
server_thread_.stop();
PRIVATE->event.interrupt();
if (server_thread_.isRunning()) {
server_thread_.stop();
server_thread_.interrupt();
if (!server_thread_.waitForFinish(1000))
server_thread_.terminate();
}
PRIVATE->event.destroy();
if (sock_s == sock)
sock_s = -1;
closeSocket(sock);
closeSocket(sock_s);
#ifdef WINDOWS
if (PRIVATE->read_event) WSASetEvent(PRIVATE->read_event);
#endif
while (!clients_.isEmpty())
delete clients_.back();
bool ned = connected_;
connected_ = connecting_ = false;
if (ned) disconnected(false);
return true;
}
@@ -591,6 +568,7 @@ bool PIEthernet::connect(bool threaded) {
connecting_ = true;
return true;
}
if (connected_) return false;
if (sock == -1) init();
if (sock == -1) return false;
memset(&PRIVATE->addr_, 0, sizeof(PRIVATE->addr_));
@@ -710,40 +688,21 @@ bool PIEthernet::send(const PIEthernet::Address & addr, const PIByteArray & data
}
void PIEthernet::interrupt() {
//piCout << "interrupt";
PRIVATE->event.interrupt();
}
ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) {
//piCout << "read" << sock;
if (sock == -1) init();
if (sock == -1 || read_to == 0) return -1;
int rs = 0, s = 0, lerr = 0;
sockaddr_in client_addr;
socklen_t slen = sizeof(client_addr);
// piCoutObj << "read from " << ip_ << ":" << port_;
int rs = 0, lerr = 0;
//piCoutObj << "read from " << path() << connecting_;
switch (type()) {
case TCP_SingleTCP:
reading_now = true;
::listen(sock, 64);
reading_now = false;
s = accept(sock, (sockaddr * )&client_addr, &slen);
if (s == -1) {
//piCoutObj << "Can`t accept new connection, " << ethErrorString();
piMinSleep();
return -1;
}
reading_now = true;
rs = ethRecv(s, read_to, max_size);
reading_now = false;
closeSocket(s);
return rs;
case TCP_Client:
if (connecting_) {
#ifdef ANDROID
/*if (sock_s == sock)
sock_s = -1;
closeSocket(sock);
closeSocket(sock_s);
init();
qDebug() << "init() in read thread";*/
#endif
addr_r.set(path());
memset(&PRIVATE->addr_, 0, sizeof(PRIVATE->addr_));
PRIVATE->addr_.sin_port = htons(addr_r.port());
@@ -752,9 +711,9 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) {
#ifdef QNX
PRIVATE->addr_.sin_len = sizeof(PRIVATE->addr_);
#endif
piCout << "connect to " << path() << "...";
piCoutObj << "connect to " << path() << "...";
connected_ = connectTCP();
piCout << "connect to " << path() << connected_;
piCoutObj << "connect to " << path() << connected_;
if (!connected_)
piCoutObj << "Can`t connect to" << addr_r << "," << ethErrorString();
opened_ = connected_;
@@ -772,18 +731,19 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) {
long wr = waitForEvent(FD_READ | FD_CLOSE);
switch (wr) {
case FD_READ:
piCout << "fd_read ...";
//piCout << "fd_read ...";
rs = ethRecv(sock, read_to, max_size);
break;
case FD_CLOSE:
piCout << "fd_close ...";
//piCout << "fd_close ...";
rs = -1;
break;
default: break;
}
}
#else
rs = ethRecv(sock, read_to, max_size);
if (PRIVATE->event.wait(sock))
rs = ethRecv(sock, read_to, max_size);
#endif
//piCoutObj << "readed" << rs;
if (rs <= 0) {
@@ -792,37 +752,39 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) {
#ifdef WINDOWS
if ((lerr == WSAEWOULDBLOCK || lerr == WSAETIMEDOUT) && !parameters()[DisonnectOnTimeout]) {
#elif defined(ANDROID)
if ((lerr == EWOULDBLOCK || lerr == EAGAIN || lerr == EINTR) && !parameters()[DisonnectOnTimeout]) {
if ((lerr == EWOULDBLOCK || lerr == EAGAIN || lerr == EINTR) && !parameters()[DisonnectOnTimeout] && rs < 0) {
#else
if ((lerr == EWOULDBLOCK || lerr == EAGAIN) && !parameters()[DisonnectOnTimeout]) {
if ((lerr == EWOULDBLOCK || lerr == EAGAIN) && !parameters()[DisonnectOnTimeout] && rs < 0) {
#endif
//piCoutObj << errorString();
return -1;
}
if (connected_) {
piCoutObj << "Disconnect on read," << ethErrorString();
//piCoutObj << "Disconnect on read," << ethErrorString();
opened_ = connected_ = false;
closeSocket(sock);
init();
disconnected(rs < 0);
}
if (parameters()[KeepConnection])
if (parameters()[KeepConnection]) {
connect();
}
//piCoutObj << "eth" << ip_ << "disconnected";
}
if (rs > 0) received(read_to, rs);
return rs;
case UDP: {
memset(&PRIVATE->raddr_, 0, sizeof(PRIVATE->raddr_));
piCoutObj << "read from" << path() << "...";
//piCoutObj << "read from" << path() << "...";
#ifdef WINDOWS
long wr = waitForEvent(FD_READ | FD_CLOSE);
switch (wr) {
case FD_READ:
piCout << "fd_read ...";
//piCout << "fd_read ...";
rs = ethRecvfrom(sock, read_to, max_size, 0, (sockaddr*)&PRIVATE->raddr_);
break;
case FD_CLOSE:
piCout << "fd_close ...";
//piCout << "fd_close ...";
rs = -1;
break;
default: break;
@@ -830,7 +792,7 @@ ssize_t PIEthernet::readDevice(void * read_to, ssize_t max_size) {
#else
rs = ethRecvfrom(sock, read_to, max_size, 0, (sockaddr*)&PRIVATE->raddr_);
#endif
piCoutObj << "read from" << path() << rs << "bytes";
//piCoutObj << "read from" << path() << rs << "bytes";
if (rs > 0) {
addr_lr.set(uint(PRIVATE->raddr_.sin_addr.s_addr), ntohs(PRIVATE->raddr_.sin_port));
//piCoutObj << "read from" << ip_r << ":" << port_r << rs << "bytes";
@@ -854,26 +816,6 @@ ssize_t PIEthernet::writeDevice(const void * data, ssize_t max_size) {
//piCoutObj << "sending to " << ip_s << ":" << port_s << " " << max_size << " bytes";
int ret = 0;
switch (type()) {
case TCP_SingleTCP:
memset(&PRIVATE->addr_, 0, sizeof(PRIVATE->addr_));
PRIVATE->addr_.sin_port = htons(addr_s.port());
PRIVATE->addr_.sin_addr.s_addr = addr_s.ip();
PRIVATE->addr_.sin_family = AF_INET;
#ifdef QNX
PRIVATE->addr_.sin_len = sizeof(PRIVATE->addr_);
#endif
//piCoutObj << "connect SingleTCP" << ip_s << ":" << port_s << "...";
if (!connectTCP()) {
//piCoutObj << "Can`t connect to " << ip_s << ":" << port_s << ", " << ethErrorString();
piMinSleep();
return -1;
}
//piCoutObj << "ok, write SingleTCP" << int(data) << max_size << "bytes ...";
ret = ::send(sock, (const char *)data, max_size, 0);
//piCoutObj << "ok, ret" << ret;
closeSocket(sock);
init();
return ret;
case UDP:
PRIVATE->saddr_.sin_port = htons(addr_s.port());
PRIVATE->saddr_.sin_addr.s_addr = addr_s.ip();
@@ -912,6 +854,7 @@ ssize_t PIEthernet::writeDevice(const void * data, ssize_t max_size) {
if (ret < 0) {
piCoutObj << "Disconnect on write," << ethErrorString();
opened_ = connected_ = false;
closeSocket(sock);
init();
disconnected(true);
}
@@ -926,8 +869,7 @@ PIIODevice::DeviceInfoFlags PIEthernet::deviceInfoFlags() const {
switch (type()) {
case UDP: return 0;
case TCP_Client:
case TCP_Server:
case TCP_SingleTCP: return Sequential | Reliable;
case TCP_Server: return Sequential | Reliable;
default: break;
}
return 0;
@@ -960,9 +902,15 @@ void PIEthernet::server_func(void * eth) {
piMSleep(10);
return;
}
#else
if (!ce->PRIVATEWB->event.wait(ce->sock)) {
piMSleep(10);
return;
}
#endif
//piCout << "server" << "accept ...";
int s = accept(ce->sock, (sockaddr * )&client_addr, &slen);
piCout << ethErrorString();
//piCout << "server" << "accept done" << ethErrorString();
if (s == -1) {
int lerr = ethErrorCore();
#ifdef WINDOWS
@@ -1003,44 +951,43 @@ void PIEthernet::setType(Type t, bool reopen) {
bool PIEthernet::connectTCP() {
#ifdef WINDOWS
::connect(sock, (sockaddr * )&(PRIVATE->addr_), sizeof(PRIVATE->addr_));
//piCout << errorString();
#ifdef WINDOWS
long wr = waitForEvent(FD_CONNECT | FD_CLOSE);
switch (wr) {
case FD_CONNECT: {
case FD_CONNECT:
//piCout << "fd_connect ...";
timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 0;
fd_set fd_test;
FD_ZERO(&fd_test);
FD_SET(sock, &fd_test);
::select(0, nullptr, &fd_test, nullptr, &timeout);
return FD_ISSET(sock, &fd_test);
} break;
return ethIsWriteable(sock);
default: break;
}
return false;
#else
return ::connect(sock, (sockaddr * )&(PRIVATE->addr_), sizeof(PRIVATE->addr_)) == 0;
if (PRIVATE->event.wait(sock, PIWaitEvent::CheckWrite)) {
if (ethIsWriteable(sock)) return true;
else {
closeSocket(sock);
init();
}
}
#endif
return false;
}
#ifdef WINDOWS
long PIEthernet::waitForEvent(long mask) {
if (!PRIVATE->read_event || sock < 0) return 0;
WSAEventSelect(sock, PRIVATE->read_event, mask);
DWORD wr = WSAWaitForMultipleEvents(1, &(PRIVATE->read_event), FALSE, WSA_INFINITE, TRUE);
piCout << "wait result" << wr;
if (wr == WSA_WAIT_EVENT_0) {
if (!PRIVATE->event.isCreate() || sock < 0) return 0;
WSAEventSelect(sock, PRIVATE->event.event, mask);
if (PRIVATE->event.wait()) {
//DWORD wr = WSAWaitForMultipleEvents(1, &(PRIVATE->read_event), FALSE, WSA_INFINITE, TRUE);
//piCout << "wait result" << wr;
//if (wr == WSA_WAIT_EVENT_0) {
WSANETWORKEVENTS events;
memset(&events, 0, sizeof(events));
WSAEnumNetworkEvents(sock, PRIVATE->read_event, &events);
WSAResetEvent(PRIVATE->read_event);
WSAEnumNetworkEvents(sock, PRIVATE->event.event, &events);
//piCout << "wait result" << events.lNetworkEvents;
return events.lNetworkEvents;
}
WSAResetEvent(PRIVATE->read_event);
return 0;
}
#endif
@@ -1449,3 +1396,43 @@ int PIEthernet::ethSetsockoptBool(int sock, int level, int optname, bool value)
so = (value ? 1 : 0);
return ethSetsockopt(sock, level, optname, &so, sizeof(so));
}
void PIEthernet::ethNonblocking(int sock) {
if (sock < 0) return;
#ifdef WINDOWS
u_long mode = 1;
ioctlsocket(sock, FIONBIO, &mode);
#else
fcntl(sock, F_SETFL, O_NONBLOCK);
#endif
}
bool PIEthernet::ethIsWriteable(int sock) {
/* fd_set fd_test;
FD_ZERO(&fd_test);
FD_SET(sock, &fd_test);
int fds = 0;
#ifndef WINDOWS
fds = sock + 1;
#endif
timeval timeout;
timeout.tv_sec = timeout.tv_usec = 0;
::select(fds, nullptr, &fd_test, nullptr, &timeout);
return FD_ISSET(sock, &fd_test);*/
#ifdef WINDOWS
fd_set fd_test;
FD_ZERO(&fd_test);
FD_SET(sock, &fd_test);
timeval timeout;
timeout.tv_sec = timeout.tv_usec = 0;
::select(0, nullptr, &fd_test, nullptr, &timeout);
return FD_ISSET(sock, &fd_test);
#else
int ret = 0;
socklen_t len = sizeof(ret);
getsockopt(sock, SOL_SOCKET, SO_ERROR, (char*)&ret, &len);
return ret == 0;
#endif
}