PIBinaryLog createIndexOnFly, loadIndex, saveIndex features
PIFile readAll and writeAll static methods
This commit is contained in:
@@ -60,6 +60,7 @@ PIBinaryLog::PIBinaryLog() {
|
||||
setThreadedReadBufferSize(65536);
|
||||
#endif
|
||||
is_started = is_indexed = is_pause = false;
|
||||
create_index_on_fly = false;
|
||||
current_index = -1;
|
||||
log_size = 0;
|
||||
f_new_path = nullptr;
|
||||
@@ -95,7 +96,6 @@ bool PIBinaryLog::openDevice() {
|
||||
is_indexed = false;
|
||||
is_pause = false;
|
||||
index.clear();
|
||||
index_pos.clear();
|
||||
log_size = 0;
|
||||
if (mode_ == ReadWrite) {
|
||||
piCoutObj << "Error: ReadWrite mode not supported, use WriteOnly or ReadOnly";
|
||||
@@ -162,7 +162,6 @@ bool PIBinaryLog::closeDevice() {
|
||||
moveIndex(-1);
|
||||
is_indexed = false;
|
||||
index.clear();
|
||||
index_pos.clear();
|
||||
bool e = isEmpty();
|
||||
log_size = 0;
|
||||
if (canWrite() && e) {
|
||||
@@ -293,6 +292,11 @@ void PIBinaryLog::createNewFile(const PIString & path) {
|
||||
}
|
||||
|
||||
|
||||
void PIBinaryLog::setCreateIndexOnFly(bool yes) {
|
||||
create_index_on_fly = yes;
|
||||
}
|
||||
|
||||
|
||||
void PIBinaryLog::setPause(bool pause) {
|
||||
pausemutex.lock();
|
||||
is_pause = pause;
|
||||
@@ -313,12 +317,7 @@ int PIBinaryLog::writeBinLog(int id, const void * data, int size) {
|
||||
}
|
||||
if (is_pause) return 0;
|
||||
logmutex.lock();
|
||||
PIByteArray logdata;
|
||||
logdata << id << size << (PISystemTime::current() - startlogtime) << PIMemoryBlock(data, size);
|
||||
int res = file.write(logdata.data(), logdata.size());
|
||||
file.flush();
|
||||
write_count++;
|
||||
log_size = file.size();
|
||||
int res = writeRecord(id, PISystemTime::current() - startlogtime, data, size);
|
||||
switch (split_mode) {
|
||||
case SplitSize:
|
||||
if (log_size > split_size) createNewFile();
|
||||
@@ -341,13 +340,8 @@ int PIBinaryLog::writeBinLog(int id, const void * data, int size) {
|
||||
|
||||
int PIBinaryLog::writeBinLog_raw(int id, const PISystemTime & time, const void * data, int size) {
|
||||
if (size <= 0 || !canWrite()) return -1;
|
||||
PIByteArray logdata;
|
||||
logdata << id << size << time << PIMemoryBlock(data, size);
|
||||
logmutex.lock();
|
||||
int res = file.write(logdata.data(), logdata.size());
|
||||
file.flush();
|
||||
write_count++;
|
||||
log_size = file.size();
|
||||
int res = writeRecord(id, time, data, size);
|
||||
logmutex.unlock();
|
||||
if (res > 0)
|
||||
return size;
|
||||
@@ -359,7 +353,7 @@ int PIBinaryLog::writeBinLog_raw(int id, const PISystemTime & time, const void *
|
||||
PIByteArray PIBinaryLog::readBinLog(int id, PISystemTime * time, int * readed_id) {
|
||||
if (!canRead()) return PIByteArray();
|
||||
logmutex.lock();
|
||||
BinLogRecord br = readRecord();
|
||||
Record br = readRecord();
|
||||
logmutex.unlock();
|
||||
if (br.id == -1) {
|
||||
piCoutObj << "End of BinLog file";
|
||||
@@ -411,7 +405,7 @@ void PIBinaryLog::setHeader(const PIByteArray & header) {
|
||||
|
||||
|
||||
PIByteArray PIBinaryLog::getHeader() {
|
||||
return binfo.user_header;
|
||||
return index.info.user_header;
|
||||
}
|
||||
|
||||
|
||||
@@ -421,7 +415,7 @@ ssize_t PIBinaryLog::readDevice(void * read_to, ssize_t max_size) {
|
||||
if (!is_thread_ok && lastrecord.id > 0) return lastrecord.data.size();
|
||||
if (!canRead()) return -1;
|
||||
if (max_size <= 0 || read_to == 0) return -1;
|
||||
BinLogRecord br;
|
||||
Record br;
|
||||
br.id = 0;
|
||||
if (filterID.isEmpty()) {
|
||||
br = readRecord();
|
||||
@@ -476,12 +470,16 @@ bool PIBinaryLog::writeFileHeader() {
|
||||
file.write(&sz, 4);
|
||||
file.write(user_header);
|
||||
file.flush();
|
||||
if (create_index_on_fly) {
|
||||
index.info.user_header = user_header;
|
||||
index.info.path = file.path();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
bool PIBinaryLog::checkFileHeader() {
|
||||
binfo.user_header.clear();
|
||||
index.info.user_header.clear();
|
||||
uchar read_sig[PIBINARYLOG_SIGNATURE_SIZE];
|
||||
for (uint i = 0; i < PIBINARYLOG_SIGNATURE_SIZE; i++)
|
||||
read_sig[i] = 0;
|
||||
@@ -504,7 +502,7 @@ bool PIBinaryLog::checkFileHeader() {
|
||||
uint32_t sz = 0;
|
||||
file.read(&sz, 4);
|
||||
if (sz > 0) {
|
||||
binfo.user_header = file.read(sz);
|
||||
index.info.user_header = file.read(sz);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@@ -515,15 +513,15 @@ bool PIBinaryLog::checkFileHeader() {
|
||||
}
|
||||
|
||||
|
||||
PIBinaryLog::BinLogRecord PIBinaryLog::readRecord() {
|
||||
PIBinaryLog::Record PIBinaryLog::readRecord() {
|
||||
// piCoutObj << "readRecord pos =" << file.pos();
|
||||
logmutex.lock();
|
||||
PIByteArray ba;
|
||||
BinLogRecord br;
|
||||
Record br;
|
||||
lastrecord.id = 0;
|
||||
lastrecord.data.clear();
|
||||
lastrecord.timestamp = PISystemTime();
|
||||
ba.resize(sizeof(BinLogRecord) - sizeof(PIByteArray));
|
||||
ba.resize(sizeof(Record) - sizeof(PIByteArray));
|
||||
if (file.read(ba.data(), ba.size_s()) > 0) {
|
||||
ba >> br.id >> br.size >> br.timestamp;
|
||||
} else {
|
||||
@@ -542,13 +540,55 @@ PIBinaryLog::BinLogRecord PIBinaryLog::readRecord() {
|
||||
br.id = 0;
|
||||
lastrecord = br;
|
||||
if (br.id == 0) fileError();
|
||||
moveIndex(index_pos.value(file.pos(), -1));
|
||||
moveIndex(index.index_pos.value(file.pos(), -1));
|
||||
logmutex.unlock();
|
||||
// piCoutObj << "readRecord done";
|
||||
return br;
|
||||
}
|
||||
|
||||
|
||||
int PIBinaryLog::writeRecord(int id, PISystemTime time, const void * data, int size) {
|
||||
PIByteArray logdata;
|
||||
logdata << id << size << time << PIMemoryBlock(data, size);
|
||||
if (create_index_on_fly) {
|
||||
BinLogIndex bi;
|
||||
bi.id = id;
|
||||
bi.data_size = size;
|
||||
bi.pos = file.pos();
|
||||
bi.timestamp = time;
|
||||
index.index << bi;
|
||||
auto & rinfo(index.info.records[id]);
|
||||
if (rinfo.count == 0) {
|
||||
rinfo.id = id;
|
||||
rinfo.start_time = rinfo.end_time = time;
|
||||
rinfo.minimum_size = rinfo.maximum_size = size;
|
||||
} else {
|
||||
rinfo.start_time = piMin(rinfo.start_time, time);
|
||||
rinfo.end_time = piMax(rinfo.end_time, time);
|
||||
rinfo.minimum_size = piMin(rinfo.minimum_size, size);
|
||||
rinfo.maximum_size = piMax(rinfo.maximum_size, size);
|
||||
}
|
||||
++rinfo.count;
|
||||
if (index.info.records_count == 0) {
|
||||
index.info.start_time = index.info.end_time = time;
|
||||
} else {
|
||||
index.info.start_time = piMin(index.info.start_time, time);
|
||||
index.info.end_time = piMax(index.info.end_time, time);
|
||||
}
|
||||
++index.info.records_count;
|
||||
is_indexed = true;
|
||||
}
|
||||
int ret = file.write(logdata.data(), logdata.size());
|
||||
file.flush();
|
||||
write_count++;
|
||||
log_size = file.size();
|
||||
if (create_index_on_fly) {
|
||||
index.info.log_size = log_size;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
void PIBinaryLog::parseLog(PIFile * f, PIBinaryLog::BinLogInfo * info, PIVector<PIBinaryLog::BinLogIndex> * index) {
|
||||
if (!info && !index) return;
|
||||
if (info) {
|
||||
@@ -596,16 +636,17 @@ void PIBinaryLog::parseLog(PIFile * f, PIBinaryLog::BinLogInfo * info, PIVector<
|
||||
if (read_version == PIBINARYLOG_VERSION) {
|
||||
uint32_t sz = 0;
|
||||
f->read(&sz, 4);
|
||||
if (sz > 0 && info) {
|
||||
info->user_header = f->read(sz);
|
||||
if (sz > 0) {
|
||||
PIByteArray user_hdr = f->read(sz);
|
||||
if (info) info->user_header = user_hdr;
|
||||
}
|
||||
}
|
||||
PIByteArray ba;
|
||||
BinLogRecord br;
|
||||
Record br;
|
||||
br.id = 0;
|
||||
br.size = 0;
|
||||
bool first = true;
|
||||
size_t hdr_size = sizeof(BinLogRecord) - sizeof(PIByteArray);
|
||||
size_t hdr_size = sizeof(Record) - sizeof(PIByteArray);
|
||||
ba.resize(hdr_size);
|
||||
while (1) {
|
||||
ba.resize(hdr_size);
|
||||
@@ -690,7 +731,7 @@ bool PIBinaryLog::cutBinLog(const PIBinaryLog::BinLogInfo & src, const PIString
|
||||
return false;
|
||||
}
|
||||
bool first = true;
|
||||
BinLogRecord br;
|
||||
Record br;
|
||||
PISystemTime st;
|
||||
PITimeMeasurer tm;
|
||||
while (!slog.isEnd() && ((slog.pos() <= to) || to < 0)) {
|
||||
@@ -746,7 +787,7 @@ bool PIBinaryLog::joinBinLogsSerial(const PIStringList & src,
|
||||
dtime = lt;
|
||||
}
|
||||
tm.reset();
|
||||
BinLogRecord br;
|
||||
Record br;
|
||||
PISystemTime st;
|
||||
while (!slog.isEnd()) {
|
||||
br = slog.readRecord();
|
||||
@@ -787,12 +828,10 @@ bool PIBinaryLog::createIndex() {
|
||||
llong cp = file.pos();
|
||||
file.seekToBegin();
|
||||
index.clear();
|
||||
index_pos.clear();
|
||||
parseLog(&file, &binfo, &index);
|
||||
parseLog(&file, &index.info, &index.index);
|
||||
index.makeIndexPos();
|
||||
file.seek(cp);
|
||||
is_indexed = !index.isEmpty();
|
||||
for (uint i = 0; i < index.size(); i++)
|
||||
index_pos[index[i].pos] = i;
|
||||
is_indexed = !index.index.isEmpty();
|
||||
logmutex.unlock();
|
||||
return is_indexed;
|
||||
}
|
||||
@@ -800,8 +839,8 @@ bool PIBinaryLog::createIndex() {
|
||||
|
||||
int PIBinaryLog::posForTime(const PISystemTime & time) {
|
||||
int ci = -1;
|
||||
for (uint i = 0; i < index.size(); i++) {
|
||||
if (time <= index[i].timestamp && (filterID.contains(index[i].id) || filterID.isEmpty())) {
|
||||
for (uint i = 0; i < index.index.size(); i++) {
|
||||
if (time <= index.index[i].timestamp && (filterID.contains(index.index[i].id) || filterID.isEmpty())) {
|
||||
ci = i;
|
||||
break;
|
||||
}
|
||||
@@ -814,12 +853,12 @@ void PIBinaryLog::seekTo(int rindex) {
|
||||
// piCoutObj << "seekTo";
|
||||
logmutex.lock();
|
||||
pausemutex.lock();
|
||||
if (rindex < index.size_s() && rindex >= 0) {
|
||||
file.seek(index[rindex].pos);
|
||||
moveIndex(index_pos.value(file.pos(), -1));
|
||||
if (rindex < index.index.size_s() && rindex >= 0) {
|
||||
file.seek(index.index[rindex].pos);
|
||||
moveIndex(index.index_pos.value(file.pos(), -1));
|
||||
// double prev_pt = play_time;
|
||||
play_time = index[rindex].timestamp.toMilliseconds();
|
||||
lastrecord.timestamp = index[rindex].timestamp;
|
||||
play_time = index.index[rindex].timestamp.toMilliseconds();
|
||||
lastrecord.timestamp = index.index[rindex].timestamp;
|
||||
if (play_mode == PlayRealTime) {
|
||||
startlogtime = PISystemTime::current() - lastrecord.timestamp;
|
||||
}
|
||||
@@ -842,8 +881,8 @@ bool PIBinaryLog::seek(const PISystemTime & time) {
|
||||
|
||||
bool PIBinaryLog::seek(llong filepos) {
|
||||
int ci = -1;
|
||||
for (uint i = 0; i < index.size(); i++) {
|
||||
if (filepos <= index[i].pos && (filterID.contains(index[i].id) || filterID.isEmpty())) {
|
||||
for (uint i = 0; i < index.index.size(); i++) {
|
||||
if (filepos <= index.index[i].pos && (filterID.contains(index.index[i].id) || filterID.isEmpty())) {
|
||||
ci = i;
|
||||
break;
|
||||
}
|
||||
@@ -856,6 +895,27 @@ bool PIBinaryLog::seek(llong filepos) {
|
||||
}
|
||||
|
||||
|
||||
int PIBinaryLog::pos() const {
|
||||
if (is_indexed) return current_index;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
PIByteArray PIBinaryLog::saveIndex() const {
|
||||
if (!is_indexed) return PIByteArray();
|
||||
return piSerialize(index);
|
||||
}
|
||||
|
||||
|
||||
bool PIBinaryLog::loadIndex(PIByteArray saved) {
|
||||
if (!canRead() || saved.isEmpty()) return false;
|
||||
index = piDeserialize<CompleteIndex>(saved);
|
||||
index.makeIndexPos();
|
||||
is_indexed = index.index.isNotEmpty();
|
||||
return is_indexed;
|
||||
}
|
||||
|
||||
|
||||
PIString PIBinaryLog::constructFullPathDevice() const {
|
||||
PIString ret;
|
||||
ret += logDir() + ":" + filePrefix() + ":" + PIString::fromNumber(defaultID()) + ":";
|
||||
@@ -928,3 +988,17 @@ void PIBinaryLog::propertyChanged(const char * s) {
|
||||
split_count = property("splitRecordCount").toInt();
|
||||
// piCoutObj << "propertyChanged" << s << play_mode;
|
||||
}
|
||||
|
||||
|
||||
void PIBinaryLog::CompleteIndex::clear() {
|
||||
info = BinLogInfo();
|
||||
index.clear();
|
||||
index_pos.clear();
|
||||
}
|
||||
|
||||
|
||||
void PIBinaryLog::CompleteIndex::makeIndexPos() {
|
||||
index_pos.clear();
|
||||
for (uint i = 0; i < index.size(); i++)
|
||||
index_pos[index[i].pos] = i;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user