/*! \file pipeer.h * \ingroup IO * \~\brief * \~english Peering net node * \~russian Элемент пиринговой сети */ /* PIP - Platform Independent Primitives Peer - named I/O ethernet node, forming self-organized peering network Ivan Pelipenko peri4ko@yandex.ru This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #ifndef PIPEER_H #define PIPEER_H #include "pidiagnostics.h" #include "piethernet.h" class PIP_EXPORT PIPeer: public PIIODevice { PIIODEVICE(PIPeer, "peer"); private: class PeerData; public: explicit PIPeer(const PIString & name = PIString()); virtual ~PIPeer(); class PIP_EXPORT PeerInfo { friend class PIPeer; BINARY_STREAM_FRIEND(PIPeer::PeerInfo); public: PeerInfo() { dist = sync = cnt = 0; trace = -1; was_update = false; _data = 0; } ~PeerInfo() {} struct PIP_EXPORT PeerAddress { PeerAddress(const PINetworkAddress & a = PINetworkAddress(), const PINetworkAddress & m = PINetworkAddress("255.255.255.0")); bool isAvailable() const { return ping > 0; } PINetworkAddress address; PINetworkAddress netmask; double ping; // ms bool wait_ping; PISystemTime last_ping; }; PIString name; PIVector addresses; int dist; PIStringList neighbours; bool isNeighbour() const { return dist == 0; } int ping() const; PINetworkAddress fastestAddress() const; protected: void addNeighbour(const PIString & n) { if (!neighbours.contains(n)) neighbours << n; } void addNeighbours(const PIStringList & l) { piForeachC(PIString & n, l) if (!neighbours.contains(n)) neighbours << n; } void removeNeighbour(const PIString & n) { neighbours.removeAll(n); } void resetPing() { for (int i = 0; i < addresses.size_s(); ++i) addresses[i].ping = -1; } void init(); void destroy(); int sync, cnt, trace; bool was_update; PISystemTime time; PeerData * _data; }; BINARY_STREAM_FRIEND(PIPeer::PeerInfo); bool send(const PIString & to, const PIByteArray & data) { return send(to, data.data(), data.size_s()); } bool send(const PIString & to, const PIString & data) { return send(to, data.data(), data.size_s()); } bool send(const PIString & to, const void * data, int size); bool send(const PeerInfo & to, const PIByteArray & data) { return send(to.name, data.data(), data.size_s()); } bool send(const PeerInfo & to, const PIString & data) { return send(to.name, data.data(), data.size_s()); } bool send(const PeerInfo & to, const void * data, int size) { return send(to.name, data, size); } bool send(const PeerInfo * to, const PIByteArray & data) { if (to == 0) return false; return send(to->name, data.data(), data.size_s()); } bool send(const PeerInfo * to, const PIString & data) { if (to == 0) return false; return send(to->name, data.data(), data.size_s()); } bool send(const PeerInfo * to, const void * data, int size) { if (to == 0) return false; return send(to->name, data, size); } void sendToAll(const PIByteArray & data) { piForeachC(PeerInfo & i, peers) send(i.name, data.data(), data.size_s()); } void sendToAll(const PIString & data) { piForeachC(PeerInfo & i, peers) send(i.name, data.data(), data.size_s()); } void sendToAll(const void * data, int size) { piForeachC(PeerInfo & i, peers) send(i.name, data, size); } bool isMulticastReceive() const { return !eths_mcast.isEmpty(); } bool isBroadcastReceive() const { return !eths_bcast.isEmpty(); } PIDiagnostics & diagnosticService() { return diag_s; } PIDiagnostics & diagnosticData() { return diag_d; } const PIVector & allPeers() const { return peers; } bool isPeerExists(const PIString & name) const { return getPeerByName(name) != 0; } const PeerInfo * getPeerByName(const PIString & name) const { return peers_map.value(name, 0); } const PeerInfo & selfInfo() const { return self_info; } const PIMap> & _peerMap() const { return addresses_map; } void reinit(); void lock() { peers_mutex.lock(); } void unlock() { peers_mutex.unlock(); } void changeName(const PIString & new_name); const PIString & trustPeerName() const { return trust_peer; } void setTrustPeerName(const PIString & peer_name) { trust_peer = peer_name; } void setTcpServerIP(const PIString & ip) { server_ip = ip; tcpClientReconnect(); } ssize_t bytesAvailable() const override; EVENT2(dataReceivedEvent, const PIString &, from, const PIByteArray &, data); EVENT1(peerConnectedEvent, const PIString &, name); EVENT1(peerDisconnectedEvent, const PIString &, name); // bool lockedEth() const {return eth_mutex.isLocked();} // bool lockedPeers() const {return peers_mutex.isLocked();} // bool lockedMBcasts() const {return mc_mutex.isLocked();} // bool lockedSends() const {return send_mutex.isLocked();} // bool lockedMCSends() const {return send_mc_mutex.isLocked();} protected: virtual void dataReceived(const PIString & from, const PIByteArray & data) { ; } virtual void peerConnected(const PIString & name) { ; } virtual void peerDisconnected(const PIString & name) { ; } EVENT_HANDLER2(bool, dataRead, const uchar *, readed, ssize_t, size); EVENT_HANDLER2(bool, mbcastRead, const uchar *, readed, ssize_t, size); private: EVENT_HANDLER2(void, timerEvent, void *, data, int, delim); EVENT_HANDLER2(bool, sendInternal, const PIString &, to, const PIByteArray &, data); EVENT_HANDLER2(void, dtReceived, const PIString &, from, const PIByteArray &, data); EVENT_HANDLER1(void, newTcpClient, PIEthernet *, client); EVENT_HANDLER(void, tcpClientReconnect); bool hasPeer(const PIString & name) { piForeachC(PeerInfo & i, peers) if (i.name == name) return true; return false; } bool removePeer(const PIString & name); void removeNeighbour(const PIString & name); void addPeer(const PeerInfo & pd); void sendPeerInfo(const PeerInfo & info); void sendPeerRemove(const PIString & peer); void sendSelfInfo() { sendPeerInfo(self_info); } void sendSelfRemove() { sendPeerRemove(self_info.name); } void syncPeers(); void checkNetwork(); void initNetwork(); void buildMap(); void initEths(PIStringList al); void initMBcasts(PIStringList al); void destroyEths(); void destroyMBcasts(); void sendMBcast(const PIByteArray & ba); void pingNeighbours(); void addToRemoved(const PeerInfo & pi) { removed[pi.name] = PIPair(pi.cnt, pi.time); } bool isRemoved(const PeerInfo & pi) const { return (removed.value(pi.name) == PIPair(pi.cnt, pi.time)); } bool openDevice() override; bool closeDevice() override; PIString constructFullPathDevice() const override; void configureFromFullPathDevice(const PIString & full_path) override; PIPropertyStorage constructVariantDevice() const override; void configureFromVariantDevice(const PIPropertyStorage & d) override; ssize_t readDevice(void * read_to, ssize_t max_size) override; ssize_t writeDevice(const void * data, ssize_t size) override; DeviceInfoFlags deviceInfoFlags() const override { return PIIODevice::Reliable; } PeerInfo * quickestPeer(const PIString & to); bool sendToNeighbour(PeerInfo * peer, const PIByteArray & ba); inline static bool isPeerRecent(const PeerInfo & my, const PeerInfo & income) { return (my.cnt < income.cnt) || (my.time < income.time); } // 1 - new peer, 2 - remove peer, 3 - sync peers, 4 - data, 5 - ping request, 6 - ping reply // Data packet: 4, from, to, ticks, data_size, data protected: bool inited__; // for internal use private: PIVector eths_traffic, eths_mcast, eths_bcast; PIEthernet::InterfaceList prev_ifaces; PIEthernet eth_send, eth_lo, eth_tcp_srv, eth_tcp_cli; PITimer sync_timer; PeerInfo self_info; PIVector peers; PIMap peers_map; PIMap> addresses_map; // map {"to" = list of nearest peers} PIMap> removed; PIDiagnostics diag_s, diag_d; bool destroyed, no_timer; PIString trust_peer; PIString server_ip; mutable PIMutex read_buffer_mutex; PIQueue read_buffer; int read_buffer_size; PIMutex mc_mutex, eth_mutex, peers_mutex, send_mutex, send_mc_mutex; }; inline PICout operator<<(PICout c, const PIPeer::PeerInfo::PeerAddress & v) { c.space(); c << "PeerAddress(" << v.address << ", " << v.netmask << ", " << v.ping << ")"; return c; } inline PICout operator<<(PICout c, const PIPeer::PeerInfo & v) { c.space(); c << "PeerInfo(" << v.name << ", " << v.dist << ", " << v.addresses << ")"; return c; } //! \relatesalso PIBinaryStream //! \~english Store operator. //! \~russian Оператор сохранения. BINARY_STREAM_WRITE(PIPeer::PeerInfo::PeerAddress) { s << v.address << v.netmask << v.ping; return s; } //! \relatesalso PIBinaryStream //! \~english Restore operator. //! \~russian Оператор извлечения. BINARY_STREAM_READ(PIPeer::PeerInfo::PeerAddress) { s >> v.address >> v.netmask >> v.ping; return s; } //! \relatesalso PIBinaryStream //! \~english Store operator. //! \~russian Оператор сохранения. BINARY_STREAM_WRITE(PIPeer::PeerInfo) { s << v.name << v.addresses << v.dist << v.neighbours << v.cnt << v.time; return s; } //! \relatesalso PIBinaryStream //! \~english Restore operator. //! \~russian Оператор извлечения. BINARY_STREAM_READ(PIPeer::PeerInfo) { s >> v.name >> v.addresses >> v.dist >> v.neighbours >> v.cnt >> v.time; return s; } #endif // PIPEER_H