Files
pip/libs/main/io_devices/pibinarylog.cpp
peri4 1c7fc39b6c version 4.0.0_alpha
in almost all methods removed timeouts in milliseconds, replaced to PISystemTime
PITimer rewrite, remove internal impl, now only thread implementation, API similar to PIThread
PITimer API no longer pass void*
PIPeer, PIConnection improved stability on reinit and exit
PISystemTime new methods
pisd now exit without hanging
2024-07-30 14:18:02 +03:00

1008 lines
27 KiB
C++

/*
PIP - Platform Independent Primitives
Class for write binary data to logfile, and read or playback this data
Andrey Bychkov work.a.b@yandex.ru
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "pibinarylog.h"
#include "pidir.h"
#include "piliterals_bytes.h"
#include "piliterals_time.h"
#include "pipropertystorage.h"
#include "pitime.h"
#define PIBINARYLOG_VERSION_OLD 0x31
/*! \class PIBinaryLog
* \brief Class for read and write binary data to logfile, and playback this data in realtime, or custom speed
*
* \section PIBinaryLog_sec0 Synopsis
* Binary Log is a file with simple header, where you can read and write some binary data.
* Any written data include special header with ID, size and timestamp.
* This header provides separation different messages from the one file by choosing different IDs.
* With \a filterID or special functions, like \a readBinLog() you can choose IDs what you want to read.
* With function \a writeBinLog() or \a setDefaultID() you can choose ID that mark you data.
* By default ID = 1, and \a filterID is empty, that mean you read any ID without filtering.
* ThreadedRead provide you playback data, with delay that you write data.
* You can choose different playbak modes by set \a PlayMode.
*
* \section PIBinaryLog_sec1 Basic usage
* This class provide all functions of \a PIIODevice, such \a open(), \a close(),
* \a read() ,\a write(), and threaded read/write.
* function \a setLogDir() need to set directory for BinLog files
* function \a createNewFile() need to create new binlog file
* function \a restart() need start from the begining of binlog file
*
*/
static const uchar binlog_sig[] = {'B', 'I', 'N', 'L', 'O', 'G'};
#define PIBINARYLOG_VERSION 0x32
#define PIBINARYLOG_SIGNATURE_SIZE sizeof(binlog_sig)
REGISTER_DEVICE(PIBinaryLog)
PIBinaryLog::PIBinaryLog() {
#ifdef MICRO_PIP
setThreadedReadBufferSize(512);
#else
setThreadedReadBufferSize(64_KiB);
#endif
is_started = is_indexed = is_pause = false;
create_index_on_fly = false;
current_index = -1;
log_size = 0;
f_new_path = nullptr;
setPlaySpeed(1.);
setDefaultID(1);
setPlaySpeed(1.0);
setPlayDelay(1_s);
setPlayRealTime();
setSplitTime(600_s);
setSplitRecordCount(1000);
setSplitFileSize(0xFFFFFF);
setSplitMode(SplitNone);
setLogDir(PIString());
setFilePrefix(PIString());
setRapidStart(false);
file.setName("__S__PIBinaryLog::file");
}
PIBinaryLog::~PIBinaryLog() {
stop();
close();
}
bool PIBinaryLog::openDevice() {
lastrecord.timestamp = PISystemTime();
lastrecord.id = 0;
write_count = 0;
is_started = false;
is_thread_ok = true;
is_indexed = false;
is_pause = false;
index.clear();
log_size = 0;
if (mode_ == ReadWrite) {
piCoutObj << "Error: ReadWrite mode not supported, use WriteOnly or ReadOnly";
return false;
}
if (path().isEmpty() && mode_ == WriteOnly) {
if (f_new_path)
setPath(f_new_path());
else
setPath(getLogfilePath(logDir(), filePrefix()));
}
if (path().isEmpty() && mode_ == ReadOnly) {
PIDir ld(logDir());
if (ld.isExists()) {
PIVector<PIFile::FileInfo> es = ld.allEntries();
piForeachC(PIFile::FileInfo & i, es) {
if (i.extension() == "binlog" && i.isFile() && i.baseName().startsWith(filePrefix())) {
setPath(i.path);
break;
}
}
}
}
if (!file.open(path(), mode_)) {
piCoutObj << "Error: Can't open file" << path();
return false;
}
setName(path());
if (mode_ == WriteOnly) {
file.clear();
if (!writeFileHeader()) {
piCoutObj << "Error: Can't write binlog file header" << path();
return false;
}
is_started = true;
}
if (mode_ == ReadOnly) {
if (file.isEmpty()) {
piCoutObj << "Error: File is null" << path();
fileError();
return false;
}
if (!checkFileHeader()) {
fileError();
return false;
}
if (isEmpty()) {
piCoutObj << "Warning: Empty BinLog file" << path();
fileEnd();
}
play_time = 0;
if (!rapid_start) is_started = true;
}
startlogtime = PISystemTime::current();
pause_time = PISystemTime();
return true;
}
bool PIBinaryLog::closeDevice() {
stopAndWait();
pausemutex.unlock();
logmutex.unlock();
moveIndex(-1);
is_indexed = false;
index.clear();
bool e = isEmpty();
log_size = 0;
if (canWrite() && e) {
file.remove();
return true;
}
return file.close();
}
bool PIBinaryLog::threadedRead(const uchar * readed, ssize_t size) {
// piCout << "binlog threaded read";
if (!canRead() || isEnd()) return PIIODevice::threadedRead(readed, size);
is_thread_ok = false;
logmutex.lock();
const PISystemTime lastrec_timestamp = lastrecord.timestamp;
logmutex.unlock();
PISystemTime pt;
double delay;
switch (play_mode) {
case PlayRealTime:
pausemutex.lock();
if (is_pause) {
pausemutex.unlock();
piMSleep(100); // TODO: rewrite with condvar
return false;
} else if (pause_time > PISystemTime()) {
// startlogtime += pause_time;
pause_time = PISystemTime();
}
pausemutex.unlock();
pt = PISystemTime::current() - startlogtime;
if (is_started) {
if (lastrec_timestamp > pt) (lastrec_timestamp - pt).sleep();
} else {
startlogtime = PISystemTime::current() - lastrec_timestamp;
is_started = true;
}
break;
case PlayVariableSpeed:
delay = lastrec_timestamp.toMilliseconds() - play_time;
// piCoutObj << "delay" << delay;
double cdelay;
int dtc;
if (is_started) {
if (is_pause) {
piMSleep(100); // TODO: rewrite with condvar
return false;
}
if (delay > 0) {
cdelay = delay * play_speed; // TODO: rewrite with condvar
dtc = int(cdelay) / 100; // TODO: rewrite with condvar
if (play_speed <= 0.) dtc = 2;
// piCout << play_speed << dtc;
for (int j = 0; j < dtc; j++) {
cdelay = delay * play_speed; // TODO: rewrite with condvar
dtc = int(cdelay) / 100; // TODO: rewrite with condvar
piMSleep(100); // TODO: rewrite with condvar
if (play_speed <= 0.) {
dtc = 2;
j = 0;
}
// piCout << " " << play_speed << dtc << j;
}
cdelay = cdelay - dtc * 100; // TODO: rewrite with condvar
PISystemTime::fromMilliseconds(cdelay).sleep();
}
} else
is_started = true;
play_time = lastrec_timestamp.toMilliseconds();
break;
case PlayStaticDelay:
if (is_started) {
if (is_pause) {
piMSleep(100); // TODO: rewrite with condvar
return false;
}
play_delay.sleep();
} else
is_started = true;
break;
default: return false;
}
bool res = PIIODevice::threadedRead(readed, size);
is_thread_ok = true;
return res;
}
PIString PIBinaryLog::getLogfilePath(const PIString & log_dir, const PIString & prefix) {
PIDir dir(log_dir);
dir.setDir(dir.absolutePath());
if (!dir.isExists()) {
piCout << "[PIBinaryLog]"
<< "Creating directory" << dir.path();
dir.make(true);
}
const PIString npath = log_dir + PIDir::separator + prefix + PIDateTime::current().toString("yyyy_MM_dd__hh_mm_ss");
PIString cnpath = npath + ".binlog";
int i = 1;
while (PIFile::isExists(cnpath)) {
cnpath = npath + '_' + PIString::fromNumber(i) + ".binlog";
i++;
}
return cnpath;
}
PIString PIBinaryLog::createNewFile() {
if (!file.close()) return PIString();
PIString cnpath;
if (f_new_path)
cnpath = f_new_path();
else
cnpath = getLogfilePath(logDir(), filePrefix());
if (open(cnpath, PIIODevice::WriteOnly)) {
newFile(file.path());
return file.path();
}
piCoutObj << "Can't create new file, maybe LogDir" << ("\"" + logDir() + "\"") << "is invalid.";
return PIString();
}
void PIBinaryLog::createNewFile(const PIString & path) {
if (open(path, PIIODevice::WriteOnly)) {
newFile(file.path());
} else
piCoutObj << "Can't create new file, maybe path" << ("\"" + path + "\"") << "is invalid.";
}
void PIBinaryLog::setCreateIndexOnFly(bool yes) {
create_index_on_fly = yes;
}
void PIBinaryLog::setPause(bool pause) {
pausemutex.lock();
is_pause = pause;
if (pause)
pause_time = PISystemTime::current();
else {
if (pause_time > PISystemTime()) pause_time = PISystemTime::current() - pause_time;
}
pausemutex.unlock();
}
int PIBinaryLog::writeBinLog(int id, const void * data, int size) {
if (size <= 0 || !canWrite()) return -1;
if (id == 0) {
piCoutObj << "Error: can`t write with id = 0! Id must be > 0";
return -1;
}
if (is_pause) return 0;
logmutex.lock();
const int res = writeRecord(id, PISystemTime::current() - startlogtime, data, size);
switch (split_mode) {
case SplitSize:
if (log_size > split_size) createNewFile();
break;
case SplitTime:
if ((PISystemTime::current() - startlogtime) > split_time) createNewFile();
break;
case SplitCount:
if (write_count > split_count) createNewFile();
break;
default: break;
}
logmutex.unlock();
if (res > 0)
return size;
else
return res;
}
int PIBinaryLog::writeBinLog_raw(int id, const PISystemTime & time, const void * data, int size) {
if (size <= 0 || !canWrite()) return -1;
logmutex.lock();
const int res = writeRecord(id, time, data, size);
logmutex.unlock();
if (res > 0)
return size;
else
return res;
}
PIByteArray PIBinaryLog::readBinLog(int id, PISystemTime * time, int * readed_id) {
if (!canRead()) return PIByteArray();
logmutex.lock();
Record br = readRecord();
logmutex.unlock();
if (br.id == -1) {
piCoutObj << "End of BinLog file";
fileEnd();
return PIByteArray();
}
if ((id == 0) && (br.id != 0) && (br.id != -1)) {
if (time) *time = br.timestamp;
if (readed_id) *readed_id = br.id;
return br.data;
}
logmutex.lock();
while (br.id != id && !isEnd())
br = readRecord();
logmutex.unlock();
if (br.id == -1) {
piCoutObj << "End of BinLog file";
fileEnd();
return PIByteArray();
}
if (br.id == id) {
if (time) *time = br.timestamp;
if (readed_id) *readed_id = br.id;
return br.data;
}
piCoutObj << "Can't find record with id =" << id;
return PIByteArray();
}
int PIBinaryLog::readBinLog(int id, void * read_to, int max_size, PISystemTime * time, int * readed_id) {
if (max_size <= 0 || read_to == 0) return -1;
const PIByteArray ba = readBinLog(id, time, readed_id);
if (ba.isEmpty()) return -1;
int sz = piMini(max_size, ba.size());
memcpy(read_to, ba.data(), sz);
return sz;
}
bool PIBinaryLog::isEmpty() const {
return (log_size <= llong(PIBINARYLOG_SIGNATURE_SIZE + 1));
}
void PIBinaryLog::setHeader(const PIByteArray & header) {
user_header = header;
}
PIByteArray PIBinaryLog::getHeader() const {
return index.info.user_header;
}
ssize_t PIBinaryLog::readDevice(void * read_to, ssize_t max_size) {
PIMutexLocker _ml(logmutex);
if (lastrecord.id == -1 || isEnd()) return 0;
if (!is_thread_ok && lastrecord.id > 0) return lastrecord.data.size();
if (!canRead()) return -1;
if (max_size <= 0 || read_to == 0) return -1;
Record br;
br.id = 0;
if (filterID.isEmpty()) {
br = readRecord();
} else {
while (!filterID.contains(br.id) && !isEnd())
br = readRecord();
}
if (br.id == -1) {
fileEnd();
piCoutObj << "End of BinLog file";
return 0;
}
if (br.id == 0) {
piCoutObj << "Read record error";
return -1;
}
const ssize_t sz = piMini(max_size, br.data.size());
if (sz < br.data.size_s()) piCoutObj << "too small read buffer:" << max_size << ", data size:" << br.data.size();
memcpy(read_to, br.data.data(), sz);
return sz;
}
ssize_t PIBinaryLog::writeDevice(const void * data, ssize_t size) {
return writeBinLog(default_id, data, size);
}
void PIBinaryLog::restart() {
const bool th = isThreadedRead();
if (th) stopThreadedRead();
if (!canRead()) return;
logmutex.unlock();
lastrecord.timestamp = PISystemTime();
lastrecord.id = 0;
is_thread_ok = true;
is_started = !rapidStart();
play_time = 0;
file.seekToBegin();
checkFileHeader();
moveIndex(0);
startlogtime = PISystemTime::current();
if (th) startThreadedRead();
}
bool PIBinaryLog::writeFileHeader() {
if (file.write(binlog_sig, PIBINARYLOG_SIGNATURE_SIZE) <= 0) return false;
const uchar version = PIBINARYLOG_VERSION;
if (file.write(&version, 1) <= 0) return false;
const uint32_t sz = user_header.size();
file.write(&sz, 4);
file.write(user_header);
file.flush();
if (create_index_on_fly) {
index.info.user_header = user_header;
index.info.path = file.path();
}
return true;
}
bool PIBinaryLog::checkFileHeader() {
index.info.user_header.clear();
uchar read_sig[PIBINARYLOG_SIGNATURE_SIZE];
for (uint i = 0; i < PIBINARYLOG_SIGNATURE_SIZE; i++)
read_sig[i] = 0;
if (file.read(read_sig, PIBINARYLOG_SIGNATURE_SIZE) < 0) return false;
bool correct = true;
for (uint i = 0; i < PIBINARYLOG_SIGNATURE_SIZE; i++)
if (read_sig[i] != binlog_sig[i]) correct = false;
if (!correct) {
piCoutObj << "BinLogFile signature is corrupted or invalid file";
return false;
}
uchar read_version = 0;
if (file.read(&read_version, 1) < 0) return false;
if (read_version == PIBINARYLOG_VERSION_OLD) {
log_size = file.size();
return true;
}
if (read_version == PIBINARYLOG_VERSION) {
log_size = file.size();
uint32_t sz = 0;
file.read(&sz, 4);
if (sz > 0) {
index.info.user_header = file.read(sz);
}
return true;
}
if (read_version == 0) piCoutObj << "BinLogFile has invalid version";
if (read_version < PIBINARYLOG_VERSION) piCoutObj << "BinLogFile has too old verion";
if (read_version > PIBINARYLOG_VERSION) piCoutObj << "BinLogFile has too newest version";
return false;
}
PIBinaryLog::Record PIBinaryLog::readRecord() {
// piCoutObj << "readRecord pos =" << file.pos();
logmutex.lock();
PIByteArray ba;
Record br;
lastrecord.id = 0;
lastrecord.data.clear();
lastrecord.timestamp = PISystemTime();
ba.resize(sizeof(Record) - sizeof(PIByteArray));
if (file.read(ba.data(), ba.size_s()) > 0) {
ba >> br.id >> br.size >> br.timestamp;
} else {
br.id = -1;
logmutex.unlock();
// piCoutObj << "readRecord done" << br.id;
return br;
}
if ((br.id != 0) && (br.id != -1) && (br.size > 0)) {
ba.resize(br.size);
if (file.read(ba.data(), ba.size_s()) > 0)
br.data = ba;
else
br.id = 0;
} else
br.id = 0;
lastrecord = br;
if (br.id == 0) fileError();
moveIndex(index.index_pos.value(file.pos(), -1));
logmutex.unlock();
// piCoutObj << "readRecord done";
return br;
}
int PIBinaryLog::writeRecord(int id, PISystemTime time, const void * data, int size) {
PIByteArray logdata;
logdata << id << size << time << PIMemoryBlock(data, size);
if (create_index_on_fly) {
BinLogIndex bi;
bi.id = id;
bi.data_size = size;
bi.pos = file.pos();
bi.timestamp = time;
index.index << bi;
auto & rinfo(index.info.records[id]);
if (rinfo.count == 0) {
rinfo.id = id;
rinfo.start_time = rinfo.end_time = time;
rinfo.minimum_size = rinfo.maximum_size = size;
} else {
rinfo.start_time = piMin(rinfo.start_time, time);
rinfo.end_time = piMax(rinfo.end_time, time);
rinfo.minimum_size = piMin(rinfo.minimum_size, size);
rinfo.maximum_size = piMax(rinfo.maximum_size, size);
}
++rinfo.count;
if (index.info.records_count == 0) {
index.info.start_time = index.info.end_time = time;
} else {
index.info.start_time = piMin(index.info.start_time, time);
index.info.end_time = piMax(index.info.end_time, time);
}
++index.info.records_count;
is_indexed = true;
}
const int ret = file.write(logdata.data(), logdata.size());
file.flush();
write_count++;
log_size = file.size();
if (create_index_on_fly) {
index.info.log_size = log_size;
}
return ret;
}
void PIBinaryLog::parseLog(PIFile * f, PIBinaryLog::BinLogInfo * info, PIVector<PIBinaryLog::BinLogIndex> * index) {
if (!info && !index) return;
if (info) {
info->log_size = -1;
info->records_count = 0;
info->records.clear();
}
if (index) index->clear();
if (f == 0) return;
if (!f->canRead()) return;
if (info) {
info->path = f->path();
info->log_size = f->size();
}
uchar read_sig[PIBINARYLOG_SIGNATURE_SIZE];
for (uint i = 0; i < PIBINARYLOG_SIGNATURE_SIZE; i++)
read_sig[i] = 0;
if (f->read(read_sig, PIBINARYLOG_SIGNATURE_SIZE) < 0) {
if (info) info->records_count = -1;
return;
}
for (uint i = 0; i < PIBINARYLOG_SIGNATURE_SIZE; i++) {
if (read_sig[i] != binlog_sig[i]) {
if (info) info->records_count = -2;
return;
}
}
uchar read_version = 0;
if (f->read(&read_version, 1) < 0) {
if (info) info->records_count = -3;
return;
}
if (read_version == 0) {
if (info) info->records_count = -4;
return;
}
if (read_version < PIBINARYLOG_VERSION_OLD) {
if (info) info->records_count = -5;
return;
}
if (read_version > PIBINARYLOG_VERSION) {
if (info) info->records_count = -6;
return;
}
if (read_version == PIBINARYLOG_VERSION) {
uint32_t sz = 0;
f->read(&sz, 4);
if (sz > 0) {
PIByteArray user_hdr = f->read(sz);
if (info) info->user_header.swap(user_hdr);
}
}
PIByteArray ba;
Record br;
br.id = 0;
br.size = 0;
bool first = true;
constexpr size_t hdr_size = sizeof(Record) - sizeof(PIByteArray);
ba.resize(hdr_size);
while (1) {
ba.resize(hdr_size);
{
if (f->read(ba.data(), ba.size()) > 0) {
ba >> br.id >> br.size >> br.timestamp;
} else
break;
if (info) {
if (info->log_size - f->pos() >= br.size) {
f->seek(f->pos() + br.size);
}
} else
break;
}
if (br.id > 0) {
if (index) {
BinLogIndex bl_ind;
bl_ind.id = br.id;
bl_ind.data_size = br.size;
bl_ind.pos = f->pos() - br.size - hdr_size;
bl_ind.timestamp = br.timestamp;
index->append(bl_ind);
}
if (info) {
info->records_count++;
if (first) {
info->start_time = br.timestamp;
first = false;
}
BinLogRecordInfo & bri(info->records[br.id]);
bri.count++;
if (bri.id == 0) {
bri.id = br.id;
bri.minimum_size = bri.maximum_size = br.size;
bri.start_time = br.timestamp;
} else {
bri.end_time = br.timestamp;
if (bri.minimum_size > br.size) bri.minimum_size = br.size;
if (bri.maximum_size < br.size) bri.maximum_size = br.size;
}
}
}
}
if (info) info->end_time = br.timestamp;
}
void PIBinaryLog::moveIndex(int i) {
if (is_indexed) {
current_index = i;
posChanged(current_index);
}
}
PIBinaryLog::BinLogInfo PIBinaryLog::getLogInfo(const PIString & path) {
BinLogInfo bi;
bi.path = path;
bi.records_count = 0;
PIFile tfile;
if (!tfile.open(path, PIIODevice::ReadOnly)) return bi;
parseLog(&tfile, &bi, 0);
return bi;
}
bool PIBinaryLog::cutBinLog(const PIBinaryLog::BinLogInfo & src, const PIString & dst, int from, int to) {
PIBinaryLog slog;
if (!slog.open(src.path, PIIODevice::ReadOnly)) {
piCout << "[PIBinaryLog]"
<< "Error, can't open" << src.path;
return false;
}
const PIVector<int> ids = src.records.keys();
slog.seekTo(from);
PIBinaryLog dlog;
dlog.createNewFile(dst);
if (!dlog.isOpened()) {
piCout << "[PIBinaryLog]"
<< "Error, can't create" << dst;
return false;
}
bool first = true;
Record br;
PISystemTime st;
PITimeMeasurer tm;
while (!slog.isEnd() && ((slog.pos() <= to) || to < 0)) {
br = slog.readRecord();
if (first) {
st = br.timestamp;
first = false;
}
if (ids.contains(br.id)) {
if (dlog.writeBinLog_raw(br.id, br.timestamp - st, br.data) <= 0) {
piCout << "[PIBinaryLog]"
<< "Error, can't write to file" << dst;
return false;
}
}
if (tm.elapsed_s() > 1) {
tm.reset();
piCout << "[PIBinaryLog]"
<< "process" << PITime::fromSystemTime(br.timestamp).toString();
}
}
return true;
}
bool PIBinaryLog::joinBinLogsSerial(const PIStringList & src,
const PIString & dst,
std::function<bool(const PIString &, PISystemTime)> progress) {
PIBinaryLog slog;
PIBinaryLog dlog;
PISystemTime dtime;
PISystemTime lt;
PITimeMeasurer tm;
bool first = true;
for (const PIString & fn: src) {
if (!slog.open(fn, PIIODevice::ReadOnly)) {
piCout << "[PIBinaryLog]"
<< "Error, can't open" << fn;
return false;
}
if (first) {
first = false;
dlog.setHeader(slog.getHeader());
dlog.createNewFile(dst);
if (!dlog.isOpened()) {
piCout << "[PIBinaryLog]"
<< "Error, can't create" << dst;
return false;
}
piCout << "[PIBinaryLog]"
<< "Start join binlogs to" << dst;
} else {
dtime = lt;
}
tm.reset();
Record br;
PISystemTime st;
while (!slog.isEnd()) {
br = slog.readRecord();
if (br.data.isEmpty() || br.id == 0 || br.id == -1) continue;
st = br.timestamp;
lt = dtime + br.timestamp;
if (dlog.writeBinLog_raw(br.id, lt, br.data) <= 0) {
piCout << "[PIBinaryLog]"
<< "Error, can't write to file" << dst;
return false;
}
if (tm.elapsed_s() > 0.1) {
tm.reset();
if (progress) {
if (!progress(fn, lt)) {
slog.close();
dlog.close();
PIFile::remove(dlog.path());
return false;
}
} else {
piCout << "[PIBinaryLog]"
<< "process" << PITime::fromSystemTime(lt).toString();
}
}
}
slog.close();
// piCout << "[PIBinaryLog]" << "complete" << fn;
}
piCout << "[PIBinaryLog]"
<< "Finish join binlogs, total time" << PITime::fromSystemTime(lt).toString();
return true;
}
bool PIBinaryLog::createIndex() {
logmutex.lock();
const llong cp = file.pos();
file.seekToBegin();
index.clear();
parseLog(&file, &index.info, &index.index);
index.makeIndexPos();
file.seek(cp);
is_indexed = !index.index.isEmpty();
logmutex.unlock();
return is_indexed;
}
int PIBinaryLog::posForTime(const PISystemTime & time) {
int ci = -1;
for (uint i = 0; i < index.index.size(); i++) {
if (time <= index.index[i].timestamp && (filterID.contains(index.index[i].id) || filterID.isEmpty())) {
ci = i;
break;
}
}
return ci;
}
void PIBinaryLog::seekTo(int rindex) {
// piCoutObj << "seekTo";
logmutex.lock();
pausemutex.lock();
if (rindex < index.index.size_s() && rindex >= 0) {
file.seek(index.index[rindex].pos);
moveIndex(index.index_pos.value(file.pos(), -1));
// double prev_pt = play_time;
play_time = index.index[rindex].timestamp.toMilliseconds();
lastrecord.timestamp = index.index[rindex].timestamp;
if (play_mode == PlayRealTime) {
startlogtime = PISystemTime::current() - lastrecord.timestamp;
}
}
// piCoutObj << "seekTo done";
pausemutex.unlock();
logmutex.unlock();
}
bool PIBinaryLog::seek(const PISystemTime & time) {
const int ci = posForTime(time);
if (ci >= 0) {
seekTo(ci);
return true;
}
return false;
}
bool PIBinaryLog::seek(llong filepos) {
int ci = -1;
for (uint i = 0; i < index.index.size(); i++) {
if (filepos <= index.index[i].pos && (filterID.contains(index.index[i].id) || filterID.isEmpty())) {
ci = i;
break;
}
}
if (ci >= 0) {
seekTo(ci);
return true;
}
return false;
}
int PIBinaryLog::pos() const {
if (is_indexed) return current_index;
return -1;
}
PIByteArray PIBinaryLog::saveIndex() const {
if (!is_indexed) return PIByteArray();
return piSerialize(index);
}
bool PIBinaryLog::loadIndex(PIByteArray saved) {
if (!canRead() || saved.isEmpty()) return false;
index = piDeserialize<CompleteIndex>(saved);
index.makeIndexPos();
is_indexed = index.index.isNotEmpty();
return is_indexed;
}
PIString PIBinaryLog::constructFullPathDevice() const {
PIString ret;
ret += logDir() + ":" + filePrefix() + ":" + PIString::fromNumber(defaultID()) + ":";
switch (play_mode) {
case PlayRealTime: ret += "RT"; break;
case PlayVariableSpeed: ret += PIString::fromNumber(playSpeed()) + "X"; break;
case PlayStaticDelay: ret += PIString::fromNumber(playDelay().toMilliseconds()) + "M"; break;
default: ret += "RT"; break;
}
return ret;
}
void PIBinaryLog::configureFromFullPathDevice(const PIString & full_path) {
const PIStringList pl = full_path.split(":");
for (int i = 0; i < pl.size_s(); ++i) {
const PIString p(pl[i]);
switch (i) {
case 0: setLogDir(p); break;
case 1: setFilePrefix(p); break;
case 2: setDefaultID(p.toInt()); break;
case 3:
if (p.toUpperCase() == "RT") setPlayRealTime();
if (p.toUpperCase().right(1) == "X") setPlaySpeed((p.left(p.size() - 1)).toDouble());
if (p.toUpperCase().right(1) == "M") setPlayDelay(PISystemTime::fromMilliseconds((p.left(p.size() - 1)).toDouble()));
break;
}
}
// piCoutObj << "configured";
}
PIPropertyStorage PIBinaryLog::constructVariantDevice() const {
PIPropertyStorage ret;
PIVariantTypes::Enum e;
ret.addProperty("log dir", PIVariantTypes::Dir(logDir()));
ret.addProperty("file prefix", filePrefix());
ret.addProperty("default ID", defaultID());
e << "real-time"
<< "variable speed"
<< "static delay";
e.selectValue((int)playMode());
ret.addProperty("play mode", e);
ret.addProperty("play speed", playSpeed());
ret.addProperty("play delay", playDelay().toMilliseconds());
return ret;
}
void PIBinaryLog::configureFromVariantDevice(const PIPropertyStorage & d) {
setLogDir(d.propertyValueByName("log dir").toString());
setFilePrefix(d.propertyValueByName("file prefix").toString());
setDefaultID(d.propertyValueByName("default ID").toInt());
setPlaySpeed(d.propertyValueByName("play speed").toDouble());
setPlayDelay(PISystemTime::fromMilliseconds(d.propertyValueByName("play delay").toDouble()));
setPlayMode((PlayMode)d.propertyValueByName("play mode").toEnum().selectedValue());
}
void PIBinaryLog::propertyChanged(const char * s) {
default_id = property("defaultID").toInt();
rapid_start = property("rapidStart").toBool();
play_mode = (PlayMode)property("playMode").toInt();
const double ps = property("playSpeed").toDouble();
play_speed = ps > 0. ? 1. / ps : 0.;
play_delay = property("playDelay").toSystemTime();
split_mode = (SplitMode)property("splitMode").toInt();
split_time = property("splitTime").toSystemTime();
split_size = property("splitFileSize").toLLong();
split_count = property("splitRecordCount").toInt();
// piCoutObj << "propertyChanged" << s << play_mode;
}
void PIBinaryLog::CompleteIndex::clear() {
info = BinLogInfo();
index.clear();
index_pos.clear();
}
void PIBinaryLog::CompleteIndex::makeIndexPos() {
index_pos.clear();
for (uint i = 0; i < index.size(); i++)
index_pos[index[i].pos] = i;
}