Files
pip/src_main/io_devices/pibinarylog.cpp

733 lines
20 KiB
C++

/*
PIP - Platform Independent Primitives
Class for write binary data to logfile, and read or playback this data
Copyright (C) 2020 Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "pibinarylog.h"
#include "pidir.h"
#include "pipropertystorage.h"
/*! \class PIBinaryLog
* \brief Class for read and write binary data to logfile, and playback this data in realtime, or custom speed
*
* \section PIBinaryLog_sec0 Synopsis
* Binary Log is a file with simple header, where you can read and write some binary data.
* Any written data include special header with ID, size and timestamp.
* This header provides separation different messages from the one file by choosing different IDs.
* With \a filterID or special functions, like \a readBinLog() you can choose IDs what you want to read.
* With function \a writeBinLog() or \a setDefaultID() you can choose ID that mark you data.
* By default ID = 1, and \a filterID is empty, that mean you read any ID without filtering.
* ThreadedRead provide you playback data, with delay that you write data.
* You can choose different playbak modes by set \a PlayMode.
*
* \section PIBinaryLog_sec1 Basic usage
* This class provide all functions of \a PIIODevice, such \a open(), \a close(),
* \a read() ,\a write(), and threaded read/write.
* function \a setLogDir() need to set directory for BinLog files
* function \a createNewFile() need to create new binlog file
* function \a restart() need start from the begining of binlog file
*
*/
REGISTER_DEVICE(PIBinaryLog)
PIBinaryLog::PIBinaryLog() {
#ifdef FREERTOS
setThreadedReadBufferSize(512);
#else
setThreadedReadBufferSize(65536);
#endif
is_started = is_indexed = is_pause = false;
current_index = -1;
log_size = 0;
setPlaySpeed(1.);
setDefaultID(1);
setPlaySpeed(1.0);
setPlayDelay(PISystemTime::fromSeconds(1.0));
setPlayRealTime();
setSplitTime(PISystemTime(600, 0));
setSplitRecordCount(1000);
setSplitFileSize(0xFFFFFF);
setSplitMode(SplitNone);
setLogDir(PIString());
setFilePrefix(PIString());
setRapidStart(false);
file.setName("__S__PIBinaryLog::file");
// piCoutObj << "created";
}
bool PIBinaryLog::openDevice() {
lastrecord.timestamp = PISystemTime();
lastrecord.id = 0;
write_count = 0;
is_started = false;
is_thread_ok = true;
is_indexed = false;
is_pause = false;
index.clear();
index_pos.clear();
log_size = 0;
if (mode_ == ReadWrite) {
piCoutObj << "Error: ReadWrite mode not supported, use WriteOnly or ReadOnly";
return false;
}
if (path().isEmpty() && mode_ == WriteOnly) {
setPath(getLogfilePath());
}
if (path().isEmpty() && mode_ == ReadOnly) {
PIDir ld(logDir());
if (ld.isExists()) {
PIVector<PIFile::FileInfo> es = ld.allEntries();
piForeachC(PIFile::FileInfo &i, es) {
if (i.extension() == "binlog" && i.isFile() && i.baseName().startsWith(filePrefix())) {
setPath(i.path);
piBreak;
}
}
}
}
if (!file.open(path(), mode_)) {
piCoutObj << "Error: Can't open file" << path();
return false;
}
setName(path());
if (mode_ == WriteOnly) {
file.clear();
if (!writeFileHeader()) {
piCoutObj << "Error: Can't write binlog file header" << path();
return false;
}
is_started = true;
}
if (mode_ == ReadOnly) {
if (file.isEmpty()) {
piCoutObj << "Error: File is null" << path();
fileError();
return false;
}
if (!checkFileHeader()) {
fileError();
return false;
}
if (isEmpty()) {
piCoutObj << "Warning: Empty BinLog file" << path();
fileEnd();
}
play_time = 0;
if (!rapid_start) is_started = true;
}
startlogtime = PISystemTime::current();
pause_time = PISystemTime();
return true;
}
bool PIBinaryLog::closeDevice() {
moveIndex(-1);
is_indexed = false;
index.clear();
index_pos.clear();
bool e = isEmpty();
log_size = 0;
if (canWrite() && e) {
file.remove();
return true;
}
return file.close();
}
bool PIBinaryLog::threadedRead(uchar *readed, int size) {
// piCout << "binlog threaded read";
if (!canRead() || isEnd()) return PIIODevice::threadedRead(readed, size);
is_thread_ok = false;
PISystemTime pt;
double delay;
switch (play_mode) {
case PlayRealTime:
pausemutex.lock();
if (is_pause) {
piMSleep(100);
pausemutex.unlock();
return false;
} else if (pause_time > PISystemTime()) {
startlogtime += pause_time;
pause_time = PISystemTime();
}
pausemutex.unlock();
pt = PISystemTime::current() - startlogtime;
if (is_started) {
if (lastrecord.timestamp > pt)
(lastrecord.timestamp - pt).sleep();
} else {
startlogtime = PISystemTime::current() - lastrecord.timestamp;
is_started = true;
}
break;
case PlayVariableSpeed:
delay = lastrecord.timestamp.toMilliseconds() - play_time;
//piCoutObj << "delay" << delay;
double cdelay;
int dtc;
if (is_started) {
if (is_pause) {
piMSleep(100);
return false;
}
if (delay > 0) {
cdelay = delay * play_speed;
dtc = int(cdelay) /100;
if (play_speed <= 0.) dtc = 2;
//piCout << play_speed << dtc;
for (int j=0; j<dtc; j++) {
cdelay = delay * play_speed;
dtc = int(cdelay) /100;
piMSleep(100);
if (play_speed <= 0.) {dtc = 2; j = 0;}
//piCout << " " << play_speed << dtc << j;
}
cdelay = cdelay - dtc*100;
PISystemTime::fromMilliseconds(cdelay).sleep();
}
} else is_started = true;
play_time = lastrecord.timestamp.toMilliseconds();
break;
case PlayStaticDelay:
if (is_started) {
if (is_pause) {
piMSleep(100);
return false;
}
play_delay.sleep();
} else is_started = true;
break;
default:
return false;
}
bool res = PIIODevice::threadedRead(readed, size);
is_thread_ok = true;
return res;
}
PIString PIBinaryLog::getLogfilePath() const {
PIDir dir(logDir());
dir.setDir(dir.absolutePath());
if (!dir.isExists()) {
piCoutObj << "Creating directory" << dir.path();
dir.make(true);
}
PIString npath = logDir() + "/" + filePrefix() + PIDateTime::current().toString("yyyy_MM_dd__hh_mm_ss");
PIString cnpath = npath + ".binlog";
int i = 1;
while (PIFile::isExists(cnpath)) {
cnpath = npath + "_" + PIString::fromNumber(i) + ".binlog";
i++;
}
return cnpath;
}
PIString PIBinaryLog::createNewFile() {
if (!file.close()) return PIString();
PIString cnpath = getLogfilePath();
if (open(cnpath, PIIODevice::WriteOnly)) {
newFile(file.path());
return file.path();
}
piCoutObj << "Can't create new file, maybe LogDir is invalid.";
return PIString();
}
void PIBinaryLog::createNewFile(const PIString &path) {
if (open(path, PIIODevice::WriteOnly)) {
newFile(file.path());
}
else piCoutObj << "Can't create new file, maybe path is invalid.";
}
void PIBinaryLog::setPause(bool pause) {
pausemutex.lock();
is_pause = pause;
if (pause) pause_time = PISystemTime::current();
else pause_time = PISystemTime::current() - pause_time;
pausemutex.unlock();
}
int PIBinaryLog::writeBinLog(int id, const void *data, int size) {
if (size <= 0 || !canWrite()) return -1;
if (id == 0) {
piCoutObj << "Error: can`t write with id = 0!";
return -1;
}
logmutex.lock();
switch (split_mode) {
case SplitSize:
if (file.size() > split_size) createNewFile();
break;
case SplitTime:
if ((PISystemTime::current() - startlogtime) > split_time) createNewFile();
break;
case SplitCount:
if (write_count > split_count) createNewFile();
break;
default: break;
}
if (is_pause) {
logmutex.unlock();
return 0;
}
PIByteArray logdata;
logdata << id << size << (PISystemTime::current() - startlogtime) << PIByteArray::RawData(data, size);
int res = file.write(logdata.data(), logdata.size());
file.flush();
write_count++;
log_size = file.size();
logmutex.unlock();
if (res > 0) return size;
else return res;
}
int PIBinaryLog::writeBinLog_raw(int id, const PISystemTime &time, const void *data, int size) {
if (size <= 0 || !canWrite()) return -1;
PIByteArray logdata;
logdata << id << size << time << PIByteArray::RawData(data, size);
logmutex.lock();
int res = file.write(logdata.data(), logdata.size());
file.flush();
write_count++;
log_size = file.size();
logmutex.unlock();
if (res > 0) return size;
else return res;
}
PIByteArray PIBinaryLog::readBinLog(int id, PISystemTime * time) {
if (!canRead()) return PIByteArray();
BinLogRecord br = readRecord();
if (br.id == -1) {
piCoutObj << "End of BinLog file";
fileEnd();
return PIByteArray();
}
if (id == 0 && br.id > 0) return br.data;
while (br.id != id && !isEnd()) br = readRecord();
if (br.id == -1) {
piCoutObj << "End of BinLog file";
fileEnd();
return PIByteArray();
}
if (br.id == id) {
if (time)
*time = br.timestamp;
return br.data;
}
piCoutObj << "Can't find record with id =" << id;
return PIByteArray();
}
int PIBinaryLog::readBinLog(int id, void *read_to, int max_size, PISystemTime * time) {
if (max_size <= 0 || read_to == 0) return -1;
PIByteArray ba = readBinLog(id, time);
if (ba.isEmpty()) return -1;
int sz = piMini(max_size, ba.size());
memcpy(read_to, ba.data(), sz);
return sz;
}
int PIBinaryLog::readDevice(void *read_to, int max_size) {
if (lastrecord.id == -1 || isEnd()) return 0;
if(!is_thread_ok && lastrecord.id > 0) return lastrecord.data.size();
if (!canRead()) return -1;
if (max_size <= 0 || read_to == 0) return -1;
BinLogRecord br;
br.id = 0;
if (filterID.isEmpty()) br = readRecord();
else {
while (!filterID.contains(br.id) && !isEnd()) br = readRecord();
}
if (br.id == -1) {
fileEnd();
piCoutObj << "End of BinLog file";
return 0;
}
if (br.id <= 0) {
piCoutObj << "Read record error";
return -1;
}
int sz = piMini(max_size, br.data.size());
if (sz < br.data.size_s()) piCoutObj << "too small read buffer:" << max_size << ", data size:" << br.data.size();
memcpy(read_to, br.data.data(), sz);
return sz;
}
void PIBinaryLog::restart() {
bool th = isRunning();
if (th) stopThreadedRead();
if (!canRead()) return;
logmutex.unlock();
lastrecord.timestamp = PISystemTime();
lastrecord.id = 0;
is_thread_ok = true;
is_started = !rapidStart();
play_time = 0;
file.seekToBegin();
checkFileHeader();
moveIndex(0);
startlogtime = PISystemTime::current();
if (th) startThreadedRead();
}
bool PIBinaryLog::writeFileHeader() {
if (file.write(&__S__PIBinaryLog::binlog_sig, PIBINARYLOG_SIGNATURE_SIZE) <= 0) return false;
uchar version = PIBINARYLOG_VERSION;
if (file.write(&version, 1) <= 0) return false;
file.flush();
return true;
}
bool PIBinaryLog::checkFileHeader() {
uchar read_sig[PIBINARYLOG_SIGNATURE_SIZE];
for (uint i=0; i<PIBINARYLOG_SIGNATURE_SIZE; i++) read_sig[i] = 0;
if (file.read(read_sig, PIBINARYLOG_SIGNATURE_SIZE) < 0) return false;
bool correct = true;
for (uint i=0; i<PIBINARYLOG_SIGNATURE_SIZE; i++)
if (read_sig[i] != __S__PIBinaryLog::binlog_sig[i]) correct = false;
if (!correct) {
piCoutObj << "BinLogFile signature is corrupted or invalid file";
return false;
}
uchar read_version = 0;
if (file.read(&read_version, 1) < 0) return false;
if (read_version == PIBINARYLOG_VERSION) {
log_size = file.size();
return true;
}
if (read_version == 0)
piCoutObj << "BinLogFile has invalid version";
if (read_version < PIBINARYLOG_VERSION)
piCoutObj << "BinLogFile has too old verion";
if (read_version > PIBINARYLOG_VERSION)
piCoutObj << "BinLogFile has too newest version";
return false;
}
PIBinaryLog::BinLogRecord PIBinaryLog::readRecord() {
// piCoutObj << "readRecord";
logmutex.lock();
PIByteArray ba;
BinLogRecord br;
lastrecord.id = 0;
lastrecord.data.clear();
lastrecord.timestamp = PISystemTime();
ba.resize(sizeof(BinLogRecord) - sizeof(PIByteArray));
if(file.read(ba.data(), ba.size_s()) > 0) {
ba >> br.id >> br.size >> br.timestamp;
} else {
br.id = -1;
logmutex.unlock();
// piCoutObj << "readRecord done";
return br;
}
if (br.id > 0 && br.size > 0) {
ba.resize(br.size);
if(file.read(ba.data(), ba.size_s()) > 0) br.data = ba;
else br.id = 0;
} else br.id = 0;
lastrecord = br;
if (br.id == 0) fileError();
moveIndex(index_pos.value(file.pos(), -1));
logmutex.unlock();
// piCoutObj << "readRecord done";
return br;
}
void PIBinaryLog::parseLog(PIFile * f, PIBinaryLog::BinLogInfo * info, PIVector<PIBinaryLog::BinLogIndex> * index) {
BinLogInfo * bi = info;
bool ginfo = info != 0;
bool gindex = index != 0;
if (!ginfo && !gindex) return;
if (ginfo) {
bi->log_size = -1;
bi->records_count = 0;
bi->records.clear();
}
if (gindex) index->clear();
if (f == 0) return;
if (!f->canRead()) return;
if (ginfo) {
bi->path = f->path();
bi->log_size = f->size();
}
uchar read_sig[PIBINARYLOG_SIGNATURE_SIZE];
for (uint i=0; i<PIBINARYLOG_SIGNATURE_SIZE; i++) read_sig[i] = 0;
bool ok = true;
if (f->read(read_sig, PIBINARYLOG_SIGNATURE_SIZE) < 0) {if (ginfo) bi->records_count = -1; ok = false;}
for (uint i=0; i<PIBINARYLOG_SIGNATURE_SIZE; i++)
if (read_sig[i] != __S__PIBinaryLog::binlog_sig[i]) {if (ginfo) bi->records_count = -2; ok = false;}
uchar read_version = 0;
if (f->read(&read_version, 1) < 0) {if (ginfo) bi->records_count = -3; ok = false;}
if (read_version == 0) {if (ginfo) bi->records_count = -4; ok = false;}
if (read_version < PIBINARYLOG_VERSION) {if (ginfo) bi->records_count = -5; ok = false;}
if (read_version > PIBINARYLOG_VERSION) {if (ginfo) bi->records_count = -6; ok = false;}
if (!ok) return;
PIByteArray ba;
BinLogRecord br;
bool first = true;
size_t hdr_size = sizeof(BinLogRecord) - sizeof(PIByteArray);
ba.resize(hdr_size);
while (1) {
ba.resize(hdr_size);
{
if (f->read(ba.data(), ba.size()) > 0) {
ba >> br.id >> br.size >> br.timestamp;
} else
break;
if (bi->log_size - f->pos() >= br.size)
f->seek(f->pos() + br.size);
else
break;
}
if (br.id > 0) {
if (gindex) {
BinLogIndex bl_ind;
bl_ind.id = br.id;
bl_ind.pos = f->pos() - br.size - hdr_size;
bl_ind.timestamp = br.timestamp;
index->append(bl_ind);
}
if (ginfo) {
bi->records_count++;
if (first) {
bi->start_time = br.timestamp;
first = false;
}
BinLogRecordInfo &bri(bi->records[br.id]);
bri.count++;
if (bri.id == 0) {
bri.id = br.id;
bri.minimum_size = bri.maximum_size = br.size;
bri.start_time = br.timestamp;
} else {
bri.end_time = br.timestamp;
if (bri.minimum_size > br.size) bri.minimum_size = br.size;
if (bri.maximum_size < br.size) bri.maximum_size = br.size;
}
}
}
}
if (ginfo) bi->end_time = br.timestamp;
}
void PIBinaryLog::moveIndex(int i) {
if (is_indexed) {
current_index = i;
posChanged(current_index);
}
}
PIBinaryLog::BinLogInfo PIBinaryLog::getLogInfo(const PIString & path) {
BinLogInfo bi;
bi.path = path;
bi.records_count = 0;
PIFile tfile;
if (!tfile.open(path, PIIODevice::ReadOnly)) return bi;
parseLog(&tfile, &bi, 0);
return bi;
}
bool PIBinaryLog::cutBinLog(const PIBinaryLog::BinLogInfo & src, const PIString & dst, int from, int to) {
PIBinaryLog slog;
if (!slog.open(src.path, PIIODevice::ReadOnly)) return false;
PIVector<int> ids = src.records.keys();
slog.seekTo(from);
PIBinaryLog dlog;
dlog.createNewFile(dst);
bool first = true;
BinLogRecord br;
PISystemTime st;
while (!slog.isEnd() && ((slog.pos() <= to) || to < 0)) {
br = slog.readRecord();
if (first) {
st = br.timestamp;
first = false;
}
if (ids.contains(br.id)) {
dlog.writeBinLog_raw(br.id, br.timestamp - st, br.data);
}
}
return true;
}
bool PIBinaryLog::createIndex() {
logmutex.lock();
llong cp = file.pos();
file.seekToBegin();
index.clear();
index_pos.clear();
parseLog(&file, &binfo, &index);
file.seek(cp);
is_indexed = !index.isEmpty();
for (uint i=0; i<index.size(); i++) index_pos[index[i].pos] = i;
logmutex.unlock();
return is_indexed;
}
void PIBinaryLog::seekTo(int rindex) {
// piCoutObj << "seekTo";
logmutex.lock();
if (rindex < index.size_s() && rindex >= 0) {
file.seek(index[rindex].pos);
moveIndex(index_pos.value(file.pos(), -1));
play_time = index[rindex].timestamp.toMilliseconds();
lastrecord.timestamp = index[rindex].timestamp;
}
// piCoutObj << "seekTo done";
logmutex.unlock();
}
bool PIBinaryLog::seek(const PISystemTime & time) {
int ci = -1;
for (uint i=0; i<index.size(); i++) {
if (time <= index[i].timestamp && (filterID.contains(index[i].id) || filterID.isEmpty())) {
ci = i;
break;
}
}
if (ci >= 0) {
seekTo(ci);
return true;
}
return false;
}
bool PIBinaryLog::seek(llong filepos) {
int ci = -1;
for (uint i=0; i<index.size(); i++) {
if (filepos <= index[i].pos && (filterID.contains(index[i].id) || filterID.isEmpty())) {
ci = i;
break;
}
}
if (ci >= 0) {
seekTo(ci);
return true;
}
return false;
}
PIString PIBinaryLog::constructFullPathDevice() const {
PIString ret;
ret << logDir() << ":" << filePrefix() << ":" << defaultID() << ":";
switch (play_mode) {
case PlayRealTime:
ret << "RT";
break;
case PlayVariableSpeed:
ret << PIString::fromNumber(playSpeed()) << "X";
break;
case PlayStaticDelay:
ret << PIString::fromNumber(playDelay().toMilliseconds()) << "M";
break;
default:
ret << "RT";
break;
}
return ret;
}
void PIBinaryLog::configureFromFullPathDevice(const PIString & full_path) {
PIStringList pl = full_path.split(":");
for (int i = 0; i < pl.size_s(); ++i) {
PIString p(pl[i]);
switch (i) {
case 0: setLogDir(p); break;
case 1: setFilePrefix(p); break;
case 2: setDefaultID(p.toInt()); break;
case 3:
if (p.toUpperCase() == "RT") setPlayRealTime();
if (p.toUpperCase().right(1) == "X") setPlaySpeed((p.left(p.size() - 1)).toDouble());
if (p.toUpperCase().right(1) == "M") setPlayDelay(PISystemTime::fromMilliseconds((p.left(p.size() - 1)).toDouble()));
break;
}
}
// piCoutObj << "configured";
}
PIPropertyStorage PIBinaryLog::constructVariantDevice() const {
PIPropertyStorage ret;
PIVariantTypes::Enum e;
ret.addProperty("log dir", PIVariantTypes::Dir(logDir()));
ret.addProperty("file prefix", filePrefix());
ret.addProperty("default ID", defaultID());
e << "real-time" << "variable speed" << "static delay";
e.selectValue((int)playMode());
ret.addProperty("play mode", e);
ret.addProperty("play speed", playSpeed());
ret.addProperty("play delay", playDelay().toMilliseconds());
return ret;
}
void PIBinaryLog::configureFromVariantDevice(const PIPropertyStorage & d) {
setLogDir(d.propertyValueByName("log dir").toString());
setFilePrefix(d.propertyValueByName("file prefix").toString());
setDefaultID(d.propertyValueByName("default ID").toInt());
setPlaySpeed(d.propertyValueByName("play speed").toDouble());
setPlayDelay(PISystemTime::fromMilliseconds(d.propertyValueByName("play delay").toDouble()));
setPlayMode((PlayMode)d.propertyValueByName("play mode").toEnum().selectedValue());
}
void PIBinaryLog::propertyChanged(const PIString &s) {
default_id = property("defaultID").toInt();
rapid_start = property("rapidStart").toBool();
play_mode = (PlayMode)property("playMode").toInt();
double ps = property("playSpeed").toDouble();
play_speed = ps > 0. ? 1. / ps : 0.;
play_delay = property("playDelay").toSystemTime();
split_mode = (SplitMode)property("splitMode").toInt();
split_time = property("splitTime").toSystemTime();
split_size = property("splitFileSize").toLLong();
split_count = property("splitRecordCount").toInt();
// piCoutObj << "propertyChanged" << s << play_mode;
}