#include #include #include #include #include #include #include #include #define assert(_Expression) \ (void) \ ((!!(_Expression)) || \ (_assert(#_Expression,__FILE__,__LINE__),0)) void _assert (const char *_Message, const char *_File, unsigned _Line) { std::cerr << "assert (" << _Message << ") failed in " << _File << " line " << _Line << std::endl; exit(1); } std::atomic_flag is_calc_barrier = ATOMIC_FLAG_INIT; PIVector is_calc_statuses; namespace sm { struct block { PIVector input_blocks; PIVector output_blocks; std::atomic_flag barrier = ATOMIC_FLAG_INIT; const int is_calc_idx; const std::chrono::microseconds calc_time; static std::chrono::microseconds random_time() { float val = powf(rand() % 1000 / 1000.f, 20.f) * 30.f * 1000.f; // std::cout << int(val) << std::endl; return std::chrono::microseconds(int(val)); } explicit block(const int is_calc_idx) : is_calc_idx(is_calc_idx), calc_time(random_time()) {} void calc() { std::this_thread::sleep_for(calc_time); } }; struct time_report { double calc_time_ms; double sync_time_ms; }; } void link(sm::block* out, sm::block* in) { out->output_blocks.push_back(in); in->input_blocks.push_back(out); } PIVector scheme_generate(int blocks_count = 100, int begin_block_count = 3, int expansion = 3, float connectivity = 0.3) { PIVector start_blocks; if (blocks_count <= 0) return start_blocks; is_calc_statuses.resize(blocks_count, false); int all_block_count; for (all_block_count = 0; all_block_count < begin_block_count; ++all_block_count) { auto block = new sm::block(all_block_count); start_blocks.push_back(block); if (all_block_count + 1 == blocks_count) return start_blocks; } PIVector current_blocks = start_blocks; do { int new_blocks_count = rand() % (2 * expansion) - expansion + current_blocks.size(); if (new_blocks_count < 1) new_blocks_count = 1; new_blocks_count = new_blocks_count + all_block_count > blocks_count ? blocks_count - all_block_count : new_blocks_count; PIVector next_current_blocks; for (int i = 0; i < new_blocks_count; ++i) { auto block = new sm::block(all_block_count); next_current_blocks.push_back(block); bool is_connected = false; for (int j = 0; j < current_blocks.size(); ++j) { if (rand() % 1000 < int(connectivity * 1000)) { link(current_blocks[j], block); is_connected = true; } } if (!is_connected) link(current_blocks[0], block); current_blocks = next_current_blocks; all_block_count++; if (all_block_count == blocks_count) return start_blocks; } } while (all_block_count < blocks_count); return start_blocks; } void scheme_print(PIVector& start_blocks) { PIVector current_blocks = start_blocks; int num = 1; PIMap block_nums; for (auto & current_block : current_blocks) { block_nums[current_block] = num++; } while (current_blocks.size() > 0) { PIVector next_current_blocks; for (auto current_block : current_blocks) { PICout cout = piCout; cout.setControl(0, true); cout << current_block->is_calc_idx << "{" << current_block->calc_time.count() / 1000.f << "ms,("; for (auto & output_block : current_block->output_blocks) { if (block_nums.contains(output_block)) continue; block_nums[output_block] = num++; cout << output_block->is_calc_idx << ","; next_current_blocks.push_back(output_block); } cout << ")} "; } piCout << ""; current_blocks = next_current_blocks; } } void scheme_clear_calc_statuses(PIVector& start_blocks) { is_calc_statuses.forEachInplace([](bool is_calced){ return false; }); } void unlock(sm::block* block, int locks_count = -1) { if (locks_count == -1) locks_count = block->input_blocks.size(); for (int i = 0; i < locks_count; ++i) { block->input_blocks[i]->barrier.clear(std::memory_order_release); } block->barrier.clear(std::memory_order_release); } int try_lock(PIVector& block_pool) { for (int i = 0; i < block_pool.size(); ++i) { auto block = block_pool[i]; if (block->barrier.test_and_set(std::memory_order_acquire)) continue; int locks_count = 0; for (auto & input_block : block->input_blocks) { if (input_block->barrier.test_and_set(std::memory_order_acquire)) break; locks_count++; } if (locks_count == block->input_blocks.size()) return i; unlock(block, locks_count); } return -1; } sm::block* try_lock_next_and_post(std::atomic_flag& block_pool_flag, PIVector& block_pool, bool& is_block_pool_empty, PIVector& new_available_blocks) { while(block_pool_flag.test_and_set(std::memory_order_acquire)) { std::this_thread::yield(); } block_pool << new_available_blocks; sm::block* block = nullptr; if (block_pool.isEmpty()) { is_block_pool_empty = true; } else { is_block_pool_empty = false; int block_idx = try_lock(block_pool); if (block_idx != -1) { block = block_pool[block_idx]; block_pool.remove(block_idx); // std::cout << block->is_calc_idx << ": locked for calc" << std::endl; } } block_pool_flag.clear(std::memory_order_release); return block; } bool is_available_block(sm::block* block) { for (auto input_block : block->input_blocks) { // std::cout << input_block->is_calc_idx << ": check " << is_calc_statuses[input_block->is_calc_idx] << std::endl; if (!is_calc_statuses[input_block->is_calc_idx]) return false; } return true; } void post_available_blocks(sm::block* calc_block, PIVector& new_available_blocks) { while(is_calc_barrier.test_and_set(std::memory_order_acquire)) { std::this_thread::yield(); } // std::cout << calc_block->is_calc_idx << ": looking for new available blocks" << std::endl; is_calc_statuses[calc_block->is_calc_idx] = true; new_available_blocks.clear(); for (auto output_block : calc_block->output_blocks) { if (is_available_block(output_block)) { // std::cout << calc_block->is_calc_idx << ": new available block: " << output_block->is_calc_idx << std::endl; new_available_blocks.push_back(output_block); } } is_calc_barrier.clear(std::memory_order_release); } sm::time_report algorithm_runnable(std::atomic_flag& block_pool_flag, PIVector& block_pool, std::atomic_int& waiting_threads_flags, unsigned i, unsigned thread_count) { bool is_block_pool_empty; bool is_block_pool_empty_old; PIVector new_available_blocks; double calc_time = 0; auto all_start = std::chrono::high_resolution_clock::now(); auto all_end = std::chrono::high_resolution_clock::now(); do { auto block = try_lock_next_and_post(block_pool_flag, block_pool, is_block_pool_empty,new_available_blocks); new_available_blocks.clear(); if (is_block_pool_empty) { int waiting_threads_val; if (is_block_pool_empty_old) { waiting_threads_val = waiting_threads_flags.load(std::memory_order_acquire); } else { waiting_threads_val = waiting_threads_flags.fetch_or(1u << i, std::memory_order_acq_rel) | 1u << i; all_end = std::chrono::high_resolution_clock::now(); } // std::cout << i << " wtv=" << waiting_threads_val << std::endl; if (waiting_threads_val == (1u << thread_count) - 1) break; is_block_pool_empty_old = is_block_pool_empty; } else if (!is_block_pool_empty && is_block_pool_empty_old) { waiting_threads_flags.fetch_and(~(1u << i)); is_block_pool_empty_old = is_block_pool_empty; } if (block == nullptr) { std::this_thread::yield(); } else { auto start = std::chrono::high_resolution_clock::now(); block->calc(); auto end = std::chrono::high_resolution_clock::now(); calc_time += std::chrono::duration_cast(end - start).count() / 1000.; unlock(block); post_available_blocks(block, new_available_blocks); } } while (true); piCout << "terminating thread" << i; double all_time = std::chrono::duration_cast(all_end - all_start).count() / 1000.; return { .calc_time_ms = calc_time, .sync_time_ms = all_time - calc_time }; } std::vector> check_performance(const PIVector& start_blocks, const unsigned thread_count) { std::atomic_int waiting_threads_flags(0); std::atomic_flag block_pool_flag = ATOMIC_FLAG_INIT; PIVector block_pool = start_blocks; std::vector> duration_futures; for (unsigned i = 0; i < thread_count; ++i) { auto duration = std::async(std::launch::async, [&, i](){ return algorithm_runnable(block_pool_flag, block_pool, waiting_threads_flags, i, thread_count); }); duration_futures.push_back(std::move(duration)); } for (auto & future : duration_futures) future.wait(); return duration_futures; } void print_performance(std::vector>& duration_futures) { for (auto & future : duration_futures) future.wait(); std::cout << "durations for " << duration_futures.size() << " threads: "; double sum_sync_time = 0., sum_calc_time = 0., max_time = 0.; for (auto & future : duration_futures) { sm::time_report tr = future.get(); sum_sync_time += tr.sync_time_ms; sum_calc_time += tr.calc_time_ms; if (tr.sync_time_ms + tr.calc_time_ms > max_time) max_time = tr.sync_time_ms + tr.calc_time_ms; std::cout << "(sync=" << (int)tr.sync_time_ms << "ms,calc=" << (int)tr.calc_time_ms << "ms) "; } if (duration_futures.size() > 1) { std::cout << "sum_sync=" << (int)sum_sync_time << "ms,max_time=" << max_time << "ms"; } std::cout << std::endl; } int main() { srand(time(nullptr)); PIVector start_blocks = scheme_generate(1000, 3, 4, 0.2); // scheme_print(start_blocks); auto duration_futures = check_performance(start_blocks, 1); print_performance(duration_futures); scheme_clear_calc_statuses(start_blocks); duration_futures = check_performance(start_blocks, 6); print_performance(duration_futures); return 0; }