#include #include #include struct SomeLargeData { uint8_t data[4]{}; SomeLargeData() { memset(data, 0xff, sizeof(data)); } ~SomeLargeData() { memset(data, 0x00, sizeof(data)); } }; inline PIByteArray & operator <<(PIByteArray & s, const SomeLargeData & v) { s << PIByteArray::RawData(v.data, sizeof(v.data)); return s; } inline PIByteArray & operator >>(PIByteArray & s, SomeLargeData & v) { if (s.size() < sizeof(v.data)) { piCout << "Error in operator >> for SomeLargeData"; exit(1); } s >> PIByteArray::RawData(v.data, sizeof(v.data)); return s; } REGISTER_BUS_TYPE(SomeLargeData) void test_blocking_queue() { std::atomic_bool is_end(false); PIBlockingQueue out_dequeue; PIBlockingQueue in_dequeue; auto runnable = [&](){ while (true) { SMBlockData data; bool is_ok; data = in_dequeue.poll(100, SMBlockData(), &is_ok); if (!is_ok) { if (is_end) break; continue; } out_dequeue.put(data); // if (!is_ok) { // if (is_end) break; // std::this_thread::yield(); // } } }; PIVector> futures; for (int i = 0; i < 4; ++i) { futures.append(std::async(std::launch::async, runnable)); } SomeLargeData content; int iteration_count = 100 * 1000; for (int i = 0; i < iteration_count / 20; ++i) { for (int j = 0; j < 20; ++j) { SMBlockData block_data(j+1); for (int k = 0; k < j+1; ++k) { block_data[k].value() = content; } // bus_data << SMBusData::create(content); SMBlockData out_data = block_data.clone(); in_dequeue.put(out_data); } for (int j = 0; j < 20; ++j) { auto block_data = out_dequeue.take(); if (block_data[0].isInvalid()) { piCout << "Error: bus_data is invalid"; exit(1); } } // printf("It's alive! %d\n", i); } is_end = true; for (auto& future: futures) future.get(); } void test_mutexes() { std::atomic_bool is_end(false); PIDeque out_dequeue; PIDeque in_dequeue; PIMutex out_mutex; PIMutex in_mutex; auto runnable = [&](){ while (true) { SMBlockData data; in_mutex.lock(); bool is_ok = !in_dequeue.isEmpty(); if (is_ok) data = in_dequeue.take_front(); in_mutex.unlock(); if (!is_ok) { if (is_end) break; std::this_thread::yield(); continue; } out_mutex.lock(); out_dequeue.push_back(data); out_mutex.unlock(); // if (!is_ok) { // if (is_end) break; // std::this_thread::yield(); // } } }; PIVector> futures; for (int i = 0; i < 4; ++i) { futures.append(std::async(std::launch::async, runnable)); } SomeLargeData content; int iteration_count = 100 * 1000; for (int i = 0; i < iteration_count / 20; ++i) { for (int j = 0; j < 20; ++j) { SMBlockData block_data(j+1); for (int k = 0; k < j+1; ++k) { block_data[k].value() = content; } // bus_data << SMBusData::create(content); SMBlockData out_data = block_data.clone(); in_mutex.lock(); in_dequeue.push_back(out_data); in_mutex.unlock(); } for (int j = 0; j < 20; ++j) { out_mutex.lock(); if (out_dequeue.isEmpty()) { out_mutex.unlock(); j--; std::this_thread::yield(); continue; } auto block_data = out_dequeue.take_front(); out_mutex.unlock(); if (block_data[0].isInvalid()) { piCout << "Error: bus_data is invalid"; exit(1); } } // printf("It's alive! %d\n", i); } is_end = true; for (auto& future: futures) future.get(); } int main() { test_blocking_queue(); return 0; }