#include #include struct SomeLargeData { uint8_t data[4]{}; SomeLargeData() { memset(data, 0xff, sizeof(data)); } ~SomeLargeData() { memset(data, 0x00, sizeof(data)); } }; inline PIByteArray & operator <<(PIByteArray & s, const SomeLargeData & v) { s << PIByteArray::RawData(v.data, sizeof(v.data)); return s; } inline PIByteArray & operator >>(PIByteArray & s, SomeLargeData & v) { if (s.size() < sizeof(v.data)) { piCout << "Error in operator >> for SomeLargeData"; exit(1); } s >> PIByteArray::RawData(v.data, sizeof(v.data)); return s; } REGISTER_BUS_TYPE(SomeLargeData) int main() { std::atomic_bool is_end(false); PIThreadPoolExecutor executor(4); PIBlockingDequeue out_dequeue; PIBlockingDequeue in_dequeue; executor.execute([&](){ while (true) { bool is_ok; SMBlockData data; data = in_dequeue.poll(100, data, &is_ok); if (!is_ok) { if (is_end) break; } out_dequeue.offer(data, 100); if (!is_ok) { if (is_end) break; } } }); SomeLargeData content; int iteration_count = 100 * 1000; for (int i = 0; i < iteration_count / 20; ++i) { for (int j = 0; j < 20; ++j) { SMBlockData block_data(j+1); for (int k = 0; k < j+1; ++k) { block_data[k].sharedData() = content; } // bus_data << SMBusData::create(content); in_dequeue.offer(block_data.clone()); } for (int j = 0; j < 20; ++j) { auto block_data = out_dequeue.take(); if (block_data[0].isInvalid()) { piCout << "Error: bus_data is invalid"; exit(1); } } // printf("It's alive! %d\n", i); } is_end = true; executor.shutdownNow(); return 0; }