binlog fixes

PIBinaryStream doc
remove makePIPair
rename bytesAvailable
This commit is contained in:
Бычков Андрей
2022-07-28 14:46:58 +03:00
parent 00d06f71ba
commit 1b09ad5c27
19 changed files with 95 additions and 55 deletions

View File

@@ -42,7 +42,7 @@ public:
void setServerName(const PIString & server_name); void setServerName(const PIString & server_name);
void setKeepConnection(bool on); void setKeepConnection(bool on);
bool isConnected() const {return is_connected;} bool isConnected() const {return is_connected;}
ssize_t bytesAvailible() const override {return buff.size();} ssize_t bytesAvailable() const override {return buff.size();}
EVENT(connected); EVENT(connected);
EVENT(disconnected); EVENT(disconnected);

View File

@@ -50,7 +50,7 @@ public:
int readDevice(void * read_to, int max_size) override; int readDevice(void * read_to, int max_size) override;
int writeDevice(const void * data, int size) override; int writeDevice(const void * data, int size) override;
DeviceInfoFlags deviceInfoFlags() const override {return PIIODevice::Reliable;} DeviceInfoFlags deviceInfoFlags() const override {return PIIODevice::Reliable;}
ssize_t bytesAvailible() const override {return buff.size();} ssize_t bytesAvailable() const override {return buff.size();}
private: private:
void pushBuffer(const PIByteArray & ba); void pushBuffer(const PIByteArray & ba);

View File

@@ -132,10 +132,7 @@ template< class T1, class T2 >
PIPair<T1,T2> createPIPair(const T1 & f, const T2 & s) { PIPair<T1,T2> createPIPair(const T1 & f, const T2 & s) {
return PIPair<T1,T2>(f, s); return PIPair<T1,T2>(f, s);
} }
template< class T1, class T2 >
PIPair<T1,T2> makePIPair(const T1 & f, const T2 & s) {
return PIPair<T1,T2>(f, s);
}
//! \~english Creates \a PIPair object, deducing the target type from the types of arguments. //! \~english Creates \a PIPair object, deducing the target type from the types of arguments.
//! \~russian Создает \a PIPair выводя типы из аргументов. //! \~russian Создает \a PIPair выводя типы из аргументов.
@@ -144,9 +141,6 @@ template< class T1, class T2 >
PIPair<T1,T2> createPIPair(T1 && f, T2 && s) { PIPair<T1,T2> createPIPair(T1 && f, T2 && s) {
return PIPair<T1,T2>(std::move(f), std::move(s)); return PIPair<T1,T2>(std::move(f), std::move(s));
} }
template< class T1, class T2 >
PIPair<T1,T2> makePIPair(T1 && f, T2 && s) {
return PIPair<T1,T2>(std::move(f), std::move(s));
}
#endif // PIPAIR_H #endif // PIPAIR_H

View File

