git-svn-id: svn://db.shs.com.ru/pip@275 12ceb7fc-bf1f-11e4-8940-5bc7170c53b5

This commit is contained in:
2016-10-05 12:54:10 +00:00
parent a75fbf26b5
commit 6166ec5931
3 changed files with 32 additions and 26 deletions

View File

@@ -817,6 +817,11 @@ bool PIPeer::openDevice() {
} }
bool PIPeer::closeDevice() {
return false;
}
void PIPeer::syncPeers() { void PIPeer::syncPeers() {
//piCout << "[PIPeer \"" + self_info.name + "\"] sync " << peers.size_s() << " peers"; //piCout << "[PIPeer \"" + self_info.name + "\"] sync " << peers.size_s() << " peers";
PIMutexLocker locker(eth_mutex); PIMutexLocker locker(eth_mutex);

View File

@@ -182,7 +182,7 @@ private:
bool isRemoved(const PeerInfo & pi) const {return (removed.value(pi.name) == PIPair<int, PISystemTime>(pi.cnt, pi.time));} bool isRemoved(const PeerInfo & pi) const {return (removed.value(pi.name) == PIPair<int, PISystemTime>(pi.cnt, pi.time));}
bool openDevice(); bool openDevice();
bool closeDevice() {return false;} bool closeDevice();
PIString fullPathPrefix() const {return "peer";} PIString fullPathPrefix() const {return "peer";}
void configureFromFullPath(const PIString &full_path); void configureFromFullPath(const PIString &full_path);
@@ -195,8 +195,7 @@ private:
// Data packet: 4, from, to, ticks, data_size, data // Data packet: 4, from, to, ticks, data_size, data
protected: protected:
PIMutex mc_mutex, eth_mutex, peers_mutex, send_mutex; PIMutex mc_mutex, eth_mutex, peers_mutex, send_mutex, send_mc_mutex;
PIMutex send_mc_mutex;
private: private:
PIVector<PIEthernet * > eths_traffic, eths_mcast, eths_bcast; PIVector<PIEthernet * > eths_traffic, eths_mcast, eths_bcast;

View File

@@ -25,7 +25,7 @@
#include "pithread.h" #include "pithread.h"
template <typename T1, typename T2> template <typename Tin, typename Tout>
class PIPipelineThread : public PIThread class PIPipelineThread : public PIThread
{ {
PIOBJECT_SUBCLASS(PIPipelineThread, PIThread) PIOBJECT_SUBCLASS(PIPipelineThread, PIThread)
@@ -40,13 +40,13 @@ public:
stop(); stop();
if (!waitForFinish(1000)) terminate(); if (!waitForFinish(1000)) terminate();
} }
template <typename T3> template <typename T>
void connectTo(PIPipelineThread<T2, T3> * next) { void connectTo(PIPipelineThread<Tout, T> * next) {
CONNECT2(void, T2, bool *, this, calculated, next, enqueue) CONNECT2(void, Tout, bool *, this, calculated, next, enqueue)
} }
EVENT2(calculated, const T2 &, v, bool *, overload) EVENT2(calculated, const Tout &, v, bool *, overload)
EVENT_HANDLER1(void, enqueue, const T1 &, v) {enqueue(v, 0);} EVENT_HANDLER1(void, enqueue, const Tin &, v) {enqueue(v, 0);}
EVENT_HANDLER2(void, enqueue, const T1 &, v, bool *, overload) { EVENT_HANDLER2(void, enqueue, const Tin &, v, bool *, overload) {
mutex.lock(); mutex.lock();
if (max_size > 0 && in.size() < max_size) { if (max_size > 0 && in.size() < max_size) {
in.enqueue(v); in.enqueue(v);
@@ -86,14 +86,14 @@ public:
mutex_l.unlock(); mutex_l.unlock();
} }
} }
T2 getLast() { Tout getLast() {
T2 ret; Tout ret;
mutex_l.lock(); mutex_l.lock();
ret = last; ret = last;
mutex_l.unlock(); mutex_l.unlock();
return ret; return ret;
} }
uint maxQueSize() { uint maxQueSize() {
uint ret; uint ret;
mutex.lock(); mutex.lock();
@@ -101,19 +101,19 @@ public:
mutex.unlock(); mutex.unlock();
return ret; return ret;
} }
void setMaxQueSize(uint count) { void setMaxQueSize(uint count) {
mutex.lock(); mutex.lock();
count = max_size; count = max_size;
if (max_size > 0 && in.size() > max_size) in.resize(max_size); if (max_size > 0 && in.size() > max_size) in.resize(max_size);
mutex.unlock(); mutex.unlock();
} }
bool isWaitNextPipe() {return wait_next_pipe;} bool isWaitNextPipe() {return wait_next_pipe;}
void setWaitNextPipe(bool wait) {wait_next_pipe = wait;} void setWaitNextPipe(bool wait) {wait_next_pipe = wait;}
protected: protected:
virtual T2 calc(const T1 &v, bool &ok) = 0; virtual Tout calc(const Tin &v, bool &ok) = 0;
private: private:
void begin() {cnt = 0;} void begin() {cnt = 0;}
@@ -124,22 +124,24 @@ private:
piMSleep(1); piMSleep(1);
return; return;
} }
if (next_overload) { if (next_overload && wait_next_pipe) {
calculated(last, &next_overload); calculated(last, &next_overload);
} else { } else {
T1 t = in.dequeue(); Tin t = in.dequeue();
bool ok = true; bool ok = true;
mutex.unlock(); mutex.unlock();
T2 r = calc(t, ok); Tout r = calc(t, ok);
mutex_l.lock(); if (ok) {
last = r; mutex_l.lock();
mutex_l.unlock(); last = r;
cnt++; mutex_l.unlock();
if (ok) calculated(r, &next_overload); cnt++;
calculated(r, &next_overload);
}
} }
} }
PIQueue<T1> in; PIQueue<Tin> in;
T2 last; Tout last;
PIMutex mutex; PIMutex mutex;
PIMutex mutex_l; PIMutex mutex_l;
ullong cnt; ullong cnt;