From 1b09ad5c27ded1a0bf43c342e8e645214957d721 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=91=D1=8B=D1=87=D0=BA=D0=BE=D0=B2=20=D0=90=D0=BD=D0=B4?= =?UTF-8?q?=D1=80=D0=B5=D0=B9?= Date: Thu, 28 Jul 2022 14:46:58 +0300 Subject: [PATCH] binlog fixes PIBinaryStream doc remove makePIPair rename bytesAvailable --- libs/main/cloud/picloudclient.h | 2 +- libs/main/cloud/picloudserver.h | 2 +- libs/main/containers/pipair.h | 10 +-- libs/main/core/pibinarystream.h | 24 ++++-- libs/main/io_devices/pibinarylog.cpp | 80 ++++++++++++++------ libs/main/io_devices/pibinarylog.h | 2 +- libs/main/io_devices/pifile.h | 2 +- libs/main/io_devices/piiobytearray.h | 2 +- libs/main/io_devices/piiodevice.h | 4 +- libs/main/io_devices/piiostream.h | 2 +- libs/main/io_devices/piiostring.h | 2 +- libs/main/io_devices/pipeer.cpp | 2 +- libs/main/io_devices/pipeer.h | 2 +- libs/main/io_devices/pispi.cpp | 2 +- libs/main/io_devices/pispi.h | 2 +- libs/main/io_devices/pitransparentdevice.cpp | 2 +- libs/main/io_devices/pitransparentdevice.h | 2 +- libs/main/io_utils/pipacketextractor.cpp | 4 +- libs/main/io_utils/pipacketextractor.h | 2 +- 19 files changed, 95 insertions(+), 55 deletions(-) diff --git a/libs/main/cloud/picloudclient.h b/libs/main/cloud/picloudclient.h index 4bd1276d..6f58e093 100644 --- a/libs/main/cloud/picloudclient.h +++ b/libs/main/cloud/picloudclient.h @@ -42,7 +42,7 @@ public: void setServerName(const PIString & server_name); void setKeepConnection(bool on); bool isConnected() const {return is_connected;} - ssize_t bytesAvailible() const override {return buff.size();} + ssize_t bytesAvailable() const override {return buff.size();} EVENT(connected); EVENT(disconnected); diff --git a/libs/main/cloud/picloudserver.h b/libs/main/cloud/picloudserver.h index 4e3b7fd7..18409483 100644 --- a/libs/main/cloud/picloudserver.h +++ b/libs/main/cloud/picloudserver.h @@ -50,7 +50,7 @@ public: int readDevice(void * read_to, int max_size) override; int writeDevice(const void * data, int size) override; DeviceInfoFlags deviceInfoFlags() const override {return PIIODevice::Reliable;} - ssize_t bytesAvailible() const override {return buff.size();} + ssize_t bytesAvailable() const override {return buff.size();} private: void pushBuffer(const PIByteArray & ba); diff --git a/libs/main/containers/pipair.h b/libs/main/containers/pipair.h index d373e276..9de7e2df 100644 --- a/libs/main/containers/pipair.h +++ b/libs/main/containers/pipair.h @@ -132,10 +132,7 @@ template< class T1, class T2 > PIPair createPIPair(const T1 & f, const T2 & s) { return PIPair(f, s); } -template< class T1, class T2 > -PIPair makePIPair(const T1 & f, const T2 & s) { - return PIPair(f, s); -} + //! \~english Creates \a PIPair object, deducing the target type from the types of arguments. //! \~russian Создает \a PIPair выводя типы из аргументов. @@ -144,9 +141,6 @@ template< class T1, class T2 > PIPair createPIPair(T1 && f, T2 && s) { return PIPair(std::move(f), std::move(s)); } -template< class T1, class T2 > -PIPair makePIPair(T1 && f, T2 && s) { - return PIPair(std::move(f), std::move(s)); -} + #endif // PIPAIR_H diff --git a/libs/main/core/pibinarystream.h b/libs/main/core/pibinarystream.h index 878d4d89..3fce7a8e 100644 --- a/libs/main/core/pibinarystream.h +++ b/libs/main/core/pibinarystream.h @@ -46,15 +46,20 @@ //! \~\brief //! \~english Binary serialization interface. //! \~russian Интерфейс бинарной сериализации. +//! \~\details +//! \~english In your class you should implement this methods: +//! \~russian В производном классе вы должны реализовать следующие методы: +//! \~\code +//! bool binaryStreamAppendImp (const void * d, size_t s); +//! bool binaryStreamTakeImp (void * d, size_t s); +//! ssize_t binaryStreamSizeImp () const; +//! \endcode +//! \~english function binaryStreamSizeImp must return -1 if size unknown +//! \~russian функция binaryStreamSizeImp должна возвращать -1 если нет информации о размере template class PIBinaryStream { public: - // one should implement next methods: - // - // bool binaryStreamAppendImp (const void * d, size_t s); - // bool binaryStreamTakeImp (void * d, size_t s); - // ssize_t binaryStreamSizeImp () const; - + //! \~russian Записать данные bool binaryStreamAppend(const void * d, size_t s) { if (!static_cast(this)->binaryStreamAppendImp(d, s)) { return false; @@ -62,6 +67,7 @@ public: } return true; } + //! \~russian Прочитать данные bool binaryStreamTake(void * d, size_t s) { if (!static_cast(this)->binaryStreamTakeImp(d, s)) { return false; @@ -70,12 +76,18 @@ public: return true; } + //! \~russian Узнать оставшийся размер + //!\~\details + //!\~russian возвращает -1 если нет информации о размере ssize_t binaryStreamSize() const { return static_cast(this)->binaryStreamSizeImp(); } + //! \~russian Записать данные template void binaryStreamAppend(T v) {binaryStreamAppend(&v, sizeof(v));} + + //! \~russian Прочитать int int binaryStreamTakeInt() { int r = 0; binaryStreamTake(&r, sizeof(r)); diff --git a/libs/main/io_devices/pibinarylog.cpp b/libs/main/io_devices/pibinarylog.cpp index 199210c0..8c4555f8 100644 --- a/libs/main/io_devices/pibinarylog.cpp +++ b/libs/main/io_devices/pibinarylog.cpp @@ -550,15 +550,33 @@ void PIBinaryLog::parseLog(PIFile * f, PIBinaryLog::BinLogInfo * info, PIVector< } uchar read_sig[PIBINARYLOG_SIGNATURE_SIZE]; for (uint i=0; iread(read_sig, PIBINARYLOG_SIGNATURE_SIZE) < 0) {if (info) info->records_count = -1; ok = false;} - for (uint i=0; irecords_count = -2; ok = false;} + if (f->read(read_sig, PIBINARYLOG_SIGNATURE_SIZE) < 0) { + if (info) info->records_count = -1; + return; + } + for (uint i=0; irecords_count = -2; + return; + } + } uchar read_version = 0; - if (f->read(&read_version, 1) < 0) {if (info) info->records_count = -3; ok = false;} - if (read_version == 0) {if (info) info->records_count = -4; ok = false;} - if (read_version < PIBINARYLOG_VERSION_OLD) {if (info) info->records_count = -5; ok = false;} - if (read_version > PIBINARYLOG_VERSION) {if (info) info->records_count = -6; ok = false;} + if (f->read(&read_version, 1) < 0) { + if (info) info->records_count = -3; + return; + } + if (read_version == 0) { + if (info) info->records_count = -4; + return; + } + if (read_version < PIBINARYLOG_VERSION_OLD) { + if (info) info->records_count = -5; + return; + } + if (read_version > PIBINARYLOG_VERSION) { + if (info) info->records_count = -6; + return; + } if (read_version == PIBINARYLOG_VERSION) { uint32_t sz = 0; f->read(&sz, 4); @@ -566,7 +584,6 @@ void PIBinaryLog::parseLog(PIFile * f, PIBinaryLog::BinLogInfo * info, PIVector< info->user_header = f->read(sz); } } - if (!ok) return; PIByteArray ba; BinLogRecord br; br.id = 0; @@ -580,11 +597,15 @@ void PIBinaryLog::parseLog(PIFile * f, PIBinaryLog::BinLogInfo * info, PIVector< if (f->read(ba.data(), ba.size()) > 0) { ba >> br.id >> br.size >> br.timestamp; } else break; - if (info->log_size - f->pos() >= br.size) f->seek(f->pos() + br.size); + if (info) { + if (info->log_size - f->pos() >= br.size) { + f->seek(f->pos() + br.size); + } + } else break; } if (br.id > 0) { - if (info) { + if (index) { BinLogIndex bl_ind; bl_ind.id = br.id; bl_ind.data_size = br.size; @@ -674,15 +695,9 @@ bool PIBinaryLog::cutBinLog(const PIBinaryLog::BinLogInfo & src, const PIString } -bool PIBinaryLog::joinBinLogsSerial(const PIStringList & src, const PIString & dst) { +bool PIBinaryLog::joinBinLogsSerial(const PIStringList & src, const PIString & dst, std::function progress) { PIBinaryLog slog; PIBinaryLog dlog; - dlog.createNewFile(dst); - if (!dlog.isOpened()) { - piCout << "[PIBinaryLog]" << "Error, can't create" << dst; - return false; - } - piCout << "[PIBinaryLog]" << "Start join binlogs to" << dst; PISystemTime dtime; PISystemTime lt; PITimeMeasurer tm; @@ -692,8 +707,16 @@ bool PIBinaryLog::joinBinLogsSerial(const PIStringList & src, const PIString & d piCout << "[PIBinaryLog]" << "Error, can't open" << fn; return false; } - if (first) first = false; - else { + if (first) { + first = false; + dlog.setHeader(slog.getHeader()); + dlog.createNewFile(dst); + if (!dlog.isOpened()) { + piCout << "[PIBinaryLog]" << "Error, can't create" << dst; + return false; + } + piCout << "[PIBinaryLog]" << "Start join binlogs to" << dst; + } else { dtime = lt; } tm.reset(); @@ -701,18 +724,29 @@ bool PIBinaryLog::joinBinLogsSerial(const PIStringList & src, const PIString & d PISystemTime st; while (!slog.isEnd()) { br = slog.readRecord(); + if (br.data.isEmpty() || br.id < 1) continue; st = br.timestamp; lt = dtime + br.timestamp; if (dlog.writeBinLog_raw(br.id, lt, br.data) <= 0) { piCout << "[PIBinaryLog]" << "Error, can't write to file" << dst; return false; } - if (tm.elapsed_s() > 1) { + if (tm.elapsed_s() > 0.1) { tm.reset(); - piCout << "[PIBinaryLog]" << "process" << PITime::fromSystemTime(lt).toString(); + if (progress) { + if (!progress(fn, lt)) { + slog.close(); + dlog.close(); + PIFile::remove(dlog.path()); + return false; + } + } else { + piCout << "[PIBinaryLog]" << "process" << PITime::fromSystemTime(lt).toString(); + } } } - piCout << "[PIBinaryLog]" << "complete" << fn; + slog.close(); + //piCout << "[PIBinaryLog]" << "complete" << fn; } piCout << "[PIBinaryLog]" << "Finish join binlogs, total time" << PITime::fromSystemTime(lt).toString(); return true; diff --git a/libs/main/io_devices/pibinarylog.h b/libs/main/io_devices/pibinarylog.h index 034088c1..43c8e4a9 100644 --- a/libs/main/io_devices/pibinarylog.h +++ b/libs/main/io_devices/pibinarylog.h @@ -298,7 +298,7 @@ public: static bool cutBinLog(const BinLogInfo & src, const PIString & dst, int from, int to); //! Create new binlog from serial splitted binlogs "src" - static bool joinBinLogsSerial(const PIStringList & src, const PIString & dst); + static bool joinBinLogsSerial(const PIStringList & src, const PIString & dst, std::function progress = nullptr); protected: PIString constructFullPathDevice() const override; diff --git a/libs/main/io_devices/pifile.h b/libs/main/io_devices/pifile.h index 88e7a313..9159bf64 100644 --- a/libs/main/io_devices/pifile.h +++ b/libs/main/io_devices/pifile.h @@ -227,7 +227,7 @@ public: //! \~russian Возвращает размер файла в байтах llong size() const; - ssize_t bytesAvailible() const override {return size() - pos();} + ssize_t bytesAvailable() const override {return size() - pos();} //! \~english Returns read/write position //! \~russian Возвращает позицию чтения/записи diff --git a/libs/main/io_devices/piiobytearray.h b/libs/main/io_devices/piiobytearray.h index 99be18ca..b006513f 100644 --- a/libs/main/io_devices/piiobytearray.h +++ b/libs/main/io_devices/piiobytearray.h @@ -84,7 +84,7 @@ public: //! \~russian Вставляет данные "ba" в содержимое буфера в текущую позицию int writeByteArray(const PIByteArray & ba); - ssize_t bytesAvailible() const override { + ssize_t bytesAvailable() const override { if (data_) return data_->size(); else return 0; } diff --git a/libs/main/io_devices/piiodevice.h b/libs/main/io_devices/piiodevice.h index fa1063fb..b7122695 100644 --- a/libs/main/io_devices/piiodevice.h +++ b/libs/main/io_devices/piiodevice.h @@ -285,7 +285,7 @@ public: PIByteArray read(int max_size); //! \~english Returns the number of bytes that are available for reading. - //! \~russian Возвращает количество байт + //! \~russian Возвращает количество байт доступных для чтения //! \~\details //! \~english This function is commonly used with sequential devices //! to determine the number of bytes to allocate in a buffer before reading. @@ -293,7 +293,7 @@ public: //! \~russian Эта функция как правило используется чтобы знать какой //! размер буфера нужен в памяти для чтения. //! Если функция возвращает -1 это значит что количество байт для чтения не известно. - virtual ssize_t bytesAvailible() const {return -1;} + virtual ssize_t bytesAvailable() const {return -1;} //! \~english Write maximum "max_size" bytes of "data" to device //! \~russian Пишет в устройство не более "max_size" байт из "data" diff --git a/libs/main/io_devices/piiostream.h b/libs/main/io_devices/piiostream.h index 42221bca..84e62fce 100644 --- a/libs/main/io_devices/piiostream.h +++ b/libs/main/io_devices/piiostream.h @@ -57,7 +57,7 @@ public: ssize_t binaryStreamSizeImp() const { if (!dev) return 0; - return dev->bytesAvailible(); + return dev->bytesAvailable(); } private: diff --git a/libs/main/io_devices/piiostring.h b/libs/main/io_devices/piiostring.h index 033ecfd3..e8332455 100644 --- a/libs/main/io_devices/piiostring.h +++ b/libs/main/io_devices/piiostring.h @@ -88,7 +88,7 @@ public: //! \~russian Вставляет строку "string" в содержимое буфера в текущую позицию int writeString(const PIString & string); - ssize_t bytesAvailible() const override { + ssize_t bytesAvailable() const override { if (str) return str->size() - pos; else return 0; } diff --git a/libs/main/io_devices/pipeer.cpp b/libs/main/io_devices/pipeer.cpp index 394b34b3..10e61c5c 100644 --- a/libs/main/io_devices/pipeer.cpp +++ b/libs/main/io_devices/pipeer.cpp @@ -924,7 +924,7 @@ void PIPeer::changeName(const PIString &new_name) { } -ssize_t PIPeer::bytesAvailible() const { +ssize_t PIPeer::bytesAvailable() const { ssize_t ret = 0; read_buffer_mutex.lock(); if (!read_buffer.isEmpty()) ret = read_buffer.back().size(); diff --git a/libs/main/io_devices/pipeer.h b/libs/main/io_devices/pipeer.h index 9170ed7e..e3cbc576 100644 --- a/libs/main/io_devices/pipeer.h +++ b/libs/main/io_devices/pipeer.h @@ -116,7 +116,7 @@ public: void setTrustPeerName(const PIString & peer_name) {trust_peer = peer_name;} void setTcpServerIP(const PIString & ip) {server_ip = ip; tcpClientReconnect();} - ssize_t bytesAvailible() const override; + ssize_t bytesAvailable() const override; EVENT2(dataReceivedEvent, const PIString &, from, const PIByteArray &, data); diff --git a/libs/main/io_devices/pispi.cpp b/libs/main/io_devices/pispi.cpp index 76f4b00d..bbe875aa 100644 --- a/libs/main/io_devices/pispi.cpp +++ b/libs/main/io_devices/pispi.cpp @@ -88,7 +88,7 @@ bool PISPI::isParameterSet(PISPI::Parameters parameter) const { } -ssize_t PISPI::bytesAvailible() const { +ssize_t PISPI::bytesAvailable() const { return recv_buf.size(); } diff --git a/libs/main/io_devices/pispi.h b/libs/main/io_devices/pispi.h index d6aa69e5..cdb44ae6 100644 --- a/libs/main/io_devices/pispi.h +++ b/libs/main/io_devices/pispi.h @@ -60,7 +60,7 @@ public: //! Returns parameters PIFlags parameters() const {return spi_mode;} - ssize_t bytesAvailible() const override; + ssize_t bytesAvailable() const override; protected: bool openDevice() override; diff --git a/libs/main/io_devices/pitransparentdevice.cpp b/libs/main/io_devices/pitransparentdevice.cpp index 2e9a42e1..39b6831a 100644 --- a/libs/main/io_devices/pitransparentdevice.cpp +++ b/libs/main/io_devices/pitransparentdevice.cpp @@ -49,7 +49,7 @@ PITransparentDevice::~PITransparentDevice() { } -ssize_t PITransparentDevice::bytesAvailible() const { +ssize_t PITransparentDevice::bytesAvailable() const { ssize_t ret = 0; que_mutex.lock(); if (que.isNotEmpty()) ret = que.back().size(); diff --git a/libs/main/io_devices/pitransparentdevice.h b/libs/main/io_devices/pitransparentdevice.h index 194f0975..991a4f88 100644 --- a/libs/main/io_devices/pitransparentdevice.h +++ b/libs/main/io_devices/pitransparentdevice.h @@ -44,7 +44,7 @@ public: virtual ~PITransparentDevice(); - ssize_t bytesAvailible() const override; + ssize_t bytesAvailable() const override; protected: bool openDevice() override; diff --git a/libs/main/io_utils/pipacketextractor.cpp b/libs/main/io_utils/pipacketextractor.cpp index ee015af4..d42cbcbb 100644 --- a/libs/main/io_utils/pipacketextractor.cpp +++ b/libs/main/io_utils/pipacketextractor.cpp @@ -137,8 +137,8 @@ void PIPacketExtractor::setDevice(PIIODevice * device_) { } -ssize_t PIPacketExtractor::bytesAvailible() const { - if (dev) return dev->bytesAvailible(); +ssize_t PIPacketExtractor::bytesAvailable() const { + if (dev) return dev->bytesAvailable(); else return 0; } diff --git a/libs/main/io_utils/pipacketextractor.h b/libs/main/io_utils/pipacketextractor.h index 9f64009d..1aa49edb 100644 --- a/libs/main/io_utils/pipacketextractor.h +++ b/libs/main/io_utils/pipacketextractor.h @@ -62,7 +62,7 @@ public: //! Set child %device to "device_" void setDevice(PIIODevice * device_); - ssize_t bytesAvailible() const override; + ssize_t bytesAvailable() const override; //! Returns buffer size int bufferSize() const {return buffer_size;}