#include #include #include #include #include #include #include #define assert(_Expression) \ (void) \ ((!!(_Expression)) || \ (_assert(#_Expression,__FILE__,__LINE__),0)) void _assert (const char *_Message, const char *_File, unsigned _Line) { std::cerr << "assert (" << _Message << ") failed in " << _File << " line " << _Line << std::endl; exit(1); } std::atomic_flag is_calc_barrier = ATOMIC_FLAG_INIT; PIVector is_calc_statuses; namespace sm { struct block { PIVector input_blocks; PIVector output_blocks; std::atomic_flag barrier = ATOMIC_FLAG_INIT; const int is_calc_idx; block(const int isCalcIdx) : is_calc_idx(isCalcIdx) {} void calc() { std::this_thread::sleep_for(std::chrono::microseconds(rand() % 20)); } }; } void link(sm::block* out, sm::block* in) { out->output_blocks.push_back(in); in->input_blocks.push_back(out); } PIVector generate_scheme(int blocks_count = 100, int begin_block_count = 3, int expansion = 3, float connectivity = 0.3) { PIVector start_blocks; if (blocks_count <= 0) return start_blocks; is_calc_statuses.resize(blocks_count, false); int all_block_count; for (all_block_count = 0; all_block_count < begin_block_count; ++all_block_count) { auto block = new sm::block(all_block_count); start_blocks.push_back(block); if (all_block_count + 1 == blocks_count) return start_blocks; } PIVector current_blocks = start_blocks; do { int new_blocks_count = rand() % (2 * expansion) - expansion + current_blocks.size(); if (new_blocks_count < 1) new_blocks_count = 1; new_blocks_count = new_blocks_count + all_block_count > blocks_count ? blocks_count - all_block_count : new_blocks_count; PIVector next_current_blocks; for (int i = 0; i < new_blocks_count; ++i) { auto block = new sm::block(all_block_count); next_current_blocks.push_back(block); bool is_connected = false; for (int j = 0; j < current_blocks.size(); ++j) { if (rand() % 1000 < int(connectivity * 1000)) { link(current_blocks[j], block); is_connected = true; } } if (!is_connected) link(current_blocks[0], block); current_blocks = next_current_blocks; all_block_count++; if (all_block_count == blocks_count) return start_blocks; } } while (all_block_count < blocks_count); return start_blocks; } void print_scheme(PIVector& start_blocks) { PIVector current_blocks = start_blocks; int num = 1; PIMap block_nums; for (auto & current_block : current_blocks) { block_nums[current_block] = num++; } while (current_blocks.size() > 0) { PIVector next_current_blocks; for (auto current_block : current_blocks) { PICout cout = piCout; cout.setControl(0, true); cout << current_block->is_calc_idx << "("; for (auto & output_block : current_block->output_blocks) { if (block_nums.contains(output_block)) continue; block_nums[output_block] = num++; cout << output_block->is_calc_idx << ","; next_current_blocks.push_back(output_block); } cout << ") "; } piCout << ""; current_blocks = next_current_blocks; } } void unlock(sm::block* block, int locks_count = -1) { if (locks_count == -1) locks_count = block->input_blocks.size(); for (int i = 0; i < locks_count; ++i) { block->input_blocks[i]->barrier.clear(std::memory_order_release); } block->barrier.clear(std::memory_order_release); } int try_lock(PIVector& block_pool) { for (int i = 0; i < block_pool.size(); ++i) { auto block = block_pool[i]; if (block->barrier.test_and_set(std::memory_order_acquire)) continue; int locks_count = 0; for (auto & input_block : block->input_blocks) { if (input_block->barrier.test_and_set(std::memory_order_acquire)) break; locks_count++; } if (locks_count == block->input_blocks.size()) return i; unlock(block, locks_count); } return -1; } sm::block* try_lock_next_and_post(std::atomic_flag& block_pool_flag, PIVector& block_pool, bool& is_block_pool_empty, PIVector& new_available_blocks) { while(block_pool_flag.test_and_set(std::memory_order_acquire)) { std::this_thread::yield(); } block_pool << new_available_blocks; sm::block* block = nullptr; if (block_pool.isEmpty()) { is_block_pool_empty = true; } else { is_block_pool_empty = false; int block_idx = try_lock(block_pool); if (block_idx != -1) { block = block_pool[block_idx]; block_pool.remove(block_idx); // std::cout << block->is_calc_idx << ": locked for calc" << std::endl; } } block_pool_flag.clear(std::memory_order_release); return block; } bool is_available_block(sm::block* block) { for (auto input_block : block->input_blocks) { // std::cout << input_block->is_calc_idx << ": check " << is_calc_statuses[input_block->is_calc_idx] << std::endl; if (!is_calc_statuses[input_block->is_calc_idx]) return false; } return true; } void post_available_blocks(sm::block* calc_block, PIVector& new_available_blocks) { while(is_calc_barrier.test_and_set(std::memory_order_acquire)) { std::this_thread::yield(); } // std::cout << calc_block->is_calc_idx << ": looking for new available blocks" << std::endl; is_calc_statuses[calc_block->is_calc_idx] = true; new_available_blocks.clear(); for (auto output_block : calc_block->output_blocks) { if (is_available_block(output_block)) { // std::cout << calc_block->is_calc_idx << ": new available block: " << output_block->is_calc_idx << std::endl; new_available_blocks.push_back(output_block); } } is_calc_barrier.clear(std::memory_order_release); } int main() { srand(time(nullptr)); PIVector start_blocks = generate_scheme(100); print_scheme(start_blocks); std::atomic_int waiting_threads_flags; std::atomic_flag block_pool_flag = ATOMIC_FLAG_INIT; PIVector block_pool = start_blocks; const unsigned THREAD_COUNT = 6; std::vector> duration_futures; for (unsigned i = 0; i < THREAD_COUNT; ++i) { auto duration = std::async(std::launch::deferred, [&, i](){ bool is_block_pool_empty; bool is_block_pool_empty_old; PIVector new_available_blocks; double calc_time = 0; auto all_start = std::chrono::high_resolution_clock::now(); do { auto block = try_lock_next_and_post(block_pool_flag, block_pool, is_block_pool_empty, new_available_blocks); new_available_blocks.clear(); if (is_block_pool_empty && !is_block_pool_empty_old) { int waiting_threads_val = waiting_threads_flags.fetch_or(1u << i) | 1u << i; // std::cout << i << " wtv=" << waiting_threads_val << std::endl; if (waiting_threads_val == (1u << THREAD_COUNT) - 1) break; is_block_pool_empty_old = is_block_pool_empty; } else if (!is_block_pool_empty && is_block_pool_empty_old) { waiting_threads_flags.fetch_and(~(1u << i)); is_block_pool_empty_old = is_block_pool_empty; } if (block == nullptr) { std::this_thread::yield(); } else { auto start = std::chrono::high_resolution_clock::now(); block->calc(); auto end = std::chrono::high_resolution_clock::now(); calc_time += std::chrono::duration_cast(end - start).count() / 1000.; unlock(block); post_available_blocks(block, new_available_blocks); } } while (true); std::cout << "terminating thread " << i << std::endl; auto all_end = std::chrono::high_resolution_clock::now(); double all_time = std::chrono::duration_cast(all_end - all_start).count() / 1000.f; return all_time - calc_time; }); duration_futures.push_back(std::move(duration)); } for (auto & future : duration_futures) future.wait(); std::cout << "Sync durations: "; for (auto & future : duration_futures) std::cout << future.get() << "ms "; std::cout << std::endl; return 0; }