git-svn-id: svn://db.shs.com.ru/pip@13 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5
This commit is contained in:
2015-03-10 13:23:16 +00:00
parent 1563d4827d
commit 79aef34c24
3 changed files with 44 additions and 28 deletions

View File

@@ -88,7 +88,7 @@ public:
CONNECTU(&ft, receiveFilesFinished, this, ftevent); CONNECTU(&ft, receiveFilesFinished, this, ftevent);
CONNECTU(&eth, threadedReadEvent, this, received); CONNECTU(&eth, threadedReadEvent, this, received);
start(50); start(50);
eth.open(); eth.setParameter(PIEthernet::SeparateSockets);
eth.startThreadedRead(); eth.startThreadedRead();
} }

View File

@@ -10,7 +10,9 @@ PIBaseTransfer::PIBaseTransfer(): crc(standardCRC_16()) {
is_sending = is_receiving = false; is_sending = is_receiving = false;
break_ = true; break_ = true;
bytes_all = bytes_cur = 0; bytes_all = bytes_cur = 0;
timeout_ = 1.; replies_cnt = 0;
timeout_ = 10.;
packets_count = 32;
setPacketSize(4096); setPacketSize(4096);
srand(PISystemTime::current().toMilliseconds()); srand(PISystemTime::current().toMilliseconds());
} }
@@ -38,15 +40,14 @@ void PIBaseTransfer::stopReceive() {
void PIBaseTransfer::received(PIByteArray& data) { void PIBaseTransfer::received(PIByteArray& data) {
packet_header_size = sizeof(PacketHeader) + customHeader().size(); packet_header_size = sizeof(PacketHeader) + customHeader().size();
// piCoutObj << "r" << data.size() << sizeof(PacketHeader); // piCoutObj << "receive" << data.size();
if (data.size() < sizeof(PacketHeader)) return; if (data.size() < sizeof(PacketHeader)) return;
PacketHeader h; PacketHeader h;
data >> h; data >> h;
PacketType pt = (PacketType)h.type; PacketType pt = (PacketType)h.type;
// piCoutObj << "receive" << h.session_id << h.type << h.id; // piCoutObj << "receive" << h.session_id << h.type << h.id;
switch (pt) { switch (pt) {
case pt_Unknown: case pt_Unknown: break;
break;
case pt_Data: case pt_Data:
if (h.session_id != header.session_id || !is_receiving) { if (h.session_id != header.session_id || !is_receiving) {
sendBreak(h.session_id); sendBreak(h.session_id);
@@ -66,6 +67,7 @@ void PIBaseTransfer::received(PIByteArray& data) {
case pt_ReplySuccess: case pt_ReplySuccess:
case pt_ReplyInvalid: case pt_ReplyInvalid:
if (h.session_id != header.session_id) return; if (h.session_id != header.session_id) return;
replies_cnt++;
if (is_sending) { if (is_sending) {
if (h.id >= 0 && h.id < replies.size()) if (h.id >= 0 && h.id < replies.size())
replies[h.id] = pt; replies[h.id] = pt;
@@ -111,13 +113,13 @@ void PIBaseTransfer::received(PIByteArray& data) {
is_receiving = true; is_receiving = true;
break_ = false; break_ = false;
receiveStarted(); receiveStarted();
replies_cnt = 0;
state_string = "receiving"; state_string = "receiving";
replies[0] = pt_ReplySuccess; replies[0] = pt_ReplySuccess;
sendReply(pt_ReplySuccess); sendReply(pt_ReplySuccess);
} }
break; break;
default: default: break;
break;
} }
} }
@@ -131,24 +133,40 @@ bool PIBaseTransfer::send_process() {
replies.fill(pt_Unknown); replies.fill(pt_Unknown);
PIByteArray ba; PIByteArray ba;
if (!getStartRequest()) return finish_send(false); if (!getStartRequest()) return finish_send(false);
replies_cnt = 0;
state_string = "sending"; state_string = "sending";
PITimeMeasurer stm;
int ltm = 0;
for (int i = 0; i < session.size_s(); i++) { for (int i = 0; i < session.size_s(); i++) {
if (i - replies_cnt >= packets_count) {
--i;
piMSleep(1);
if (ltm > 10) return finish_send(false);
if (stm.elapsed_s() > timeout_/10) {
replies_cnt = i;
ltm++;
}
continue;
}
stm.reset();
ba = build_packet(i); ba = build_packet(i);
sendRequest(ba); sendRequest(ba);
if (break_) return finish_send(false); if (break_) return finish_send(false);
} }
PITimeMeasurer tm; PITimeMeasurer tm2, tm;
int prev_chk = 0; int prev_chk = 0;
while (tm.elapsed_s() < timeout_) { while (tm.elapsed_s() < timeout_) {
int chk = checkSession(); int chk = checkSession();
if (chk != prev_chk) tm.reset(); if (chk != prev_chk) tm.reset();
if (chk == 0) return finish_send(true); if (chk == 0) return finish_send(true);
if (chk > 0) { if (chk > 0) {
// piCoutObj << "recovery packet" << chk; if (tm2.elapsed_s() > timeout_/10) {
// piCoutObj << "recovery packet" << chk;
piMSleep(1); piMSleep(1);
ba = build_packet(chk - 1); ba = build_packet(chk - 1);
sendRequest(ba); sendRequest(ba);
} }
}
// if (chk == -1) return finish_send(false); // if (chk == -1) return finish_send(false);
if (break_) return finish_send(false); if (break_) return finish_send(false);
prev_chk = chk; prev_chk = chk;
@@ -258,18 +276,15 @@ bool PIBaseTransfer::getStartRequest() {
ba << header; ba << header;
ba << st; ba << st;
state_string = "send request"; state_string = "send request";
for (int i = 0; i < 5; i++) {
tm.reset();
sendRequest(ba);
while (tm.elapsed_s() < timeout_) { while (tm.elapsed_s() < timeout_) {
sendRequest(ba);
if (break_) return false; if (break_) return false;
//piCoutObj << send_replyes[0]; //piCoutObj << send_replyes[0];
if (replies[0] == pt_ReplySuccess) { if (replies[0] == pt_ReplySuccess) {
state_string = "send permited!"; state_string = "send permited!";
return true; return true;
} }
piMSleep(10); piMSleep(100);
}
} }
return false; return false;
} }
@@ -315,7 +330,6 @@ PIByteArray PIBaseTransfer::build_packet(int id) {
PIByteArray ba; PIByteArray ba;
header.id = id + 1; header.id = id + 1;
header.type = pt_Data; header.type = pt_Data;
// piCoutObj << "Packet" << header.id;
// piCoutObj << "session id" << header.session_id; // piCoutObj << "session id" << header.session_id;
//ret << header; //ret << header;
ret.append(customHeader()); ret.append(customHeader());
@@ -331,6 +345,7 @@ PIByteArray PIBaseTransfer::build_packet(int id) {
header.crc = crc.calculate(ret); header.crc = crc.calculate(ret);
PIByteArray hdr; hdr << header; PIByteArray hdr; hdr << header;
ret.insert(0, hdr); ret.insert(0, hdr);
// piCoutObj << "Packet" << header.id << ret.size();
return ret; return ret;
} }

View File

@@ -82,12 +82,13 @@ private:
friend PIByteArray & operator <<(PIByteArray & s, const PIBaseTransfer::StartRequest & v); friend PIByteArray & operator <<(PIByteArray & s, const PIBaseTransfer::StartRequest & v);
static const uint signature; static const uint signature;
int packet_size; int packet_size, packets_count;
double timeout_; double timeout_;
PIVector<PIVector<Part> > session; PIVector<PIVector<Part> > session;
PIVector<PacketType> replies; PIVector<PacketType> replies;
PacketHeader header; PacketHeader header;
CRC_16 crc; CRC_16 crc;
int replies_cnt;
void processData(int id, PIByteArray &data); void processData(int id, PIByteArray &data);
PIByteArray build_packet(int id); PIByteArray build_packet(int id);