/* PIP - Platform Independent Primitives Class for write binary data to logfile, and read or playback this data Andrey Bychkov work.a.b@yandex.ru This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #include "pibinarylog.h" #include "pidir.h" #include "pipropertystorage.h" #define PIBINARYLOG_VERSION_OLD 0x31 /*! \class PIBinaryLog * \brief Class for read and write binary data to logfile, and playback this data in realtime, or custom speed * * \section PIBinaryLog_sec0 Synopsis * Binary Log is a file with simple header, where you can read and write some binary data. * Any written data include special header with ID, size and timestamp. * This header provides separation different messages from the one file by choosing different IDs. * With \a filterID or special functions, like \a readBinLog() you can choose IDs what you want to read. * With function \a writeBinLog() or \a setDefaultID() you can choose ID that mark you data. * By default ID = 1, and \a filterID is empty, that mean you read any ID without filtering. * ThreadedRead provide you playback data, with delay that you write data. * You can choose different playbak modes by set \a PlayMode. * * \section PIBinaryLog_sec1 Basic usage * This class provide all functions of \a PIIODevice, such \a open(), \a close(), * \a read() ,\a write(), and threaded read/write. * function \a setLogDir() need to set directory for BinLog files * function \a createNewFile() need to create new binlog file * function \a restart() need start from the begining of binlog file * */ static const uchar binlog_sig[] = {'B','I','N','L','O','G'}; #define PIBINARYLOG_VERSION 0x32 #define PIBINARYLOG_SIGNATURE_SIZE sizeof(binlog_sig) REGISTER_DEVICE(PIBinaryLog) PIBinaryLog::PIBinaryLog() { #ifdef FREERTOS setThreadedReadBufferSize(512); #else setThreadedReadBufferSize(65536); #endif is_started = is_indexed = is_pause = false; current_index = -1; log_size = 0; setPlaySpeed(1.); setDefaultID(1); setPlaySpeed(1.0); setPlayDelay(PISystemTime::fromSeconds(1.0)); setPlayRealTime(); setSplitTime(PISystemTime(600, 0)); setSplitRecordCount(1000); setSplitFileSize(0xFFFFFF); setSplitMode(SplitNone); setLogDir(PIString()); setFilePrefix(PIString()); setRapidStart(false); file.setName("__S__PIBinaryLog::file"); // piCoutObj << "created"; } PIBinaryLog::~PIBinaryLog() { stop(); close(); } bool PIBinaryLog::openDevice() { lastrecord.timestamp = PISystemTime(); lastrecord.id = 0; write_count = 0; is_started = false; is_thread_ok = true; is_indexed = false; is_pause = false; index.clear(); index_pos.clear(); log_size = 0; if (mode_ == ReadWrite) { piCoutObj << "Error: ReadWrite mode not supported, use WriteOnly or ReadOnly"; return false; } if (path().isEmpty() && mode_ == WriteOnly) { setPath(getLogfilePath()); } if (path().isEmpty() && mode_ == ReadOnly) { PIDir ld(logDir()); if (ld.isExists()) { PIVector es = ld.allEntries(); piForeachC(PIFile::FileInfo &i, es) { if (i.extension() == "binlog" && i.isFile() && i.baseName().startsWith(filePrefix())) { setPath(i.path); break; } } } } if (!file.open(path(), mode_)) { piCoutObj << "Error: Can't open file" << path(); return false; } setName(path()); if (mode_ == WriteOnly) { file.clear(); if (!writeFileHeader()) { piCoutObj << "Error: Can't write binlog file header" << path(); return false; } is_started = true; } if (mode_ == ReadOnly) { if (file.isEmpty()) { piCoutObj << "Error: File is null" << path(); fileError(); return false; } if (!checkFileHeader()) { fileError(); return false; } if (isEmpty()) { piCoutObj << "Warning: Empty BinLog file" << path(); fileEnd(); } play_time = 0; if (!rapid_start) is_started = true; } startlogtime = PISystemTime::current(); pause_time = PISystemTime(); return true; } bool PIBinaryLog::closeDevice() { stopThreadedRead(); pausemutex.unlock(); logmutex.unlock(); moveIndex(-1); is_indexed = false; index.clear(); index_pos.clear(); bool e = isEmpty(); log_size = 0; if (canWrite() && e) { file.remove(); return true; } return file.close(); } bool PIBinaryLog::threadedRead(uchar *readed, int size) { // piCout << "binlog threaded read"; if (!canRead() || isEnd()) return PIIODevice::threadedRead(readed, size); is_thread_ok = false; logmutex.lock(); PISystemTime pt, lastrec_timestamp = lastrecord.timestamp; logmutex.unlock(); double delay; switch (play_mode) { case PlayRealTime: pausemutex.lock(); if (is_pause) { pausemutex.unlock(); piMSleep(100); return false; } else if (pause_time > PISystemTime()) { startlogtime += pause_time; pause_time = PISystemTime(); } pausemutex.unlock(); pt = PISystemTime::current() - startlogtime; if (is_started) { if (lastrec_timestamp > pt) (lastrec_timestamp - pt).sleep(); } else { startlogtime = PISystemTime::current() - lastrec_timestamp; is_started = true; } break; case PlayVariableSpeed: delay = lastrec_timestamp.toMilliseconds() - play_time; //piCoutObj << "delay" << delay; double cdelay; int dtc; if (is_started) { if (is_pause) { piMSleep(100); return false; } if (delay > 0) { cdelay = delay * play_speed; dtc = int(cdelay) /100; if (play_speed <= 0.) dtc = 2; //piCout << play_speed << dtc; for (int j=0; j PISystemTime()) pause_time = PISystemTime::current() - pause_time; } pausemutex.unlock(); } int PIBinaryLog::writeBinLog(int id, const void *data, int size) { if (size <= 0 || !canWrite()) return -1; if (id == 0) { piCoutObj << "Error: can`t write with id = 0!"; return -1; } logmutex.lock(); switch (split_mode) { case SplitSize: if (file.size() > split_size) createNewFile(); break; case SplitTime: if ((PISystemTime::current() - startlogtime) > split_time) createNewFile(); break; case SplitCount: if (write_count > split_count) createNewFile(); break; default: break; } if (is_pause) { logmutex.unlock(); return 0; } PIByteArray logdata; logdata << id << size << (PISystemTime::current() - startlogtime) << PIByteArray::RawData(data, size); int res = file.write(logdata.data(), logdata.size()); file.flush(); write_count++; log_size = file.size(); logmutex.unlock(); if (res > 0) return size; else return res; } int PIBinaryLog::writeBinLog_raw(int id, const PISystemTime &time, const void *data, int size) { if (size <= 0 || !canWrite()) return -1; PIByteArray logdata; logdata << id << size << time << PIByteArray::RawData(data, size); logmutex.lock(); int res = file.write(logdata.data(), logdata.size()); file.flush(); write_count++; log_size = file.size(); logmutex.unlock(); if (res > 0) return size; else return res; } PIByteArray PIBinaryLog::readBinLog(int id, PISystemTime * time, int * readed_id) { if (!canRead()) return PIByteArray(); logmutex.lock(); BinLogRecord br = readRecord(); logmutex.unlock(); if (br.id == -1) { piCoutObj << "End of BinLog file"; fileEnd(); return PIByteArray(); } if (id == 0 && br.id > 0) { if (time) *time = br.timestamp; if (readed_id) *readed_id = br.id; return br.data; } while (br.id != id && !isEnd()) br = readRecord(); if (br.id == -1) { piCoutObj << "End of BinLog file"; fileEnd(); return PIByteArray(); } if (br.id == id) { if (time) *time = br.timestamp; if (readed_id) *readed_id = br.id; return br.data; } piCoutObj << "Can't find record with id =" << id; return PIByteArray(); } int PIBinaryLog::readBinLog(int id, void *read_to, int max_size, PISystemTime * time, int * readed_id) { if (max_size <= 0 || read_to == 0) return -1; PIByteArray ba = readBinLog(id, time, readed_id); if (ba.isEmpty()) return -1; int sz = piMini(max_size, ba.size()); memcpy(read_to, ba.data(), sz); return sz; } bool PIBinaryLog::isEmpty() const { return (log_size <= llong(PIBINARYLOG_SIGNATURE_SIZE + 1)); } void PIBinaryLog::setHeader(const PIByteArray & header) { user_header = header; } PIByteArray PIBinaryLog::getHeader() { return binfo.user_header; } int PIBinaryLog::readDevice(void *read_to, int max_size) { PIMutexLocker _ml(logmutex); if (lastrecord.id == -1 || isEnd()) return 0; if(!is_thread_ok && lastrecord.id > 0) return lastrecord.data.size(); if (!canRead()) return -1; if (max_size <= 0 || read_to == 0) return -1; BinLogRecord br; br.id = 0; if (filterID.isEmpty()) br = readRecord(); else { while (!filterID.contains(br.id) && !isEnd()) br = readRecord(); } if (br.id == -1) { fileEnd(); piCoutObj << "End of BinLog file"; return 0; } if (br.id <= 0) { piCoutObj << "Read record error"; return -1; } int sz = piMini(max_size, br.data.size()); if (sz < br.data.size_s()) piCoutObj << "too small read buffer:" << max_size << ", data size:" << br.data.size(); memcpy(read_to, br.data.data(), sz); return sz; } void PIBinaryLog::restart() { bool th = isRunning(); if (th) stopThreadedRead(); if (!canRead()) return; logmutex.unlock(); lastrecord.timestamp = PISystemTime(); lastrecord.id = 0; is_thread_ok = true; is_started = !rapidStart(); play_time = 0; file.seekToBegin(); checkFileHeader(); moveIndex(0); startlogtime = PISystemTime::current(); if (th) startThreadedRead(); } bool PIBinaryLog::writeFileHeader() { if (file.write(binlog_sig, PIBINARYLOG_SIGNATURE_SIZE) <= 0) return false; uchar version = PIBINARYLOG_VERSION; if (file.write(&version, 1) <= 0) return false; uint32_t sz = user_header.size(); file.write(&sz, 4); file.write(user_header); file.flush(); return true; } bool PIBinaryLog::checkFileHeader() { binfo.user_header.clear(); uchar read_sig[PIBINARYLOG_SIGNATURE_SIZE]; for (uint i=0; i 0) { binfo.user_header = file.read(sz); } return true; } if (read_version == 0) piCoutObj << "BinLogFile has invalid version"; if (read_version < PIBINARYLOG_VERSION) piCoutObj << "BinLogFile has too old verion"; if (read_version > PIBINARYLOG_VERSION) piCoutObj << "BinLogFile has too newest version"; return false; } PIBinaryLog::BinLogRecord PIBinaryLog::readRecord() { // piCoutObj << "readRecord"; logmutex.lock(); PIByteArray ba; BinLogRecord br; lastrecord.id = 0; lastrecord.data.clear(); lastrecord.timestamp = PISystemTime(); ba.resize(sizeof(BinLogRecord) - sizeof(PIByteArray)); if(file.read(ba.data(), ba.size_s()) > 0) { ba >> br.id >> br.size >> br.timestamp; } else { br.id = -1; logmutex.unlock(); // piCoutObj << "readRecord done"; return br; } if (br.id > 0 && br.size > 0) { ba.resize(br.size); if(file.read(ba.data(), ba.size_s()) > 0) br.data = ba; else br.id = 0; } else br.id = 0; lastrecord = br; if (br.id == 0) fileError(); moveIndex(index_pos.value(file.pos(), -1)); logmutex.unlock(); // piCoutObj << "readRecord done"; return br; } void PIBinaryLog::parseLog(PIFile * f, PIBinaryLog::BinLogInfo * info, PIVector * index) { if (!info && !index) return; if (info) { info->log_size = -1; info->records_count = 0; info->records.clear(); } if (index) index->clear(); if (f == 0) return; if (!f->canRead()) return; if (info) { info->path = f->path(); info->log_size = f->size(); } uchar read_sig[PIBINARYLOG_SIGNATURE_SIZE]; for (uint i=0; iread(read_sig, PIBINARYLOG_SIGNATURE_SIZE) < 0) {if (info) info->records_count = -1; ok = false;} for (uint i=0; irecords_count = -2; ok = false;} uchar read_version = 0; if (f->read(&read_version, 1) < 0) {if (info) info->records_count = -3; ok = false;} if (read_version == 0) {if (info) info->records_count = -4; ok = false;} if (read_version < PIBINARYLOG_VERSION_OLD) {if (info) info->records_count = -5; ok = false;} if (read_version > PIBINARYLOG_VERSION) {if (info) info->records_count = -6; ok = false;} if (read_version == PIBINARYLOG_VERSION) { uint32_t sz = 0; f->read(&sz, 4); if (sz > 0 && info) { info->user_header = f->read(sz); } } if (!ok) return; PIByteArray ba; BinLogRecord br; bool first = true; size_t hdr_size = sizeof(BinLogRecord) - sizeof(PIByteArray); ba.resize(hdr_size); while (1) { ba.resize(hdr_size); { if (f->read(ba.data(), ba.size()) > 0) { ba >> br.id >> br.size >> br.timestamp; } else break; if (info->log_size - f->pos() >= br.size) f->seek(f->pos() + br.size); else break; } if (br.id > 0) { if (info) { BinLogIndex bl_ind; bl_ind.id = br.id; bl_ind.pos = f->pos() - br.size - hdr_size; bl_ind.timestamp = br.timestamp; index->append(bl_ind); } if (info) { info->records_count++; if (first) { info->start_time = br.timestamp; first = false; } BinLogRecordInfo &bri(info->records[br.id]); bri.count++; if (bri.id == 0) { bri.id = br.id; bri.minimum_size = bri.maximum_size = br.size; bri.start_time = br.timestamp; } else { bri.end_time = br.timestamp; if (bri.minimum_size > br.size) bri.minimum_size = br.size; if (bri.maximum_size < br.size) bri.maximum_size = br.size; } } } } if (info) info->end_time = br.timestamp; } void PIBinaryLog::moveIndex(int i) { if (is_indexed) { current_index = i; posChanged(current_index); } } PIBinaryLog::BinLogInfo PIBinaryLog::getLogInfo(const PIString & path) { BinLogInfo bi; bi.path = path; bi.records_count = 0; PIFile tfile; if (!tfile.open(path, PIIODevice::ReadOnly)) return bi; parseLog(&tfile, &bi, 0); return bi; } bool PIBinaryLog::cutBinLog(const PIBinaryLog::BinLogInfo & src, const PIString & dst, int from, int to) { PIBinaryLog slog; if (!slog.open(src.path, PIIODevice::ReadOnly)) return false; PIVector ids = src.records.keys(); slog.seekTo(from); PIBinaryLog dlog; dlog.createNewFile(dst); bool first = true; BinLogRecord br; PISystemTime st; while (!slog.isEnd() && ((slog.pos() <= to) || to < 0)) { br = slog.readRecord(); if (first) { st = br.timestamp; first = false; } if (ids.contains(br.id)) { dlog.writeBinLog_raw(br.id, br.timestamp - st, br.data); } } return true; } bool PIBinaryLog::createIndex() { logmutex.lock(); llong cp = file.pos(); file.seekToBegin(); index.clear(); index_pos.clear(); parseLog(&file, &binfo, &index); file.seek(cp); is_indexed = !index.isEmpty(); for (uint i=0; i= 0) { file.seek(index[rindex].pos); moveIndex(index_pos.value(file.pos(), -1)); //double prev_pt = play_time; play_time = index[rindex].timestamp.toMilliseconds(); lastrecord.timestamp = index[rindex].timestamp; if (play_mode == PlayRealTime) { startlogtime = PISystemTime::current() - lastrecord.timestamp; } } // piCoutObj << "seekTo done"; logmutex.unlock(); } bool PIBinaryLog::seek(const PISystemTime & time) { int ci = -1; for (uint i=0; i= 0) { seekTo(ci); return true; } return false; } bool PIBinaryLog::seek(llong filepos) { int ci = -1; for (uint i=0; i= 0) { seekTo(ci); return true; } return false; } PIString PIBinaryLog::constructFullPathDevice() const { PIString ret; ret << logDir() << ":" << filePrefix() << ":" << defaultID() << ":"; switch (play_mode) { case PlayRealTime: ret << "RT"; break; case PlayVariableSpeed: ret << PIString::fromNumber(playSpeed()) << "X"; break; case PlayStaticDelay: ret << PIString::fromNumber(playDelay().toMilliseconds()) << "M"; break; default: ret << "RT"; break; } return ret; } void PIBinaryLog::configureFromFullPathDevice(const PIString & full_path) { PIStringList pl = full_path.split(":"); for (int i = 0; i < pl.size_s(); ++i) { PIString p(pl[i]); switch (i) { case 0: setLogDir(p); break; case 1: setFilePrefix(p); break; case 2: setDefaultID(p.toInt()); break; case 3: if (p.toUpperCase() == "RT") setPlayRealTime(); if (p.toUpperCase().right(1) == "X") setPlaySpeed((p.left(p.size() - 1)).toDouble()); if (p.toUpperCase().right(1) == "M") setPlayDelay(PISystemTime::fromMilliseconds((p.left(p.size() - 1)).toDouble())); break; } } // piCoutObj << "configured"; } PIPropertyStorage PIBinaryLog::constructVariantDevice() const { PIPropertyStorage ret; PIVariantTypes::Enum e; ret.addProperty("log dir", PIVariantTypes::Dir(logDir())); ret.addProperty("file prefix", filePrefix()); ret.addProperty("default ID", defaultID()); e << "real-time" << "variable speed" << "static delay"; e.selectValue((int)playMode()); ret.addProperty("play mode", e); ret.addProperty("play speed", playSpeed()); ret.addProperty("play delay", playDelay().toMilliseconds()); return ret; } void PIBinaryLog::configureFromVariantDevice(const PIPropertyStorage & d) { setLogDir(d.propertyValueByName("log dir").toString()); setFilePrefix(d.propertyValueByName("file prefix").toString()); setDefaultID(d.propertyValueByName("default ID").toInt()); setPlaySpeed(d.propertyValueByName("play speed").toDouble()); setPlayDelay(PISystemTime::fromMilliseconds(d.propertyValueByName("play delay").toDouble())); setPlayMode((PlayMode)d.propertyValueByName("play mode").toEnum().selectedValue()); } void PIBinaryLog::propertyChanged(const PIString &s) { default_id = property("defaultID").toInt(); rapid_start = property("rapidStart").toBool(); play_mode = (PlayMode)property("playMode").toInt(); double ps = property("playSpeed").toDouble(); play_speed = ps > 0. ? 1. / ps : 0.; play_delay = property("playDelay").toSystemTime(); split_mode = (SplitMode)property("splitMode").toInt(); split_time = property("splitTime").toSystemTime(); split_size = property("splitFileSize").toLLong(); split_count = property("splitRecordCount").toInt(); // piCoutObj << "propertyChanged" << s << play_mode; }