From f5953a0ba719cd8a6448296c5bc8eae55c1b2f6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=91=D1=8B=D1=87=D0=BA=D0=BE=D0=B2=20=D0=90=D0=BD=D0=B4?= =?UTF-8?q?=D1=80=D0=B5=D0=B9?= Date: Fri, 22 Jul 2022 15:24:07 +0300 Subject: [PATCH] PIBinaryLog joinBinLogsSerial correct timestamp wite with split --- libs/main/io_devices/pibinarylog.cpp | 119 ++++++++++++++++++++------- libs/main/io_devices/pibinarylog.h | 18 +++- 2 files changed, 106 insertions(+), 31 deletions(-) diff --git a/libs/main/io_devices/pibinarylog.cpp b/libs/main/io_devices/pibinarylog.cpp index 5d30d59f..3d9b6522 100644 --- a/libs/main/io_devices/pibinarylog.cpp +++ b/libs/main/io_devices/pibinarylog.cpp @@ -61,6 +61,7 @@ PIBinaryLog::PIBinaryLog() { is_started = is_indexed = is_pause = false; current_index = -1; log_size = 0; + f_new_path = nullptr; setPlaySpeed(1.); setDefaultID(1); setPlaySpeed(1.0); @@ -100,7 +101,8 @@ bool PIBinaryLog::openDevice() { return false; } if (path().isEmpty() && mode_ == WriteOnly) { - setPath(getLogfilePath()); + if (f_new_path) setPath(f_new_path()); + else setPath(getLogfilePath(logDir(), filePrefix())); } if (path().isEmpty() && mode_ == ReadOnly) { PIDir ld(logDir()); @@ -243,18 +245,18 @@ bool PIBinaryLog::threadedRead(uchar *readed, int size) { } -PIString PIBinaryLog::getLogfilePath() const { - PIDir dir(logDir()); +PIString PIBinaryLog::getLogfilePath(const PIString & log_dir, const PIString & prefix) { + PIDir dir(log_dir); dir.setDir(dir.absolutePath()); if (!dir.isExists()) { - piCoutObj << "Creating directory" << dir.path(); + piCout << "[PIBinaryLog]" << "Creating directory" << dir.path(); dir.make(true); } - PIString npath = logDir() + "/" + filePrefix() + PIDateTime::current().toString("yyyy_MM_dd__hh_mm_ss"); + PIString npath = log_dir + PIDir::separator + prefix + PIDateTime::current().toString("yyyy_MM_dd__hh_mm_ss"); PIString cnpath = npath + ".binlog"; int i = 1; while (PIFile::isExists(cnpath)) { - cnpath = npath + "_" + PIString::fromNumber(i) + ".binlog"; + cnpath = npath + '_' + PIString::fromNumber(i) + ".binlog"; i++; } return cnpath; @@ -262,12 +264,14 @@ PIString PIBinaryLog::getLogfilePath() const { PIString PIBinaryLog::createNewFile() { if (!file.close()) return PIString(); - PIString cnpath = getLogfilePath(); + PIString cnpath; + if (f_new_path) cnpath = f_new_path(); + else cnpath = getLogfilePath(logDir(), filePrefix()); if (open(cnpath, PIIODevice::WriteOnly)) { newFile(file.path()); return file.path(); } - piCoutObj << "Can't create new file, maybe LogDir is invalid."; + piCoutObj << "Can't create new file, maybe LogDir" << ("\"" + logDir() + "\"") << "is invalid."; return PIString(); } @@ -276,7 +280,7 @@ void PIBinaryLog::createNewFile(const PIString &path) { if (open(path, PIIODevice::WriteOnly)) { newFile(file.path()); } - else piCoutObj << "Can't create new file, maybe path is invalid."; + else piCoutObj << "Can't create new file, maybe path" << ("\"" + path + "\"") << "is invalid."; } @@ -295,13 +299,20 @@ void PIBinaryLog::setPause(bool pause) { int PIBinaryLog::writeBinLog(int id, const void *data, int size) { if (size <= 0 || !canWrite()) return -1; if (id == 0) { - piCoutObj << "Error: can`t write with id = 0!"; + piCoutObj << "Error: can`t write with id = 0! Id must be > 0"; return -1; } + if (is_pause) return 0; logmutex.lock(); + PIByteArray logdata; + logdata << id << size << (PISystemTime::current() - startlogtime) << PIMemoryBlock(data, size); + int res = file.write(logdata.data(), logdata.size()); + file.flush(); + write_count++; + log_size = file.size(); switch (split_mode) { case SplitSize: - if (file.size() > split_size) createNewFile(); + if (log_size > split_size) createNewFile(); break; case SplitTime: if ((PISystemTime::current() - startlogtime) > split_time) createNewFile(); @@ -311,16 +322,6 @@ int PIBinaryLog::writeBinLog(int id, const void *data, int size) { break; default: break; } - if (is_pause) { - logmutex.unlock(); - return 0; - } - PIByteArray logdata; - logdata << id << size << (PISystemTime::current() - startlogtime) << PIByteArray::RawData(data, size); - int res = file.write(logdata.data(), logdata.size()); - file.flush(); - write_count++; - log_size = file.size(); logmutex.unlock(); if (res > 0) return size; else return res; @@ -330,7 +331,7 @@ int PIBinaryLog::writeBinLog(int id, const void *data, int size) { int PIBinaryLog::writeBinLog_raw(int id, const PISystemTime &time, const void *data, int size) { if (size <= 0 || !canWrite()) return -1; PIByteArray logdata; - logdata << id << size << time << PIByteArray::RawData(data, size); + logdata << id << size << time << PIMemoryBlock(data, size); logmutex.lock(); int res = file.write(logdata.data(), logdata.size()); file.flush(); @@ -631,14 +632,22 @@ PIBinaryLog::BinLogInfo PIBinaryLog::getLogInfo(const PIString & path) { bool PIBinaryLog::cutBinLog(const PIBinaryLog::BinLogInfo & src, const PIString & dst, int from, int to) { PIBinaryLog slog; - if (!slog.open(src.path, PIIODevice::ReadOnly)) return false; + if (!slog.open(src.path, PIIODevice::ReadOnly)) { + piCout << "[PIBinaryLog]" << "Error, can't open" << src.path; + return false; + } PIVector ids = src.records.keys(); slog.seekTo(from); PIBinaryLog dlog; dlog.createNewFile(dst); + if (!dlog.isOpened()) { + piCout << "[PIBinaryLog]" << "Error, can't create" << dst; + return false; + } bool first = true; BinLogRecord br; PISystemTime st; + PITimeMeasurer tm; while (!slog.isEnd() && ((slog.pos() <= to) || to < 0)) { br = slog.readRecord(); if (first) { @@ -646,9 +655,61 @@ bool PIBinaryLog::cutBinLog(const PIBinaryLog::BinLogInfo & src, const PIString first = false; } if (ids.contains(br.id)) { - dlog.writeBinLog_raw(br.id, br.timestamp - st, br.data); + if (dlog.writeBinLog_raw(br.id, br.timestamp - st, br.data) <= 0) { + piCout << "[PIBinaryLog]" << "Error, can't write to file" << dst; + return false; } } + if (tm.elapsed_s() > 1) { + tm.reset(); + piCout << "[PIBinaryLog]" << "process" << PITime::fromSystemTime(br.timestamp).toString(); + } + } + return true; +} + + +bool PIBinaryLog::joinBinLogsSerial(const PIStringList & src, const PIString & dst) { + PIBinaryLog slog; + PIBinaryLog dlog; + dlog.createNewFile(dst); + if (!dlog.isOpened()) { + piCout << "[PIBinaryLog]" << "Error, can't create" << dst; + return false; + } + piCout << "[PIBinaryLog]" << "Start join binlogs to" << dst; + PISystemTime dtime; + PISystemTime lt; + PITimeMeasurer tm; + bool first = true; + for (const PIString & fn : src) { + if (!slog.open(fn, PIIODevice::ReadOnly)) { + piCout << "[PIBinaryLog]" << "Error, can't open" << fn; + return false; + } + if (first) first = false; + else { + dtime = lt; + } + tm.reset(); + BinLogRecord br; + PISystemTime st; + while (!slog.isEnd()) { + br = slog.readRecord(); + st = br.timestamp; + lt = dtime + br.timestamp; + if (dlog.writeBinLog_raw(br.id, lt, br.data) <= 0) { + piCout << "[PIBinaryLog]" << "Error, can't write to file" << dst; + return false; + } + if (tm.elapsed_s() > 1) { + tm.reset(); + piCout << "[PIBinaryLog]" << "process" << PITime::fromSystemTime(lt).toString(); + } + } + piCout << "[PIBinaryLog]" << "complete" << fn; + } + piCout << "[PIBinaryLog]" << "Finish join binlogs, total time" << PITime::fromSystemTime(lt).toString(); return true; } @@ -728,19 +789,19 @@ bool PIBinaryLog::seek(llong filepos) { PIString PIBinaryLog::constructFullPathDevice() const { PIString ret; - ret << logDir() << ":" << filePrefix() << ":" << defaultID() << ":"; + ret += logDir() + ":" + filePrefix() + ":" + PIString::fromNumber(defaultID()) + ":"; switch (play_mode) { case PlayRealTime: - ret << "RT"; + ret += "RT"; break; case PlayVariableSpeed: - ret << PIString::fromNumber(playSpeed()) << "X"; + ret += PIString::fromNumber(playSpeed()) + "X"; break; case PlayStaticDelay: - ret << PIString::fromNumber(playDelay().toMilliseconds()) << "M"; + ret += PIString::fromNumber(playDelay().toMilliseconds()) + "M"; break; default: - ret << "RT"; + ret += "RT"; break; } return ret; diff --git a/libs/main/io_devices/pibinarylog.h b/libs/main/io_devices/pibinarylog.h index 98db407f..223debdf 100644 --- a/libs/main/io_devices/pibinarylog.h +++ b/libs/main/io_devices/pibinarylog.h @@ -28,7 +28,10 @@ #include "pifile.h" - +//! \ingroup IO +//! \~\brief +//! \~english Binary log +//! \~russian Бинарный лог class PIP_EXPORT PIBinaryLog: public PIIODevice { PIIODEVICE(PIBinaryLog, "binlog") @@ -165,6 +168,11 @@ public: //! Set pause while playing via \a threadedRead or writing via write void setPause(bool pause); + //! Set function wich returns new binlog file path when using split mode. + //! Overrides internal file path generator (logdir() + prefix() + current_time()). + //! To restore internal file path generator set this function to "nullptr". + void setFuncGetNewFilePath(std::function f) {f_new_path = f;} + //! Write one record to BinLog file, with ID = id, id must be greather than 0 int writeBinLog(int id, PIByteArray data) {return writeBinLog(id, data.data(), data.size_s());} @@ -285,8 +293,13 @@ public: //! Get binlog info and statistic static BinLogInfo getLogInfo(const PIString & path); + + //! Create new binlog from part of "src" with allowed IDs and "from" to "to" file position static bool cutBinLog(const BinLogInfo & src, const PIString & dst, int from, int to); + //! Create new binlog from serial splitted binlogs "src" + static bool joinBinLogsSerial(const PIStringList & src, const PIString & dst); + protected: PIString constructFullPathDevice() const; void configureFromFullPathDevice(const PIString & full_path); @@ -314,7 +327,7 @@ private: BinLogRecord readRecord(); static void parseLog(PIFile *f, BinLogInfo *info, PIVector * index); void moveIndex(int i); - PIString getLogfilePath() const; + static PIString getLogfilePath(const PIString & log_dir, const PIString & prefix); PIVector index; PIMap index_pos; @@ -331,6 +344,7 @@ private: int write_count, split_count, default_id, current_index; bool is_started, is_thread_ok, is_indexed, rapid_start, is_pause; PIByteArray user_header; + std::function f_new_path; }; //! \relatesalso PICout \brief Output operator PIBinaryLog::BinLogInfo to PICout