PIBinaryLog joinBinLogsSerial

correct timestamp wite with split
This commit is contained in:
Бычков Андрей
2022-07-22 15:24:07 +03:00
parent 7aa407264f
commit f5953a0ba7
2 changed files with 106 additions and 31 deletions

View File

@@ -61,6 +61,7 @@ PIBinaryLog::PIBinaryLog() {
is_started = is_indexed = is_pause = false; is_started = is_indexed = is_pause = false;
current_index = -1; current_index = -1;
log_size = 0; log_size = 0;
f_new_path = nullptr;
setPlaySpeed(1.); setPlaySpeed(1.);
setDefaultID(1); setDefaultID(1);
setPlaySpeed(1.0); setPlaySpeed(1.0);
@@ -100,7 +101,8 @@ bool PIBinaryLog::openDevice() {
return false; return false;
} }
if (path().isEmpty() && mode_ == WriteOnly) { if (path().isEmpty() && mode_ == WriteOnly) {
setPath(getLogfilePath()); if (f_new_path) setPath(f_new_path());
else setPath(getLogfilePath(logDir(), filePrefix()));
} }
if (path().isEmpty() && mode_ == ReadOnly) { if (path().isEmpty() && mode_ == ReadOnly) {
PIDir ld(logDir()); PIDir ld(logDir());
@@ -243,18 +245,18 @@ bool PIBinaryLog::threadedRead(uchar *readed, int size) {
} }
PIString PIBinaryLog::getLogfilePath() const { PIString PIBinaryLog::getLogfilePath(const PIString & log_dir, const PIString & prefix) {
PIDir dir(logDir()); PIDir dir(log_dir);
dir.setDir(dir.absolutePath()); dir.setDir(dir.absolutePath());
if (!dir.isExists()) { if (!dir.isExists()) {
piCoutObj << "Creating directory" << dir.path(); piCout << "[PIBinaryLog]" << "Creating directory" << dir.path();
dir.make(true); dir.make(true);
} }
PIString npath = logDir() + "/" + filePrefix() + PIDateTime::current().toString("yyyy_MM_dd__hh_mm_ss"); PIString npath = log_dir + PIDir::separator + prefix + PIDateTime::current().toString("yyyy_MM_dd__hh_mm_ss");
PIString cnpath = npath + ".binlog"; PIString cnpath = npath + ".binlog";
int i = 1; int i = 1;
while (PIFile::isExists(cnpath)) { while (PIFile::isExists(cnpath)) {
cnpath = npath + "_" + PIString::fromNumber(i) + ".binlog"; cnpath = npath + '_' + PIString::fromNumber(i) + ".binlog";
i++; i++;
} }
return cnpath; return cnpath;
@@ -262,12 +264,14 @@ PIString PIBinaryLog::getLogfilePath() const {
PIString PIBinaryLog::createNewFile() { PIString PIBinaryLog::createNewFile() {
if (!file.close()) return PIString(); if (!file.close()) return PIString();
PIString cnpath = getLogfilePath(); PIString cnpath;
if (f_new_path) cnpath = f_new_path();
else cnpath = getLogfilePath(logDir(), filePrefix());
if (open(cnpath, PIIODevice::WriteOnly)) { if (open(cnpath, PIIODevice::WriteOnly)) {
newFile(file.path()); newFile(file.path());
return file.path(); return file.path();
} }
piCoutObj << "Can't create new file, maybe LogDir is invalid."; piCoutObj << "Can't create new file, maybe LogDir" << ("\"" + logDir() + "\"") << "is invalid.";
return PIString(); return PIString();
} }
@@ -276,7 +280,7 @@ void PIBinaryLog::createNewFile(const PIString &path) {
if (open(path, PIIODevice::WriteOnly)) { if (open(path, PIIODevice::WriteOnly)) {
newFile(file.path()); newFile(file.path());
} }
else piCoutObj << "Can't create new file, maybe path is invalid."; else piCoutObj << "Can't create new file, maybe path" << ("\"" + path + "\"") << "is invalid.";
} }
@@ -295,13 +299,20 @@ void PIBinaryLog::setPause(bool pause) {
int PIBinaryLog::writeBinLog(int id, const void *data, int size) { int PIBinaryLog::writeBinLog(int id, const void *data, int size) {
if (size <= 0 || !canWrite()) return -1; if (size <= 0 || !canWrite()) return -1;
if (id == 0) { if (id == 0) {
piCoutObj << "Error: can`t write with id = 0!"; piCoutObj << "Error: can`t write with id = 0! Id must be > 0";
return -1; return -1;
} }
if (is_pause) return 0;
logmutex.lock(); logmutex.lock();
PIByteArray logdata;
logdata << id << size << (PISystemTime::current() - startlogtime) << PIMemoryBlock(data, size);
int res = file.write(logdata.data(), logdata.size());
file.flush();
write_count++;
log_size = file.size();
switch (split_mode) { switch (split_mode) {
case SplitSize: case SplitSize:
if (file.size() > split_size) createNewFile(); if (log_size > split_size) createNewFile();
break; break;
case SplitTime: case SplitTime:
if ((PISystemTime::current() - startlogtime) > split_time) createNewFile(); if ((PISystemTime::current() - startlogtime) > split_time) createNewFile();
@@ -311,16 +322,6 @@ int PIBinaryLog::writeBinLog(int id, const void *data, int size) {
break; break;
default: break; default: break;
} }
if (is_pause) {
logmutex.unlock();
return 0;
}
PIByteArray logdata;
logdata << id << size << (PISystemTime::current() - startlogtime) << PIByteArray::RawData(data, size);
int res = file.write(logdata.data(), logdata.size());
file.flush();
write_count++;
log_size = file.size();
logmutex.unlock(); logmutex.unlock();
if (res > 0) return size; if (res > 0) return size;
else return res; else return res;
@@ -330,7 +331,7 @@ int PIBinaryLog::writeBinLog(int id, const void *data, int size) {
int PIBinaryLog::writeBinLog_raw(int id, const PISystemTime &time, const void *data, int size) { int PIBinaryLog::writeBinLog_raw(int id, const PISystemTime &time, const void *data, int size) {
if (size <= 0 || !canWrite()) return -1; if (size <= 0 || !canWrite()) return -1;
PIByteArray logdata; PIByteArray logdata;
logdata << id << size << time << PIByteArray::RawData(data, size); logdata << id << size << time << PIMemoryBlock(data, size);
logmutex.lock(); logmutex.lock();
int res = file.write(logdata.data(), logdata.size()); int res = file.write(logdata.data(), logdata.size());
file.flush(); file.flush();
@@ -631,14 +632,22 @@ PIBinaryLog::BinLogInfo PIBinaryLog::getLogInfo(const PIString & path) {
bool PIBinaryLog::cutBinLog(const PIBinaryLog::BinLogInfo & src, const PIString & dst, int from, int to) { bool PIBinaryLog::cutBinLog(const PIBinaryLog::BinLogInfo & src, const PIString & dst, int from, int to) {
PIBinaryLog slog; PIBinaryLog slog;
if (!slog.open(src.path, PIIODevice::ReadOnly)) return false; if (!slog.open(src.path, PIIODevice::ReadOnly)) {
piCout << "[PIBinaryLog]" << "Error, can't open" << src.path;
return false;
}
PIVector<int> ids = src.records.keys(); PIVector<int> ids = src.records.keys();
slog.seekTo(from); slog.seekTo(from);
PIBinaryLog dlog; PIBinaryLog dlog;
dlog.createNewFile(dst); dlog.createNewFile(dst);
if (!dlog.isOpened()) {
piCout << "[PIBinaryLog]" << "Error, can't create" << dst;
return false;
}
bool first = true; bool first = true;
BinLogRecord br; BinLogRecord br;
PISystemTime st; PISystemTime st;
PITimeMeasurer tm;
while (!slog.isEnd() && ((slog.pos() <= to) || to < 0)) { while (!slog.isEnd() && ((slog.pos() <= to) || to < 0)) {
br = slog.readRecord(); br = slog.readRecord();
if (first) { if (first) {
@@ -646,9 +655,61 @@ bool PIBinaryLog::cutBinLog(const PIBinaryLog::BinLogInfo & src, const PIString
first = false; first = false;
} }
if (ids.contains(br.id)) { if (ids.contains(br.id)) {
dlog.writeBinLog_raw(br.id, br.timestamp - st, br.data); if (dlog.writeBinLog_raw(br.id, br.timestamp - st, br.data) <= 0) {
piCout << "[PIBinaryLog]" << "Error, can't write to file" << dst;
return false;
} }
} }
if (tm.elapsed_s() > 1) {
tm.reset();
piCout << "[PIBinaryLog]" << "process" << PITime::fromSystemTime(br.timestamp).toString();
}
}
return true;
}
bool PIBinaryLog::joinBinLogsSerial(const PIStringList & src, const PIString & dst) {
PIBinaryLog slog;
PIBinaryLog dlog;
dlog.createNewFile(dst);
if (!dlog.isOpened()) {
piCout << "[PIBinaryLog]" << "Error, can't create" << dst;
return false;
}
piCout << "[PIBinaryLog]" << "Start join binlogs to" << dst;
PISystemTime dtime;
PISystemTime lt;
PITimeMeasurer tm;
bool first = true;
for (const PIString & fn : src) {
if (!slog.open(fn, PIIODevice::ReadOnly)) {
piCout << "[PIBinaryLog]" << "Error, can't open" << fn;
return false;
}
if (first) first = false;
else {
dtime = lt;
}
tm.reset();
BinLogRecord br;
PISystemTime st;
while (!slog.isEnd()) {
br = slog.readRecord();
st = br.timestamp;
lt = dtime + br.timestamp;
if (dlog.writeBinLog_raw(br.id, lt, br.data) <= 0) {
piCout << "[PIBinaryLog]" << "Error, can't write to file" << dst;
return false;
}
if (tm.elapsed_s() > 1) {
tm.reset();
piCout << "[PIBinaryLog]" << "process" << PITime::fromSystemTime(lt).toString();
}
}
piCout << "[PIBinaryLog]" << "complete" << fn;
}
piCout << "[PIBinaryLog]" << "Finish join binlogs, total time" << PITime::fromSystemTime(lt).toString();
return true; return true;
} }
@@ -728,19 +789,19 @@ bool PIBinaryLog::seek(llong filepos) {
PIString PIBinaryLog::constructFullPathDevice() const { PIString PIBinaryLog::constructFullPathDevice() const {
PIString ret; PIString ret;
ret << logDir() << ":" << filePrefix() << ":" << defaultID() << ":"; ret += logDir() + ":" + filePrefix() + ":" + PIString::fromNumber(defaultID()) + ":";
switch (play_mode) { switch (play_mode) {
case PlayRealTime: case PlayRealTime:
ret << "RT"; ret += "RT";
break; break;
case PlayVariableSpeed: case PlayVariableSpeed:
ret << PIString::fromNumber(playSpeed()) << "X"; ret += PIString::fromNumber(playSpeed()) + "X";
break; break;
case PlayStaticDelay: case PlayStaticDelay:
ret << PIString::fromNumber(playDelay().toMilliseconds()) << "M"; ret += PIString::fromNumber(playDelay().toMilliseconds()) + "M";
break; break;
default: default:
ret << "RT"; ret += "RT";
break; break;
} }
return ret; return ret;

View File

@@ -28,7 +28,10 @@
#include "pifile.h" #include "pifile.h"
//! \ingroup IO
//! \~\brief
//! \~english Binary log
//! \~russian Бинарный лог
class PIP_EXPORT PIBinaryLog: public PIIODevice class PIP_EXPORT PIBinaryLog: public PIIODevice
{ {
PIIODEVICE(PIBinaryLog, "binlog") PIIODEVICE(PIBinaryLog, "binlog")
@@ -165,6 +168,11 @@ public:
//! Set pause while playing via \a threadedRead or writing via write //! Set pause while playing via \a threadedRead or writing via write
void setPause(bool pause); void setPause(bool pause);
//! Set function wich returns new binlog file path when using split mode.
//! Overrides internal file path generator (logdir() + prefix() + current_time()).
//! To restore internal file path generator set this function to "nullptr".
void setFuncGetNewFilePath(std::function<PIString()> f) {f_new_path = f;}
//! Write one record to BinLog file, with ID = id, id must be greather than 0 //! Write one record to BinLog file, with ID = id, id must be greather than 0
int writeBinLog(int id, PIByteArray data) {return writeBinLog(id, data.data(), data.size_s());} int writeBinLog(int id, PIByteArray data) {return writeBinLog(id, data.data(), data.size_s());}
@@ -285,8 +293,13 @@ public:
//! Get binlog info and statistic //! Get binlog info and statistic
static BinLogInfo getLogInfo(const PIString & path); static BinLogInfo getLogInfo(const PIString & path);
//! Create new binlog from part of "src" with allowed IDs and "from" to "to" file position
static bool cutBinLog(const BinLogInfo & src, const PIString & dst, int from, int to); static bool cutBinLog(const BinLogInfo & src, const PIString & dst, int from, int to);
//! Create new binlog from serial splitted binlogs "src"
static bool joinBinLogsSerial(const PIStringList & src, const PIString & dst);
protected: protected:
PIString constructFullPathDevice() const; PIString constructFullPathDevice() const;
void configureFromFullPathDevice(const PIString & full_path); void configureFromFullPathDevice(const PIString & full_path);
@@ -314,7 +327,7 @@ private:
BinLogRecord readRecord(); BinLogRecord readRecord();
static void parseLog(PIFile *f, BinLogInfo *info, PIVector<BinLogIndex> * index); static void parseLog(PIFile *f, BinLogInfo *info, PIVector<BinLogIndex> * index);
void moveIndex(int i); void moveIndex(int i);
PIString getLogfilePath() const; static PIString getLogfilePath(const PIString & log_dir, const PIString & prefix);
PIVector<BinLogIndex> index; PIVector<BinLogIndex> index;
PIMap<llong, int> index_pos; PIMap<llong, int> index_pos;
@@ -331,6 +344,7 @@ private:
int write_count, split_count, default_id, current_index; int write_count, split_count, default_id, current_index;
bool is_started, is_thread_ok, is_indexed, rapid_start, is_pause; bool is_started, is_thread_ok, is_indexed, rapid_start, is_pause;
PIByteArray user_header; PIByteArray user_header;
std::function<PIString()> f_new_path;
}; };
//! \relatesalso PICout \brief Output operator PIBinaryLog::BinLogInfo to PICout //! \relatesalso PICout \brief Output operator PIBinaryLog::BinLogInfo to PICout