add diag to basetransfer

fix pibinarylog delay

git-svn-id: svn://db.shs.com.ru/pip@63 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5
This commit is contained in:
2015-04-07 06:52:41 +00:00
parent 8b7752face
commit e16bd8f652
7 changed files with 151 additions and 14 deletions

View File

@@ -12,6 +12,7 @@ PIBaseTransfer::PIBaseTransfer(): crc(standardCRC_16()) {
bytes_all = bytes_cur = 0;
replies_cnt = send_queue = 0;
timeout_ = 10.;
diag.setDisconnectTimeout(10.);
packets_count = 32;
setPacketSize(4096);
srand(PISystemTime::current().toMilliseconds());
@@ -41,11 +42,15 @@ void PIBaseTransfer::stopReceive() {
void PIBaseTransfer::received(PIByteArray data) {
packet_header_size = sizeof(PacketHeader) + customHeader().size();
// piCoutObj << "receive" << data.size();
if (data.size() < sizeof(PacketHeader)) return;
if (data.size() < sizeof(PacketHeader)) {
diag.received(data.size(), false);
return;
}
PacketHeader h;
data >> h;
PacketType pt = (PacketType)h.type;
// piCoutObj << "receive" << h.session_id << h.type << h.id;
diag.received(data.size(), true);
switch (pt) {
case pt_Unknown: break;
case pt_Data:
@@ -117,6 +122,8 @@ void PIBaseTransfer::received(PIByteArray data) {
replies.fill(pt_Unknown);
is_receiving = true;
break_ = false;
diag.reset();
diag.start();
receiveStarted();
replies_cnt = send_queue = 0;
state_string = "receiving";
@@ -133,6 +140,8 @@ bool PIBaseTransfer::send_process() {
packet_header_size = sizeof(PacketHeader) + customHeader().size();
break_ = false;
is_sending = true;
diag.reset();
diag.start();
sendStarted();
replies.resize(session.size() + 1);
replies.fill(pt_Unknown);
@@ -154,6 +163,7 @@ bool PIBaseTransfer::send_process() {
}
stm.reset();
ba = build_packet(i);
diag.sended(ba.size());
sendRequest(ba);
send_queue++;
if (break_) return finish_send(false);
@@ -178,6 +188,7 @@ bool PIBaseTransfer::send_process() {
continue;
}
ba = build_packet(chk - 1);
diag.sended(ba.size());
sendRequest(ba);
send_queue++;
}
@@ -275,6 +286,7 @@ void PIBaseTransfer::sendReply(PacketType reply) {
header.type = reply;
PIByteArray ba;
ba << header;
diag.sended(ba.size());
sendRequest(ba);
}
@@ -291,6 +303,7 @@ bool PIBaseTransfer::getStartRequest() {
ba << st;
state_string = "send request";
while (tm.elapsed_s() < timeout_) {
diag.sended(ba.size());
sendRequest(ba);
if (break_) return false;
//piCoutObj << send_replyes[0];
@@ -372,6 +385,7 @@ bool PIBaseTransfer::finish_send(bool ok) {
header.id = 0;
if (!ok) sendBreak(header.session_id);
else sendReply(pt_ReplySuccess);
diag.stop();
sendFinished(ok);
bytes_all = bytes_cur = 0;
return ok;
@@ -384,6 +398,7 @@ void PIBaseTransfer::finish_receive(bool ok) {
// piCoutObj << state_string << PIString::readableSize(bytes_all);
is_receiving = false;
if (!ok) sendBreak(header.session_id);
diag.stop();
receiveFinished(ok);
bytes_all = bytes_cur = 0;
}

View File

@@ -3,6 +3,7 @@
#include "picrc.h"
#include "pitimer.h"
#include "pidiagnostics.h"
class PIBaseTransfer: public PIObject
{
@@ -38,7 +39,7 @@ public:
void setPacketSize(int size) {packet_size = size;}
int packetSize() const {return packet_size;}
void setTimeout(double sec) {timeout_ = sec;}
void setTimeout(double sec) {timeout_ = sec; diag.setDisconnectTimeout(sec);}
double timeout() const {return timeout_;}
const PIString & stateString() const {return state_string;}
@@ -47,6 +48,7 @@ public:
const PIString * stateString_ptr() const {return &state_string;}
const llong * bytesAll_ptr() const {return &bytes_all;}
const llong * bytesCur_ptr() const {return &bytes_cur;}
const PIDiagnostics &diagnostic() {return diag;}
EVENT(receiveStarted)
EVENT1(receiveFinished, bool, ok)
@@ -89,7 +91,8 @@ private:
PITimeMeasurer send_tm;
PacketHeader header;
CRC_16 crc;
int replies_cnt, send_queue;
int replies_cnt, send_queue;
PIDiagnostics diag;
void processData(int id, PIByteArray &data);
PIByteArray build_packet(int id);

View File

@@ -142,11 +142,20 @@ bool PIBinaryLog::threadedRead(uchar *readed, int size) {
break;
case PlayVariableSpeed:
delay = lastrecord.timestamp.toMilliseconds() - play_time;
delay /= play_speed;
double cdelay;
int dtc;
if (is_started) {
if (delay > 0)
/// TODO: Sleep by steps (about 100ms)
PISystemTime::fromMilliseconds(delay).sleep();
if (delay > 0) {
cdelay = delay / play_speed;
dtc = int(cdelay) /100;
for (int j=0; j<dtc; j++) {
cdelay = delay / play_speed;
dtc = int(cdelay) /100;
PISystemTime::fromMilliseconds(100).sleep();
}
cdelay = cdelay - dtc*100;
PISystemTime::fromMilliseconds(cdelay).sleep();
}
} else is_started = true;
play_time = lastrecord.timestamp.toMilliseconds();
break;

View File

@@ -28,7 +28,7 @@
#define PIBINARYLOG_VERSION 0x31
namespace __S__PIBinaryLog {
static const uchar binlog_sig[] = {'B','I','N','L','O','G'};
};
}
#define PIBINARYLOG_SIGNATURE_SIZE sizeof(__S__PIBinaryLog::binlog_sig)
/// TODO: Create static functions to split and join binlog files