From 0f4e2b5f4c4d2a9ae9a108da2d14f20a5106373d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=91=D1=8B=D1=87=D0=BA=D0=BE=D0=B2=20=D0=90=D0=BD=D0=B4?= =?UTF-8?q?=D1=80=D0=B5=D0=B9?= Date: Tue, 7 Apr 2015 13:24:37 +0000 Subject: [PATCH] fix PIFileTransfer git-svn-id: svn://db.shs.com.ru/pip@73 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5 --- CMakeLists.txt | 2 +- src/io/pibasetransfer.cpp | 41 ++++++--- src/io/pifiletransfer.cpp | 141 ++++++++++++++++--------------- src/io/pifiletransfer.h | 11 ++- utils/udp_file_transfer/main.cpp | 12 ++- 5 files changed, 119 insertions(+), 88 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 6b28cf09..f27f5880 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ project(pip) cmake_minimum_required(VERSION 2.6) -#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g3") +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g3") include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR}) include(CheckFunctionExists) diff --git a/src/io/pibasetransfer.cpp b/src/io/pibasetransfer.cpp index 72e27f9a..753dcfd9 100644 --- a/src/io/pibasetransfer.cpp +++ b/src/io/pibasetransfer.cpp @@ -36,6 +36,7 @@ void PIBaseTransfer::stopSend() { void PIBaseTransfer::stopReceive() { if (!is_receiving) return; break_ = true; +// piCoutObj << "stopReceive()"; finish_receive(false); } @@ -46,10 +47,13 @@ void PIBaseTransfer::received(PIByteArray data) { diag.received(data.size(), false); return; } - diag.received(data.size(), true); PacketHeader h; data >> h; PacketType pt = (PacketType)h.type; + if (!h.check_sig()) { + diag.received(data.size(), false); + return; + } else diag.received(data.size(), true); // piCoutObj << "receive" << h.session_id << h.type << h.id; switch (pt) { case pt_Unknown: break; @@ -84,6 +88,7 @@ void PIBaseTransfer::received(PIByteArray data) { } if (is_receiving && h.id == 0) { if (checkSession() == 0 && pt == pt_ReplySuccess) finish_receive(true); +// if (checkSession() == 0 && pt == pt_ReplySuccess) { piCoutObj << "Success receive"; finish_receive(true);} } break; case pt_Break: @@ -103,10 +108,13 @@ void PIBaseTransfer::received(PIByteArray data) { sendBreak(h.session_id); return; } - if (header.session_id != h.session_id && is_receiving) { -// sendBreak(h.session_id); -// return; - finish_receive(false, true); + if (is_receiving) { + if (header.session_id != h.session_id) { + // sendBreak(h.session_id); + // return; +// piCoutObj << "restart receive"; + finish_receive(false, true); + } else return; } if (data.size() == sizeof(StartRequest)) { StartRequest sr; @@ -121,10 +129,11 @@ void PIBaseTransfer::received(PIByteArray data) { session.resize(sr.packets); replies.resize(sr.packets + 1); replies.fill(pt_Unknown); - is_receiving = true; - break_ = false; diag.reset(); diag.start(100); +// piCoutObj << "receiveStarted()"; + is_receiving = true; + break_ = false; receiveStarted(); replies_cnt = send_queue = 0; state_string = "receiving"; @@ -140,10 +149,10 @@ void PIBaseTransfer::received(PIByteArray data) { bool PIBaseTransfer::send_process() { packet_header_size = sizeof(PacketHeader) + customHeader().size(); break_ = false; - is_sending = true; diag.reset(); diag.start(100); sendStarted(); + is_sending = true; replies.resize(session.size() + 1); replies.fill(pt_Unknown); PIByteArray ba; @@ -203,6 +212,7 @@ bool PIBaseTransfer::send_process() { int PIBaseTransfer::checkSession() { + if (!(is_receiving || is_sending)) return -1; int miss = 0; for (int i = 1; i < replies.size_s(); i++) { if (replies[i] != pt_ReplySuccess) miss++; @@ -379,35 +389,38 @@ PIByteArray PIBaseTransfer::build_packet(int id) { bool PIBaseTransfer::finish_send(bool ok) { + is_sending = false; if (ok) state_string = "send done"; else state_string = "send failed"; // piCoutObj << state_string << PIString::readableSize(bytes_all); - is_sending = false; header.id = 0; if (!ok) sendBreak(header.session_id); else sendReply(pt_ReplySuccess); - diag.stop(); sendFinished(ok); + diag.stop(); bytes_all = bytes_cur = 0; return ok; } void PIBaseTransfer::finish_receive(bool ok, bool quet) { + is_receiving = false; if (ok) state_string = "receive done"; else state_string = "receive failed"; -// piCoutObj << state_string << PIString::readableSize(bytes_all); - is_receiving = false; +// piCoutObj << state_string;// << PIString::readableSize(bytes_all); if (!ok && !quet) sendBreak(header.session_id); - diag.stop(); receiveFinished(ok); + diag.stop(); bytes_all = bytes_cur = 0; } void PIBaseTransfer::diagChanged(PIDiagnostics::Quality new_quality, PIDiagnostics::Quality old_quality) { if (is_receiving) { - if (new_quality == PIDiagnostics::Failure) stopReceive(); + if (new_quality == PIDiagnostics::Failure) { + piCout << "disconnected!"; + stopReceive(); + } } } diff --git a/src/io/pifiletransfer.cpp b/src/io/pifiletransfer.cpp index b62370c5..aa5ba715 100644 --- a/src/io/pifiletransfer.cpp +++ b/src/io/pifiletransfer.cpp @@ -64,14 +64,14 @@ bool PIFileTransfer::send(PIVector entries) { bool PIFileTransfer::sendFiles(const PIVector &files) { files_ = files; PIStringList names; -// piCoutObj << "prepare to send" << files_.size() << "files"; + // piCoutObj << "prepare to send" << files_.size() << "files"; for(int i=0; i &files) { if (!send_process()) return false; pftheader.step = pft_Data; PIVector pts; - for (int i=0; i fi.size) { -// piCoutObj << "error size" << work_file.size() << fi.size; + // piCoutObj << "error size" << work_file.size() << fi.size; work_file.clear(); work_file.resize(fi.size); -// piCoutObj << "correct size" << work_file.size() << fi.size; + // piCoutObj << "correct size" << work_file.size() << fi.size; } } -// piCoutObj << "write file" << path << work_file.path() << work_file.size() << fi.entry.size << work_file.pos() << fi.fstart << fi.fsize; + // piCoutObj << "write file" << path << work_file.path() << work_file.size() << fi.entry.size << work_file.pos() << fi.fstart << fi.fsize; if (work_file.size() < (llong)start) { -// piCoutObj << "error pos size" << work_file.pos() << fi.fstart; + // piCoutObj << "error pos size" << work_file.pos() << fi.fstart; work_file.resize(start); -// piCoutObj << "correct pos size" << work_file.pos() << fi.fstart; + // piCoutObj << "correct pos size" << work_file.pos() << fi.fstart; } if (work_file.size() > fi.size) { piCoutObj << "****** error size" << work_file.size() << fi.size; @@ -136,7 +136,7 @@ void PIFileTransfer::processFile(int id, ullong start, PIByteArray & data) { work_file.resize(fi.size); piCoutObj << "****** correct size" << work_file.size() << fi.size; } -// if (fi.fstart != work_file.pos()) piCoutObj << "error pos" << work_file.pos() << fi.fstart; + // if (fi.fstart != work_file.pos()) piCoutObj << "error pos" << work_file.pos() << fi.fstart; work_file.seek(start); int rs = work_file.write(data.data(), data.size()); if (rs != data.size_s()) { @@ -150,51 +150,50 @@ void PIFileTransfer::processFile(int id, ullong start, PIByteArray & data) { PIByteArray PIFileTransfer::buildPacket(Part p) { -// piCoutObj << "Step" << pftheader.step; -// piCoutObj << "session id" << pftheader.session_id; + // piCoutObj << "Step" << pftheader.step; + // piCoutObj << "session id" << pftheader.session_id; PIByteArray ba; switch(pftheader.step) { - case pft_None: - stopSend(); + case pft_None: + stopSend(); return PIByteArray(); - case pft_Description: - ba.resize(p.size); - memcpy(ba.data(), desc.data(p.start), p.size); + case pft_Description: + ba.resize(p.size); + memcpy(ba.data(), desc.data(p.start), p.size); return ba; - case pft_Data: -// piCout << "send data" << p.id << files_.size(); - PIFile::FileInfo fi = files_[p.id-1]; - if (fi.isFile()) { -// piCout << "send file" << fi.name() << fi.size; - PIString path = fi.path; - if (work_file.path() != path || !work_file.isOpened()) { - if (!work_file.open(path, PIIODevice::ReadOnly)) { + case pft_Data: + // piCout << "send data" << p.id << files_.size(); + PIFile::FileInfo fi = files_[p.id-1]; + if (fi.isFile()) { + // piCout << "send file" << fi.name() << fi.size; + PIString path = fi.path; + if (work_file.path() != path || !work_file.isOpened()) { + if (!work_file.open(path, PIIODevice::ReadOnly)) { + break_ = true; + cur_file_string = "ERROR! while open file " + fi.path; + piCoutObj << cur_file_string; + stopSend(); + return PIByteArray(); + } + } + work_file.seek(p.start); + ba.resize(p.size); + int rs = work_file.read(ba.data(), ba.size()); + if ((llong)rs != (llong)p.size) { break_ = true; - cur_file_string = "ERROR! while open file " + fi.path; - piCoutObj << cur_file_string << "," << errorString(); + cur_file_string = "ERROR! while read file " + fi.path + " (must " + PIString::fromNumber(p.size) + ", but read " + PIString::fromNumber(rs) + ")"; + piCoutObj << cur_file_string; stopSend(); return PIByteArray(); } } - work_file.seek(p.start); - ba.resize(p.size); - int rs = work_file.read(ba.data(), ba.size()); - piCout << rs << p.size; - if ((llong)rs != (llong)p.size) { - break_ = true; - cur_file_string = "ERROR! while read file " + fi.path + " (must " + PIString::fromNumber(p.size) + ", but read " + PIString::fromNumber(rs) + ")"; - piCoutObj << cur_file_string; - stopSend(); - return PIByteArray(); - } - } -// if (fi.isDir()) { -// piCout << "create dir" << fi.path; -// dir.make(fi.path); -// } - cur_file_string = fi.path; - bytes_file_all = fi.size; - bytes_file_cur = p.start; + // if (fi.isDir()) { + // piCout << "create dir" << fi.path; + // dir.make(fi.path); + // } + cur_file_string = fi.path; + bytes_file_all = fi.size; + bytes_file_cur = p.start; return ba; } return PIByteArray(); @@ -203,9 +202,9 @@ PIByteArray PIFileTransfer::buildPacket(Part p) { void PIFileTransfer::receivePart(PIBaseTransfer::Part fi, PIByteArray ba, PIByteArray pheader) { PFTHeader h; -// piCout << pheader.size() << sizeof(PFTHeader); + // piCout << pheader.size() << sizeof(PFTHeader); pheader >> h; -// piCout << h.session_id; + // piCout << h.session_id; StepType st = (StepType)h.step; pftheader.step = st; if (!h.check_sig()) { @@ -215,20 +214,20 @@ void PIFileTransfer::receivePart(PIBaseTransfer::Part fi, PIByteArray ba, PIByte return; } switch(st) { - case pft_None: - break; - case pft_Description: - pftheader.session_id = h.session_id; - if (desc.size() < fi.start + fi.size) desc.resize(fi.start + fi.size); - memcpy(desc.data(fi.start), ba.data(), ba.size_s()); - break; - case pft_Data: - if (h.session_id == pftheader.session_id) - processFile(fi.id, fi.start, ba); - else stopReceive(); - break; - default: - break; + case pft_None: + break; + case pft_Description: + pftheader.session_id = h.session_id; + if (desc.size() < fi.start + fi.size) desc.resize(fi.start + fi.size); + memcpy(desc.data(fi.start), ba.data(), ba.size_s()); + break; + case pft_Data: + if (h.session_id == pftheader.session_id) + processFile(fi.id, fi.start, ba); + else stopReceive(); + break; + default: + break; } } @@ -236,13 +235,14 @@ void PIFileTransfer::receivePart(PIBaseTransfer::Part fi, PIByteArray ba, PIByte PIByteArray PIFileTransfer::customHeader() { PIByteArray ba; ba << pftheader; - return ba; + return ba; } void PIFileTransfer::receive_started() { if (pftheader.step == pft_None) { files_.clear(); +// piCoutObj << "start receive"; receiveFilesStarted(); } } @@ -252,10 +252,11 @@ void PIFileTransfer::receive_finished(bool ok) { if (pftheader.step == pft_Description) { bool user_ok = true; if (ok) { +// piCoutObj << desc.size() << PICoutManipulators::Hex << desc; desc >> files_; +// piCoutObj << files_; PIStringList files; piForeachC(PFTFileInfo &fi, files_) files << fi.dest_path; - //piCout << files; receiveFilesRequest(files, bytesAll(), &user_ok); } if (!ok || !user_ok) { diff --git a/src/io/pifiletransfer.h b/src/io/pifiletransfer.h index b4c6a5d5..d2a21bf7 100644 --- a/src/io/pifiletransfer.h +++ b/src/io/pifiletransfer.h @@ -75,7 +75,7 @@ private: virtual void receivePart(Part fi, PIByteArray ba, PIByteArray pheader); virtual PIByteArray buildPacket(Part fi); virtual PIByteArray customHeader(); - //EVENT_HANDLER(void, send_started); + // EVENT_HANDLER(void, send_started); EVENT_HANDLER(void, receive_started); EVENT_HANDLER1(void, send_finished, bool, ok); EVENT_HANDLER1(void, receive_finished, bool, ok); @@ -89,4 +89,13 @@ inline PIByteArray & operator <<(PIByteArray & s, const PIFileTransfer::PFTFileI inline PIByteArray & operator >>(PIByteArray & s, PIFileTransfer::PFTFileInfo & v) {s >> v.dest_path >> v.size >> v.time_access >> v.time_modification >> *(int*)(&(v.flags)) >> v.id_user >> v.id_group >> v.perm_user.raw >> v.perm_group.raw >> v.perm_other.raw; return s;} +inline PICout operator <<(PICout s, const PIFileTransfer::PFTFileInfo & v) { + s.setControl(0, true); + s << "FileInfo(\"" << v.dest_path << "\", " << PIString::readableSize(v.size) << ", " + << v.perm_user.toString() << " " << v.perm_group.toString() << " " << v.perm_other.toString() << ", " + << v.time_access.toString() << ", " << v.time_modification.toString() + << ", 0x" << PICoutManipulators::Hex << v.flags << ")"; + s.restoreControl(); + return s; +} #endif // PIFILETRANSFER_H diff --git a/utils/udp_file_transfer/main.cpp b/utils/udp_file_transfer/main.cpp index 705c5b15..cfb1005b 100644 --- a/utils/udp_file_transfer/main.cpp +++ b/utils/udp_file_transfer/main.cpp @@ -50,7 +50,12 @@ private: EVENT_HANDLER(void, ftevent) { if (quet_) return; - PICout(AddSpaces) << ClearLine << ft.stateString() +#ifdef WINDOWS + piCout +#else + PICout(AddSpaces) << ClearLine +#endif + << ft.stateString() << ft.curFile() << PIString::readableSize(ft.diagnostic().receiveBytesPerSec()) + "/s" << PIString::readableSize(ft.diagnostic().sendBytesPerSec()) + "/s" << "(" << PIString::readableSize(ft.bytesFileCur()) << "/" << PIString::readableSize(ft.bytesFileAll()) << ", " @@ -59,7 +64,10 @@ private: PIString::fromNumber(PISystemTime::fromSeconds((ft.bytesAll() - ft.bytesCur()) / ft.diagnostic().receiveBytesPerSec()).toSeconds()) : PIString("unknown")) - << Flush; +#ifndef WINDOWS + << Flush +#endif + ; } EVENT_HANDLER1(void, ftsend, PIByteArray &, data) {