diff --git a/CMakeLists.txt b/CMakeLists.txt index 7143834..ef8660b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -47,4 +47,10 @@ add_executable(mutex experiments/mutex.cpp) target_link_libraries(mutex ${PIP_LIBRARY} ${PIP_CONCURRENT_LIBRARY}) add_executable(mutex_multithread experiments/mutex_multithread.cpp) -target_link_libraries(mutex_multithread ${PIP_LIBRARY} ${PIP_CONCURRENT_LIBRARY}) \ No newline at end of file +target_link_libraries(mutex_multithread ${PIP_LIBRARY} ${PIP_CONCURRENT_LIBRARY}) + +add_executable(vectors experiments/vectors.cpp) +target_link_libraries(vectors ${PIP_LIBRARY}) + +add_executable(block_choice experiments/block_choice.cpp) +target_link_libraries(block_choice ${PIP_LIBRARY}) \ No newline at end of file diff --git a/experiments/block_choice.cpp b/experiments/block_choice.cpp new file mode 100644 index 0000000..1285efa --- /dev/null +++ b/experiments/block_choice.cpp @@ -0,0 +1,254 @@ +#include +#include +#include +#include +#include +#include +#include + +#define assert(_Expression) \ + (void) \ + ((!!(_Expression)) || \ + (_assert(#_Expression,__FILE__,__LINE__),0)) + +void _assert (const char *_Message, const char *_File, unsigned _Line) { + std::cerr << "assert (" << _Message << ") failed in " << _File << " line " << _Line << std::endl; + exit(1); +} + +std::atomic_flag is_calc_barrier = ATOMIC_FLAG_INIT; +PIVector is_calc_statuses; + +namespace sm { + struct block { + PIVector input_blocks; + PIVector output_blocks; + std::atomic_flag barrier = ATOMIC_FLAG_INIT; + const int is_calc_idx; + + block(const int isCalcIdx) : is_calc_idx(isCalcIdx) {} + + void calc() { + std::this_thread::sleep_for(std::chrono::microseconds(rand() % 20)); + } + }; + +} + +void link(sm::block* out, sm::block* in) { + out->output_blocks.push_back(in); + in->input_blocks.push_back(out); +} + +PIVector generate_scheme(int blocks_count = 100, int begin_block_count = 3, int expansion = 3, float connectivity = 0.3) { + PIVector start_blocks; + if (blocks_count <= 0) return start_blocks; + is_calc_statuses.resize(blocks_count, false); + + int all_block_count; + for (all_block_count = 0; all_block_count < begin_block_count; ++all_block_count) { + auto block = new sm::block(all_block_count); + start_blocks.push_back(block); + if (all_block_count + 1 == blocks_count) return start_blocks; + } + + PIVector current_blocks = start_blocks; + do { + int new_blocks_count = rand() % (2 * expansion) - expansion + current_blocks.size(); + if (new_blocks_count < 1) new_blocks_count = 1; + new_blocks_count = new_blocks_count + all_block_count > blocks_count ? blocks_count - all_block_count : new_blocks_count; + + PIVector next_current_blocks; + for (int i = 0; i < new_blocks_count; ++i) { + auto block = new sm::block(all_block_count); + next_current_blocks.push_back(block); + + bool is_connected = false; + for (int j = 0; j < current_blocks.size(); ++j) { + if (rand() % 1000 < int(connectivity * 1000)) { + link(current_blocks[j], block); + is_connected = true; + } + } + if (!is_connected) link(current_blocks[0], block); + current_blocks = next_current_blocks; + + all_block_count++; + if (all_block_count == blocks_count) return start_blocks; + } + } while (all_block_count < blocks_count); + + return start_blocks; +} + +void print_scheme(PIVector& start_blocks) { + PIVector current_blocks = start_blocks; + + int num = 1; + PIMap block_nums; + + for (auto & current_block : current_blocks) { + block_nums[current_block] = num++; + } + + while (current_blocks.size() > 0) { + PIVector next_current_blocks; + for (auto current_block : current_blocks) { + PICout cout = piCout; + cout.setControl(0, true); + cout << current_block->is_calc_idx << "("; + for (auto & output_block : current_block->output_blocks) { + if (block_nums.contains(output_block)) continue; + block_nums[output_block] = num++; + cout << output_block->is_calc_idx << ","; + next_current_blocks.push_back(output_block); + } + cout << ") "; + } + piCout << ""; + current_blocks = next_current_blocks; + } +} + +void unlock(sm::block* block, int locks_count = -1) { + if (locks_count == -1) locks_count = block->input_blocks.size(); + for (int i = 0; i < locks_count; ++i) { + block->input_blocks[i]->barrier.clear(std::memory_order_release); + } + block->barrier.clear(std::memory_order_release); +} + +int try_lock(PIVector& block_pool) { + for (int i = 0; i < block_pool.size(); ++i) { + auto block = block_pool[i]; + if (block->barrier.test_and_set(std::memory_order_acquire)) continue; + + int locks_count = 0; + for (auto & input_block : block->input_blocks) { + if (input_block->barrier.test_and_set(std::memory_order_acquire)) break; + locks_count++; + } + if (locks_count == block->input_blocks.size()) return i; + + unlock(block, locks_count); + } + return -1; +} + +sm::block* try_lock_next_and_post(std::atomic_flag& block_pool_flag, PIVector& block_pool, bool& is_block_pool_empty, PIVector& new_available_blocks) { + while(block_pool_flag.test_and_set(std::memory_order_acquire)) { + std::this_thread::yield(); + } + + block_pool << new_available_blocks; + + sm::block* block = nullptr; + if (block_pool.isEmpty()) { + is_block_pool_empty = true; + } else { + is_block_pool_empty = false; + int block_idx = try_lock(block_pool); + if (block_idx != -1) { + block = block_pool[block_idx]; + block_pool.remove(block_idx); +// std::cout << block->is_calc_idx << ": locked for calc" << std::endl; + } + } + + block_pool_flag.clear(std::memory_order_release); + + return block; +} + +bool is_available_block(sm::block* block) { + for (auto input_block : block->input_blocks) { +// std::cout << input_block->is_calc_idx << ": check " << is_calc_statuses[input_block->is_calc_idx] << std::endl; + if (!is_calc_statuses[input_block->is_calc_idx]) return false; + } + return true; +} + +void post_available_blocks(sm::block* calc_block, PIVector& new_available_blocks) { + while(is_calc_barrier.test_and_set(std::memory_order_acquire)) { + std::this_thread::yield(); + } + +// std::cout << calc_block->is_calc_idx << ": looking for new available blocks" << std::endl; + is_calc_statuses[calc_block->is_calc_idx] = true; + new_available_blocks.clear(); + for (auto output_block : calc_block->output_blocks) { + if (is_available_block(output_block)) { +// std::cout << calc_block->is_calc_idx << ": new available block: " << output_block->is_calc_idx << std::endl; + new_available_blocks.push_back(output_block); + } + } + + is_calc_barrier.clear(std::memory_order_release); +} + +int main() { + srand(time(nullptr)); + + PIVector start_blocks = generate_scheme(100); + print_scheme(start_blocks); + + std::atomic_int waiting_threads_flags; + std::atomic_flag block_pool_flag = ATOMIC_FLAG_INIT; + PIVector block_pool = start_blocks; + + const unsigned THREAD_COUNT = 6; + std::vector> duration_futures; + for (unsigned i = 0; i < THREAD_COUNT; ++i) { + auto duration = std::async(std::launch::deferred, [&, i](){ + bool is_block_pool_empty; + bool is_block_pool_empty_old; + PIVector new_available_blocks; + + double calc_time = 0; + + auto all_start = std::chrono::high_resolution_clock::now(); + do { + auto block = try_lock_next_and_post(block_pool_flag, block_pool, is_block_pool_empty, + new_available_blocks); + new_available_blocks.clear(); + + if (is_block_pool_empty && !is_block_pool_empty_old) { + int waiting_threads_val = waiting_threads_flags.fetch_or(1u << i) | 1u << i; +// std::cout << i << " wtv=" << waiting_threads_val << std::endl; + if (waiting_threads_val == (1u << THREAD_COUNT) - 1) break; + is_block_pool_empty_old = is_block_pool_empty; + } else if (!is_block_pool_empty && is_block_pool_empty_old) { + waiting_threads_flags.fetch_and(~(1u << i)); + is_block_pool_empty_old = is_block_pool_empty; + } + + if (block == nullptr) { + std::this_thread::yield(); + } else { + auto start = std::chrono::high_resolution_clock::now(); + block->calc(); + auto end = std::chrono::high_resolution_clock::now(); + calc_time += std::chrono::duration_cast(end - start).count() / 1000.; + unlock(block); + + post_available_blocks(block, new_available_blocks); + } + + } while (true); + std::cout << "terminating thread " << i << std::endl; + auto all_end = std::chrono::high_resolution_clock::now(); + + double all_time = std::chrono::duration_cast(all_end - all_start).count() / 1000.f; + return all_time - calc_time; + }); + duration_futures.push_back(std::move(duration)); + } + + for (auto & future : duration_futures) future.wait(); + + std::cout << "Sync durations: "; + for (auto & future : duration_futures) std::cout << future.get() << "ms "; + std::cout << std::endl; + + return 0; +} diff --git a/experiments/mutex.cpp b/experiments/mutex.cpp index 50d66a0..bdf0bba 100644 --- a/experiments/mutex.cpp +++ b/experiments/mutex.cpp @@ -46,7 +46,7 @@ int main() { }); piCout << "stdMutex:" << stdMutexPerformance.get() << "ms"; - std::atomic_flag stdAtomic; + std::atomic_flag stdAtomic = ATOMIC_FLAG_INIT; auto stdAtomicPerformance = check_performance([&stdAtomic](){ while(stdAtomic.test_and_set(std::memory_order_acquire)) { std::this_thread::yield(); diff --git a/experiments/mutex_multithread.cpp b/experiments/mutex_multithread.cpp index 73d9bab..b623421 100644 --- a/experiments/mutex_multithread.cpp +++ b/experiments/mutex_multithread.cpp @@ -57,7 +57,7 @@ int main() { }); piCout << "stdMutex:" << stdMutexPerformance << "ms"; - std::atomic_flag stdAtomic; + std::atomic_flag stdAtomic = ATOMIC_FLAG_INIT; auto stdAtomicPerformance = check_performance([&stdAtomic](long& k){ while(stdAtomic.test_and_set(std::memory_order_acquire)) { std::this_thread::yield(); diff --git a/experiments/vectors.cpp b/experiments/vectors.cpp new file mode 100644 index 0000000..c25993c --- /dev/null +++ b/experiments/vectors.cpp @@ -0,0 +1,50 @@ +#include +#include +#include +#include + +template +float check_performance(Func test_function) { + auto start = std::chrono::high_resolution_clock::now(); + for (int i = 0; i < 1000; ++i) { + test_function(); + } + auto end = std::chrono::high_resolution_clock::now(); + return std::chrono::duration_cast(end - start).count() / 1000.f; +} + +int main() { + float piVectorPerformance = check_performance([](){ + PIVector piVector; + for (int i = 0; i < 1000; ++i) { + piVector.push_back(i); + } + }); + piCout << "piVector without preallocation:" << piVectorPerformance << "ms"; + + float stdVectorPerformance = check_performance([](){ + std::vector stdVector; + for (int i = 0; i < 1000; ++i) { + stdVector.push_back(i); + } + }); + piCout << "stdVector without preallocation:" << stdVectorPerformance << "ms"; + + std::unique_ptr > stdVector(new std::vector(1000)); + stdVectorPerformance = check_performance(std::bind([](std::unique_ptr >& stdVector){ + for (int i = 0; i < 1000; ++i) { + stdVector->at(i) = i; + } + }, std::move(stdVector))); + piCout << "stdVector with preallocation:" << stdVectorPerformance << "ms"; + + std::unique_ptr array(new int[1000]); + float arrayPerformance = check_performance(std::bind([](std::unique_ptr& array){ + for (int i = 0; i < 1000; ++i) { + array.get()[i] = i; + } + }, std::move(array))); + piCout << "array:" << arrayPerformance << "ms"; + + return 0; +}