add pause to PIBaseTransfer

need test

git-svn-id: svn://db.shs.com.ru/pip@79 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5
This commit is contained in:
2015-04-08 07:11:09 +00:00
parent 6c987ee1ae
commit 54d6d16464
2 changed files with 145 additions and 102 deletions

View File

@@ -7,7 +7,7 @@ PIBaseTransfer::PIBaseTransfer(): crc(standardCRC_16()), diag(false) {
header.session_id = 0; header.session_id = 0;
packet_header_size = sizeof(PacketHeader) + customHeader().size(); packet_header_size = sizeof(PacketHeader) + customHeader().size();
part_header_size = sizeof(Part) + sizeof(int); part_header_size = sizeof(Part) + sizeof(int);
is_sending = is_receiving = false; is_sending = is_receiving = is_pause = false;
break_ = true; break_ = true;
bytes_all = bytes_cur = 0; bytes_all = bytes_cur = 0;
replies_cnt = send_queue = 0; replies_cnt = send_queue = 0;
@@ -36,11 +36,20 @@ void PIBaseTransfer::stopSend() {
void PIBaseTransfer::stopReceive() { void PIBaseTransfer::stopReceive() {
if (!is_receiving) return; if (!is_receiving) return;
break_ = true; break_ = true;
// piCoutObj << "stopReceive()"; //piCoutObj << "stopReceive()";
finish_receive(false); finish_receive(false);
} }
void PIBaseTransfer::setPause(bool pause_) {
if (is_pause == pause_) return;
pause_tm.reset();
is_pause = pause_;
if (pause_) paused();
else resumed();
}
void PIBaseTransfer::received(PIByteArray data) { void PIBaseTransfer::received(PIByteArray data) {
packet_header_size = sizeof(PacketHeader) + customHeader().size(); packet_header_size = sizeof(PacketHeader) + customHeader().size();
if (data.size() < sizeof(PacketHeader)) { if (data.size() < sizeof(PacketHeader)) {
@@ -54,94 +63,115 @@ void PIBaseTransfer::received(PIByteArray data) {
diag.received(data.size(), false); diag.received(data.size(), false);
return; return;
} else diag.received(data.size(), true); } else diag.received(data.size(), true);
// piCoutObj << "receive" << h.session_id << h.type << h.id; //piCoutObj << "receive" << h.session_id << h.type << h.id;
switch (pt) { switch (pt) {
case pt_Unknown: break; case pt_Unknown: break;
case pt_Data: case pt_Data:
if (h.session_id != header.session_id || !is_receiving) { if (h.session_id != header.session_id || !is_receiving) {
sendBreak(h.session_id); sendBreak(h.session_id);
return; return;
} else {
uint rcrc = h.crc;
uint ccrc = crc.calculate(data.data(), data.size_s());
if (rcrc != ccrc) {
header.id = h.id;
sendReply(pt_ReplyInvalid);
piCoutObj << "invalid CRC";
} else { } else {
processData(h.id, data); uint rcrc = h.crc;
uint ccrc = crc.calculate(data.data(), data.size_s());
if (rcrc != ccrc) {
header.id = h.id;
sendReply(pt_ReplyInvalid);
piCoutObj << "invalid CRC";
} else {
processData(h.id, data);
}
if (is_pause) sendReply(pt_Pause);
} }
}
break; break;
case pt_ReplySuccess: case pt_ReplySuccess:
case pt_ReplyInvalid: case pt_ReplyInvalid:
if (h.session_id != header.session_id) return; if (h.session_id != header.session_id) return;
if (pt == pt_ReplySuccess) { if (is_pause) sendReply(pt_Pause);
send_queue--; if (pt == pt_ReplySuccess) {
if (send_queue < 0) send_queue = 0; send_queue--;
send_tm.reset(); if (send_queue < 0) send_queue = 0;
} send_tm.reset();
replies_cnt++; }
if (is_sending) { replies_cnt++;
if (h.id >= 0 && h.id < replies.size()) if (is_sending) {
replies[h.id] = pt; if (h.id >= 0 && h.id < replies.size())
} replies[h.id] = pt;
if (is_receiving && h.id == 0) { }
if (checkSession() == 0 && pt == pt_ReplySuccess) finish_receive(true); if (is_receiving && h.id == 0) {
// if (checkSession() == 0 && pt == pt_ReplySuccess) { piCoutObj << "Success receive"; finish_receive(true);} if (checkSession() == 0 && pt == pt_ReplySuccess) finish_receive(true);
} //if (checkSession() == 0 && pt == pt_ReplySuccess) { piCoutObj << "Success receive"; finish_receive(true);}
}
break; break;
case pt_Break: case pt_Break:
break_ = true; break_ = true;
// piCoutObj << "BREAK"; // piCoutObj << "BREAK";
if (is_receiving) { if (is_receiving) {
stopReceive(); stopReceive();
return; return;
} }
if (is_sending) { if (is_sending) {
stopSend(); stopSend();
return; return;
} }
break; break;
case pt_Start: case pt_Start:
if (is_sending) { if (is_pause && (is_sending || is_receiving)) {
sendBreak(h.session_id); if (header.session_id == h.session_id) {
return; resumed();
} is_pause = false;
if (is_receiving) { return;
if (header.session_id != h.session_id) { }
// sendBreak(h.session_id); }
// return; if (is_sending && header.session_id != h.session_id) {
// piCoutObj << "restart receive"; sendBreak(h.session_id);
finish_receive(false, true); return;
} else return; }
} if (is_receiving) {
if (data.size() == sizeof(StartRequest)) { if (header.session_id != h.session_id) {
StartRequest sr; //sendBreak(h.session_id);
data >> sr; //return;
bytes_cur = 0; //piCoutObj << "restart receive";
state_string = "start request"; finish_receive(false, true);
bytes_all = sr.size; } else return;
header.session_id = h.session_id; }
header.id = 0; if (data.size() == sizeof(StartRequest)) {
session.clear(); StartRequest sr;
replies.clear(); data >> sr;
session.resize(sr.packets); bytes_cur = 0;
replies.resize(sr.packets + 1); state_string = "start request";
replies.fill(pt_Unknown); bytes_all = sr.size;
diag.reset(); header.session_id = h.session_id;
diag.start(100); header.id = 0;
// piCoutObj << "receiveStarted()"; session.clear();
is_receiving = true; replies.clear();
break_ = false; session.resize(sr.packets);
receiveStarted(); replies.resize(sr.packets + 1);
replies_cnt = send_queue = 0; replies.fill(pt_Unknown);
state_string = "receiving"; diag.reset();
replies[0] = pt_ReplySuccess; diag.start(100);
sendReply(pt_ReplySuccess); //piCoutObj << "receiveStarted()";
} is_receiving = true;
break_ = false;
receiveStarted();
replies_cnt = send_queue = 0;
state_string = "receiving";
replies[0] = pt_ReplySuccess;
sendReply(pt_ReplySuccess);
}
break; break;
default: break; case pt_Pause:
if (header.session_id == h.session_id) {
if (!is_pause && pause_tm.elapsed_s() < timeout_) {
sendReply(pt_Start);
return;
}
if (is_pause && pause_tm.elapsed_s() > timeout_/10) sendReply(pt_Pause);
pause_tm.reset();
if (!is_pause) paused();
is_pause = true;
}
break;
default: break;
} }
} }
@@ -163,9 +193,10 @@ bool PIBaseTransfer::send_process() {
send_tm.reset(); send_tm.reset();
//int ltm = 0; //int ltm = 0;
for (int i = 0; i < session.size_s(); i++) { for (int i = 0; i < session.size_s(); i++) {
if (send_queue >= packets_count) { if (send_queue >= packets_count || is_pause) {
--i; --i;
piMSleep(1); piMSleep(1);
if (is_pause && pause_tm.elapsed_s() > timeout())return finish_send(false);
if (send_tm.elapsed_s() > timeout_) return finish_send(false); if (send_tm.elapsed_s() > timeout_) return finish_send(false);
if (stm.elapsed_s() > timeout_ / 10.) if (stm.elapsed_s() > timeout_ / 10.)
send_queue = 0; send_queue = 0;
@@ -189,6 +220,11 @@ bool PIBaseTransfer::send_process() {
piMSleep(1); piMSleep(1);
continue; continue;
} }
if (is_pause) {
piMSleep(10);
if (pause_tm.elapsed_s() > timeout())return finish_send(false);
else continue;
}
prev_chk = chk; prev_chk = chk;
if (chk > 0) { if (chk > 0) {
if (tm2.elapsed_s() > timeout_ / 10.) { if (tm2.elapsed_s() > timeout_ / 10.) {
@@ -203,7 +239,7 @@ bool PIBaseTransfer::send_process() {
send_queue++; send_queue++;
} }
} }
// if (chk == -1) return finish_send(false); // if (chk == -1) return finish_send(false);
if (break_) return finish_send(false); if (break_) return finish_send(false);
piMSleep(1); piMSleep(1);
} }
@@ -241,12 +277,12 @@ void PIBaseTransfer::buildSession(PIVector<Part> parts) {
for (int i = 0; i < parts.size_s(); i++) { for (int i = 0; i < parts.size_s(); i++) {
state_string = "calculating parts ... " + PIString::fromNumber(i) + " of " + PIString::fromNumber(parts.size()); state_string = "calculating parts ... " + PIString::fromNumber(i) + " of " + PIString::fromNumber(parts.size());
fi.id = parts[i].id; fi.id = parts[i].id;
// piCout << fi.id << state_string; //piCout << fi.id << state_string;
bytes_all += parts[i].size; bytes_all += parts[i].size;
// fi.size = fi.entry.size; //fi.size = fi.entry.size;
fi.start = 0; fi.start = 0;
llong rest = parts[i].size - (packet_size - cur_size); llong rest = parts[i].size - (packet_size - cur_size);
// piCout << i << fi.size << rest << bytes_all; //piCout << i << fi.size << rest << bytes_all;
if (rest <= 0) { if (rest <= 0) {
fi.size = parts[i].size; fi.size = parts[i].size;
lfi << fi; lfi << fi;
@@ -255,7 +291,7 @@ void PIBaseTransfer::buildSession(PIVector<Part> parts) {
fi.size = parts[i].size - rest; fi.size = parts[i].size - rest;
fi_index = 1; fi_index = 1;
fi_prts = 1 + 1 + piMaxi(1, rest / (packet_size - min_size)); fi_prts = 1 + 1 + piMaxi(1, rest / (packet_size - min_size));
// piCout << fi_prts; //piCout << fi_prts;
lfi << fi; lfi << fi;
session << lfi; session << lfi;
lfi.clear(); lfi.clear();
@@ -329,7 +365,7 @@ bool PIBaseTransfer::getStartRequest() {
void PIBaseTransfer::processData(int id, PIByteArray & data) { void PIBaseTransfer::processData(int id, PIByteArray & data) {
// piCoutObj << "received packet" << id << ", size" << data.size(); //piCoutObj << "received packet" << id << ", size" << data.size();
if (id < 1 || id > replies.size_s()) return; if (id < 1 || id > replies.size_s()) return;
if (!session[id - 1].isEmpty()) { if (!session[id - 1].isEmpty()) {
header.id = id; header.id = id;
@@ -348,10 +384,11 @@ void PIBaseTransfer::processData(int id, PIByteArray & data) {
while (!data.isEmpty()) { while (!data.isEmpty()) {
ba.clear(); ba.clear();
data >> fi; data >> fi;
/*if (fi.size > 0) */data >> ba; //if (fi.size > 0)
// fi.fsize = ba.size(); data >> ba;
//fi.fsize = ba.size();
bytes_cur += fi.size; bytes_cur += fi.size;
// piCoutObj << "recv" << fi; //piCoutObj << "recv" << fi;
session[id - 1] << fi; session[id - 1] << fi;
state_string = "receiving..."; state_string = "receiving...";
receivePart(fi, ba, pheader); receivePart(fi, ba, pheader);
@@ -368,13 +405,13 @@ PIByteArray PIBaseTransfer::build_packet(int id) {
PIByteArray ba; PIByteArray ba;
header.id = id + 1; header.id = id + 1;
header.type = pt_Data; header.type = pt_Data;
// piCoutObj << "session id" << header.session_id; //piCoutObj << "session id" << header.session_id;
//ret << header; //ret << header;
ret.append(customHeader()); ret.append(customHeader());
for (int i = 0; i < session[id].size_s(); i++) { for (int i = 0; i < session[id].size_s(); i++) {
Part fi = session[id][i]; Part fi = session[id][i];
ret << fi; ret << fi;
// piCout << "biuld" << fi; //piCout << "biuld" << fi;
ba = buildPacket(fi); ba = buildPacket(fi);
bytes_cur += ba.size(); bytes_cur += ba.size();
if (ba.size() != fi.size) piCoutObj << "***error while build packet, wrong part size"; if (ba.size() != fi.size) piCoutObj << "***error while build packet, wrong part size";
@@ -383,7 +420,7 @@ PIByteArray PIBaseTransfer::build_packet(int id) {
header.crc = crc.calculate(ret); header.crc = crc.calculate(ret);
PIByteArray hdr; hdr << header; PIByteArray hdr; hdr << header;
ret.insert(0, hdr); ret.insert(0, hdr);
// piCoutObj << "Packet" << header.id << ret.size(); //piCoutObj << "Packet" << header.id << ret.size();
return ret; return ret;
} }
@@ -392,7 +429,7 @@ bool PIBaseTransfer::finish_send(bool ok) {
is_sending = false; is_sending = false;
if (ok) state_string = "send done"; if (ok) state_string = "send done";
else state_string = "send failed"; else state_string = "send failed";
// piCoutObj << state_string << PIString::readableSize(bytes_all); //piCoutObj << state_string << PIString::readableSize(bytes_all);
header.id = 0; header.id = 0;
if (!ok) sendBreak(header.session_id); if (!ok) sendBreak(header.session_id);
else sendReply(pt_ReplySuccess); else sendReply(pt_ReplySuccess);
@@ -407,7 +444,7 @@ void PIBaseTransfer::finish_receive(bool ok, bool quet) {
is_receiving = false; is_receiving = false;
if (ok) state_string = "receive done"; if (ok) state_string = "receive done";
else state_string = "receive failed"; else state_string = "receive failed";
// piCoutObj << state_string;// << PIString::readableSize(bytes_all); //piCoutObj << state_string;// << PIString::readableSize(bytes_all);
if (!ok && !quet) sendBreak(header.session_id); if (!ok && !quet) sendBreak(header.session_id);
receiveFinished(ok); receiveFinished(ok);
diag.stop(); diag.stop();
@@ -418,7 +455,7 @@ void PIBaseTransfer::finish_receive(bool ok, bool quet) {
void PIBaseTransfer::diagChanged(PIDiagnostics::Quality new_quality, PIDiagnostics::Quality old_quality) { void PIBaseTransfer::diagChanged(PIDiagnostics::Quality new_quality, PIDiagnostics::Quality old_quality) {
if (is_receiving) { if (is_receiving) {
if (new_quality == PIDiagnostics::Failure) { if (new_quality == PIDiagnostics::Failure) {
piCout << "disconnected!"; //piCout << "disconnected!";
stopReceive(); stopReceive();
} }
} }

View File

@@ -35,6 +35,8 @@ public:
bool isSending() const {return is_sending;} bool isSending() const {return is_sending;}
bool isReceiving() const {return is_receiving;} bool isReceiving() const {return is_receiving;}
bool isPause() const {return is_pause;}
void setPause(bool pause_);
void setPacketSize(int size) {packet_size = size;} void setPacketSize(int size) {packet_size = size;}
int packetSize() const {return packet_size;} int packetSize() const {return packet_size;}
@@ -51,15 +53,19 @@ public:
const PIDiagnostics &diagnostic() {return diag;} const PIDiagnostics &diagnostic() {return diag;}
EVENT(receiveStarted) EVENT(receiveStarted)
EVENT(paused)
EVENT(resumed)
EVENT1(receiveFinished, bool, ok) EVENT1(receiveFinished, bool, ok)
EVENT(sendStarted) EVENT(sendStarted)
EVENT1(sendFinished, bool, ok) EVENT1(sendFinished, bool, ok)
EVENT1(sendRequest, PIByteArray &, data) EVENT1(sendRequest, PIByteArray &, data)
EVENT_HANDLER1(void, received, PIByteArray, data); EVENT_HANDLER1(void, received, PIByteArray, data);
EVENT_HANDLER(void, pause) {setPause(true);}
EVENT_HANDLER(void, resume) {setPause(false);}
protected: protected:
uint packet_header_size, part_header_size; uint packet_header_size, part_header_size;
bool break_, is_sending, is_receiving; bool break_, is_sending, is_receiving, is_pause;
PIString state_string; PIString state_string;
llong bytes_all, bytes_cur; llong bytes_all, bytes_cur;
@@ -71,7 +77,7 @@ protected:
private: private:
enum PacketType {pt_Unknown, pt_Data, pt_ReplySuccess, pt_ReplyInvalid, pt_Break, pt_Start}; enum PacketType {pt_Unknown, pt_Data, pt_ReplySuccess, pt_ReplyInvalid, pt_Break, pt_Start, pt_Pause};
# pragma pack(push,1) # pragma pack(push,1)
struct StartRequest { struct StartRequest {
@@ -88,7 +94,7 @@ private:
double timeout_; double timeout_;
PIVector<PIVector<Part> > session; PIVector<PIVector<Part> > session;
PIVector<PacketType> replies; PIVector<PacketType> replies;
PITimeMeasurer send_tm; PITimeMeasurer send_tm, pause_tm;
PacketHeader header; PacketHeader header;
CRC_16 crc; CRC_16 crc;
int replies_cnt, send_queue; int replies_cnt, send_queue;