many changes, need testing

rewrite PIDiagnostics, but not fully testing
change PIQueue to PIDeque
many fixes in PIBaseTransfer, it will be threadsafe and stable


git-svn-id: svn://db.shs.com.ru/pip@187 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5
This commit is contained in:
2016-04-12 04:24:57 +00:00
parent 741615b9d3
commit b93205f175
10 changed files with 462 additions and 202 deletions

View File

@@ -25,17 +25,17 @@
#ifndef PIQUEUE_H
#define PIQUEUE_H
#include "pivector.h"
#include "pideque.h"
template<typename T>
class PIP_EXPORT PIQueue: public PIVector<T> {
class PIP_EXPORT PIQueue: public PIDeque<T> {
public:
PIQueue() {;}
PIVector<T> & enqueue(const T & v) {PIVector<T>::push_front(v); return *this;}
T dequeue() {return PIVector<T>::take_back();}
T & head() {return PIVector<T>::back();}
const T & head() const {return PIVector<T>::back();}
PIVector<T> toVector() {PIVector<T> v(PIVector<T>::size()); for (uint i = 0; i < PIVector<T>::size(); ++i) v[i] = PIVector<T>::at(i); return v;}
PIDeque<T> & enqueue(const T & v) {PIDeque<T>::push_front(v); return *this;}
T dequeue() {return PIDeque<T>::take_back();}
T & head() {return PIDeque<T>::back();}
const T & head() const {return PIDeque<T>::back();}
PIVector<T> toVector() {PIVector<T> v(PIDeque<T>::size()); for (uint i = 0; i < PIDeque<T>::size(); ++i) v[i] = PIDeque<T>::at(i); return v;}
};
#endif // PIQUEUE_H

View File

@@ -10,12 +10,12 @@ PIBaseTransfer::PIBaseTransfer(): crc(standardCRC_16()), diag(false) {
is_sending = is_receiving = is_pause = false;
break_ = true;
bytes_all = bytes_cur = 0;
replies_cnt = send_queue = 0;
send_queue = 0;
timeout_ = 10.;
diag.setDisconnectTimeout(timeout_);
//CONNECTU(&diag, qualityChanged, this, diagChanged);
diag.start(100);
packets_count = 32;
diag.start(50);
packets_count = 100;
setPacketSize(4096);
randomize();
}
@@ -63,12 +63,14 @@ void PIBaseTransfer::received(PIByteArray data) {
diag.received(data.size(), false);
return;
} else diag.received(data.size(), true);
//piCoutObj << "receive" << h.session_id << h.type << h.id;
// piCoutObj << "receive" << h.session_id << h.type << h.id;
switch (pt) {
case pt_Unknown: break;
case pt_Data:
mutex_header.lock();
if (h.session_id != header.session_id || !is_receiving) {
sendBreak(h.session_id);
mutex_header.unlock();
return;
} else {
uint rcrc = h.crc;
@@ -76,32 +78,47 @@ void PIBaseTransfer::received(PIByteArray data) {
if (rcrc != ccrc) {
header.id = h.id;
sendReply(pt_ReplyInvalid);
//piCoutObj << "invalid CRC";
piCoutObj << "invalid CRC";
} else {
mutex_session.lock();
processData(h.id, data);
mutex_session.unlock();
}
if (is_pause) sendReply(pt_Pause);
}
mutex_header.unlock();
break;
case pt_ReplySuccess:
case pt_ReplyInvalid:
if (h.session_id != header.session_id) return;
mutex_header.lock();
if (h.session_id != header.session_id) {
mutex_header.unlock();
return;
}
if (is_pause) sendReply(pt_Pause);
mutex_header.unlock();
if (pt == pt_ReplySuccess) {
mutex_send.lock();
send_queue--;
if (send_queue < 0) send_queue = 0;
send_tm.reset();
mutex_send.unlock();
}
replies_cnt++;
if (is_sending) {
mutex_session.lock();
if (h.id < replies.size())
replies[h.id] = pt;
mutex_session.unlock();
// piCoutObj << "Done Packet" << h.id;
}
if (is_receiving && h.id == 0) {
// if (checkSession() == 0 && pt == pt_ReplySuccess) finish_receive(true);
if (checkSession() == 0 && pt == pt_ReplySuccess) {
if (pt == pt_ReplySuccess) {
mutex_session.lock();
int cs = checkSession();
mutex_session.unlock();
//piCoutObj << "Success receive";
finish_receive(true);
if (cs == 0) finish_receive(true);
}
}
break;
@@ -118,15 +135,18 @@ void PIBaseTransfer::received(PIByteArray data) {
}
break;
case pt_Start:
mutex_header.lock();
if (is_pause && (is_sending || is_receiving)) {
if (header.session_id == h.session_id) {
is_pause = false;
mutex_header.unlock();
resumed();
return;
}
}
if (is_sending && header.session_id != h.session_id) {
sendBreak(h.session_id);
mutex_header.unlock();
return;
}
if (is_receiving) {
@@ -134,12 +154,19 @@ void PIBaseTransfer::received(PIByteArray data) {
//sendBreak(h.session_id);
//return;
//piCoutObj << "restart receive";
mutex_header.unlock();
finish_receive(false, true);
} else return;
} else {
mutex_header.unlock();
return;
}
}
mutex_header.unlock();
if (data.size() == sizeof(StartRequest)) {
StartRequest sr;
data >> sr;
mutex_session.lock();
mutex_header.lock();
bytes_cur = 0;
state_string = "start request";
bytes_all = sr.size;
@@ -155,19 +182,25 @@ void PIBaseTransfer::received(PIByteArray data) {
//piCoutObj << "receiveStarted()";
is_receiving = true;
break_ = false;
mutex_send.lock();
send_queue = 0;
mutex_send.unlock();
receiveStarted();
replies_cnt = send_queue = 0;
state_string = "receiving";
replies[0] = pt_ReplySuccess;
mutex_session.unlock();
sendReply(pt_ReplySuccess);
mutex_header.unlock();
}
break;
case pt_Pause:
mutex_header.lock();
if (header.session_id == h.session_id) {
//piCout << "receive pause";
if (!is_pause && pause_tm.elapsed_s() < timeout_/10) {
//piCout << "resume";
sendReply(pt_Start);
mutex_header.unlock();
return;
}
if (!is_pause) paused();
@@ -179,6 +212,7 @@ void PIBaseTransfer::received(PIByteArray data) {
if (is_sending) send_tm.reset();
pause_tm.reset();
}
mutex_header.unlock();
break;
default: break;
}
@@ -186,48 +220,73 @@ void PIBaseTransfer::received(PIByteArray data) {
bool PIBaseTransfer::send_process() {
mutex_session.lock();
packet_header_size = sizeof(PacketHeader) + customHeader().size();
break_ = false;
diag.reset();
// diag.start(100);
sendStarted();
is_sending = true;
replies.resize(session.size() + 1);
int session_size = session.size();
replies.resize(session_size + 1);
replies.fill(pt_Unknown);
mutex_session.unlock();
PIByteArray ba;
if (!getStartRequest()) return finish_send(false);
replies_cnt = send_queue = 0;
state_string = "sending";
PITimeMeasurer stm;
mutex_send.lock();
send_queue = 0;
send_tm.reset();
mutex_send.unlock();
//int ltm = 0;
for (int i = 0; i < session.size_s(); i++) {
if (send_queue >= packets_count || is_pause) {
for (int i = 0; i < session_size; i++) {
mutex_send.lock();
int sq = send_queue;
mutex_send.unlock();
if (sq >= packets_count || is_pause) {
--i;
piMSleep(1);
//piMSleep(1);
if (is_pause) {
piMSleep(40);
//piCout << "send pause";
mutex_header.lock();
sendReply(pt_Pause);
mutex_header.unlock();
if (pause_tm.elapsed_s() > timeout())return finish_send(false);
}
if (send_tm.elapsed_s() > timeout_) return finish_send(false);
if (stm.elapsed_s() > timeout_ / 10.)
mutex_send.lock();
if (send_tm.elapsed_s() > timeout_) {
mutex_send.unlock();
return finish_send(false);
}
if (stm.elapsed_s() > timeout_ / 10.) {
send_queue = 0;
}
mutex_send.unlock();
continue;
}
stm.reset();
ba = build_packet(i);
diag.sended(ba.size());
sendRequest(ba);
mutex_send.lock();
send_queue++;
mutex_send.unlock();
if (break_) return finish_send(false);
}
PITimeMeasurer tm2, tm;
// piCoutObj << "send done, checking";
PITimeMeasurer tm;
int prev_chk = 0;
mutex_send.lock();
send_queue = 0;
mutex_send.unlock();
piMSleep(10);
while (tm.elapsed_s() < timeout_) {
//piCoutObj << "recovering...";
mutex_session.lock();
int chk = checkSession();
mutex_session.unlock();
if (chk == 0) return finish_send(true);
if (chk != prev_chk) tm.reset();
else if (tm.elapsed_m() < 100) {
@@ -237,27 +296,32 @@ bool PIBaseTransfer::send_process() {
if (is_pause) {
piMSleep(40);
//piCout << "send pause";
mutex_header.lock();
sendReply(pt_Pause);
mutex_header.unlock();
if (pause_tm.elapsed_s() > timeout())return finish_send(false);
else continue;
}
prev_chk = chk;
if (chk > 0) {
if (tm2.elapsed_s() > timeout_ / 10.) {
//piCoutObj << "recovery packet" << chk;
if (send_queue >= packets_count) {
piMSleep(10);
continue;
}
ba = build_packet(chk - 1);
diag.sended(ba.size());
sendRequest(ba);
send_queue++;
//piCoutObj << "recovery packet" << chk;
mutex_send.lock();
int sq = send_queue;
mutex_send.unlock();
if (sq >= packets_count) {
piMSleep(10);
continue;
}
ba = build_packet(chk - 1);
diag.sended(ba.size());
sendRequest(ba);
mutex_send.lock();
send_queue++;
mutex_send.unlock();
}
// if (chk == -1) return finish_send(false);
if (break_) return finish_send(false);
piMSleep(1);
//piMSleep(1);
}
return finish_send(false);
}
@@ -281,6 +345,8 @@ int PIBaseTransfer::checkSession() {
void PIBaseTransfer::buildSession(PIVector<Part> parts) {
mutex_session.lock();
mutex_header.lock();
state_string = "calculating parts ... ";
session.clear();
header.session_id = rand();
@@ -334,6 +400,8 @@ void PIBaseTransfer::buildSession(PIVector<Part> parts) {
}
}
if (cur_size > min_size) session << lfi;
mutex_header.unlock();
mutex_session.unlock();
}
@@ -358,6 +426,7 @@ void PIBaseTransfer::sendReply(PacketType reply) {
bool PIBaseTransfer::getStartRequest() {
PITimeMeasurer tm;
mutex_header.lock();
header.type = pt_Start;
header.id = 0;
PIByteArray ba;
@@ -365,6 +434,7 @@ bool PIBaseTransfer::getStartRequest() {
st.packets = (uint)session.size();
st.size = bytes_all;
ba << header;
mutex_header.unlock();
ba << st;
state_string = "send request";
while (tm.elapsed_s() < timeout_) {
@@ -372,24 +442,29 @@ bool PIBaseTransfer::getStartRequest() {
sendRequest(ba);
if (break_) return false;
// piCoutObj << replies[0];
mutex_session.lock();
if (replies[0] == pt_ReplySuccess) {
state_string = "send permited!";
piCoutObj << "ping " << tm.elapsed_m();
mutex_session.unlock();
return true;
}
piMSleep(100);
mutex_session.unlock();
piMSleep(1);
}
return false;
}
void PIBaseTransfer::processData(int id, PIByteArray & data) {
//piCoutObj << "received packet" << id << ", size" << data.size();
// piCoutObj << "received packet" << id << ", size" << data.size();
if (id < 1 || id > replies.size_s()) return;
if (!session[id - 1].isEmpty()) {
header.id = id;
replies[id] = pt_ReplySuccess;
sendReply(pt_ReplySuccess);
if (checkSession() == 0) state_string = "receive ok";
if (replies[replies.size()-1] == pt_ReplySuccess)
if (checkSession() == 0) state_string = "receive ok";
return;
}
Part fi;
@@ -413,19 +488,18 @@ void PIBaseTransfer::processData(int id, PIByteArray & data) {
}
header.id = id;
replies[id] = pt_ReplySuccess;
if (checkSession() == 0) state_string = "receive ok";
sendReply(pt_ReplySuccess);
if (checkSession() == 0) state_string = "receive ok";
}
PIByteArray PIBaseTransfer::build_packet(int id) {
PIByteArray ret;
PIByteArray ba;
header.id = id + 1;
header.type = pt_Data;
//piCoutObj << "session id" << header.session_id;
//ret << header;
ret.append(customHeader());
mutex_session.lock();
for (int i = 0; i < session[id].size_s(); i++) {
Part fi = session[id][i];
ret << fi;
@@ -435,10 +509,16 @@ PIByteArray PIBaseTransfer::build_packet(int id) {
if (ba.size() != fi.size) piCoutObj << "***error while build packet, wrong part size";
ret << ba;
}
mutex_session.unlock();
PIByteArray hdr;
mutex_header.lock();
header.id = id + 1;
header.type = pt_Data;
header.crc = crc.calculate(ret);
PIByteArray hdr; hdr << header;
hdr << header;
mutex_header.unlock();
ret.insert(0, hdr);
//piCoutObj << "Packet" << header.id << ret.size();
// piCoutObj << "Send Packet" << header.id << ret.size();
return ret;
}
@@ -448,9 +528,11 @@ bool PIBaseTransfer::finish_send(bool ok) {
if (ok) state_string = "send done";
else state_string = "send failed";
//piCoutObj << state_string << PIString::readableSize(bytes_all);
mutex_header.lock();
header.id = 0;
if (!ok) sendBreak(header.session_id);
else sendReply(pt_ReplySuccess);
mutex_header.unlock();
sendFinished(ok);
// diag.stop();
bytes_all = bytes_cur = 0;
@@ -463,24 +545,13 @@ void PIBaseTransfer::finish_receive(bool ok, bool quet) {
if (ok) state_string = "receive done";
else state_string = "receive failed";
//piCoutObj << state_string << PIString::readableSize(bytes_all);
if (!ok && !quet) sendBreak(header.session_id);
if (!ok && !quet) {
mutex_header.lock();
sendBreak(header.session_id);
mutex_header.unlock();
}
receiveFinished(ok);
// diag.stop();
bytes_all = bytes_cur = 0;
}
void PIBaseTransfer::diagChanged(PIDiagnostics::Quality new_quality, PIDiagnostics::Quality old_quality) {
if (is_receiving) {
if (new_quality == PIDiagnostics::Failure) {
//piCout << "disconnected!";
break_ = true;
is_receiving = false;
state_string = "receive failed";
receiveFinished(false);
bytes_all = bytes_cur = 0;
//stopReceive();
}
}
}

View File

@@ -121,8 +121,11 @@ private:
PITimeMeasurer send_tm, pause_tm;
PacketHeader header;
CRC_16 crc;
int replies_cnt, send_queue;
int send_queue;
PIDiagnostics diag;
PIMutex mutex_session;
PIMutex mutex_send;
PIMutex mutex_header;
void processData(int id, PIByteArray &data);
PIByteArray build_packet(int id);
@@ -132,7 +135,6 @@ private:
bool getStartRequest();
bool finish_send(bool ok);
void finish_receive(bool ok, bool quet = false);
EVENT_HANDLER2(void, diagChanged, PIDiagnostics::Quality, new_quality, PIDiagnostics::Quality, old_quality);
};
inline PIByteArray & operator <<(PIByteArray & s, const PIBaseTransfer::PacketHeader & v) {s << v.sig << v.type << v.session_id << v.id << v.crc; return s;}

View File

@@ -39,87 +39,77 @@
PIDiagnostics::PIDiagnostics(bool start_): PITimer(PITimer::Pool) {
reset();
if (start_) start();
if (start_) start(100);
}
void PIDiagnostics::reset() {
setDisconnectTimeout(3.);
lock();
qual = PIDiagnostics::Unknown;
speedIn = speedOut = PIString::readableSize(0) + "/s";
ifreq = immediate_freq = integral_freq = 0.f;
cur_pckt = rec_once = 0;
wrong_count = receive_count = send_count = wrong_bytes = receive_bytes = send_bytes = 0;
packets_in_sec = packets_out_sec = bytes_in_sec = bytes_out_sec = 0;
speedRecv = speedSend = PIString::readableSize(0) + "/s";
immediate_freq = integral_freq = 0.f;
count_wrong = count_recv = count_send = bytes_wrong = bytes_recv = bytes_send = 0;
packets_recv_sec = packets_send_sec = bytes_recv_sec = bytes_send_sec = 0;
history_rec.clear();
history_send.clear();
history_rec << Entry();
history_send << Entry();
unlock();
setDisconnectTimeout(3.);
}
void PIDiagnostics::received(int size, bool correct) {
lock();
rec_once = 1;
Entry &e(history_send.back());
if (correct) {
float el = tm.elapsed_s();
tm.reset();
if (el > 0.f) immediate_freq = ifreq = 1.f / el;
else immediate_freq = ifreq = 0.f;
receive_count++;
receive_bytes += size;
e.cnt_ok++;
e.bytes_ok += size;
count_recv++;
bytes_recv += size;
} else {
immediate_freq = ifreq = 0.f;
wrong_count++;
wrong_bytes += size;
e.cnt_fail++;
e.bytes_fail += size;
count_wrong++;
bytes_wrong += size;
}
addToHistory(history_rec, size, correct);
e.empty = false;
unlock();
}
void PIDiagnostics::sended(int size) {
lock();
send_count++;
send_bytes += size;
addToHistory(history_send, size);
Entry &e(history_send.back());
e.cnt_ok++;
e.bytes_ok += size;
count_send++;
bytes_send += size;
e.empty = false;
unlock();
}
void PIDiagnostics::tick(void * data, int delimiter) {
lock();
checkHistory(history_rec);
checkHistory(history_send);
PIDiagnostics::Quality diag;
immediate_freq = ifreq;
ifreq = 0.f;
int bps[2];
int cpckt[2];
bps[0] = bps[1] = 0;
cpckt[0] = cpckt[1] = 0;
packets_in_sec = packets_out_sec = 0;
piForeachC (Entry & e, history_rec) {
if (e.ok) {
bps[0] += e.bytes;
packets_in_sec++;
}
cpckt[e.ok ? 1 : 0]++;
}
piForeachC (Entry & e, history_send) {
bps[1] += e.bytes;
packets_out_sec++;
}
bytes_in_sec = bps[0] / disconn_;
bytes_out_sec = bps[1] / disconn_;
packets_in_sec /= disconn_;
packets_out_sec /= disconn_;
speedIn = PIString::readableSize(bytes_in_sec) + "/s";
speedOut = PIString::readableSize(bytes_out_sec) + "/s";
int arc = cpckt[0] + cpckt[1];
int tcnt = 0;
Entry send = calcHistory(history_send, &tcnt);
Entry recv = calcHistory(history_rec, &tcnt);
float it = disconn_ * (float(tcnt) / history_rec.size());
float hz = interval() / 1000.f;
integral_freq = recv.cnt_ok / it;
packets_recv_sec = ullong(float(recv.cnt_ok) / it);
packets_send_sec = ullong(float(send.cnt_ok) / it);
bytes_recv_sec = ullong(double(recv.bytes_ok) / it);
bytes_send_sec = ullong(double(send.bytes_ok) / it);
immediate_freq = double(history_rec.back().cnt_ok) / hz;
speedRecv = PIString::readableSize(ullong(double(history_rec.back().bytes_ok) / hz)) + "/s";
speedSend = PIString::readableSize(ullong(double(history_send.back().bytes_ok) / hz)) + "/s";
int arc = recv.cnt_ok + recv.cnt_fail;
float good_percents = 0.f;
if (arc > 0) good_percents = (float)cpckt[1] / arc * 100.f;
if (disconn_ > 0.) integral_freq = cpckt[1] / disconn_;
else integral_freq = 0.;
if (rec_once == 0) {
if (arc > 0) good_percents = (float)recv.cnt_ok / arc * 100.f;
PIDiagnostics::Quality diag;
if (tcnt == 0) {
diag = PIDiagnostics::Unknown;
} else {
if (good_percents == 0.f) diag = PIDiagnostics::Failure;
@@ -131,39 +121,41 @@ void PIDiagnostics::tick(void * data, int delimiter) {
qualityChanged(diag, qual);
qual = diag;
}
history_rec.dequeue();
history_send.dequeue();
Entry e;
if (diag != PIDiagnostics::Unknown) e.empty = false;
history_rec.enqueue(e);
history_send.enqueue(e);
unlock();
}
void PIDiagnostics::addToHistory(PIVector<PIDiagnostics::Entry> & hist, int bytes, bool ok) {
PIDiagnostics::Entry PIDiagnostics::calcHistory(PIQueue<Entry> &hist, int * cnt) {
Entry e;
e.time = PISystemTime::current(true);
e.bytes = bytes;
e.ok = ok;
checkHistory(hist);
hist << e;
}
void PIDiagnostics::checkHistory(PIVector< PIDiagnostics::Entry > & hist) {
PISystemTime ctm = PISystemTime::current(true);
*cnt = 0;
for (int i = 0; i < hist.size_s(); ++i) {
if ((ctm - hist[i].time).abs() > disconn_st) {
hist.remove(i);
--i;
}
e.bytes_ok += hist[i].bytes_ok;
e.bytes_fail += hist[i].bytes_fail;
e.cnt_ok += hist[i].cnt_ok;
e.cnt_fail += hist[i].cnt_fail;
if (!e.empty) *cnt++;
}
return e;
}
void PIDiagnostics::propertyChanged(const PIString &) {
disconn_ = property("disconnectTimeout").toFloat();
changeDisconnectTimeout();
float disct = property("disconnectTimeout").toFloat();
changeDisconnectTimeout(disct);
}
void PIDiagnostics::changeDisconnectTimeout() {
void PIDiagnostics::changeDisconnectTimeout(float disct) {
lock();
disconn_st = PISystemTime::fromSeconds(disconn_);
disconn_ = piMaxf(disct, interval());
int hist_size = piMaxi(int(disconn_ * 1000 / interval()), 1);
history_rec.resize(hist_size);
history_send.resize(hist_size);
unlock();
}

View File

@@ -51,7 +51,7 @@ public:
float disconnectTimeout() const {return disconn_;}
//! Returns period of full disconnect in seconds and period of averaging frequency
void setDisconnectTimeout(float s) {setProperty("disconnectTimeout", s); disconn_ = s; changeDisconnectTimeout();}
void setDisconnectTimeout(float s) {setProperty("disconnectTimeout", s); changeDisconnectTimeout(s);}
//! Returns immediate receive frequency, packets/s
@@ -61,43 +61,43 @@ public:
float integralFrequency() const {return integral_freq;}
//! Returns correct received packets per second
ullong receiveCountPerSec() const {return packets_in_sec;}
ullong receiveCountPerSec() const {return packets_recv_sec;}
//! Returns sended packets per second
ullong sendCountPerSec() const {return packets_out_sec;}
ullong sendCountPerSec() const {return packets_send_sec;}
//! Returns correct received bytes per second
ullong receiveBytesPerSec() const {return bytes_in_sec;}
ullong receiveBytesPerSec() const {return bytes_recv_sec;}
//! Returns sended bytes per second
ullong sendBytesPerSec() const {return bytes_out_sec;}
ullong sendBytesPerSec() const {return bytes_send_sec;}
//! Returns overall correct received bytes
ullong receiveBytes() const {return receive_bytes;}
ullong receiveBytes() const {return bytes_recv;}
//! Returns overall wrong received bytes
ullong wrongBytes() const {return wrong_bytes;}
ullong wrongBytes() const {return bytes_wrong;}
//! Returns overall sended bytes
ullong sendBytes() const {return send_bytes;}
ullong sendBytes() const {return bytes_send;}
//! Returns overall correct received packets count
ullong receiveCount() const {return receive_count;}
ullong receiveCount() const {return count_recv;}
//! Returns overall wrong received packets count
ullong wrongCount() const {return wrong_count;}
ullong wrongCount() const {return count_wrong;}
//! Returns overall sended packets count
ullong sendCount() const {return send_count;}
ullong sendCount() const {return count_send;}
//! Returns connection quality
PIDiagnostics::Quality quality() const {return qual;}
//! Returns receive speed in format "n {B|kB|MB|GB|TB}/s"
PIString receiveSpeed() const {return speedIn;}
PIString receiveSpeed() const {return speedRecv;}
//! Returns send speed in format "n {B|kB|MB|GB|TB}/s"
PIString sendSpeed() const {return speedOut;}
PIString sendSpeed() const {return speedSend;}
//! Returns immediate receive frequency pointer, packets/s. Useful for output to PIConsole
@@ -107,46 +107,46 @@ public:
const float * integralFrequency_ptr() const {return &integral_freq;}
//! Returns correct received packets per second pointer. Useful for output to PIConsole
const ullong * receiveCountPerSec_ptr() const {return &packets_in_sec;}
const ullong * receiveCountPerSec_ptr() const {return &packets_recv_sec;}
//! Returns sended packets per second pointer. Useful for output to PIConsole
const ullong * sendCountPerSec_ptr() const {return &packets_out_sec;}
const ullong * sendCountPerSec_ptr() const {return &packets_send_sec;}
//! Returns correct received bytes per second pointer. Useful for output to PIConsole
const ullong * receiveBytesPerSec_ptr() const {return &bytes_in_sec;}
const ullong * receiveBytesPerSec_ptr() const {return &bytes_recv_sec;}
//! Returns sended bytes per second pointer. Useful for output to PIConsole
const ullong * sendBytesPerSec_ptr() const {return &bytes_out_sec;}
const ullong * sendBytesPerSec_ptr() const {return &bytes_send_sec;}
//! Returns overall correct received bytes pointer. Useful for output to PIConsole
const ullong * receiveBytes_ptr() const {return &receive_bytes;}
const ullong * receiveBytes_ptr() const {return &bytes_recv;}
//! Returns overall wrong received bytes pointer. Useful for output to PIConsole
const ullong * wrongBytes_ptr() const {return &wrong_bytes;}
const ullong * wrongBytes_ptr() const {return &bytes_wrong;}
//! Returns overall sended bytes pointer. Useful for output to PIConsole
const ullong * sendBytes_ptr() const {return &send_bytes;}
const ullong * sendBytes_ptr() const {return &bytes_send;}
//! Returns overall correct received packets count pointer. Useful for output to PIConsole
const ullong * receiveCount_ptr() const {return &receive_count;}
const ullong * receiveCount_ptr() const {return &count_recv;}
//! Returns overall wrong received packets count pointer. Useful for output to PIConsole
const ullong * wrongCount_ptr() const {return &wrong_count;}
const ullong * wrongCount_ptr() const {return &count_wrong;}
//! Returns overall sended packets count pointer. Useful for output to PIConsole
const ullong * sendCount_ptr() const {return &send_count;}
const ullong * sendCount_ptr() const {return &count_send;}
//! Returns connection quality pointer. Useful for output to PIConsole
const int * quality_ptr() const {return (int * )&qual;}
//! Returns receive speed pointer in format "n {B|kB|MB|GB|TB}/s". Useful for output to PIConsole
const PIString * receiveSpeed_ptr() const {return &speedIn;}
const PIString * receiveSpeed_ptr() const {return &speedRecv;}
//! Returns send speed pointer in format "n {B|kB|MB|GB|TB}/s". Useful for output to PIConsole
const PIString * sendSpeed_ptr() const {return &speedOut;}
const PIString * sendSpeed_ptr() const {return &speedSend;}
EVENT_HANDLER0(void, start) {start(1000.); changeDisconnectTimeout();}
EVENT_HANDLER1(void, start, double, msecs) {if (msecs > 0.) {PITimer::start(msecs); changeDisconnectTimeout();}}
EVENT_HANDLER0(void, start) {start(100.); changeDisconnectTimeout(disconn_);}
EVENT_HANDLER1(void, start, double, msecs) {if (msecs > 0.) {PITimer::start(msecs); changeDisconnectTimeout(disconn_);}}
EVENT_HANDLER0(void, stop) {PITimer::stop();}
EVENT_HANDLER0(void, reset);
@@ -182,27 +182,26 @@ public:
private:
struct Entry {
Entry() {ok = true; bytes = 0;}
PISystemTime time;
bool ok;
int bytes;
Entry() {bytes_ok = bytes_fail = 0; cnt_ok = cnt_fail = 0; empty = true;}
ullong bytes_ok;
ullong bytes_fail;
uint cnt_ok;
uint cnt_fail;
bool empty;
};
void tick(void * data, int delimiter);
void addToHistory(PIVector<Entry> & hist, int bytes, bool ok = true);
void checkHistory(PIVector<Entry> & hist);
Entry calcHistory(PIQueue<Entry> & hist, int * cnt);
void propertyChanged(const PIString & );
void changeDisconnectTimeout();
void changeDisconnectTimeout(float disct);
PIDiagnostics::Quality qual;
PIString speedIn, speedOut;
float ifreq, immediate_freq, integral_freq, disconn_;
PIVector<Entry> history_rec, history_send;
PISystemTime disconn_st;
PITimeMeasurer tm;
char cur_pckt, rec_once;
ullong wrong_count, receive_count, send_count, wrong_bytes, receive_bytes, send_bytes;
ullong packets_in_sec, packets_out_sec, bytes_in_sec, bytes_out_sec;
PIString speedRecv, speedSend;
float ifreq, immediate_freq, integral_freq;
PIQueue<Entry> history_rec, history_send;
float disconn_;
ullong count_wrong, count_recv, count_send, bytes_wrong, bytes_recv, bytes_send;
ullong packets_recv_sec, packets_send_sec, bytes_recv_sec, bytes_send_sec;
};

View File

@@ -228,7 +228,7 @@ PIVector<PIFile::FileInfo> PIDir::entries() {
PIString p(dp);
if (dp == ".") dp.clear();
else if (!dp.endsWith(separator)) dp += separator;
//piCout << "entries from" << p;
// piCout << "start entries from" << p;
#ifdef WINDOWS
if (dp == separator) {
char letters[1024];
@@ -287,6 +287,7 @@ PIVector<PIFile::FileInfo> PIDir::entries() {
}
free(list);
#endif
// piCout << "end entries from" << p;
return l;
}

View File

@@ -42,7 +42,7 @@ public:
UDP /** UDP - User Datagram Protocol */ ,
TCP_Client /** TCP client - allow connection to TCP server */ ,
TCP_Server /** TCP server - receive connections from TCP clients */ ,
TCP_SingleTCP
TCP_SingleTCP /** TCP client single mode - connect & send & disconnect, on each packet */
};
//! \brief Parameters of %PIEthernet

View File

@@ -211,7 +211,7 @@ PIByteArray PIFileTransfer::buildPacket(Part p) {
bytes_file_cur = p.start;
return ba;
}
return PIByteArray();
return ba;
}
@@ -286,11 +286,12 @@ void PIFileTransfer::receive_finished(bool ok) {
if (!ok || !user_ok) {
pftheader.session_id--;
started_ = false;
piCoutObj << "file cancel";
receiveFilesFinished(false);
} else resume();
}
if (pftheader.step == pft_Data) {
//piCout << "file close";
//piCoutObj << "file close";
work_file.close();
started_ = false;
receiveFilesFinished(ok);
@@ -299,7 +300,9 @@ void PIFileTransfer::receive_finished(bool ok) {
void PIFileTransfer::send_finished(bool ok) {
//piCout << "file close";
work_file.close();
started_ = false;
// piCoutObj << "file close";
if (pftheader.step == pft_Data) {
work_file.close();
started_ = false;
}
}

View File

@@ -0,0 +1,129 @@
#ifndef PIPIPELINETHREAD_H
#define PIPIPELINETHREAD_H
#include "pithread.h"
template <typename T1, typename T2>
class PIPipelineThread : public PIThread
{
PIOBJECT_SUBCLASS(PIPipelineThread, PIThread)
public:
PIPipelineThread() {
cnt = 0;
max_size = 0;
wait_next_pipe = true;
next_overload = false;
}
~PIPipelineThread() {
stop();
if (!waitForFinish(1000)) terminate();
}
template <typename T3>
void connectTo(PIPipelineThread<T2, T3> * next) {
CONNECT2(void, T2, bool *, this, calculated, next, enqueue)
}
EVENT2(calculated, const T2 &, v, bool *, overload)
EVENT_HANDLER1(void, enqueue, const T1 &, v) {enqueue(v, 0);}
EVENT_HANDLER2(void, enqueue, const T1 &, v, bool *, overload) {
mutex.lock();
if (max_size > 0 && in.size() < max_size) {
in.enqueue(v);
if (overload) *overload = false;
} else {
if (overload) *overload = true;
}
mutex.unlock();
}
const ullong * counterPtr() const {return &cnt;}
ullong counter() const {return cnt;}
bool isEmpty() {
bool ret;
mutex.lock();
ret = in.isEmpty();
mutex.unlock();
return ret;
}
int queSize() {
int ret;
mutex.lock();
ret = in.size();
mutex.unlock();
return ret;
}
void clear() {
mutex.lock();
in.clear();
next_overload = false;
mutex.unlock();
}
void stopCalc() {
if (isRunning()) {
stop();
if (!waitForFinish(100)) terminate();
mutex.unlock();
mutex_l.unlock();
}
}
T2 getLast() {
T2 ret;
mutex_l.lock();
ret = last;
mutex_l.unlock();
return ret;
}
uint maxQueSize() {
uint ret;
mutex.lock();
ret = max_size;
mutex.unlock();
return ret;
}
void setMaxQueSize(uint count) {
mutex.lock();
count = max_size;
if (max_size > 0 && in.size() > max_size) in.resize(max_size);
mutex.unlock();
}
bool isWaitNextPipe() {return wait_next_pipe;}
void setWaitNextPipe(bool wait) {wait_next_pipe = wait;}
protected:
virtual T2 calc(const T1 &v, bool &ok) = 0;
private:
void begin() {cnt = 0;}
void run() {
mutex.lock();
if (in.isEmpty()) {
mutex.unlock();
piMSleep(1);
return;
}
if (next_overload) {
calculated(last, &next_overload);
} else {
T1 t = in.dequeue();
bool ok = true;
mutex.unlock();
T2 r = calc(t, ok);
mutex_l.lock();
last = r;
mutex_l.unlock();
cnt++;
if (ok) calculated(r, &next_overload);
}
}
PIQueue<T1> in;
T2 last;
PIMutex mutex;
PIMutex mutex_l;
ullong cnt;
uint max_size;
bool wait_next_pipe;
bool next_overload;
};
#endif // PIPIPELINETHREAD_H

View File

@@ -8,17 +8,25 @@ using namespace PICoutManipulators;
class UDPFileTransfer: public PITimer {
PIOBJECT_SUBCLASS(UDPFileTransfer, PITimer)
public:
UDPFileTransfer(const PIString &src_ip_port, const PIString &dst_ip_port) {
UDPFileTransfer(const PIString &src_ip_port, const PIString &dst_ip_port, bool test = false) {
quet_ = false;
test_ = test;
eth.setReadAddress(src_ip_port);
eth.setSendAddress(dst_ip_port);
ft.setPacketSize(8192);
ft.setName("PIFT");
CONNECTU(&ft, sendRequest, this, ftsend);
CONNECTU(&ft, sendFilesStarted, this, ftevent);
CONNECTU(&ft, receiveFilesStarted, this, ftevent);
CONNECTU(&ft, sendFilesFinished, this, ftevent);
CONNECTU(&ft, receiveFilesFinished, this, ftevent);
if (test_) {
testt.setPacketSize(1280);
testt.setName("TEST");
//testt.setTimeout(0.1);
CONNECTU(&testt, sendRequest, this, ftsend);
} else {
ft.setPacketSize(1280);
ft.setName("PIFT");
CONNECTU(&ft, sendRequest, this, ftsend);
CONNECTU(&ft, sendFilesStarted, this, ftevent);
CONNECTU(&ft, receiveFilesStarted, this, ftevent);
CONNECTU(&ft, sendFilesFinished, this, ftevent);
CONNECTU(&ft, receiveFilesFinished, this, ftevent);
}
CONNECTU(&eth, threadedReadEvent, this, received);
start(50);
eth.setParameter(PIEthernet::SeparateSockets);
@@ -31,7 +39,14 @@ public:
ft.send(file);
}
void startTest() {
PIByteArray ba(1024*1024*64);
testt.send(ba);
}
PIFileTransfer ft;
PIDataTransfer testt;
bool test_;
private:
PIEthernet eth;
@@ -44,30 +59,50 @@ private:
ft.stopSend();
ft.stopReceive();
}
// piCout << (int)ft.diagnostic().quality();
// piCout << (int)ft.diagnostic().quality();
}
if (testt.isSending() || testt.isReceiving()) ftevent();
}
EVENT_HANDLER(void, ftevent) {
if (test_) {
#ifdef WINDOWS
piCout
#else
PICout(AddSpaces) << ClearLine
#endif
<< testt.stateString()
<< testt.diagnostic().receiveSpeed()
<< testt.diagnostic().sendSpeed()
<< "("
<< PIString::readableSize(testt.bytesCur()) << "/" << PIString::readableSize(testt.bytesAll())
<< ")"
#ifndef WINDOWS
<< Flush
#endif
;
return;
}
if (quet_) return;
#ifdef WINDOWS
piCout
#else
piCout
#else
PICout(AddSpaces) << ClearLine
#endif
#endif
<< ft.stateString() << ft.curFile()
<< PIString::readableSize(ft.diagnostic().receiveBytesPerSec()) + "/s"
<< PIString::readableSize(ft.diagnostic().sendBytesPerSec()) + "/s"
<< ft.diagnostic().receiveSpeed()
<< ft.diagnostic().sendSpeed()
<< "(" << PIString::readableSize(ft.bytesFileCur()) << "/" << PIString::readableSize(ft.bytesFileAll()) << ", "
<< PIString::readableSize(ft.bytesCur()) << "/" << PIString::readableSize(ft.bytesAll()) << ")"
<< "ETA" << (ft.diagnostic().receiveBytesPerSec() > 0 ?
PIString::fromNumber(PISystemTime::fromSeconds((ft.bytesAll() - ft.bytesCur()) /
ft.diagnostic().receiveBytesPerSec()).toSeconds())
: PIString("unknown"))
#ifndef WINDOWS
#ifndef WINDOWS
<< Flush
#endif
#endif
;
}
@@ -77,6 +112,10 @@ private:
EVENT_HANDLER2(void, received, uchar * , readed, int, size) {
PIByteArray ba(readed, size);
if(test_) {
testt.received(ba);
return;
}
ft.received(ba);
}
};
@@ -95,13 +134,14 @@ void usage() {
piCout << "-p " << Green << "- UDP port for transfer, by default 50005";
piCout << "-d <work_dir> " << Green << "- directory, where place received files";
piCout << "<path> " << Green << "- add path to send, if no path, then \"pift\" working in receive mode";
piCout << "-t " << Green << "- test mode";
}
int main (int argc, char * argv[]) {
PICLI cli(argc, argv);
PIINTROSPECTION_START
cli.setOptionalArgumentsCount(-1);
cli.setOptionalArgumentsCount(-1);
cli.addArgument("send", true);
cli.addArgument("receive", true);
cli.addArgument("dir", true);
@@ -109,6 +149,28 @@ int main (int argc, char * argv[]) {
cli.addArgument("help");
cli.addArgument("quet");
cli.addArgument("fullpath");
cli.addArgument("test", true);
if (cli.hasArgument("test")) {
PIString src;
PIString dst;
bool send_ = false;
if (cli.argumentValue("test") == "1") {
src = "127.0.0.1:50005";
dst = "127.0.0.1:50006";
send_ = true;
} else {
src = "127.0.0.1:50006";
dst = "127.0.0.1:50005";
}
if (cli.hasArgument("send") && cli.hasArgument("receive") && cli.hasArgument("fullpath")) {
src = cli.argumentValue("receive");
dst = cli.argumentValue("send");
}
UDPFileTransfer tuf(src, dst, true);
if (send_) tuf.startTest();
WAIT_FOREVER;
return 0;
}
if ((!cli.hasArgument("send") || !cli.hasArgument("receive")) || cli.hasArgument("help")) {
usage();
return 0;
@@ -137,8 +199,9 @@ int main (int argc, char * argv[]) {
} else {
piCout << "wait for receiving";
}
kbd.start();
WAIT_FOR_EXIT
PICout(0) << "\n";
PICout(0) << "\n";
return 0;
}