back to polygonf

git-svn-id: svn://db.shs.com.ru/pip@105 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5
This commit is contained in:
2015-04-17 07:19:36 +00:00
parent 022b76bc29
commit 353dbedf77
25 changed files with 143 additions and 106 deletions

View File

@@ -35,7 +35,7 @@ void PIBaseTransfer::stopSend() {
void PIBaseTransfer::stopReceive() {
if (!is_receiving) return;
break_ = true;
piCoutObj << "stopReceive()";
//piCoutObj << "stopReceive()";
finish_receive(false);
}
@@ -62,7 +62,7 @@ void PIBaseTransfer::received(PIByteArray data) {
diag.received(data.size(), false);
return;
} else diag.received(data.size(), true);
piCoutObj << "receive" << h.session_id << h.type << h.id;
//piCoutObj << "receive" << h.session_id << h.type << h.id;
switch (pt) {
case pt_Unknown: break;
case pt_Data:
@@ -75,7 +75,7 @@ void PIBaseTransfer::received(PIByteArray data) {
if (rcrc != ccrc) {
header.id = h.id;
sendReply(pt_ReplyInvalid);
piCoutObj << "invalid CRC";
//piCoutObj << "invalid CRC";
} else {
processData(h.id, data);
}
@@ -103,7 +103,7 @@ void PIBaseTransfer::received(PIByteArray data) {
break;
case pt_Break:
break_ = true;
piCoutObj << "BREAK";
//piCoutObj << "BREAK";
if (is_receiving) {
stopReceive();
return;
@@ -129,7 +129,7 @@ void PIBaseTransfer::received(PIByteArray data) {
if (header.session_id != h.session_id) {
//sendBreak(h.session_id);
//return;
piCoutObj << "restart receive";
//piCoutObj << "restart receive";
finish_receive(false, true);
} else return;
}
@@ -148,7 +148,7 @@ void PIBaseTransfer::received(PIByteArray data) {
replies.fill(pt_Unknown);
diag.reset();
diag.start(100);
piCoutObj << "receiveStarted()";
//piCoutObj << "receiveStarted()";
is_receiving = true;
break_ = false;
receiveStarted();
@@ -160,16 +160,16 @@ void PIBaseTransfer::received(PIByteArray data) {
break;
case pt_Pause:
if (header.session_id == h.session_id) {
piCout << "receive pause";
//piCout << "receive pause";
if (!is_pause && pause_tm.elapsed_s() < timeout_/10) {
piCout << "resume";
//piCout << "resume";
sendReply(pt_Start);
return;
}
if (!is_pause) paused();
is_pause = true;
if (is_receiving && pause_tm.elapsed_m() > 40) {
piCout << "send pause";
//piCout << "send pause";
sendReply(pt_Pause);
}
if (is_sending) send_tm.reset();
@@ -203,7 +203,7 @@ bool PIBaseTransfer::send_process() {
piMSleep(1);
if (is_pause) {
piMSleep(40);
piCout << "send pause";
//piCout << "send pause";
sendReply(pt_Pause);
if (pause_tm.elapsed_s() > timeout())return finish_send(false);
}
@@ -232,7 +232,7 @@ bool PIBaseTransfer::send_process() {
}
if (is_pause) {
piMSleep(40);
piCout << "send pause";
//piCout << "send pause";
sendReply(pt_Pause);
if (pause_tm.elapsed_s() > timeout())return finish_send(false);
else continue;
@@ -240,7 +240,7 @@ bool PIBaseTransfer::send_process() {
prev_chk = chk;
if (chk > 0) {
if (tm2.elapsed_s() > timeout_ / 10.) {
piCoutObj << "recovery packet" << chk;
//piCoutObj << "recovery packet" << chk;
if (send_queue >= packets_count) {
piMSleep(10);
continue;
@@ -270,7 +270,7 @@ int PIBaseTransfer::checkSession() {
if (replies[i] != pt_ReplySuccess) return i;
}
if (miss > 0) {
piCoutObj << "missing" << miss << "packets";
//piCoutObj << "missing" << miss << "packets";
return -miss;
} else return 0;
}
@@ -289,12 +289,12 @@ void PIBaseTransfer::buildSession(PIVector<Part> parts) {
for (int i = 0; i < parts.size_s(); i++) {
state_string = "calculating parts ... " + PIString::fromNumber(i) + " of " + PIString::fromNumber(parts.size());
fi.id = parts[i].id;
piCout << fi.id << state_string;
//piCout << fi.id << state_string;
bytes_all += parts[i].size;
//fi.size = fi.entry.size;
fi.start = 0;
llong rest = parts[i].size - (packet_size - cur_size);
piCout << i << fi.size << rest << bytes_all;
//piCout << i << fi.size << rest << bytes_all;
if (rest <= 0) {
fi.size = parts[i].size;
lfi << fi;
@@ -334,7 +334,7 @@ void PIBaseTransfer::buildSession(PIVector<Part> parts) {
void PIBaseTransfer::sendBreak(int session_id) {
piCoutObj << "sendBreak";
//piCoutObj << "sendBreak";
uint psid = header.session_id;
header.session_id = session_id;
sendReply(pt_Break);
@@ -343,7 +343,7 @@ void PIBaseTransfer::sendBreak(int session_id) {
void PIBaseTransfer::sendReply(PacketType reply) {
piCoutObj << "sendReply" << reply;
//piCoutObj << "sendReply" << reply;
header.type = reply;
PIByteArray ba;
ba << header;
@@ -379,7 +379,7 @@ bool PIBaseTransfer::getStartRequest() {
void PIBaseTransfer::processData(int id, PIByteArray & data) {
piCoutObj << "received packet" << id << ", size" << data.size();
//piCoutObj << "received packet" << id << ", size" << data.size();
if (id < 1 || id > replies.size_s()) return;
if (!session[id - 1].isEmpty()) {
header.id = id;
@@ -402,7 +402,7 @@ void PIBaseTransfer::processData(int id, PIByteArray & data) {
data >> ba;
//fi.fsize = ba.size();
bytes_cur += fi.size;
piCoutObj << "recv" << fi;
//piCoutObj << "recv" << fi;
session[id - 1] << fi;
state_string = "receiving...";
receivePart(fi, ba, pheader);
@@ -419,13 +419,13 @@ PIByteArray PIBaseTransfer::build_packet(int id) {
PIByteArray ba;
header.id = id + 1;
header.type = pt_Data;
piCoutObj << "session id" << header.session_id;
//piCoutObj << "session id" << header.session_id;
//ret << header;
ret.append(customHeader());
for (int i = 0; i < session[id].size_s(); i++) {
Part fi = session[id][i];
ret << fi;
piCout << "biuld" << fi;
//piCout << "biuld" << fi;
ba = buildPacket(fi);
bytes_cur += ba.size();
if (ba.size() != fi.size) piCoutObj << "***error while build packet, wrong part size";
@@ -434,7 +434,7 @@ PIByteArray PIBaseTransfer::build_packet(int id) {
header.crc = crc.calculate(ret);
PIByteArray hdr; hdr << header;
ret.insert(0, hdr);
piCoutObj << "Packet" << header.id << ret.size();
//piCoutObj << "Packet" << header.id << ret.size();
return ret;
}
@@ -443,7 +443,7 @@ bool PIBaseTransfer::finish_send(bool ok) {
is_sending = false;
if (ok) state_string = "send done";
else state_string = "send failed";
piCoutObj << state_string << PIString::readableSize(bytes_all);
//piCoutObj << state_string << PIString::readableSize(bytes_all);
header.id = 0;
if (!ok) sendBreak(header.session_id);
else sendReply(pt_ReplySuccess);
@@ -458,7 +458,7 @@ void PIBaseTransfer::finish_receive(bool ok, bool quet) {
is_receiving = false;
if (ok) state_string = "receive done";
else state_string = "receive failed";
piCoutObj << state_string << PIString::readableSize(bytes_all);
//piCoutObj << state_string << PIString::readableSize(bytes_all);
if (!ok && !quet) sendBreak(header.session_id);
receiveFinished(ok);
diag.stop();
@@ -469,7 +469,7 @@ void PIBaseTransfer::finish_receive(bool ok, bool quet) {
void PIBaseTransfer::diagChanged(PIDiagnostics::Quality new_quality, PIDiagnostics::Quality old_quality) {
if (is_receiving) {
if (new_quality == PIDiagnostics::Failure) {
piCout << "disconnected!";
//piCout << "disconnected!";
break_ = true;
is_receiving = false;
state_string = "receive failed";

View File

@@ -274,9 +274,9 @@ PIVector<PIFile::FileInfo> PIDir::entries() {
# endif
for (int i = 0; i < cnt; ++i) {
l << PIFile::fileInfo(dp + PIString(list[i]->d_name));
delete list[i];
free(list[i]);
}
delete list;
free(list);
#endif
return l;
}

View File

@@ -532,13 +532,14 @@ int PIEthernet::read(void * read_to, int max_size) {
case UDP:
memset(&PRIVATE->raddr_, 0, sizeof(PRIVATE->raddr_));
rs = ethRecvfrom(sock, read_to, max_size, 0, (sockaddr*)&PRIVATE->raddr_);
//piCout << "eth" << path() << "read return" << rs << errorString();
if (rs > 0) {
port_r = ntohs(PRIVATE->raddr_.sin_port);
ip_r = PIStringAscii(inet_ntoa(PRIVATE->raddr_.sin_addr));
//piCoutObj << "read from" << ip_r << ":" << port_r << rs << "bytes";
//piCout << "received from" << lastReadAddress();
received(read_to, rs);
}
//else piCoutObj << "read returt" << rs << ", error" << ethErrorString();
return rs;
//return ::read(sock, read_to, max_size);
default: break;
@@ -582,7 +583,7 @@ int PIEthernet::write(const void * data, int max_size) {
/*if (params[PIEthernet::Broadcast]) PRIVATE->saddr_.sin_addr.s_addr = INADDR_BROADCAST;
else*/ PRIVATE->saddr_.sin_addr.s_addr = inet_addr(ip_s.dataAscii());
PRIVATE->saddr_.sin_family = AF_INET;
//piCout << "[PIEth] write to" << ip_s << ":" << port_s << "socket" << sock_s << max_size << "bytes ...";
//piCoutObj << "write to" << ip_s << ":" << port_s << "socket" << sock_s << max_size << "bytes ...";
return ethSendto(sock_s, data, max_size, 0, (sockaddr * )&PRIVATE->saddr_, sizeof(PRIVATE->saddr_));
//piCout << "[PIEth] write to" << ip_s << ":" << port_s << "ok";
case TCP_Client:
@@ -819,7 +820,7 @@ PIEthernet::InterfaceList PIEthernet::interfaces() {
ifc.ifc_buf = new char[ifc.ifc_len];
if (ioctl(s, SIOCGIFCONF, &ifc) < 0) {
piCout << "[PIEthernet] Can`t get interfaces:" << errorString();
delete ifc.ifc_buf;
delete[] ifc.ifc_buf;
return il;
}
int icnt = ifc.ifc_len / sizeof(ifreq);

View File

@@ -487,7 +487,7 @@ bool PIFile::applyFileInfo(const PIString & path, const PIFile::FileInfo & info)
if (!info.perm_user.write) attr |= FILE_ATTRIBUTE_READONLY;
if (SetFileAttributes((LPCTSTR)(fp.data()), attr) == 0) {
piCout << "[PIFile] applyFileInfo: \"SetFileAttributes\" error:" << errorString();
return false;
//return false;
}
HANDLE hFile = 0;
if ((attr & FILE_ATTRIBUTE_DIRECTORY) == FILE_ATTRIBUTE_DIRECTORY) {
@@ -515,11 +515,11 @@ bool PIFile::applyFileInfo(const PIString & path, const PIFile::FileInfo & info)
if (info.perm_other.exec) mode |= S_IXOTH;
if (chmod(fp.data(), mode) != 0) {
piCout << "[PIFile] applyFileInfo: \"chmod\" error:" << errorString();
return false;
//return false;
}
if (chown(fp.data(), info.id_user, info.id_group) != 0) {
piCout << "[PIFile] applyFileInfo: \"chown\" error:" << errorString();
return false;
//return false;
}
struct timeval tm[2];
PISystemTime st = info.time_access.toSystemTime();
@@ -530,7 +530,7 @@ bool PIFile::applyFileInfo(const PIString & path, const PIFile::FileInfo & info)
tm[1].tv_usec = st.nanoseconds / 1000;
if (utimes(fp.data(), tm) != 0) {
piCout << "[PIFile] applyFileInfo: \"utimes\" error:" << errorString();
return false;
//return false;
}
#endif
return true;

View File

@@ -243,7 +243,7 @@ PIByteArray PIIODevice::readForTime(double timeout_ms) {
if (ret <= 0) msleep(1);
else str.append(td, ret);
}
delete td;
delete[] td;
return str;
}

View File

@@ -43,7 +43,9 @@ PIPeer::PeerData::PeerData(const PIString & n): PIObject(n) {
PIPeer::PeerData::~PeerData() {
t.stop();
dt_in.stop();
dt_out.stop();
t.stop(true);
}
@@ -55,6 +57,7 @@ void PIPeer::PeerData::dtThread() {
bool PIPeer::PeerData::send(const PIByteArray & d) {
piCout << "send ..." << t.isRunning();
if (t.isRunning()) return false;
data = d;
t.startOnce();
@@ -335,7 +338,7 @@ bool PIPeer::sendInternal(const PIString & to, const PIByteArray & data) {
}
PIByteArray ba;
ba << int(4) << self_info.name << to << int(0) << data;
//piCoutObj << "sendInternal" << to << data.size_s() << int(data.front());
piCoutObj << "sendInternal to" << to << data.size_s() << int(data.front());
if (!sendToNeighbour(dp, ba)) return false;
return true;
}
@@ -355,7 +358,7 @@ bool PIPeer::dataRead(uchar * readed, int size) {
PIString from, to;
ba >> type;
PIMutexLocker locker(eth_mutex);
//piCout << "[PIPeer \"" + name_ + "\"] Received packet" << type;
piCoutObj << "received data from" << from << "packet" << type;
if (type == 5) { // ping request
PIString addr;
PISystemTime time;
@@ -456,7 +459,7 @@ bool PIPeer::mbcastRead(uchar * data, int size) {
if (type <= 0 || type >= 4) return true;
PeerInfo pi;
ba >> pi.name;
//piCout << "read type" << type << "from" << pi.name;
piCoutObj << "received mb from" << pi.name << "packet" << type;
if (pi.name == self_info.name) return true;
PIMutexLocker locker(mc_mutex);
diag_s.received(size);
@@ -743,6 +746,11 @@ void PIPeer::checkNetwork() {
PIEthernet::InterfaceList ifaces = PIEthernet::interfaces();
if (prev_ifaces == ifaces) return;
prev_ifaces = ifaces;
reinit();
}
void PIPeer::reinit() {
PIMutexLocker mbl(mc_mutex);
PIMutexLocker ethl(eth_mutex);
PIMutexLocker pl(peers_mutex);

View File

@@ -129,6 +129,7 @@ public:
const PeerInfo & selfInfo() const {return self_info;}
const PIMap<PIString, PIVector<PeerInfo * > > & _peerMap() const {return addresses_map;}
void reinit();
void lock() {peers_mutex.lock();}
void unlock() {peers_mutex.unlock();}