fix PIFileTransfer

git-svn-id: svn://db.shs.com.ru/pip@73 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5
This commit is contained in:
2015-04-07 13:24:37 +00:00
parent 75eb413856
commit 0f4e2b5f4c
5 changed files with 119 additions and 88 deletions

View File

@@ -1,6 +1,6 @@
project(pip) project(pip)
cmake_minimum_required(VERSION 2.6) cmake_minimum_required(VERSION 2.6)
#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g3") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g3")
include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
include(CheckFunctionExists) include(CheckFunctionExists)

View File

@@ -36,6 +36,7 @@ void PIBaseTransfer::stopSend() {
void PIBaseTransfer::stopReceive() { void PIBaseTransfer::stopReceive() {
if (!is_receiving) return; if (!is_receiving) return;
break_ = true; break_ = true;
// piCoutObj << "stopReceive()";
finish_receive(false); finish_receive(false);
} }
@@ -46,10 +47,13 @@ void PIBaseTransfer::received(PIByteArray data) {
diag.received(data.size(), false); diag.received(data.size(), false);
return; return;
} }
diag.received(data.size(), true);
PacketHeader h; PacketHeader h;
data >> h; data >> h;
PacketType pt = (PacketType)h.type; PacketType pt = (PacketType)h.type;
if (!h.check_sig()) {
diag.received(data.size(), false);
return;
} else diag.received(data.size(), true);
// piCoutObj << "receive" << h.session_id << h.type << h.id; // piCoutObj << "receive" << h.session_id << h.type << h.id;
switch (pt) { switch (pt) {
case pt_Unknown: break; case pt_Unknown: break;
@@ -84,6 +88,7 @@ void PIBaseTransfer::received(PIByteArray data) {
} }
if (is_receiving && h.id == 0) { if (is_receiving && h.id == 0) {
if (checkSession() == 0 && pt == pt_ReplySuccess) finish_receive(true); if (checkSession() == 0 && pt == pt_ReplySuccess) finish_receive(true);
// if (checkSession() == 0 && pt == pt_ReplySuccess) { piCoutObj << "Success receive"; finish_receive(true);}
} }
break; break;
case pt_Break: case pt_Break:
@@ -103,10 +108,13 @@ void PIBaseTransfer::received(PIByteArray data) {
sendBreak(h.session_id); sendBreak(h.session_id);
return; return;
} }
if (header.session_id != h.session_id && is_receiving) { if (is_receiving) {
// sendBreak(h.session_id); if (header.session_id != h.session_id) {
// return; // sendBreak(h.session_id);
finish_receive(false, true); // return;
// piCoutObj << "restart receive";
finish_receive(false, true);
} else return;
} }
if (data.size() == sizeof(StartRequest)) { if (data.size() == sizeof(StartRequest)) {
StartRequest sr; StartRequest sr;
@@ -121,10 +129,11 @@ void PIBaseTransfer::received(PIByteArray data) {
session.resize(sr.packets); session.resize(sr.packets);
replies.resize(sr.packets + 1); replies.resize(sr.packets + 1);
replies.fill(pt_Unknown); replies.fill(pt_Unknown);
is_receiving = true;
break_ = false;
diag.reset(); diag.reset();
diag.start(100); diag.start(100);
// piCoutObj << "receiveStarted()";
is_receiving = true;
break_ = false;
receiveStarted(); receiveStarted();
replies_cnt = send_queue = 0; replies_cnt = send_queue = 0;
state_string = "receiving"; state_string = "receiving";
@@ -140,10 +149,10 @@ void PIBaseTransfer::received(PIByteArray data) {
bool PIBaseTransfer::send_process() { bool PIBaseTransfer::send_process() {
packet_header_size = sizeof(PacketHeader) + customHeader().size(); packet_header_size = sizeof(PacketHeader) + customHeader().size();
break_ = false; break_ = false;
is_sending = true;
diag.reset(); diag.reset();
diag.start(100); diag.start(100);
sendStarted(); sendStarted();
is_sending = true;
replies.resize(session.size() + 1); replies.resize(session.size() + 1);
replies.fill(pt_Unknown); replies.fill(pt_Unknown);
PIByteArray ba; PIByteArray ba;
@@ -203,6 +212,7 @@ bool PIBaseTransfer::send_process() {
int PIBaseTransfer::checkSession() { int PIBaseTransfer::checkSession() {
if (!(is_receiving || is_sending)) return -1;
int miss = 0; int miss = 0;
for (int i = 1; i < replies.size_s(); i++) { for (int i = 1; i < replies.size_s(); i++) {
if (replies[i] != pt_ReplySuccess) miss++; if (replies[i] != pt_ReplySuccess) miss++;
@@ -379,35 +389,38 @@ PIByteArray PIBaseTransfer::build_packet(int id) {
bool PIBaseTransfer::finish_send(bool ok) { bool PIBaseTransfer::finish_send(bool ok) {
is_sending = false;
if (ok) state_string = "send done"; if (ok) state_string = "send done";
else state_string = "send failed"; else state_string = "send failed";
// piCoutObj << state_string << PIString::readableSize(bytes_all); // piCoutObj << state_string << PIString::readableSize(bytes_all);
is_sending = false;
header.id = 0; header.id = 0;
if (!ok) sendBreak(header.session_id); if (!ok) sendBreak(header.session_id);
else sendReply(pt_ReplySuccess); else sendReply(pt_ReplySuccess);
diag.stop();
sendFinished(ok); sendFinished(ok);
diag.stop();
bytes_all = bytes_cur = 0; bytes_all = bytes_cur = 0;
return ok; return ok;
} }
void PIBaseTransfer::finish_receive(bool ok, bool quet) { void PIBaseTransfer::finish_receive(bool ok, bool quet) {
is_receiving = false;
if (ok) state_string = "receive done"; if (ok) state_string = "receive done";
else state_string = "receive failed"; else state_string = "receive failed";
// piCoutObj << state_string << PIString::readableSize(bytes_all); // piCoutObj << state_string;// << PIString::readableSize(bytes_all);
is_receiving = false;
if (!ok && !quet) sendBreak(header.session_id); if (!ok && !quet) sendBreak(header.session_id);
diag.stop();
receiveFinished(ok); receiveFinished(ok);
diag.stop();
bytes_all = bytes_cur = 0; bytes_all = bytes_cur = 0;
} }
void PIBaseTransfer::diagChanged(PIDiagnostics::Quality new_quality, PIDiagnostics::Quality old_quality) { void PIBaseTransfer::diagChanged(PIDiagnostics::Quality new_quality, PIDiagnostics::Quality old_quality) {
if (is_receiving) { if (is_receiving) {
if (new_quality == PIDiagnostics::Failure) stopReceive(); if (new_quality == PIDiagnostics::Failure) {
piCout << "disconnected!";
stopReceive();
}
} }
} }

View File

@@ -64,14 +64,14 @@ bool PIFileTransfer::send(PIVector<PIFile::FileInfo> entries) {
bool PIFileTransfer::sendFiles(const PIVector<PFTFileInfo> &files) { bool PIFileTransfer::sendFiles(const PIVector<PFTFileInfo> &files) {
files_ = files; files_ = files;
PIStringList names; PIStringList names;
// piCoutObj << "prepare to send" << files_.size() << "files"; // piCoutObj << "prepare to send" << files_.size() << "files";
for(int i=0; i<files_.size_s(); i++) { for(int i=0; i<files_.size_s(); i++) {
// files_[i].path = dir.relative(files_[i].path); // files_[i].path = dir.relative(files_[i].path);
if (names.contains(files_[i].path)) {files_.remove(i); i--;} if (names.contains(files_[i].path)) {files_.remove(i); i--;}
else names << files_[i].path; else names << files_[i].path;
if (files_[i].isDir()) files_[i].size = 0; if (files_[i].isDir()) files_[i].size = 0;
// piCout << "prepare" << i << files_[i].path << files_[i].dest_path << files_[i].name(); // piCout << "prepare" << i << files_[i].path << files_[i].dest_path << files_[i].name();
} }
srand(PISystemTime::current().toMilliseconds()); srand(PISystemTime::current().toMilliseconds());
pftheader.session_id = rand(); pftheader.session_id = rand();
sendFilesStarted(); sendFilesStarted();
@@ -82,25 +82,25 @@ bool PIFileTransfer::sendFiles(const PIVector<PFTFileInfo> &files) {
if (!send_process()) return false; if (!send_process()) return false;
pftheader.step = pft_Data; pftheader.step = pft_Data;
PIVector<Part> pts; PIVector<Part> pts;
for (int i=0; i<files_.size_s(); i++) for (int i=0; i<files_.size_s(); i++) {
pts << Part(i+1, files_[i].size); pts << Part(i+1, files_[i].size);
}
buildSession(pts); buildSession(pts);
bool ok = send_process(); bool ok = send_process();
return ok; return ok;
} }
void PIFileTransfer::processFile(int id, ullong start, PIByteArray & data) { void PIFileTransfer::processFile(int id, ullong start, PIByteArray & data) {
piCout << "processFile" << id << files_.size(); // piCout << "processFile" << id << files_.size();
PFTFileInfo fi = files_[id-1]; PFTFileInfo fi = files_[id-1];
bytes_file_all = fi.size; bytes_file_all = fi.size;
bytes_file_cur = start; bytes_file_cur = start;
cur_file_string = fi.dest_path; cur_file_string = fi.dest_path;
PIString path = dir.absolutePath() + dir.separator + fi.dest_path; PIString path = dir.absolutePath() + dir.separator + fi.dest_path;
piCout << "receive" << path << fi.size << data.size(); // piCout << "receive" << path << fi.size << data.size();
if (fi.isDir()) { if (fi.isDir()) {
// piCoutObj << "make dir" << fi.entry.path; // piCoutObj << "make dir" << fi.entry.path;
if (!PIDir::make(path)) { if (!PIDir::make(path)) {
cur_file_string = "ERROR! while create directory " + path; cur_file_string = "ERROR! while create directory " + path;
piCoutObj << cur_file_string; piCoutObj << cur_file_string;
@@ -113,22 +113,22 @@ void PIFileTransfer::processFile(int id, ullong start, PIByteArray & data) {
work_file.close(); work_file.close();
if (!work_file.open(path, PIIODevice::ReadWrite)) { if (!work_file.open(path, PIIODevice::ReadWrite)) {
cur_file_string = "ERROR! while open file " + path; cur_file_string = "ERROR! while open file " + path;
piCoutObj << cur_file_string << "," << errorString(); piCoutObj << cur_file_string;
stopReceive(); stopReceive();
return; return;
} }
if (work_file.size() > fi.size) { if (work_file.size() > fi.size) {
// piCoutObj << "error size" << work_file.size() << fi.size; // piCoutObj << "error size" << work_file.size() << fi.size;
work_file.clear(); work_file.clear();
work_file.resize(fi.size); work_file.resize(fi.size);
// piCoutObj << "correct size" << work_file.size() << fi.size; // piCoutObj << "correct size" << work_file.size() << fi.size;
} }
} }
// piCoutObj << "write file" << path << work_file.path() << work_file.size() << fi.entry.size << work_file.pos() << fi.fstart << fi.fsize; // piCoutObj << "write file" << path << work_file.path() << work_file.size() << fi.entry.size << work_file.pos() << fi.fstart << fi.fsize;
if (work_file.size() < (llong)start) { if (work_file.size() < (llong)start) {
// piCoutObj << "error pos size" << work_file.pos() << fi.fstart; // piCoutObj << "error pos size" << work_file.pos() << fi.fstart;
work_file.resize(start); work_file.resize(start);
// piCoutObj << "correct pos size" << work_file.pos() << fi.fstart; // piCoutObj << "correct pos size" << work_file.pos() << fi.fstart;
} }
if (work_file.size() > fi.size) { if (work_file.size() > fi.size) {
piCoutObj << "****** error size" << work_file.size() << fi.size; piCoutObj << "****** error size" << work_file.size() << fi.size;
@@ -136,7 +136,7 @@ void PIFileTransfer::processFile(int id, ullong start, PIByteArray & data) {
work_file.resize(fi.size); work_file.resize(fi.size);
piCoutObj << "****** correct size" << work_file.size() << fi.size; piCoutObj << "****** correct size" << work_file.size() << fi.size;
} }
// if (fi.fstart != work_file.pos()) piCoutObj << "error pos" << work_file.pos() << fi.fstart; // if (fi.fstart != work_file.pos()) piCoutObj << "error pos" << work_file.pos() << fi.fstart;
work_file.seek(start); work_file.seek(start);
int rs = work_file.write(data.data(), data.size()); int rs = work_file.write(data.data(), data.size());
if (rs != data.size_s()) { if (rs != data.size_s()) {
@@ -150,51 +150,50 @@ void PIFileTransfer::processFile(int id, ullong start, PIByteArray & data) {
PIByteArray PIFileTransfer::buildPacket(Part p) { PIByteArray PIFileTransfer::buildPacket(Part p) {
// piCoutObj << "Step" << pftheader.step; // piCoutObj << "Step" << pftheader.step;
// piCoutObj << "session id" << pftheader.session_id; // piCoutObj << "session id" << pftheader.session_id;
PIByteArray ba; PIByteArray ba;
switch(pftheader.step) { switch(pftheader.step) {
case pft_None: case pft_None:
stopSend(); stopSend();
return PIByteArray(); return PIByteArray();
case pft_Description: case pft_Description:
ba.resize(p.size); ba.resize(p.size);
memcpy(ba.data(), desc.data(p.start), p.size); memcpy(ba.data(), desc.data(p.start), p.size);
return ba; return ba;
case pft_Data: case pft_Data:
// piCout << "send data" << p.id << files_.size(); // piCout << "send data" << p.id << files_.size();
PIFile::FileInfo fi = files_[p.id-1]; PIFile::FileInfo fi = files_[p.id-1];
if (fi.isFile()) { if (fi.isFile()) {
// piCout << "send file" << fi.name() << fi.size; // piCout << "send file" << fi.name() << fi.size;
PIString path = fi.path; PIString path = fi.path;
if (work_file.path() != path || !work_file.isOpened()) { if (work_file.path() != path || !work_file.isOpened()) {
if (!work_file.open(path, PIIODevice::ReadOnly)) { if (!work_file.open(path, PIIODevice::ReadOnly)) {
break_ = true;
cur_file_string = "ERROR! while open file " + fi.path;
piCoutObj << cur_file_string;
stopSend();
return PIByteArray();
}
}
work_file.seek(p.start);
ba.resize(p.size);
int rs = work_file.read(ba.data(), ba.size());
if ((llong)rs != (llong)p.size) {
break_ = true; break_ = true;
cur_file_string = "ERROR! while open file " + fi.path; cur_file_string = "ERROR! while read file " + fi.path + " (must " + PIString::fromNumber(p.size) + ", but read " + PIString::fromNumber(rs) + ")";
piCoutObj << cur_file_string << "," << errorString(); piCoutObj << cur_file_string;
stopSend(); stopSend();
return PIByteArray(); return PIByteArray();
} }
} }
work_file.seek(p.start); // if (fi.isDir()) {
ba.resize(p.size); // piCout << "create dir" << fi.path;
int rs = work_file.read(ba.data(), ba.size()); // dir.make(fi.path);
piCout << rs << p.size; // }
if ((llong)rs != (llong)p.size) { cur_file_string = fi.path;
break_ = true; bytes_file_all = fi.size;
cur_file_string = "ERROR! while read file " + fi.path + " (must " + PIString::fromNumber(p.size) + ", but read " + PIString::fromNumber(rs) + ")"; bytes_file_cur = p.start;
piCoutObj << cur_file_string;
stopSend();
return PIByteArray();
}
}
// if (fi.isDir()) {
// piCout << "create dir" << fi.path;
// dir.make(fi.path);
// }
cur_file_string = fi.path;
bytes_file_all = fi.size;
bytes_file_cur = p.start;
return ba; return ba;
} }
return PIByteArray(); return PIByteArray();
@@ -203,9 +202,9 @@ PIByteArray PIFileTransfer::buildPacket(Part p) {
void PIFileTransfer::receivePart(PIBaseTransfer::Part fi, PIByteArray ba, PIByteArray pheader) { void PIFileTransfer::receivePart(PIBaseTransfer::Part fi, PIByteArray ba, PIByteArray pheader) {
PFTHeader h; PFTHeader h;
// piCout << pheader.size() << sizeof(PFTHeader); // piCout << pheader.size() << sizeof(PFTHeader);
pheader >> h; pheader >> h;
// piCout << h.session_id; // piCout << h.session_id;
StepType st = (StepType)h.step; StepType st = (StepType)h.step;
pftheader.step = st; pftheader.step = st;
if (!h.check_sig()) { if (!h.check_sig()) {
@@ -215,20 +214,20 @@ void PIFileTransfer::receivePart(PIBaseTransfer::Part fi, PIByteArray ba, PIByte
return; return;
} }
switch(st) { switch(st) {
case pft_None: case pft_None:
break; break;
case pft_Description: case pft_Description:
pftheader.session_id = h.session_id; pftheader.session_id = h.session_id;
if (desc.size() < fi.start + fi.size) desc.resize(fi.start + fi.size); if (desc.size() < fi.start + fi.size) desc.resize(fi.start + fi.size);
memcpy(desc.data(fi.start), ba.data(), ba.size_s()); memcpy(desc.data(fi.start), ba.data(), ba.size_s());
break; break;
case pft_Data: case pft_Data:
if (h.session_id == pftheader.session_id) if (h.session_id == pftheader.session_id)
processFile(fi.id, fi.start, ba); processFile(fi.id, fi.start, ba);
else stopReceive(); else stopReceive();
break; break;
default: default:
break; break;
} }
} }
@@ -236,13 +235,14 @@ void PIFileTransfer::receivePart(PIBaseTransfer::Part fi, PIByteArray ba, PIByte
PIByteArray PIFileTransfer::customHeader() { PIByteArray PIFileTransfer::customHeader() {
PIByteArray ba; PIByteArray ba;
ba << pftheader; ba << pftheader;
return ba; return ba;
} }
void PIFileTransfer::receive_started() { void PIFileTransfer::receive_started() {
if (pftheader.step == pft_None) { if (pftheader.step == pft_None) {
files_.clear(); files_.clear();
// piCoutObj << "start receive";
receiveFilesStarted(); receiveFilesStarted();
} }
} }
@@ -252,10 +252,11 @@ void PIFileTransfer::receive_finished(bool ok) {
if (pftheader.step == pft_Description) { if (pftheader.step == pft_Description) {
bool user_ok = true; bool user_ok = true;
if (ok) { if (ok) {
// piCoutObj << desc.size() << PICoutManipulators::Hex << desc;
desc >> files_; desc >> files_;
// piCoutObj << files_;
PIStringList files; PIStringList files;
piForeachC(PFTFileInfo &fi, files_) files << fi.dest_path; piForeachC(PFTFileInfo &fi, files_) files << fi.dest_path;
//piCout << files;
receiveFilesRequest(files, bytesAll(), &user_ok); receiveFilesRequest(files, bytesAll(), &user_ok);
} }
if (!ok || !user_ok) { if (!ok || !user_ok) {

View File

@@ -75,7 +75,7 @@ private:
virtual void receivePart(Part fi, PIByteArray ba, PIByteArray pheader); virtual void receivePart(Part fi, PIByteArray ba, PIByteArray pheader);
virtual PIByteArray buildPacket(Part fi); virtual PIByteArray buildPacket(Part fi);
virtual PIByteArray customHeader(); virtual PIByteArray customHeader();
//EVENT_HANDLER(void, send_started); // EVENT_HANDLER(void, send_started);
EVENT_HANDLER(void, receive_started); EVENT_HANDLER(void, receive_started);
EVENT_HANDLER1(void, send_finished, bool, ok); EVENT_HANDLER1(void, send_finished, bool, ok);
EVENT_HANDLER1(void, receive_finished, bool, ok); EVENT_HANDLER1(void, receive_finished, bool, ok);
@@ -89,4 +89,13 @@ inline PIByteArray & operator <<(PIByteArray & s, const PIFileTransfer::PFTFileI
inline PIByteArray & operator >>(PIByteArray & s, PIFileTransfer::PFTFileInfo & v) {s >> v.dest_path >> v.size >> v.time_access >> v.time_modification >> inline PIByteArray & operator >>(PIByteArray & s, PIFileTransfer::PFTFileInfo & v) {s >> v.dest_path >> v.size >> v.time_access >> v.time_modification >>
*(int*)(&(v.flags)) >> v.id_user >> v.id_group >> v.perm_user.raw >> v.perm_group.raw >> v.perm_other.raw; return s;} *(int*)(&(v.flags)) >> v.id_user >> v.id_group >> v.perm_user.raw >> v.perm_group.raw >> v.perm_other.raw; return s;}
inline PICout operator <<(PICout s, const PIFileTransfer::PFTFileInfo & v) {
s.setControl(0, true);
s << "FileInfo(\"" << v.dest_path << "\", " << PIString::readableSize(v.size) << ", "
<< v.perm_user.toString() << " " << v.perm_group.toString() << " " << v.perm_other.toString() << ", "
<< v.time_access.toString() << ", " << v.time_modification.toString()
<< ", 0x" << PICoutManipulators::Hex << v.flags << ")";
s.restoreControl();
return s;
}
#endif // PIFILETRANSFER_H #endif // PIFILETRANSFER_H

View File

@@ -50,7 +50,12 @@ private:
EVENT_HANDLER(void, ftevent) { EVENT_HANDLER(void, ftevent) {
if (quet_) return; if (quet_) return;
PICout(AddSpaces) << ClearLine << ft.stateString() #ifdef WINDOWS
piCout
#else
PICout(AddSpaces) << ClearLine
#endif
<< ft.stateString() << ft.curFile()
<< PIString::readableSize(ft.diagnostic().receiveBytesPerSec()) + "/s" << PIString::readableSize(ft.diagnostic().receiveBytesPerSec()) + "/s"
<< PIString::readableSize(ft.diagnostic().sendBytesPerSec()) + "/s" << PIString::readableSize(ft.diagnostic().sendBytesPerSec()) + "/s"
<< "(" << PIString::readableSize(ft.bytesFileCur()) << "/" << PIString::readableSize(ft.bytesFileAll()) << ", " << "(" << PIString::readableSize(ft.bytesFileCur()) << "/" << PIString::readableSize(ft.bytesFileAll()) << ", "
@@ -59,7 +64,10 @@ private:
PIString::fromNumber(PISystemTime::fromSeconds((ft.bytesAll() - ft.bytesCur()) / PIString::fromNumber(PISystemTime::fromSeconds((ft.bytesAll() - ft.bytesCur()) /
ft.diagnostic().receiveBytesPerSec()).toSeconds()) ft.diagnostic().receiveBytesPerSec()).toSeconds())
: PIString("unknown")) : PIString("unknown"))
<< Flush; #ifndef WINDOWS
<< Flush
#endif
;
} }
EVENT_HANDLER1(void, ftsend, PIByteArray &, data) { EVENT_HANDLER1(void, ftsend, PIByteArray &, data) {