From efeed060260e8d400672bcf31053135de2b0992f Mon Sep 17 00:00:00 2001 From: Stepan Fomenko Date: Mon, 13 Jul 2020 13:14:40 +0300 Subject: [PATCH] Working block sync algorithm --- experiments/block_choice.cpp | 164 +++++++++++++++++++++++------------ 1 file changed, 107 insertions(+), 57 deletions(-) diff --git a/experiments/block_choice.cpp b/experiments/block_choice.cpp index 1285efa..dbb07ff 100644 --- a/experiments/block_choice.cpp +++ b/experiments/block_choice.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #define assert(_Expression) \ (void) \ @@ -25,14 +26,25 @@ namespace sm { PIVector output_blocks; std::atomic_flag barrier = ATOMIC_FLAG_INIT; const int is_calc_idx; + const std::chrono::microseconds calc_time; - block(const int isCalcIdx) : is_calc_idx(isCalcIdx) {} + static std::chrono::microseconds random_time() { + float val = powf(rand() % 1000 / 1000.f, 10.f) * 100.f * 1000.f; +// std::cout << int(val) << std::endl; + return std::chrono::microseconds(int(val)); + } + + explicit block(const int is_calc_idx) : is_calc_idx(is_calc_idx), calc_time(random_time()) {} void calc() { - std::this_thread::sleep_for(std::chrono::microseconds(rand() % 20)); + std::this_thread::sleep_for(calc_time); } }; + struct time_report { + double calc_time_ms; + double sync_time_ms; + }; } void link(sm::block* out, sm::block* in) { @@ -40,7 +52,7 @@ void link(sm::block* out, sm::block* in) { in->input_blocks.push_back(out); } -PIVector generate_scheme(int blocks_count = 100, int begin_block_count = 3, int expansion = 3, float connectivity = 0.3) { +PIVector scheme_generate(int blocks_count = 100, int begin_block_count = 3, int expansion = 3, float connectivity = 0.3) { PIVector start_blocks; if (blocks_count <= 0) return start_blocks; is_calc_statuses.resize(blocks_count, false); @@ -81,7 +93,7 @@ PIVector generate_scheme(int blocks_count = 100, int begin_block_cou return start_blocks; } -void print_scheme(PIVector& start_blocks) { +void scheme_print(PIVector& start_blocks) { PIVector current_blocks = start_blocks; int num = 1; @@ -96,20 +108,24 @@ void print_scheme(PIVector& start_blocks) { for (auto current_block : current_blocks) { PICout cout = piCout; cout.setControl(0, true); - cout << current_block->is_calc_idx << "("; + cout << current_block->is_calc_idx << "{" << current_block->calc_time.count() / 1000.f << "ms,("; for (auto & output_block : current_block->output_blocks) { if (block_nums.contains(output_block)) continue; block_nums[output_block] = num++; cout << output_block->is_calc_idx << ","; next_current_blocks.push_back(output_block); } - cout << ") "; + cout << ")} "; } piCout << ""; current_blocks = next_current_blocks; } } +void scheme_clear_calc_statuses(PIVector& start_blocks) { + is_calc_statuses.forEachInplace([](bool is_calced){ return false; }); +} + void unlock(sm::block* block, int locks_count = -1) { if (locks_count == -1) locks_count = block->input_blocks.size(); for (int i = 0; i < locks_count; ++i) { @@ -186,69 +202,103 @@ void post_available_blocks(sm::block* calc_block, PIVector& new_avai is_calc_barrier.clear(std::memory_order_release); } -int main() { - srand(time(nullptr)); +sm::time_report algorithm_runnable(std::atomic_flag& block_pool_flag, + PIVector& block_pool, + std::atomic_int& waiting_threads_flags, + unsigned i, unsigned thread_count) { + bool is_block_pool_empty; + bool is_block_pool_empty_old; + PIVector new_available_blocks; - PIVector start_blocks = generate_scheme(100); - print_scheme(start_blocks); + double calc_time = 0; - std::atomic_int waiting_threads_flags; + auto all_start = std::chrono::high_resolution_clock::now(); + auto all_end = std::chrono::high_resolution_clock::now(); + do { + auto block = try_lock_next_and_post(block_pool_flag, block_pool, is_block_pool_empty,new_available_blocks); + new_available_blocks.clear(); + + if (is_block_pool_empty) { + int waiting_threads_val; + if (is_block_pool_empty_old) { + waiting_threads_val = waiting_threads_flags.load(std::memory_order_acquire); + } else { + waiting_threads_val = waiting_threads_flags.fetch_or(1u << i, std::memory_order_acq_rel) | 1u << i; + all_end = std::chrono::high_resolution_clock::now(); + } +// std::cout << i << " wtv=" << waiting_threads_val << std::endl; + if (waiting_threads_val == (1u << thread_count) - 1) break; + is_block_pool_empty_old = is_block_pool_empty; + } else if (!is_block_pool_empty && is_block_pool_empty_old) { + waiting_threads_flags.fetch_and(~(1u << i)); + is_block_pool_empty_old = is_block_pool_empty; + } + + if (block == nullptr) { + std::this_thread::yield(); + } else { + auto start = std::chrono::high_resolution_clock::now(); + block->calc(); + auto end = std::chrono::high_resolution_clock::now(); + calc_time += std::chrono::duration_cast(end - start).count() / 1000.; + unlock(block); + + post_available_blocks(block, new_available_blocks); + } + + } while (true); + piCout << "terminating thread" << i; + + double all_time = std::chrono::duration_cast(all_end - all_start).count() / 1000.; + return { .calc_time_ms = calc_time, .sync_time_ms = all_time - calc_time }; +} + +std::vector> check_performance(const PIVector& start_blocks, const unsigned thread_count) { + std::atomic_int waiting_threads_flags(0); std::atomic_flag block_pool_flag = ATOMIC_FLAG_INIT; PIVector block_pool = start_blocks; - const unsigned THREAD_COUNT = 6; - std::vector> duration_futures; - for (unsigned i = 0; i < THREAD_COUNT; ++i) { - auto duration = std::async(std::launch::deferred, [&, i](){ - bool is_block_pool_empty; - bool is_block_pool_empty_old; - PIVector new_available_blocks; - - double calc_time = 0; - - auto all_start = std::chrono::high_resolution_clock::now(); - do { - auto block = try_lock_next_and_post(block_pool_flag, block_pool, is_block_pool_empty, - new_available_blocks); - new_available_blocks.clear(); - - if (is_block_pool_empty && !is_block_pool_empty_old) { - int waiting_threads_val = waiting_threads_flags.fetch_or(1u << i) | 1u << i; -// std::cout << i << " wtv=" << waiting_threads_val << std::endl; - if (waiting_threads_val == (1u << THREAD_COUNT) - 1) break; - is_block_pool_empty_old = is_block_pool_empty; - } else if (!is_block_pool_empty && is_block_pool_empty_old) { - waiting_threads_flags.fetch_and(~(1u << i)); - is_block_pool_empty_old = is_block_pool_empty; - } - - if (block == nullptr) { - std::this_thread::yield(); - } else { - auto start = std::chrono::high_resolution_clock::now(); - block->calc(); - auto end = std::chrono::high_resolution_clock::now(); - calc_time += std::chrono::duration_cast(end - start).count() / 1000.; - unlock(block); - - post_available_blocks(block, new_available_blocks); - } - - } while (true); - std::cout << "terminating thread " << i << std::endl; - auto all_end = std::chrono::high_resolution_clock::now(); - - double all_time = std::chrono::duration_cast(all_end - all_start).count() / 1000.f; - return all_time - calc_time; + std::vector> duration_futures; + for (unsigned i = 0; i < thread_count; ++i) { + auto duration = std::async(std::launch::async, [&, i](){ + return algorithm_runnable(block_pool_flag, block_pool, waiting_threads_flags, i, thread_count); }); duration_futures.push_back(std::move(duration)); } for (auto & future : duration_futures) future.wait(); + return duration_futures; +} - std::cout << "Sync durations: "; - for (auto & future : duration_futures) std::cout << future.get() << "ms "; +void print_performance(std::vector>& duration_futures) { + for (auto & future : duration_futures) future.wait(); + std::cout << "durations for " << duration_futures.size() << " threads: "; + double sum_sync_time = 0., sum_calc_time = 0.; + for (auto & future : duration_futures) { + sm::time_report tr = future.get(); + sum_sync_time += tr.sync_time_ms; + sum_calc_time += tr.calc_time_ms; + std::cout << "(sync=" << (int)tr.sync_time_ms << "ms,calc=" << (int)tr.calc_time_ms << "ms) "; + } + if (duration_futures.size() > 1) { + std::cout << "sum_sync=" << (int)sum_sync_time << "ms,sum_calc=" << (int)sum_calc_time << "ms"; + } std::cout << std::endl; +} + +int main() { + srand(time(nullptr)); + + PIVector start_blocks = scheme_generate(200, 3, 4); + scheme_print(start_blocks); + + auto duration_futures = check_performance(start_blocks, 1); + print_performance(duration_futures); + + scheme_clear_calc_statuses(start_blocks); + + duration_futures = check_performance(start_blocks, 6); + print_performance(duration_futures); return 0; }