diff --git a/CMakeLists.txt b/CMakeLists.txt index 3e37c21f..1ef96b74 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,8 +2,8 @@ cmake_minimum_required(VERSION 3.0) cmake_policy(SET CMP0017 NEW) # need include() with .cmake project(PIP) set(PIP_MAJOR 3) -set(PIP_MINOR 10) -set(PIP_REVISION 3) +set(PIP_MINOR 11) +set(PIP_REVISION 0) set(PIP_SUFFIX ) set(PIP_COMPANY SHS) set(PIP_DOMAIN org.SHS) diff --git a/libs/main/io_devices/pibinarylog.cpp b/libs/main/io_devices/pibinarylog.cpp index 222e40f3..86d30b6c 100644 --- a/libs/main/io_devices/pibinarylog.cpp +++ b/libs/main/io_devices/pibinarylog.cpp @@ -60,6 +60,7 @@ PIBinaryLog::PIBinaryLog() { setThreadedReadBufferSize(65536); #endif is_started = is_indexed = is_pause = false; + create_index_on_fly = false; current_index = -1; log_size = 0; f_new_path = nullptr; @@ -95,7 +96,6 @@ bool PIBinaryLog::openDevice() { is_indexed = false; is_pause = false; index.clear(); - index_pos.clear(); log_size = 0; if (mode_ == ReadWrite) { piCoutObj << "Error: ReadWrite mode not supported, use WriteOnly or ReadOnly"; @@ -162,7 +162,6 @@ bool PIBinaryLog::closeDevice() { moveIndex(-1); is_indexed = false; index.clear(); - index_pos.clear(); bool e = isEmpty(); log_size = 0; if (canWrite() && e) { @@ -293,6 +292,11 @@ void PIBinaryLog::createNewFile(const PIString & path) { } +void PIBinaryLog::setCreateIndexOnFly(bool yes) { + create_index_on_fly = yes; +} + + void PIBinaryLog::setPause(bool pause) { pausemutex.lock(); is_pause = pause; @@ -313,12 +317,7 @@ int PIBinaryLog::writeBinLog(int id, const void * data, int size) { } if (is_pause) return 0; logmutex.lock(); - PIByteArray logdata; - logdata << id << size << (PISystemTime::current() - startlogtime) << PIMemoryBlock(data, size); - int res = file.write(logdata.data(), logdata.size()); - file.flush(); - write_count++; - log_size = file.size(); + int res = writeRecord(id, PISystemTime::current() - startlogtime, data, size); switch (split_mode) { case SplitSize: if (log_size > split_size) createNewFile(); @@ -341,13 +340,8 @@ int PIBinaryLog::writeBinLog(int id, const void * data, int size) { int PIBinaryLog::writeBinLog_raw(int id, const PISystemTime & time, const void * data, int size) { if (size <= 0 || !canWrite()) return -1; - PIByteArray logdata; - logdata << id << size << time << PIMemoryBlock(data, size); logmutex.lock(); - int res = file.write(logdata.data(), logdata.size()); - file.flush(); - write_count++; - log_size = file.size(); + int res = writeRecord(id, time, data, size); logmutex.unlock(); if (res > 0) return size; @@ -359,7 +353,7 @@ int PIBinaryLog::writeBinLog_raw(int id, const PISystemTime & time, const void * PIByteArray PIBinaryLog::readBinLog(int id, PISystemTime * time, int * readed_id) { if (!canRead()) return PIByteArray(); logmutex.lock(); - BinLogRecord br = readRecord(); + Record br = readRecord(); logmutex.unlock(); if (br.id == -1) { piCoutObj << "End of BinLog file"; @@ -411,7 +405,7 @@ void PIBinaryLog::setHeader(const PIByteArray & header) { PIByteArray PIBinaryLog::getHeader() { - return binfo.user_header; + return index.info.user_header; } @@ -421,7 +415,7 @@ ssize_t PIBinaryLog::readDevice(void * read_to, ssize_t max_size) { if (!is_thread_ok && lastrecord.id > 0) return lastrecord.data.size(); if (!canRead()) return -1; if (max_size <= 0 || read_to == 0) return -1; - BinLogRecord br; + Record br; br.id = 0; if (filterID.isEmpty()) { br = readRecord(); @@ -476,12 +470,16 @@ bool PIBinaryLog::writeFileHeader() { file.write(&sz, 4); file.write(user_header); file.flush(); + if (create_index_on_fly) { + index.info.user_header = user_header; + index.info.path = file.path(); + } return true; } bool PIBinaryLog::checkFileHeader() { - binfo.user_header.clear(); + index.info.user_header.clear(); uchar read_sig[PIBINARYLOG_SIGNATURE_SIZE]; for (uint i = 0; i < PIBINARYLOG_SIGNATURE_SIZE; i++) read_sig[i] = 0; @@ -504,7 +502,7 @@ bool PIBinaryLog::checkFileHeader() { uint32_t sz = 0; file.read(&sz, 4); if (sz > 0) { - binfo.user_header = file.read(sz); + index.info.user_header = file.read(sz); } return true; } @@ -515,15 +513,15 @@ bool PIBinaryLog::checkFileHeader() { } -PIBinaryLog::BinLogRecord PIBinaryLog::readRecord() { +PIBinaryLog::Record PIBinaryLog::readRecord() { // piCoutObj << "readRecord pos =" << file.pos(); logmutex.lock(); PIByteArray ba; - BinLogRecord br; + Record br; lastrecord.id = 0; lastrecord.data.clear(); lastrecord.timestamp = PISystemTime(); - ba.resize(sizeof(BinLogRecord) - sizeof(PIByteArray)); + ba.resize(sizeof(Record) - sizeof(PIByteArray)); if (file.read(ba.data(), ba.size_s()) > 0) { ba >> br.id >> br.size >> br.timestamp; } else { @@ -542,13 +540,55 @@ PIBinaryLog::BinLogRecord PIBinaryLog::readRecord() { br.id = 0; lastrecord = br; if (br.id == 0) fileError(); - moveIndex(index_pos.value(file.pos(), -1)); + moveIndex(index.index_pos.value(file.pos(), -1)); logmutex.unlock(); // piCoutObj << "readRecord done"; return br; } +int PIBinaryLog::writeRecord(int id, PISystemTime time, const void * data, int size) { + PIByteArray logdata; + logdata << id << size << time << PIMemoryBlock(data, size); + if (create_index_on_fly) { + BinLogIndex bi; + bi.id = id; + bi.data_size = size; + bi.pos = file.pos(); + bi.timestamp = time; + index.index << bi; + auto & rinfo(index.info.records[id]); + if (rinfo.count == 0) { + rinfo.id = id; + rinfo.start_time = rinfo.end_time = time; + rinfo.minimum_size = rinfo.maximum_size = size; + } else { + rinfo.start_time = piMin(rinfo.start_time, time); + rinfo.end_time = piMax(rinfo.end_time, time); + rinfo.minimum_size = piMin(rinfo.minimum_size, size); + rinfo.maximum_size = piMax(rinfo.maximum_size, size); + } + ++rinfo.count; + if (index.info.records_count == 0) { + index.info.start_time = index.info.end_time = time; + } else { + index.info.start_time = piMin(index.info.start_time, time); + index.info.end_time = piMax(index.info.end_time, time); + } + ++index.info.records_count; + is_indexed = true; + } + int ret = file.write(logdata.data(), logdata.size()); + file.flush(); + write_count++; + log_size = file.size(); + if (create_index_on_fly) { + index.info.log_size = log_size; + } + return ret; +} + + void PIBinaryLog::parseLog(PIFile * f, PIBinaryLog::BinLogInfo * info, PIVector * index) { if (!info && !index) return; if (info) { @@ -596,16 +636,17 @@ void PIBinaryLog::parseLog(PIFile * f, PIBinaryLog::BinLogInfo * info, PIVector< if (read_version == PIBINARYLOG_VERSION) { uint32_t sz = 0; f->read(&sz, 4); - if (sz > 0 && info) { - info->user_header = f->read(sz); + if (sz > 0) { + PIByteArray user_hdr = f->read(sz); + if (info) info->user_header = user_hdr; } } PIByteArray ba; - BinLogRecord br; + Record br; br.id = 0; br.size = 0; bool first = true; - size_t hdr_size = sizeof(BinLogRecord) - sizeof(PIByteArray); + size_t hdr_size = sizeof(Record) - sizeof(PIByteArray); ba.resize(hdr_size); while (1) { ba.resize(hdr_size); @@ -690,7 +731,7 @@ bool PIBinaryLog::cutBinLog(const PIBinaryLog::BinLogInfo & src, const PIString return false; } bool first = true; - BinLogRecord br; + Record br; PISystemTime st; PITimeMeasurer tm; while (!slog.isEnd() && ((slog.pos() <= to) || to < 0)) { @@ -746,7 +787,7 @@ bool PIBinaryLog::joinBinLogsSerial(const PIStringList & src, dtime = lt; } tm.reset(); - BinLogRecord br; + Record br; PISystemTime st; while (!slog.isEnd()) { br = slog.readRecord(); @@ -787,12 +828,10 @@ bool PIBinaryLog::createIndex() { llong cp = file.pos(); file.seekToBegin(); index.clear(); - index_pos.clear(); - parseLog(&file, &binfo, &index); + parseLog(&file, &index.info, &index.index); + index.makeIndexPos(); file.seek(cp); - is_indexed = !index.isEmpty(); - for (uint i = 0; i < index.size(); i++) - index_pos[index[i].pos] = i; + is_indexed = !index.index.isEmpty(); logmutex.unlock(); return is_indexed; } @@ -800,8 +839,8 @@ bool PIBinaryLog::createIndex() { int PIBinaryLog::posForTime(const PISystemTime & time) { int ci = -1; - for (uint i = 0; i < index.size(); i++) { - if (time <= index[i].timestamp && (filterID.contains(index[i].id) || filterID.isEmpty())) { + for (uint i = 0; i < index.index.size(); i++) { + if (time <= index.index[i].timestamp && (filterID.contains(index.index[i].id) || filterID.isEmpty())) { ci = i; break; } @@ -814,12 +853,12 @@ void PIBinaryLog::seekTo(int rindex) { // piCoutObj << "seekTo"; logmutex.lock(); pausemutex.lock(); - if (rindex < index.size_s() && rindex >= 0) { - file.seek(index[rindex].pos); - moveIndex(index_pos.value(file.pos(), -1)); + if (rindex < index.index.size_s() && rindex >= 0) { + file.seek(index.index[rindex].pos); + moveIndex(index.index_pos.value(file.pos(), -1)); // double prev_pt = play_time; - play_time = index[rindex].timestamp.toMilliseconds(); - lastrecord.timestamp = index[rindex].timestamp; + play_time = index.index[rindex].timestamp.toMilliseconds(); + lastrecord.timestamp = index.index[rindex].timestamp; if (play_mode == PlayRealTime) { startlogtime = PISystemTime::current() - lastrecord.timestamp; } @@ -842,8 +881,8 @@ bool PIBinaryLog::seek(const PISystemTime & time) { bool PIBinaryLog::seek(llong filepos) { int ci = -1; - for (uint i = 0; i < index.size(); i++) { - if (filepos <= index[i].pos && (filterID.contains(index[i].id) || filterID.isEmpty())) { + for (uint i = 0; i < index.index.size(); i++) { + if (filepos <= index.index[i].pos && (filterID.contains(index.index[i].id) || filterID.isEmpty())) { ci = i; break; } @@ -856,6 +895,27 @@ bool PIBinaryLog::seek(llong filepos) { } +int PIBinaryLog::pos() const { + if (is_indexed) return current_index; + return -1; +} + + +PIByteArray PIBinaryLog::saveIndex() const { + if (!is_indexed) return PIByteArray(); + return piSerialize(index); +} + + +bool PIBinaryLog::loadIndex(PIByteArray saved) { + if (!canRead() || saved.isEmpty()) return false; + index = piDeserialize(saved); + index.makeIndexPos(); + is_indexed = index.index.isNotEmpty(); + return is_indexed; +} + + PIString PIBinaryLog::constructFullPathDevice() const { PIString ret; ret += logDir() + ":" + filePrefix() + ":" + PIString::fromNumber(defaultID()) + ":"; @@ -928,3 +988,17 @@ void PIBinaryLog::propertyChanged(const char * s) { split_count = property("splitRecordCount").toInt(); // piCoutObj << "propertyChanged" << s << play_mode; } + + +void PIBinaryLog::CompleteIndex::clear() { + info = BinLogInfo(); + index.clear(); + index_pos.clear(); +} + + +void PIBinaryLog::CompleteIndex::makeIndexPos() { + index_pos.clear(); + for (uint i = 0; i < index.size(); i++) + index_pos[index[i].pos] = i; +} diff --git a/libs/main/io_devices/pibinarylog.h b/libs/main/io_devices/pibinarylog.h index f8cc21b7..9c1c8460 100644 --- a/libs/main/io_devices/pibinarylog.h +++ b/libs/main/io_devices/pibinarylog.h @@ -26,6 +26,7 @@ #ifndef PIBINARYLOG_H #define PIBINARYLOG_H +#include "pichunkstream.h" #include "pifile.h" //! \ingroup IO @@ -54,6 +55,8 @@ public: SplitCount /*! Separate files by records count */ }; +#pragma pack(push, 8) + //! \brief Struct contains information about all records with same ID struct PIP_EXPORT BinLogRecordInfo { BinLogRecordInfo() { @@ -68,17 +71,6 @@ public: PISystemTime end_time; }; - //! \brief Struct contains full information about Binary Log file and about all Records using map of \a BinLogRecordInfo - struct PIP_EXPORT BinLogInfo { - PIString path; - int records_count; - llong log_size; - PISystemTime start_time; - PISystemTime end_time; - PIMap records; - PIByteArray user_header; - }; - //! \brief Struct contains position, ID and timestamp of record in file struct PIP_EXPORT BinLogIndex { int id; @@ -87,6 +79,19 @@ public: PISystemTime timestamp; }; +#pragma pack(pop) + + //! \brief Struct contains full information about Binary Log file and about all Records using map of \a BinLogRecordInfo + struct PIP_EXPORT BinLogInfo { + PIString path; + int records_count = 0; + llong log_size = 0L; + PISystemTime start_time; + PISystemTime end_time; + PIMap records; + PIByteArray user_header; + }; + //! Current \a PlayMode PlayMode playMode() const { return play_mode; } @@ -121,6 +126,9 @@ public: //! Returns if rapid start enabled bool rapidStart() const { return rapid_start; } + //! Returns if index creates while writing + bool createIndexOnFly() const { return create_index_on_fly; } + //! Create binlog file with Filename = path void createNewFile(const PIString & path); @@ -142,6 +150,9 @@ public: //! If enabled BinLog \a ThreadedRead starts without delay for first record, i.e. first record will be readed immediately void setRapidStart(bool enabled) { setProperty("rapidStart", enabled); } + //! Set index creation while writing + void setCreateIndexOnFly(bool yes); + //! Set play speed to "speed", default value is 1.0x //! Also this function set \a playMode to \a PlayVariableSpeed void setPlaySpeed(double speed) { @@ -258,12 +269,12 @@ public: //! Get binlog info \a BinLogInfo BinLogInfo logInfo() const { - if (is_indexed) return binfo; + if (is_indexed) return index.info; return getLogInfo(path()); } //! Get binlog index \a BinLogIndex, need \a createIndex before getting index - const PIVector & logIndex() const { return index; } + const PIVector & logIndex() const { return index.index; } //! Create index of current binlog file bool createIndex(); @@ -284,10 +295,10 @@ public: bool seek(llong filepos); //! Get current record index (position record in file) - int pos() const { - if (is_indexed) return current_index; - return -1; - } + int pos() const; + + PIByteArray saveIndex() const; + bool loadIndex(PIByteArray saved); //! \handlers //! \{ @@ -342,38 +353,89 @@ protected: DeviceInfoFlags deviceInfoFlags() const override { return PIIODevice::Reliable; } private: - struct PIP_EXPORT BinLogRecord { + struct PIP_EXPORT Record { int id; int size; PISystemTime timestamp; PIByteArray data; }; + struct PIP_EXPORT CompleteIndex { + void clear(); + void makeIndexPos(); + BinLogInfo info; + PIVector index; + PIMap index_pos; + }; + BINARY_STREAM_FRIEND(CompleteIndex) bool writeFileHeader(); bool checkFileHeader(); - BinLogRecord readRecord(); + Record readRecord(); + int writeRecord(int id, PISystemTime time, const void * data, int size); static void parseLog(PIFile * f, BinLogInfo * info, PIVector * index); void moveIndex(int i); static PIString getLogfilePath(const PIString & log_dir, const PIString & prefix); - PIVector index; - PIMap index_pos; - BinLogInfo binfo; + CompleteIndex index; PlayMode play_mode; SplitMode split_mode; PIFile file; - BinLogRecord lastrecord; + Record lastrecord; PISystemTime startlogtime, play_delay, split_time, pause_time; mutable PIMutex logmutex, pausemutex; double play_time, play_speed; llong split_size, log_size; int write_count, split_count, default_id, current_index; - bool is_started, is_thread_ok, is_indexed, rapid_start, is_pause; + bool is_started, is_thread_ok, is_indexed, rapid_start, is_pause, create_index_on_fly; PIByteArray user_header; std::function f_new_path; }; + +BINARY_STREAM_WRITE(PIBinaryLog::BinLogInfo) { + PIChunkStream cs; + cs.add(1, v.path) + .add(2, v.records_count) + .add(3, v.log_size) + .add(4, v.start_time) + .add(5, v.end_time) + .add(6, v.records) + .add(7, v.user_header); + s << cs.data(); + return s; +} + +BINARY_STREAM_READ(PIBinaryLog::BinLogInfo) { + PIByteArray csba; + s >> csba; + PIChunkStream cs(csba); + while (!cs.atEnd()) { + switch (cs.read()) { + case 1: cs.get(v.path); break; + case 2: cs.get(v.records_count); break; + case 3: cs.get(v.log_size); break; + case 4: cs.get(v.start_time); break; + case 5: cs.get(v.end_time); break; + case 6: cs.get(v.records); break; + case 7: cs.get(v.user_header); break; + } + } + return s; +} + + +BINARY_STREAM_WRITE(PIBinaryLog::CompleteIndex) { + s << v.info << v.index; + return s; +} + +BINARY_STREAM_READ(PIBinaryLog::CompleteIndex) { + s >> v.info >> v.index; + return s; +} + + //! \relatesalso PICout \brief Output operator PIBinaryLog::BinLogInfo to PICout inline PICout operator<<(PICout s, const PIBinaryLog::BinLogInfo & bi) { s.space(); diff --git a/libs/main/io_devices/pifile.cpp b/libs/main/io_devices/pifile.cpp index 950dc208..0b4f0763 100644 --- a/libs/main/io_devices/pifile.cpp +++ b/libs/main/io_devices/pifile.cpp @@ -622,3 +622,18 @@ bool PIFile::applyFileInfo(const PIString & path, const PIFile::FileInfo & info) #endif return true; } + + +PIByteArray PIFile::readAll(const PIString & path, bool forceRead) { + PIFile f(path, PIIODevice::ReadOnly); + if (!f.isOpened()) return PIByteArray(); + return f.readAll(forceRead); +} + + +int PIFile::writeAll(const PIString & path, const PIByteArray & data) { + PIFile f(path, PIIODevice::ReadWrite); + if (!f.isOpened()) return 0; + f.clear(); + return f.write(data.data(), data.size_s()); +} diff --git a/libs/main/io_devices/pifile.h b/libs/main/io_devices/pifile.h index 0adc4d7b..8f373499 100644 --- a/libs/main/io_devices/pifile.h +++ b/libs/main/io_devices/pifile.h @@ -293,6 +293,14 @@ public: //! \~russian Применяет параметры "info" к файлу или директории с путём "info".path static bool applyFileInfo(const FileInfo & info) { return applyFileInfo(info.path, info); } + //! \~english Read all file content at path "path" to byte array and return it. + //! \~russian Читает всё содержимое файла по пути "path" и возвращает его как массив байтов. + static PIByteArray readAll(const PIString & path, bool forceRead = false); + + //! \~english Clear file at path "path" and write "data", returns written bytes. + //! \~russian Очищает файл по пути "path", пишет туда "data" и возвращает количество записанных байт. + static int writeAll(const PIString & path, const PIByteArray & data); + //! \handlers //! \{