git-svn-id: svn://db.shs.com.ru/pip@267 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5

This commit is contained in:
2016-09-20 20:12:06 +00:00
parent be6aaf26a4
commit 24cbff2e0c
6 changed files with 305 additions and 115 deletions

View File

@@ -46,7 +46,7 @@ REGISTER_DEVICE(PIBinaryLog)
PIBinaryLog::PIBinaryLog() {
setThreadedReadBufferSize(65536);
is_started = is_indexed = false;
is_started = is_indexed = is_pause = false;
current_index = -1;
setPlaySpeed(1.);
setDefaultID(1);
@@ -61,6 +61,7 @@ PIBinaryLog::PIBinaryLog() {
setFilePrefix(PIString());
setRapidStart(false);
file.setName("__S__PIBinaryLog::file");
// piCoutObj << "created";
}
@@ -71,6 +72,7 @@ bool PIBinaryLog::openDevice() {
is_started = false;
is_thread_ok = true;
is_indexed = false;
is_pause = false;
index.clear();
index_pos.clear();
if (mode_ == ReadWrite) {
@@ -121,6 +123,7 @@ bool PIBinaryLog::openDevice() {
if (!rapidStart()) is_started = true;
}
startlogtime = PISystemTime::current();
pause_time = PISystemTime();
return true;
}
@@ -146,10 +149,18 @@ bool PIBinaryLog::threadedRead(uchar *readed, int size) {
double delay;
switch (play_mode) {
case PlayRealTime:
pausemutex.lock();
if (is_pause) {
piMSleep(100);
pausemutex.unlock();
return false;
} else if (pause_time > PISystemTime()) {
startlogtime += pause_time;
pause_time = PISystemTime();
}
pausemutex.unlock();
pt = PISystemTime::current() - startlogtime;
if (is_started) {
if ((lastrecord.timestamp - pt).toSeconds() > 100. || (lastrecord.timestamp - pt).toSeconds() < -100.)
piCoutObj << PISystemTime::current() << startlogtime << pt << lastrecord.timestamp << (lastrecord.timestamp - pt);
if (lastrecord.timestamp > pt)
(lastrecord.timestamp - pt).sleep();
} else {
@@ -163,6 +174,10 @@ bool PIBinaryLog::threadedRead(uchar *readed, int size) {
double cdelay;
int dtc;
if (is_started) {
if (is_pause) {
piMSleep(100);
return false;
}
if (delay > 0) {
cdelay = delay * play_speed;
dtc = int(cdelay) /100;
@@ -182,8 +197,13 @@ bool PIBinaryLog::threadedRead(uchar *readed, int size) {
play_time = lastrecord.timestamp.toMilliseconds();
break;
case PlayStaticDelay:
if (is_started) play_delay.sleep();
else is_started = true;
if (is_started) {
if (is_pause) {
piMSleep(100);
return false;
}
play_delay.sleep();
} else is_started = true;
break;
default:
return false;
@@ -229,6 +249,15 @@ void PIBinaryLog::createNewFile(const PIString &path) {
}
void PIBinaryLog::setPause(bool pause) {
pausemutex.lock();
is_pause = pause;
if (pause) pause_time = PISystemTime::current();
else pause_time = PISystemTime::current() - pause_time;
pausemutex.unlock();
}
int PIBinaryLog::writeBinLog(int id, const void *data, int size) {
if (size <= 0 || !canWrite()) return -1;
if (id == 0) {
@@ -247,6 +276,7 @@ int PIBinaryLog::writeBinLog(int id, const void *data, int size) {
break;
default: break;
}
if (is_pause) return 0;
PIByteArray logdata;
logdata << id << size << (PISystemTime::current() - startlogtime) << PIByteArray::RawData(data, size);
int res = file.write(logdata.data(), logdata.size());
@@ -503,7 +533,21 @@ PIBinaryLog::BinLogInfo PIBinaryLog::getLogInfo(const PIString & path) {
PIString PIBinaryLog::constructFullPath() const {
PIString ret(fullPathPrefix() + "://");
ret << logDir() << ":" << filePrefix() << ":" << defaultID();
ret << logDir() << ":" << filePrefix() << ":" << defaultID() << ":";
switch (play_mode) {
case PlayRealTime:
ret << "RT";
break;
case PlayVariableSpeed:
ret << PIString::fromNumber(playSpeed()) << "X";
break;
case PlayStaticDelay:
ret << PIString::fromNumber(playDelay().toMilliseconds()) << "M";
break;
default:
ret << "RT";
break;
}
return ret;
}
@@ -573,12 +617,18 @@ void PIBinaryLog::configureFromFullPath(const PIString & full_path) {
case 0: setLogDir(p); break;
case 1: setFilePrefix(p); break;
case 2: setDefaultID(p.toInt()); break;
case 3:
if (p.toUpperCase() == "RT") setPlayRealTime();
if (p.toUpperCase().right(1) == "X") setPlaySpeed((p.left(p.size() - 1)).toDouble());
if (p.toUpperCase().right(1) == "M") setPlayDelay(PISystemTime::fromMilliseconds((p.left(p.size() - 1)).toDouble()));
break;
}
}
// piCoutObj << "configured";
}
void PIBinaryLog::propertyChanged(const PIString &) {
void PIBinaryLog::propertyChanged(const PIString &s) {
default_id = property("defaultID").toInt();
rapid_start = property("rapidStart").toBool();
play_mode = (PlayMode)property("playMode").toInt();
@@ -589,5 +639,6 @@ void PIBinaryLog::propertyChanged(const PIString &) {
split_time = property("splitTime").toSystemTime();
split_size = property("splitFileSize").toLLong();
split_count = property("splitRecordCount").toInt();
// piCoutObj << "propertyChanged" << s << play_mode;
}

View File

@@ -164,6 +164,8 @@ public:
//! Also this function set \a splitMode to \a SplitCount
void setSplitRecordCount(int count) {setSplitMode(SplitCount); setProperty("splitRecordCount", count);}
//! Set pause while playing via \a threadedRead or writing via write
void setPause(bool pause);
//! Write one record to BinLog file, with ID = id, id must be greather than 0
int writeBinLog(int id, PIByteArray data) {return writeBinLog(id, data.data(), data.size_s());}
@@ -192,6 +194,9 @@ public:
//! Returns if BinLog file is empty
bool isEmpty() const {return (file.size() <= PIBINARYLOG_SIGNATURE_SIZE + 1);}
//! Returns BinLog pause status
bool isPause() const {return is_pause;}
//! Returns if BinLog file is empty
int lastReadedID() const {return lastrecord.id;}
@@ -293,12 +298,12 @@ private:
SplitMode split_mode;
PIFile file;
BinLogRecord lastrecord;
PISystemTime startlogtime, play_delay, split_time;
PIMutex logmutex;
PISystemTime startlogtime, play_delay, split_time, pause_time;
PIMutex logmutex, pausemutex;
double play_time, play_speed;
llong split_size;
int write_count, split_count, default_id, current_index;
bool is_started, is_thread_ok, is_indexed, rapid_start;
bool is_started, is_thread_ok, is_indexed, rapid_start, is_pause;
};
//! \relatesalso PICout \relatesalso PIBinaryLog::BinLogInfo \brief Output operator to PICout

View File

@@ -18,6 +18,7 @@
*/
#include "pipeer.h"
#include "piconfig.h"
#define _PIPEER_MSG_SIZE 4000
#define _PIPEER_MSG_TTL 100
@@ -26,6 +27,7 @@
#define _PIPEER_LOOPBACK_PORT_S 13313
#define _PIPEER_LOOPBACK_PORT_E (13313+32)
#define _PIPEER_MULTICAST_PORT 13360
#define _PIPEER_TCP_PORT _PIPEER_MULTICAST_PORT
#define _PIPEER_BROADCAST_PORT 13361
#define _PIPEER_TRAFFIC_PORT_S 13400
#define _PIPEER_TRAFFIC_PORT_E 14000
@@ -132,7 +134,7 @@ PIString PIPeer::PeerInfo::fastestAddress() const {
REGISTER_DEVICE(PIPeer)
PIPeer::PIPeer(const PIString & n): PIIODevice() {
PIPeer::PIPeer(const PIString & n): PIIODevice(), diag_d(false), diag_s(false), eth_tcp_srv(PIEthernet::TCP_Server), eth_tcp_cli(PIEthernet::TCP_Client) {
destroyed = false;
PIMutexLocker mbl(mc_mutex);
PIMutexLocker ethl(eth_mutex);
@@ -171,6 +173,8 @@ PIPeer::~PIPeer() {
piForeach (PIEthernet * i, eths_bcast)
i->stopThreadedRead();
eth_lo.stopThreadedRead();
eth_tcp_srv.stopThreadedRead();
eth_tcp_cli.stopThreadedRead();
sendSelfRemove();
destroyMBcasts();
eth_send.close();
@@ -293,6 +297,18 @@ void PIPeer::initMBcasts(PIStringList al) {
break;
}
}
eth_tcp_srv.setName("__S__PIPeer_eth_TCP_Server");
eth_tcp_srv.init();
eth_tcp_srv.listen("0.0.0.0", _PIPEER_TCP_PORT, true);
CONNECTU(&eth_tcp_srv, newConnection, this, newTcpClient);
eth_tcp_srv.startThreadedRead();
eth_tcp_cli.setName("__S__PIPeer_eth_TCP_Client");
eth_tcp_cli.init();
eth_tcp_cli.setDebug(false);
tcpClientReconnect();
CONNECTU(&eth_tcp_cli, threadedReadEvent, this, mbcastRead);
CONNECTU(&eth_tcp_cli, disconnected, this, tcpClientReconnect);
eth_tcp_cli.startThreadedRead();
if (eths_mcast.isEmpty() && eths_bcast.isEmpty() && !eth_lo.isOpened()) piCoutObj << "Warning! Can`t find suitable network interface for multicast receive, check for exists at least one interface with multicasting enabled!";
// piCoutObj << "initMBcasts ok";
}
@@ -332,6 +348,7 @@ void PIPeer::destroyMBcasts() {
eth_lo.close();
eths_mcast.clear();
eths_bcast.clear();
eth_tcp_srv.stop();
}
@@ -690,6 +707,16 @@ void PIPeer::sendMBcast(const PIByteArray & ba) {
if (eth_lo.send(ba))
diag_s.sended(ba.size_s());
}
PIVector<PIEthernet * > cl = eth_tcp_srv.clients();
piForeach (PIEthernet * e, cl) {
if (e->isOpened() && e->isConnected())
if (e->send(ba))
diag_s.sended(ba.size_s());
}
if (eth_tcp_cli.isOpened() && eth_tcp_cli.isConnected()) {
if (eth_tcp_cli.send(ba))
diag_s.sended(ba.size_s());
}
// piCout << "send muticast ok";
send_mc_mutex.unlock();
}
@@ -766,7 +793,17 @@ void PIPeer::pingNeighbours() {
bool PIPeer::openDevice() {
PIConfig conf(
#ifndef WINDOWS
"/etc/pip.conf"
#else
"pip.conf"
#endif
, PIIODevice::ReadOnly);
server_ip = conf.getValue("peer_server_ip", "");
reinit();
diag_d.reset();
diag_s.reset();
PIMutexLocker ml(peers_mutex);
if (trust_peer.isEmpty())
return !peers.isEmpty();
@@ -900,6 +937,14 @@ int PIPeer::write(const void *data, int size) {
}
void PIPeer::newTcpClient(PIEthernet *client) {
client->setName("__S__PIPeer_eth_TCP_ServerClient" + client->path());
piCoutObj << "client" << client->path();
CONNECTU(client, threadedReadEvent, this, mbcastRead);
client->startThreadedRead();
}
void PIPeer::configureFromFullPath(const PIString & full_path) {
PIStringList pl = full_path.split(":");
for (int i = 0; i < pl.size_s(); ++i) {
@@ -925,6 +970,8 @@ void PIPeer::initNetwork() {
// piCoutObj << sl << self_info.addresses.size();
sl.removeAll("127.0.0.1");
initMBcasts(sl);
diag_d.start();
diag_s.start();
// piCoutObj << "initNetwork done";
}
@@ -977,3 +1024,8 @@ void PIPeer::buildMap() {
//piCout << "map" << c.name << "=" << cpath;
}
}
void PIPeer::tcpClientReconnect() {
eth_tcp_cli.connect(server_ip, _PIPEER_TCP_PORT);
}

View File

@@ -134,6 +134,7 @@ public:
void changeName(const PIString & new_name);
const PIString & trustPeerName() const {return trust_peer;}
void setTrustPeerName(const PIString & peer_name) {trust_peer = peer_name;}
void setTcpServerIP(const PIString & ip) {server_ip = ip; tcpClientReconnect();}
PIString constructFullPath() const;
int read(void * read_to, int max_size);
int write(const void * data, int size);
@@ -155,6 +156,8 @@ private:
EVENT_HANDLER2(void, timerEvent, void * , data, int, delim);
EVENT_HANDLER2(bool, sendInternal, const PIString &, to, const PIByteArray &, data);
EVENT_HANDLER2(void, dtReceived, const PIString &, from, const PIByteArray &, data);
EVENT_HANDLER1(void, newTcpClient, PIEthernet * , client);
EVENT_HANDLER(void, tcpClientReconnect);
bool hasPeer(const PIString & name) {piForeachC (PeerInfo & i, peers) if (i.name == name) return true; return false;}
bool removePeer(const PIString & name);
@@ -198,7 +201,7 @@ protected:
private:
PIVector<PIEthernet * > eths_traffic, eths_mcast, eths_bcast;
PIEthernet::InterfaceList prev_ifaces;
PIEthernet eth_send, eth_lo;
PIEthernet eth_send, eth_lo, eth_tcp_srv, eth_tcp_cli;
PITimer sync_timer;
PeerInfo self_info;
PIVector<PeerInfo> peers;
@@ -208,6 +211,7 @@ private:
PIDiagnostics diag_s, diag_d;
bool destroyed, no_timer;
PIString trust_peer;
PIString server_ip;
PIMutex read_buffer_mutex;
PIQueue<PIByteArray> read_buffer;
int read_buffer_size;