diff --git a/src/io/pipeer.cpp b/src/io/pipeer.cpp index b925d104..8a9c57ba 100755 --- a/src/io/pipeer.cpp +++ b/src/io/pipeer.cpp @@ -127,14 +127,16 @@ PIString PIPeer::PeerInfo::fastestAddress() const { } +REGISTER_DEVICE(PIPeer) - -PIPeer::PIPeer(const PIString & name_): PIObject() { +PIPeer::PIPeer(const PIString & n): PIIODevice() { destroyed = false; - setName(name_); - self_info.name = name_; - diag_d.setName(name_+"_data"); - diag_s.setName(name_+"_service"); + PIMutexLocker mbl(mc_mutex); + PIMutexLocker ethl(eth_mutex); + PIMutexLocker pl(peers_mutex); + PIMutexLocker sl(send_mutex); + changeName(n); + read_buffer_size = 128; self_info.dist = 0; self_info.time = PISystemTime::current(); //joinMulticastGroup("239.240.241.242"); @@ -385,6 +387,11 @@ bool PIPeer::sendInternal(const PIString & to, const PIByteArray & data) { void PIPeer::dtReceived(const PIString & from, const PIByteArray & data) { dataReceived(from, data); dataReceivedEvent(from, data); + if (trust_peer.isEmpty() || trust_peer == from) { + read_buffer_mutex.lock(); + if (read_buffer.size() < read_buffer_size) read_buffer.enqueue(data); + read_buffer_mutex.unlock(); + } } @@ -464,8 +471,7 @@ bool PIPeer::dataRead(uchar * readed, int size) { } if (pt == 1) { peers_mutex.unlock(); - dataReceived(from, pba); - dataReceivedEvent(from, pba); + dtReceived(from, pba); return true; } if (pt == 2 || pt == 3) { @@ -752,6 +758,14 @@ void PIPeer::pingNeighbours() { } +bool PIPeer::openDevice() { + PIMutexLocker ml(peers_mutex); + if (trust_peer.isEmpty()) + return !peers.isEmpty(); + return hasPeer(trust_peer); +} + + void PIPeer::syncPeers() { //piCout << "[PIPeer \"" + self_info.name + "\"] sync " << peers.size_s() << " peers"; PIMutexLocker locker(eth_mutex); @@ -826,6 +840,60 @@ void PIPeer::reinit() { } +void PIPeer::changeName(const PIString &new_name) { + PIString name_ = new_name; + if (name_.isEmpty()) name_ = "rnd_" + PIString::fromNumber(random() % 1000); + setName(name_); + self_info.name = name_; + diag_d.setName(name_+"_data"); + diag_s.setName(name_+"_service"); +} + + +PIString PIPeer::constructFullPath() const { + PIString ret(fullPathPrefix() + "://"); + ret << name() << ":" << trustPeerName(); + return ret; +} + + +int PIPeer::read(void *read_to, int max_size) { + bool empty = read_buffer.isEmpty(); + while (empty) piMSleep(10); + PIMutexLocker ml(read_buffer_mutex); + if (!read_buffer.isEmpty()) { + PIByteArray ba = read_buffer.dequeue(); + int sz = piMini(ba.size_s(), max_size); + memcpy(read_to, ba.data(), sz); + return sz; + } + return 0; +} + + +int PIPeer::write(const void *data, int size) { + if (trust_peer.isEmpty()) { + sendToAll(data, size); + return size; + } + if (send(trust_peer, data, size)) + return size; + else return -1; +} + + +void PIPeer::configureFromFullPath(const PIString & full_path) { + PIStringList pl = full_path.split(":"); + for (int i = 0; i < pl.size_s(); ++i) { + PIString p(pl[i]); + switch (i) { + case 0: changeName(p); break; + case 1: setTrustPeerName(p); break; + } + } +} + + void PIPeer::initNetwork() { // piCoutObj << "initNetwork ..."; eth_send.init(); @@ -890,85 +958,4 @@ void PIPeer::buildMap() { addresses_map[c.name] = cpath; //piCout << "map" << c.name << "=" << cpath; } - /*int max_dist = -1; - self_info._nuses.resize(self_info.neighbours.size()); - self_info._nuses.fill(0); - self_info._first = &self_info; - peers_map[self_info.name] = &self_info; - piForeach (PeerInfo & i, peers) { - i._nuses.resize(i.neighbours.size()); - i._nuses.fill(0); - i._first = 0; - peers_map[i.name] = &i; - if (max_dist < i.dist) - max_dist = i.dist; - if (i.dist > 0) continue; - i._naddress.clear(); - i._neth = 0; - PIString mma, ma; - bool af = false; - for (int mi = 0; mi < self_info.addresses.size_s(); ++mi) { - PeerInfo::Address & a(self_info.addresses[mi]); - if (af) break; - ma = a.address; - //mma = m.left(m.findLast(".")); - mma = PIEthernet::applyMask(a.address, a.netmask); - for (int ii = 0; ii < i.addresses.size_s(); ++ii) { - PeerInfo::Address & r(i.addresses[ii]); - if (!r.isAvailable()) continue; - if (mma == PIEthernet::applyMask(r.address, r.netmask)) { - i._naddress = r.address; - //piCout << "_naddress" << i.name << "=" << r; - af = true; - break; - } - } - } - if (!af) continue; - //piCout << " peer" << i.name << ma; - piForeach (PIEthernet * e, eths_traffic) - if (e->readAddress() == ma) { - i._neth = e; - piBreak; - } - //piCout << i.name << i._naddress; - } - PIVector cwave, nwave; - PeerInfo * npeer; - cwave << &self_info; - for (int d = 0; d <= max_dist; ++d) { - if (cwave.isEmpty()) break; - nwave.clear(); - piForeach (PeerInfo * p, cwave) { - int ns = p->neighbours.size_s(); - for (int n = 0; n < ns; ++n) { - if (p->_nuses[n] >= ns) continue; - p->_nuses[n]++; - npeer = peers_map[p->neighbours[n]]; - if (npeer == 0) continue; - if (d == 0) npeer->_first = npeer; - else { - if (d == 1) npeer->_first = p; - else npeer->_first = p->_first; - } - nwave << npeer; - } - } - cwave = nwave; - //piCout << "wave" << d; - for (int i = 0; i < cwave.size_s(); ++i) { - //piCout << " peer" << cwave[i]->name << Hex << (uint)(cwave[i]->_first); - if (cwave[i]->_first == 0) {cwave.remove(i); --i; continue;} - if (addresses_map.contains(cwave[i]->name)) {cwave.remove(i); --i; continue;} - } - for (int i = 0; i < cwave.size_s(); ++i) { - PIVector & pl(addresses_map[cwave[i]->name]); - if (!pl.contains(cwave[i]->_first)) - pl << cwave[i]->_first; - } - }*/ - /*piCout << " ** addresses map **"; - piForeachC (napair & i, addresses_map) - piCout << i.first << i.second; - piCout << " ** addresses map end **";*/ } diff --git a/src/io/pipeer.h b/src/io/pipeer.h index 71851872..ebe77df6 100755 --- a/src/io/pipeer.h +++ b/src/io/pipeer.h @@ -27,9 +27,9 @@ #include "pidiagnostics.h" #include "pidatatransfer.h" -class PIP_EXPORT PIPeer: public PIObject +class PIP_EXPORT PIPeer: public PIIODevice { - PIOBJECT_SUBCLASS(PIPeer, PIObject) + PIIODEVICE(PIPeer) private: class PeerData: public PIObject { @@ -53,7 +53,7 @@ private: }; public: - explicit PIPeer(const PIString & name); + explicit PIPeer(const PIString & name = PIString()); virtual ~PIPeer(); class PeerInfo { @@ -132,6 +132,13 @@ public: void reinit(); void lock() {peers_mutex.lock();} void unlock() {peers_mutex.unlock();} + void changeName(const PIString & new_name); + const PIString & trustPeerName() const {return trust_peer;} + void setTrustPeerName(const PIString & peer_name) {trust_peer = peer_name;} + PIString constructFullPath() const; + int read(void *read_to, int max_size); + int write(const void * data, int size); + EVENT2(dataReceivedEvent, const PIString &, from, const PIByteArray &, data) EVENT1(peerConnectedEvent, const PIString &, name) @@ -171,7 +178,13 @@ private: void pingNeighbours(); void addToRemoved(const PeerInfo & pi) {removed[pi.name] = PIPair(pi.cnt, pi.time);} bool isRemoved(const PeerInfo & pi) const {return (removed.value(pi.name) == PIPair(pi.cnt, pi.time));} - + + bool openDevice(); + bool closeDevice() {return false;} + PIString fullPathPrefix() const {return "peer";} + void configureFromFullPath(const PIString &full_path); + + PeerInfo * quickestPeer(const PIString & to); bool sendToNeighbour(PeerInfo * peer, const PIByteArray & ba); inline static bool isPeerRecent(const PeerInfo & my, const PeerInfo & income) {return (my.cnt < income.cnt) || (my.time < income.time);} @@ -196,8 +209,11 @@ private: PIDiagnostics diag_s, diag_d; bool destroyed, no_timer; PIString id_; - -}; + PIString trust_peer; + PIMutex read_buffer_mutex; + PIQueue read_buffer; + int read_buffer_size; + }; inline PICout operator <<(PICout c, const PIPeer::PeerInfo::Address & v) {c.space(); c << "PeerAddress(" << v.address << ", " << v.netmask << ", " << v.ping << ")"; return c;} inline PICout operator <<(PICout c, const PIPeer::PeerInfo & v) {c.space(); c << "PeerInfo(" << v.name << ", " << v.dist << ", " << v.addresses << ")"; return c;} diff --git a/utils/system_daemon/daemon.cpp b/utils/system_daemon/daemon.cpp index 3b2ca85c..cfd973ed 100644 --- a/utils/system_daemon/daemon.cpp +++ b/utils/system_daemon/daemon.cpp @@ -2,7 +2,6 @@ #include "shared.h" #include "pisysteminfo.h" -const char pisd_prefix[] = "_pisd_"; const char self_name[] = "__self__"; extern PIScreen screen; //bool Daemon::inited__ = false; @@ -198,7 +197,7 @@ void Daemon::TileFileProgress::tileEvent(PIScreenTile * t, TileEvent e) { Daemon::Daemon(): inited__(false), PIPeer(pisd_prefix + PISystemInfo::instance()->hostname + "_" + PIString(rand() % 100)), fm(this) { - setName("Daemon"); +// setName("Daemon"); dtimer.setName("__S__Daemon_timer"); mode = offset = cur = height = 0; CONNECTU(&screen, keyPressed, this, keyEvent) diff --git a/utils/system_daemon/file_manager.cpp b/utils/system_daemon/file_manager.cpp index 6f78cc41..ee423a4e 100644 --- a/utils/system_daemon/file_manager.cpp +++ b/utils/system_daemon/file_manager.cpp @@ -44,7 +44,7 @@ bool FileManager::TileDir::keyEvent(PIKbdListener::KeyEvent key) { } break; case PIKbdListener::F7: - nd = askNewDir(); + nd = askUserInput("Enter new directory name:"); setFocus(); if (nd.isEmpty()) return true; if (is_right && remote_mode) diff --git a/utils/system_daemon/main.cpp b/utils/system_daemon/main.cpp index 5bea8985..0720c873 100755 --- a/utils/system_daemon/main.cpp +++ b/utils/system_daemon/main.cpp @@ -41,12 +41,11 @@ class MainMenu: public PITimer { public: MainMenu() { cur_peer = -1; - TileSimple * tile = new TileSimple("title"); - tile->content << TileSimple::Row("pisd (PI System Daemon, PIP version " + PIPVersion() + ")", CellFormat(Black, Transparent)); - tile->content << TileSimple::Row("This daemon: \"" + daemon_.thisDaemonName() + "\"", CellFormat(Black, Transparent)); - tile->back_format.color_back = Yellow; - tile->size_policy = Fixed; - screen.rootTile()->addTile(tile); + title = new TileSimple("title"); + updateTitle(title); + title->back_format.color_back = Yellow; + title->size_policy = Fixed; + screen.rootTile()->addTile(title); PIScreenTile * center = new PIScreenTile("center"); center->back_format.color_back = Cyan; @@ -96,6 +95,7 @@ public: ret->content << TileList::Row("Peer info", CellFormat()); ret->content << TileList::Row("Peer reinit", CellFormat()); ret->content << TileList::Row("Peer diagnostics", CellFormat()); + ret->content << TileList::Row("Peer change self name", CellFormat()); ret->content << TileList::Row("Exit", CellFormat()); ret->selection_mode = TileList::NoSelection; return ret; @@ -148,6 +148,12 @@ public: // updatePeerInfo(); return ret; } + + void updateTitle(TileSimple * tl) { + tl->content.clear(); + tl->content << TileSimple::Row("pisd (PI System Daemon, PIP version " + PIPVersion() + ")", CellFormat(Black, Transparent)); + tl->content << TileSimple::Row("This daemon: \"" + daemon_.thisDaemonName() + "\"", CellFormat(Black, Transparent)); + } void updatePeerDiag(TileSimple * tl, const PIDiagnostics & diag) { tl->content.clear(); tl->content << TileSimple::Row(diag.name() + " diagnostics", CellFormat(PIScreenTypes::Default, PIScreenTypes::Default, PIScreenTypes::Bold)); @@ -224,7 +230,8 @@ public: case 3: tpeer->show(); peers_tl->setFocus(); break; case 4: daemon_.reinit(); tmenu->show(); break; case 5: tpeerdiag->show(); break; - case 6: PIKbdListener::exiting = true; break; + case 6: daemon_.changeName(pisd_prefix + askUserInput("Peer name:")); updateTitle(title); menuRequest(); break; + case 7: PIKbdListener::exiting = true; break; } } return; @@ -249,6 +256,7 @@ public: } PIScreenTile * tmenu, * tinfo, * tfm, * tdaemon, * tpeer, * tpeerdiag; TileList * peers_tl, * addrs_tl, * peermap_tl; + TileSimple * title; TileSimple * peerinfo_tl; TileSimple * peerdiagdata_tl, * peerdiagservice_tl; PIVector mtiles; diff --git a/utils/system_daemon/shared.cpp b/utils/system_daemon/shared.cpp index 2ff54dc2..4c4b9585 100644 --- a/utils/system_daemon/shared.cpp +++ b/utils/system_daemon/shared.cpp @@ -38,7 +38,7 @@ PIString readableTime(const PITime & t) { } -PIString askNewDir() { +PIString askUserInput(const PIString & desc) { PIScreenTile dlg; dlg.setMargins(1, 1, 1, 1); dlg.spacing = 1; @@ -48,7 +48,7 @@ PIString askNewDir() { TileButtons * btns = new TileButtons(); lbl->back_format.color_back = Yellow; btns->back_format.color_back = Yellow; - lbl->content << TileSimple::Row("Enter new directory name:", CellFormat(Black, Transparent)); + lbl->content << TileSimple::Row(desc, CellFormat(Black, Transparent)); btns->content << TileButtons::Button(" Ok ", CellFormat()); btns->content << TileButtons::Button("Cancel", CellFormat()); dlg.addTile(lbl); diff --git a/utils/system_daemon/shared.h b/utils/system_daemon/shared.h index 41d5e34a..71e71ab5 100644 --- a/utils/system_daemon/shared.h +++ b/utils/system_daemon/shared.h @@ -5,11 +5,13 @@ #include "piscreen.h" #include "piscreentiles.h" +static const char pisd_prefix[] = "_pisd_"; + using namespace PIScreenTypes; PIString readableTime(const PITime & t); -PIString askNewDir(); +PIString askUserInput(const PIString &desc); bool askQuestion(const PIString & t); void showInfo(const PIString & t); void removeFiles(const PIDir & dir, PIStringList l);