git-svn-id: svn://db.shs.com.ru/pip@540 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5

This commit is contained in:
2017-09-02 09:19:15 +00:00
parent c604a64867
commit 584ae31627
4 changed files with 84 additions and 18 deletions

View File

@@ -300,7 +300,7 @@ void PIKbdListener::readKeyboard() {
tm_dbl.reset();
}
else if (mb < me.buttons) me.action = MouseButtonRelease;
else {piCoutObj << "WTF"; break;}
else {if (mb != 0) piCoutObj << "WTF"; break;}
}
me.buttons = mb;
if (piCompareBinary(&prev_me, &me, sizeof(me)))

View File

@@ -13,12 +13,13 @@ PIBaseTransfer::PIBaseTransfer(): crc(standardCRC_16()), diag(false) {
break_ = true;
bytes_all = bytes_cur = 0;
send_queue = 0;
send_up = 0;
timeout_ = 10.;
diag.setDisconnectTimeout(timeout_ / 10);
//CONNECTU(&diag, qualityChanged, this, diagChanged);
diag.setName("PIBaseTransfer");
diag.start(50);
packets_count = 100;
packets_count = 10;
setPacketSize(4096);
randomize();
}
@@ -112,9 +113,25 @@ void PIBaseTransfer::received(PIByteArray data) {
}
if (is_sending) {
mutex_session.lock();
if (h.id < replies.size())
if (h.id < replies.size()) {
replies[h.id] = pt;
else
pm_string[h.id] = pt == pt_ReplySuccess ? '#' : '-';
int s = pm_string.find('+'), s1 = send_queue;
while (s <= (int)h.id && s > 0) {
send_queue--;
if (s >= 0) pm_string[s] = '-';
send_up = 0;
if (send_queue < 0) {
send_queue = 0;
break;
}
s = pm_string.find('+');
}
if (s1-send_queue > 1 && packets_count > 2) packets_count-= piMaxi(packets_count/10,1);
if (s1 == send_queue && s1 < piMaxi(packets_count/2, 2) && packets_count < 100) send_up++;
if (send_up > 20 && send_up > packets_count*2) packets_count+= piMaxi(packets_count/10,1);
//piCoutObj << packets_count;
} else
piCoutObj << "invalid reply id";
mutex_session.unlock();
// piCoutObj << "Done Packet" << h.id;
@@ -182,11 +199,13 @@ void PIBaseTransfer::received(PIByteArray data) {
bytes_all = sr.size;
header.session_id = h.session_id;
header.id = 0;
packets_count = 10;
session.clear();
replies.clear();
session.resize(sr.packets);
replies.resize(sr.packets + 1);
replies.fill(pt_Unknown);
pm_string.resize(replies.size(), '-');
diag.reset();
// diag.start(100);
//piCoutObj << "receiveStarted()";
@@ -198,6 +217,7 @@ void PIBaseTransfer::received(PIByteArray data) {
receiveStarted();
state_string = "receiving";
replies[0] = pt_ReplySuccess;
pm_string[0] = '#';
mutex_session.unlock();
sendReply(pt_ReplySuccess);
mutex_header.unlock();
@@ -240,6 +260,7 @@ bool PIBaseTransfer::send_process() {
int session_size = session.size();
replies.resize(session_size + 1);
replies.fill(pt_Unknown);
pm_string.resize(replies.size(), '-');
mutex_session.unlock();
PIByteArray ba;
if (!getStartRequest()) return finish_send(false);
@@ -247,6 +268,7 @@ bool PIBaseTransfer::send_process() {
PITimeMeasurer stm;
mutex_send.lock();
send_queue = 0;
send_up = 0;
send_tm.reset();
mutex_send.unlock();
//int ltm = 0;
@@ -272,8 +294,10 @@ bool PIBaseTransfer::send_process() {
mutex_send.unlock();
return finish_send(false);
}
if (stm.elapsed_s() > timeout_ / 10.) {
if (stm.elapsed_s() > timeout_ / 2.) {
send_up = 0;
send_queue = 0;
pm_string.replaceAll("+", "-");
}
mutex_send.unlock();
continue;
@@ -282,26 +306,28 @@ bool PIBaseTransfer::send_process() {
ba = build_packet(i);
diag.sended(ba.size_s());
sendRequest(ba);
pm_string[i+1] = '+';
mutex_send.lock();
send_queue++;
mutex_send.unlock();
if (break_) return finish_send(false);
}
// piCoutObj << "send done, checking";
PITimeMeasurer tm;
PITimeMeasurer rtm;
int prev_chk = 0;
mutex_send.lock();
send_queue = 0;
mutex_send.unlock();
piMSleep(10);
while (tm.elapsed_s() < timeout_) {
state_string = "sending+";
while (rtm.elapsed_s() < timeout_) {
//piCoutObj << "recovering...";
mutex_session.lock();
int chk = checkSession();
mutex_session.unlock();
if (chk == 0) return finish_send(true);
if (chk != prev_chk) tm.reset();
else if (tm.elapsed_m() < 100) {
if (chk != prev_chk) rtm.reset();
else if (rtm.elapsed_m() < 100) {
piMSleep(1);
continue;
}
@@ -322,11 +348,15 @@ bool PIBaseTransfer::send_process() {
mutex_send.unlock();
if (sq >= packets_count) {
piMSleep(10);
send_queue = 0;
send_up = 0;
pm_string.replaceAll("+", "-");
continue;
}
ba = build_packet(chk - 1);
diag.sended(ba.size_s());
sendRequest(ba);
pm_string[chk] = '+';
mutex_send.lock();
send_queue++;
mutex_send.unlock();
@@ -457,7 +487,8 @@ bool PIBaseTransfer::getStartRequest() {
mutex_session.lock();
if (replies[0] == pt_ReplySuccess) {
state_string = "send permited!";
//piCoutObj << "ping " << tm.elapsed_m();
//packets_count = piClampi(10 / tm.elapsed_m(), 2, 100);
//piCoutObj << "ping " << tm.elapsed_m() << packets_count;
mutex_session.unlock();
return true;
}
@@ -474,6 +505,7 @@ void PIBaseTransfer::processData(int id, PIByteArray & data) {
if (!session[id - 1].isEmpty()) {
header.id = id;
replies[id] = pt_ReplySuccess;
pm_string[id] = '#';
sendReply(pt_ReplySuccess);
if (replies[replies.size()-1] == pt_ReplySuccess)
if (checkSession() == 0) state_string = "receive ok";
@@ -500,6 +532,7 @@ void PIBaseTransfer::processData(int id, PIByteArray & data) {
}
header.id = id;
replies[id] = pt_ReplySuccess;
pm_string[id] = '#';
sendReply(pt_ReplySuccess);
if (checkSession() == 0) state_string = "receive ok";
}

View File

@@ -69,6 +69,7 @@ public:
bool isCRCEnabled() const {return crc_enabled;}
PIString stateString() const {return state_string;}
PIString packetMap() const {return pm_string;}
llong bytesAll() const {return bytes_all;}
llong bytesCur() const {return bytes_cur;}
const PIDiagnostics &diagnostic() {return diag;}
@@ -131,11 +132,13 @@ private:
PacketHeader header;
CRC_16 crc;
int send_queue;
int send_up;
PIDiagnostics diag;
PIMutex mutex_session;
PIMutex mutex_send;
PIMutex mutex_header;
bool crc_enabled;
PIString pm_string;
};
inline PIByteArray & operator <<(PIByteArray & s, const PIBaseTransfer::PacketHeader & v) {s << v.sig << v.type << v.session_id << v.id << v.crc; return s;}

View File

@@ -1,6 +1,11 @@
#include "pip.h"
#include "pifiletransfer.h"
#include "pidatatransfer.h"
#include "piscreen.h"
#include "piscreentiles.h"
TileSimple pmt;
using namespace PICoutManipulators;
@@ -20,7 +25,7 @@ public:
//testt.setTimeout(0.1);
CONNECTU(&testt, sendRequest, this, ftsend);
} else {
ft.setPacketSize(12800);
ft.setPacketSize(65000);
ft.setName("PIFT");
CONNECTU(&ft, sendRequest, this, ftsend);
CONNECTU(&ft, sendFilesStarted, this, ftevent);
@@ -41,7 +46,7 @@ public:
}
void startTest() {
PIByteArray ba(1024*1024*256);
PIByteArray ba(1024*1024*8);
testt.send(ba);
}
@@ -56,13 +61,17 @@ private:
void tick(void *, int) {
if (ft.isStarted()) {
ftevent();
updatePMT();
if (PIKbdListener::exiting) {
ft.stopSend();
ft.stopReceive();
}
// piCout << (int)ft.diagnostic().quality();
}
if (testt.isSending() || testt.isReceiving()) ftevent();
if (testt.isSending() || testt.isReceiving()) {
ftevent();
updatePMT();
}
}
EVENT_HANDLER(void, ftevent) {
@@ -119,6 +128,18 @@ private:
}
ft.received(ba);
}
void updatePMT() {
PIString pm;
if (test_) pm = testt.packetMap();
else pm = ft.packetMap();
int wd = 110;
pmt.content.resize(pm.size() / wd + 1);
for (int i=0; i<pmt.content.size_s(); i++) {
pmt.content[i].first = pm.mid(wd*i, piMini(pm.size() - i*wd, wd));
}
}
};
@@ -166,9 +187,14 @@ int main (int argc, char * argv[]) {
src = cli.argumentValue("receive");
dst = cli.argumentValue("send");
}
PIScreen screen(false);
screen.enableExitCapture();
screen.rootTile()->addTile(new TilePICout());
screen.rootTile()->addTile(&pmt);
UDPFileTransfer tuf(src, dst, true);
screen.start();
if (send_) tuf.startTest();
WAIT_FOREVER;
screen.waitForFinish();
return 0;
}
if ((!cli.hasArgument("send") || !cli.hasArgument("receive")) || cli.hasArgument("help")) {
@@ -184,8 +210,13 @@ int main (int argc, char * argv[]) {
src += ":"+PIString::fromNumber(port);
dst += ":"+PIString::fromNumber(port);
}
PIKbdListener kbd;
kbd.enableExitCapture();
// PIKbdListener kbd;
// kbd.enableExitCapture();
PIScreen screen(false);
screen.enableExitCapture();
screen.rootTile()->addTile(new TilePICout());
screen.rootTile()->addTile(&pmt);
screen.start();
UDPFileTransfer f(src, dst);
if (cli.hasArgument("dir")) f.ft.setDirectory(cli.argumentValue("dir"));
if (cli.hasArgument("quet")) f.setQuet(true);
@@ -199,9 +230,8 @@ int main (int argc, char * argv[]) {
} else {
piCout << "wait for receiving";
}
kbd.start();
WAIT_FOR_EXIT
PICout(0) << "\n";
screen.waitForFinish();
return 0;
}