@@ -46,15 +46,20 @@
//! \~\brief //! \~\brief
//! \~english Binary serialization interface. //! \~english Binary serialization interface.
//! \~russian Интерфейс бинарной сериализации. //! \~russian Интерфейс бинарной сериализации.
//! \~\details
//! \~english In your class you should implement this methods:
//! \~russian В производном классе вы должны реализовать следующие методы:
//! \~\code
//! bool binaryStreamAppendImp (const void * d, size_t s);
//! bool binaryStreamTakeImp (void * d, size_t s);
//! ssize_t binaryStreamSizeImp () const;
//! \endcode
//! \~english function binaryStreamSizeImp must return -1 if size unknown
//! \~russian функция binaryStreamSizeImp должна возвращать -1 если нет информации о размере
template<typename P> template<typename P>
class PIBinaryStream { class PIBinaryStream {
public: public:
// one should implement next methods: //! \~russian Записать данные
//
// bool binaryStreamAppendImp (const void * d, size_t s);
// bool binaryStreamTakeImp (void * d, size_t s);
// ssize_t binaryStreamSizeImp () const;
bool binaryStreamAppend(const void * d, size_t s) { bool binaryStreamAppend(const void * d, size_t s) {
if (!static_cast<P*>(this)->binaryStreamAppendImp(d, s)) { if (!static_cast<P*>(this)->binaryStreamAppendImp(d, s)) {
return false; return false;
@@ -62,6 +67,7 @@ public:
} }
return true; return true;
} }
//! \~russian Прочитать данные
bool binaryStreamTake(void * d, size_t s) { bool binaryStreamTake(void * d, size_t s) {
if (!static_cast<P*>(this)->binaryStreamTakeImp(d, s)) { if (!static_cast<P*>(this)->binaryStreamTakeImp(d, s)) {
return false; return false;
@@ -70,12 +76,18 @@ public:
return true; return true;
} }
//! \~russian Узнать оставшийся размер
//!\~\details
//!\~russian возвращает -1 если нет информации о размере
ssize_t binaryStreamSize() const { ssize_t binaryStreamSize() const {
return static_cast<P*>(this)->binaryStreamSizeImp(); return static_cast<P*>(this)->binaryStreamSizeImp();
} }
//! \~russian Записать данные
template<typename T> template<typename T>
void binaryStreamAppend(T v) {binaryStreamAppend(&v, sizeof(v));} void binaryStreamAppend(T v) {binaryStreamAppend(&v, sizeof(v));}
//! \~russian Прочитать int
int binaryStreamTakeInt() { int binaryStreamTakeInt() {
int r = 0; int r = 0;
binaryStreamTake(&r, sizeof(r)); binaryStreamTake(&r, sizeof(r));

View File

@@ -550,15 +550,33 @@ void PIBinaryLog::parseLog(PIFile * f, PIBinaryLog::BinLogInfo * info, PIVector<
} }
uchar read_sig[PIBINARYLOG_SIGNATURE_SIZE]; uchar read_sig[PIBINARYLOG_SIGNATURE_SIZE];
for (uint i=0; i<PIBINARYLOG_SIGNATURE_SIZE; i++) read_sig[i] = 0; for (uint i=0; i<PIBINARYLOG_SIGNATURE_SIZE; i++) read_sig[i] = 0;
bool ok = true; if (f->read(read_sig, PIBINARYLOG_SIGNATURE_SIZE) < 0) {
if (f->read(read_sig, PIBINARYLOG_SIGNATURE_SIZE) < 0) {if (info) info->records_count = -1; ok = false;} if (info) info->records_count = -1;
for (uint i=0; i<PIBINARYLOG_SIGNATURE_SIZE; i++) return;
if (read_sig[i] != binlog_sig[i]) {if (info) info->records_count = -2; ok = false;} }
for (uint i=0; i<PIBINARYLOG_SIGNATURE_SIZE; i++) {
if (read_sig[i] != binlog_sig[i]) {
if (info) info->records_count = -2;
return;
}
}
uchar read_version = 0; uchar read_version = 0;
if (f->read(&read_version, 1) < 0) {if (info) info->records_count = -3; ok = false;} if (f->read(&read_version, 1) < 0) {
if (read_version == 0) {if (info) info->records_count = -4; ok = false;} if (info) info->records_count = -3;
if (read_version < PIBINARYLOG_VERSION_OLD) {if (info) info->records_count = -5; ok = false;} return;
if (read_version > PIBINARYLOG_VERSION) {if (info) info->records_count = -6; ok = false;} }
if (read_version == 0) {
if (info) info->records_count = -4;
return;
}
if (read_version < PIBINARYLOG_VERSION_OLD) {
if (info) info->records_count = -5;
return;
}
if (read_version > PIBINARYLOG_VERSION) {
if (info) info->records_count = -6;
return;
}
if (read_version == PIBINARYLOG_VERSION) { if (read_version == PIBINARYLOG_VERSION) {
uint32_t sz = 0; uint32_t sz = 0;
f->read(&sz, 4); f->read(&sz, 4);
@@ -566,7 +584,6 @@ void PIBinaryLog::parseLog(PIFile * f, PIBinaryLog::BinLogInfo * info, PIVector<
info->user_header = f->read(sz); info->user_header = f->read(sz);
} }
} }
if (!ok) return;
PIByteArray ba; PIByteArray ba;
BinLogRecord br; BinLogRecord br;
br.id = 0; br.id = 0;
@@ -580,11 +597,15 @@ void PIBinaryLog::parseLog(PIFile * f, PIBinaryLog::BinLogInfo * info, PIVector<
if (f->read(ba.data(), ba.size()) > 0) { if (f->read(ba.data(), ba.size()) > 0) {
ba >> br.id >> br.size >> br.timestamp; ba >> br.id >> br.size >> br.timestamp;
} else break; } else break;
if (info->log_size - f->pos() >= br.size) f->seek(f->pos() + br.size); if (info) {
if (info->log_size - f->pos() >= br.size) {
f->seek(f->pos() + br.size);
}
}
else break; else break;
} }
if (br.id > 0) { if (br.id > 0) {
if (info) { if (index) {
BinLogIndex bl_ind; BinLogIndex bl_ind;
bl_ind.id = br.id; bl_ind.id = br.id;
bl_ind.data_size = br.size; bl_ind.data_size = br.size;
@@ -674,15 +695,9 @@ bool PIBinaryLog::cutBinLog(const PIBinaryLog::BinLogInfo & src, const PIString
} }
bool PIBinaryLog::joinBinLogsSerial(const PIStringList & src, const PIString & dst) { bool PIBinaryLog::joinBinLogsSerial(const PIStringList & src, const PIString & dst, std::function<bool (const PIString &, PISystemTime)> progress) {
PIBinaryLog slog; PIBinaryLog slog;
PIBinaryLog dlog; PIBinaryLog dlog;
dlog.createNewFile(dst);
if (!dlog.isOpened()) {
piCout << "[PIBinaryLog]" << "Error, can't create" << dst;
return false;
}
piCout << "[PIBinaryLog]" << "Start join binlogs to" << dst;
PISystemTime dtime; PISystemTime dtime;
PISystemTime lt; PISystemTime lt;
PITimeMeasurer tm; PITimeMeasurer tm;
@@ -692,8 +707,16 @@ bool PIBinaryLog::joinBinLogsSerial(const PIStringList & src, const PIString & d
piCout << "[PIBinaryLog]" << "Error, can't open" << fn; piCout << "[PIBinaryLog]" << "Error, can't open" << fn;
return false; return false;
} }
if (first) first = false; if (first) {
else { first = false;
dlog.setHeader(slog.getHeader());
dlog.createNewFile(dst);
if (!dlog.isOpened()) {
piCout << "[PIBinaryLog]" << "Error, can't create" << dst;
return false;
}
piCout << "[PIBinaryLog]" << "Start join binlogs to" << dst;
} else {
dtime = lt; dtime = lt;
} }
tm.reset(); tm.reset();
@@ -701,18 +724,29 @@ bool PIBinaryLog::joinBinLogsSerial(const PIStringList & src, const PIString & d
PISystemTime st; PISystemTime st;
while (!slog.isEnd()) { while (!slog.isEnd()) {
br = slog.readRecord(); br = slog.readRecord();
if (br.data.isEmpty() || br.id < 1) continue;
st = br.timestamp; st = br.timestamp;
lt = dtime + br.timestamp; lt = dtime + br.timestamp;
if (dlog.writeBinLog_raw(br.id, lt, br.data) <= 0) { if (dlog.writeBinLog_raw(br.id, lt, br.data) <= 0) {
piCout << "[PIBinaryLog]" << "Error, can't write to file" << dst; piCout << "[PIBinaryLog]" << "Error, can't write to file" << dst;
return false; return false;
} }
if (tm.elapsed_s() > 1) { if (tm.elapsed_s() > 0.1) {
tm.reset(); tm.reset();
if (progress) {
if (!progress(fn, lt)) {
slog.close();
dlog.close();
PIFile::remove(dlog.path());
return false;
}
} else {
piCout << "[PIBinaryLog]" << "process" << PITime::fromSystemTime(lt).toString(); piCout << "[PIBinaryLog]" << "process" << PITime::fromSystemTime(lt).toString();
} }
} }
piCout << "[PIBinaryLog]" << "complete" << fn; }
slog.close();
//piCout << "[PIBinaryLog]" << "complete" << fn;
} }
piCout << "[PIBinaryLog]" << "Finish join binlogs, total time" << PITime::fromSystemTime(lt).toString(); piCout << "[PIBinaryLog]" << "Finish join binlogs, total time" << PITime::fromSystemTime(lt).toString();
return true; return true;

View File

@@ -298,7 +298,7 @@ public:
static bool cutBinLog(const BinLogInfo & src, const PIString & dst, int from, int to); static bool cutBinLog(const BinLogInfo & src, const PIString & dst, int from, int to);
//! Create new binlog from serial splitted binlogs "src" //! Create new binlog from serial splitted binlogs "src"
static bool joinBinLogsSerial(const PIStringList & src, const PIString & dst); static bool joinBinLogsSerial(const PIStringList & src, const PIString & dst, std::function<bool (const PIString &, PISystemTime)> progress = nullptr);
protected: protected:
PIString constructFullPathDevice() const override; PIString constructFullPathDevice() const override;

View File

@@ -227,7 +227,7 @@ public:
//! \~russian Возвращает размер файла в байтах //! \~russian Возвращает размер файла в байтах
llong size() const; llong size() const;
ssize_t bytesAvailible() const override {return size() - pos();} ssize_t bytesAvailable() const override {return size() - pos();}
//! \~english Returns read/write position //! \~english Returns read/write position
//! \~russian Возвращает позицию чтения/записи //! \~russian Возвращает позицию чтения/записи

View File

@@ -84,7 +84,7 @@ public:
//! \~russian Вставляет данные "ba" в содержимое буфера в текущую позицию //! \~russian Вставляет данные "ba" в содержимое буфера в текущую позицию
int writeByteArray(const PIByteArray & ba); int writeByteArray(const PIByteArray & ba);
ssize_t bytesAvailible() const override { ssize_t bytesAvailable() const override {
if (data_) return data_->size(); if (data_) return data_->size();
else return 0; else return 0;
} }

View File

@@ -285,7 +285,7 @@ public:
PIByteArray read(int max_size); PIByteArray read(int max_size);
//! \~english Returns the number of bytes that are available for reading. //! \~english Returns the number of bytes that are available for reading.
//! \~russian Возвращает количество байт //! \~russian Возвращает количество байт доступных для чтения
//! \~\details //! \~\details
//! \~english This function is commonly used with sequential devices //! \~english This function is commonly used with sequential devices
//! to determine the number of bytes to allocate in a buffer before reading. //! to determine the number of bytes to allocate in a buffer before reading.
@@ -293,7 +293,7 @@ public:
//! \~russian Эта функция как правило используется чтобы знать какой //! \~russian Эта функция как правило используется чтобы знать какой
//! размер буфера нужен в памяти для чтения. //! размер буфера нужен в памяти для чтения.
//! Если функция возвращает -1 это значит что количество байт для чтения не известно. //! Если функция возвращает -1 это значит что количество байт для чтения не известно.
virtual ssize_t bytesAvailible() const {return -1;} virtual ssize_t bytesAvailable() const {return -1;}
//! \~english Write maximum "max_size" bytes of "data" to device //! \~english Write maximum "max_size" bytes of "data" to device
//! \~russian Пишет в устройство не более "max_size" байт из "data" //! \~russian Пишет в устройство не более "max_size" байт из "data"

View File

@@ -57,7 +57,7 @@ public:
ssize_t binaryStreamSizeImp() const { ssize_t binaryStreamSizeImp() const {
if (!dev) return 0; if (!dev) return 0;
return dev->bytesAvailible(); return dev->bytesAvailable();
} }
private: private:

View File

@@ -88,7 +88,7 @@ public:
//! \~russian Вставляет строку "string" в содержимое буфера в текущую позицию //! \~russian Вставляет строку "string" в содержимое буфера в текущую позицию
int writeString(const PIString & string); int writeString(const PIString & string);
ssize_t bytesAvailible() const override { ssize_t bytesAvailable() const override {
if (str) return str->size() - pos; if (str) return str->size() - pos;
else return 0; else return 0;
} }

View File

@@ -924,7 +924,7 @@ void PIPeer::changeName(const PIString &new_name) {
} }
ssize_t PIPeer::bytesAvailible() const { ssize_t PIPeer::bytesAvailable() const {
ssize_t ret = 0; ssize_t ret = 0;
read_buffer_mutex.lock(); read_buffer_mutex.lock();
if (!read_buffer.isEmpty()) ret = read_buffer.back().size(); if (!read_buffer.isEmpty()) ret = read_buffer.back().size();

View File

@@ -116,7 +116,7 @@ public:
void setTrustPeerName(const PIString & peer_name) {trust_peer = peer_name;} void setTrustPeerName(const PIString & peer_name) {trust_peer = peer_name;}
void setTcpServerIP(const PIString & ip) {server_ip = ip; tcpClientReconnect();} void setTcpServerIP(const PIString & ip) {server_ip = ip; tcpClientReconnect();}
ssize_t bytesAvailible() const override; ssize_t bytesAvailable() const override;
EVENT2(dataReceivedEvent, const PIString &, from, const PIByteArray &, data); EVENT2(dataReceivedEvent, const PIString &, from, const PIByteArray &, data);

View File

@@ -88,7 +88,7 @@ bool PISPI::isParameterSet(PISPI::Parameters parameter) const {
} }
ssize_t PISPI::bytesAvailible() const { ssize_t PISPI::bytesAvailable() const {
return recv_buf.size(); return recv_buf.size();
} }

View File

@@ -60,7 +60,7 @@ public:
//! Returns parameters //! Returns parameters
PIFlags<PISPI::Parameters> parameters() const {return spi_mode;} PIFlags<PISPI::Parameters> parameters() const {return spi_mode;}
ssize_t bytesAvailible() const override; ssize_t bytesAvailable() const override;
protected: protected:
bool openDevice() override; bool openDevice() override;

View File

@@ -49,7 +49,7 @@ PITransparentDevice::~PITransparentDevice() {
} }
ssize_t PITransparentDevice::bytesAvailible() const { ssize_t PITransparentDevice::bytesAvailable() const {
ssize_t ret = 0; ssize_t ret = 0;
que_mutex.lock(); que_mutex.lock();
if (que.isNotEmpty()) ret = que.back().size(); if (que.isNotEmpty()) ret = que.back().size();

View File

@@ -44,7 +44,7 @@ public:
virtual ~PITransparentDevice(); virtual ~PITransparentDevice();
ssize_t bytesAvailible() const override; ssize_t bytesAvailable() const override;
protected: protected:
bool openDevice() override; bool openDevice() override;

View File

@@ -137,8 +137,8 @@ void PIPacketExtractor::setDevice(PIIODevice * device_) {
} }
ssize_t PIPacketExtractor::bytesAvailible() const { ssize_t PIPacketExtractor::bytesAvailable() const {
if (dev) return dev->bytesAvailible(); if (dev) return dev->bytesAvailable();
else return 0; else return 0;
} }

View File

@@ -62,7 +62,7 @@ public:
//! Set child %device to "device_" //! Set child %device to "device_"
void setDevice(PIIODevice * device_); void setDevice(PIIODevice * device_);
ssize_t bytesAvailible() const override; ssize_t bytesAvailable() const override;
//! Returns buffer size //! Returns buffer size
int bufferSize() const {return buffer_size;} int bufferSize() const {return buffer_size;}