diff --git a/src/io/pipeer.cpp b/src/io/pipeer.cpp index 3824299c..d09a8f75 100755 --- a/src/io/pipeer.cpp +++ b/src/io/pipeer.cpp @@ -817,6 +817,11 @@ bool PIPeer::openDevice() { } +bool PIPeer::closeDevice() { + return false; +} + + void PIPeer::syncPeers() { //piCout << "[PIPeer \"" + self_info.name + "\"] sync " << peers.size_s() << " peers"; PIMutexLocker locker(eth_mutex); diff --git a/src/io/pipeer.h b/src/io/pipeer.h index 826c6fd9..f0a53d7b 100755 --- a/src/io/pipeer.h +++ b/src/io/pipeer.h @@ -182,7 +182,7 @@ private: bool isRemoved(const PeerInfo & pi) const {return (removed.value(pi.name) == PIPair(pi.cnt, pi.time));} bool openDevice(); - bool closeDevice() {return false;} + bool closeDevice(); PIString fullPathPrefix() const {return "peer";} void configureFromFullPath(const PIString &full_path); @@ -195,8 +195,7 @@ private: // Data packet: 4, from, to, ticks, data_size, data protected: - PIMutex mc_mutex, eth_mutex, peers_mutex, send_mutex; - PIMutex send_mc_mutex; + PIMutex mc_mutex, eth_mutex, peers_mutex, send_mutex, send_mc_mutex; private: PIVector eths_traffic, eths_mcast, eths_bcast; diff --git a/src/thread/pipipelinethread.h b/src/thread/pipipelinethread.h index e5abbed9..88ea16cf 100644 --- a/src/thread/pipipelinethread.h +++ b/src/thread/pipipelinethread.h @@ -25,7 +25,7 @@ #include "pithread.h" -template +template class PIPipelineThread : public PIThread { PIOBJECT_SUBCLASS(PIPipelineThread, PIThread) @@ -40,13 +40,13 @@ public: stop(); if (!waitForFinish(1000)) terminate(); } - template - void connectTo(PIPipelineThread * next) { - CONNECT2(void, T2, bool *, this, calculated, next, enqueue) + template + void connectTo(PIPipelineThread * next) { + CONNECT2(void, Tout, bool *, this, calculated, next, enqueue) } - EVENT2(calculated, const T2 &, v, bool *, overload) - EVENT_HANDLER1(void, enqueue, const T1 &, v) {enqueue(v, 0);} - EVENT_HANDLER2(void, enqueue, const T1 &, v, bool *, overload) { + EVENT2(calculated, const Tout &, v, bool *, overload) + EVENT_HANDLER1(void, enqueue, const Tin &, v) {enqueue(v, 0);} + EVENT_HANDLER2(void, enqueue, const Tin &, v, bool *, overload) { mutex.lock(); if (max_size > 0 && in.size() < max_size) { in.enqueue(v); @@ -86,14 +86,14 @@ public: mutex_l.unlock(); } } - T2 getLast() { - T2 ret; + Tout getLast() { + Tout ret; mutex_l.lock(); ret = last; mutex_l.unlock(); return ret; } - + uint maxQueSize() { uint ret; mutex.lock(); @@ -101,19 +101,19 @@ public: mutex.unlock(); return ret; } - + void setMaxQueSize(uint count) { mutex.lock(); count = max_size; if (max_size > 0 && in.size() > max_size) in.resize(max_size); mutex.unlock(); } - + bool isWaitNextPipe() {return wait_next_pipe;} void setWaitNextPipe(bool wait) {wait_next_pipe = wait;} protected: - virtual T2 calc(const T1 &v, bool &ok) = 0; + virtual Tout calc(const Tin &v, bool &ok) = 0; private: void begin() {cnt = 0;} @@ -124,22 +124,24 @@ private: piMSleep(1); return; } - if (next_overload) { + if (next_overload && wait_next_pipe) { calculated(last, &next_overload); } else { - T1 t = in.dequeue(); + Tin t = in.dequeue(); bool ok = true; mutex.unlock(); - T2 r = calc(t, ok); - mutex_l.lock(); - last = r; - mutex_l.unlock(); - cnt++; - if (ok) calculated(r, &next_overload); + Tout r = calc(t, ok); + if (ok) { + mutex_l.lock(); + last = r; + mutex_l.unlock(); + cnt++; + calculated(r, &next_overload); + } } } - PIQueue in; - T2 last; + PIQueue in; + Tout last; PIMutex mutex; PIMutex mutex_l; ullong cnt;