threadedRead now const uchar *

pipacketextractor Header mode now more flexible
fix splitTime mode
more refactoring
add virtual override to functions
remove piforeach
replace 0 to nullptr
iterate over pimap via iterators
replace CONNECTU to CONNECT# with compile time check
This commit is contained in:
Бычков Андрей
2022-07-26 17:18:08 +03:00
parent a4882dc054
commit d13e68c206
36 changed files with 615 additions and 623 deletions

View File

@@ -61,11 +61,11 @@ PIPeer::PeerData::PeerData(const PIString & n): PIObject(n) {
dt_out.setPacketSize(_PIPEER_MSG_SIZE);
dt_in.setCRCEnabled(false);
dt_out.setCRCEnabled(false);
CONNECTU(&dt_in, sendRequest, this, dtSendRequestIn);
CONNECTU(&dt_out, sendRequest, this, dtSendRequestOut);
CONNECTU(&dt_in, receiveFinished, this, dtReceiveFinishedIn);
CONNECTU(&dt_out, receiveFinished, this, dtReceiveFinishedOut);
CONNECTU(&t, started, this, dtThread);
CONNECT1(void, PIByteArray &, &dt_in, sendRequest, this, dtSendRequestIn);
CONNECT1(void, PIByteArray &, &dt_out, sendRequest, this, dtSendRequestOut);
CONNECT1(void, bool, &dt_in, receiveFinished, this, dtReceiveFinishedIn);
CONNECT1(void, bool, &dt_out, receiveFinished, this, dtReceiveFinishedOut);
CONNECT0(void, &t, started, this, dtThread);
}
@@ -172,7 +172,7 @@ PIPeer::PIPeer(const PIString & n): PIIODevice(), inited__(false), eth_tcp_srv(P
self_info.dist = 0;
self_info.time = PISystemTime::current();
randomize();
CONNECTU(&sync_timer, tickEvent, this, timerEvent);
CONNECT2(void, void *, int, &sync_timer, tickEvent, this, timerEvent);
prev_ifaces = PIEthernet::interfaces();
no_timer = false;
sync_timer.addDelimiter(5);
@@ -247,7 +247,7 @@ void PIPeer::initEths(PIStringList al) {
eths_traffic << ce;
cint = prev_ifaces.getByAddress(a);
self_info.addresses << PeerInfo::PeerAddress(ce->path(), cint == 0 ? "255.255.255.0" : cint->netmask);
CONNECTU(ce, threadedReadEvent, this, dataRead);
CONNECT2(void, const uchar *, int, ce, threadedReadEvent, this, dataRead);
ce->startThreadedRead();
// piCoutObj << "dc binded to" << ce->path();
// piCoutObj << "add eth" << a;
@@ -282,7 +282,7 @@ void PIPeer::initMBcasts(PIStringList al) {
ce->joinMulticastGroup(_PIPEER_MULTICAST_IP);
if (ce->open()) {
eths_mcast << ce;
CONNECTU(ce, threadedReadEvent, this, mbcastRead);
CONNECT2(void, const uchar *, int, ce, threadedReadEvent, this, mbcastRead);
ce->startThreadedRead();
// piCout << "mcast bind to" << a << ce->sendIP();
} else {
@@ -302,7 +302,7 @@ void PIPeer::initMBcasts(PIStringList al) {
ce->setReadAddress(a, _PIPEER_BROADCAST_PORT);
if (ce->open()) {
eths_bcast << ce;
CONNECTU(ce, threadedReadEvent, this, mbcastRead);
CONNECT2(void, const uchar *, int, ce, threadedReadEvent, this, mbcastRead);
ce->startThreadedRead();
// piCout << "mc BC try" << a << nm << ce->sendIP();
// piCout << "bcast bind to" << a << nm;
@@ -319,7 +319,7 @@ void PIPeer::initMBcasts(PIStringList al) {
eth_lo.setReadAddress("127.0.0.1", p);
if (eth_lo.open()) {
eth_lo.setSendIP("127.0.0.1");
CONNECTU(&eth_lo, threadedReadEvent, this, mbcastRead);
CONNECT2(void, const uchar *, int, &eth_lo, threadedReadEvent, this, mbcastRead);
eth_lo.startThreadedRead();
// piCout << "lo binded to" << eth_lo.readAddress() << eth_lo.sendAddress();
//piCout << "add eth" << ta;
@@ -330,13 +330,13 @@ void PIPeer::initMBcasts(PIStringList al) {
eth_tcp_srv.init();
eth_tcp_srv.listen("0.0.0.0", _PIPEER_TCP_PORT, true);
eth_tcp_srv.setDebug(false);
CONNECTU(&eth_tcp_srv, newConnection, this, newTcpClient);
CONNECT1(void, PIEthernet *, &eth_tcp_srv, newConnection, this, newTcpClient);
eth_tcp_srv.startThreadedRead();
eth_tcp_cli.setName("__S__PIPeer_eth_TCP_Client");
eth_tcp_cli.init();
eth_tcp_cli.setDebug(false);
tcpClientReconnect();
CONNECTU(&eth_tcp_cli, threadedReadEvent, this, mbcastRead);
CONNECT2(void, const uchar *, int, &eth_tcp_cli, threadedReadEvent, this, mbcastRead);
CONNECTU(&eth_tcp_cli, disconnected, this, tcpClientReconnect);
eth_tcp_cli.startThreadedRead();
if (eths_mcast.isEmpty() && eths_bcast.isEmpty() && !eth_lo.isOpened()) piCoutObj << "Warning! Can`t find suitable network interface for multicast receive, check for exists at least one interface with multicasting enabled!";
@@ -444,7 +444,7 @@ void PIPeer::dtReceived(const PIString & from, const PIByteArray & data) {
}
bool PIPeer::dataRead(uchar * readed, int size) {
bool PIPeer::dataRead(const uchar * readed, int size) {
if (destroyed) {
//piCout << "[PIPeer] SegFault";
return true;
@@ -561,7 +561,7 @@ bool PIPeer::dataRead(uchar * readed, int size) {
}
bool PIPeer::mbcastRead(uchar * data, int size) {
bool PIPeer::mbcastRead(const uchar * data, int size) {
if (destroyed) {
//piCout << "[PIPeer] SegFault";
return true;
@@ -765,8 +765,8 @@ void PIPeer::addPeer(const PIPeer::PeerInfo & pd) {
peers << pd;
PeerInfo & p(peers.back());
p.init();
CONNECTU(p._data, sendRequest, this, sendInternal)
CONNECTU(p._data, received, this, dtReceived)
CONNECT2(void, const PIString &, const PIByteArray &, p._data, sendRequest, this, sendInternal)
CONNECT2(void, const PIString &, const PIByteArray &, p._data, received, this, dtReceived)
}
@@ -961,7 +961,7 @@ int PIPeer::writeDevice(const void *data, int size) {
void PIPeer::newTcpClient(PIEthernet *client) {
client->setName("__S__PIPeer_eth_TCP_ServerClient" + client->path());
piCoutObj << "client" << client->path();
CONNECTU(client, threadedReadEvent, this, mbcastRead);
CONNECT2(void, const uchar *, int, client, threadedReadEvent, this, mbcastRead);
client->startThreadedRead();
}
@@ -1009,8 +1009,9 @@ void PIPeer::initNetwork() {
self_info.addresses.clear();
PIVector<PIEthernet::Address> al = PIEthernet::allAddresses();
PIStringList sl;
piForeachC (PIEthernet::Address & a, al)
for (const PIEthernet::Address & a : al) {
sl << a.ipString();
}
initEths(sl);
// piCoutObj << sl << self_info.addresses.size();
sl.removeAll("127.0.0.1");