Files
multithread_experiments/experiments/block_choice.cpp
2020-07-13 15:24:45 +03:00

308 lines
9.9 KiB
C++

#include <pivector.h>
#include <vector>
#include <pimap.h>
#include <picout.h>
#include <future>
#include <thread>
#include <iostream>
#include <math.h>
#define assert(_Expression) \
(void) \
((!!(_Expression)) || \
(_assert(#_Expression,__FILE__,__LINE__),0))
void _assert (const char *_Message, const char *_File, unsigned _Line) {
std::cerr << "assert (" << _Message << ") failed in " << _File << " line " << _Line << std::endl;
exit(1);
}
std::atomic_flag is_calc_barrier = ATOMIC_FLAG_INIT;
PIVector<bool> is_calc_statuses;
namespace sm {
struct block {
PIVector<block*> input_blocks;
PIVector<block*> output_blocks;
std::atomic_flag barrier = ATOMIC_FLAG_INIT;
const int is_calc_idx;
const std::chrono::microseconds calc_time;
static std::chrono::microseconds random_time() {
float val = powf(rand() % 1000 / 1000.f, 20.f) * 30.f * 1000.f;
// std::cout << int(val) << std::endl;
return std::chrono::microseconds(int(val));
}
explicit block(const int is_calc_idx) : is_calc_idx(is_calc_idx), calc_time(random_time()) {}
void calc() {
std::this_thread::sleep_for(calc_time);
}
};
struct time_report {
double calc_time_ms;
double sync_time_ms;
};
}
void link(sm::block* out, sm::block* in) {
out->output_blocks.push_back(in);
in->input_blocks.push_back(out);
}
PIVector<sm::block*> scheme_generate(int blocks_count = 100, int begin_block_count = 3, int expansion_shift = 0, int expansion_d = 3, float connectivity = 0.3) {
PIVector<sm::block*> start_blocks;
if (blocks_count <= 0) return start_blocks;
is_calc_statuses.resize(blocks_count, false);
int all_block_count;
for (all_block_count = 0; all_block_count < begin_block_count; ++all_block_count) {
auto block = new sm::block(all_block_count);
start_blocks.push_back(block);
if (all_block_count + 1 == blocks_count) return start_blocks;
}
PIVector<sm::block*> current_blocks = start_blocks;
do {
int new_blocks_count = rand() % (2 * expansion_d) - expansion_d + expansion_shift + current_blocks.size();
if (new_blocks_count < 1) new_blocks_count = 1;
new_blocks_count = new_blocks_count + all_block_count > blocks_count ? blocks_count - all_block_count : new_blocks_count;
PIVector<sm::block*> next_current_blocks;
for (int i = 0; i < new_blocks_count; ++i) {
auto block = new sm::block(all_block_count);
next_current_blocks.push_back(block);
bool is_connected = false;
for (int j = 0; j < current_blocks.size(); ++j) {
if (rand() % 1000 < int(connectivity * 1000)) {
link(current_blocks[j], block);
is_connected = true;
}
}
if (!is_connected) link(current_blocks[0], block);
current_blocks = next_current_blocks;
all_block_count++;
if (all_block_count == blocks_count) return start_blocks;
}
} while (all_block_count < blocks_count);
return start_blocks;
}
void scheme_print(PIVector<sm::block*>& start_blocks) {
PIVector<sm::block*> current_blocks = start_blocks;
int num = 1;
PIMap<sm::block*, int> block_nums;
for (auto & current_block : current_blocks) {
block_nums[current_block] = num++;
}
while (current_blocks.size() > 0) {
PIVector<sm::block*> next_current_blocks;
for (auto current_block : current_blocks) {
PICout cout = piCout;
cout.setControl(0, true);
cout << current_block->is_calc_idx << "{" << current_block->calc_time.count() / 1000.f << "ms,(";
for (auto & output_block : current_block->output_blocks) {
if (block_nums.contains(output_block)) continue;
block_nums[output_block] = num++;
cout << output_block->is_calc_idx << ",";
next_current_blocks.push_back(output_block);
}
cout << ")} ";
}
piCout << "";
current_blocks = next_current_blocks;
}
}
void scheme_clear_calc_statuses(PIVector<sm::block*>& start_blocks) {
is_calc_statuses.forEachInplace([](bool is_calced){ return false; });
}
void unlock(sm::block* block, int locks_count = -1) {
if (locks_count == -1) locks_count = block->input_blocks.size();
for (int i = 0; i < locks_count; ++i) {
block->input_blocks[i]->barrier.clear(std::memory_order_release);
}
block->barrier.clear(std::memory_order_release);
}
bool try_lock(sm::block* block) {
if (block->barrier.test_and_set(std::memory_order_acquire)) return false;
int locks_count = 0;
for (auto & input_block : block->input_blocks) {
if (input_block->barrier.test_and_set(std::memory_order_acquire)) break;
locks_count++;
}
if (locks_count == block->input_blocks.size()) {
return true;
} else {
unlock(block, locks_count);
return false;
}
}
sm::block* try_lock_next_and_post(std::atomic_flag& block_pool_flag, PIVector<sm::block*>& block_pool, bool& is_block_pool_empty, PIVector<sm::block*>& new_available_blocks) {
while(block_pool_flag.test_and_set(std::memory_order_acquire)) {
std::this_thread::yield();
}
block_pool << new_available_blocks;
sm::block* block = nullptr;
if (block_pool.isEmpty()) {
is_block_pool_empty = true;
} else {
is_block_pool_empty = false;
for (int i = 0; i < block_pool.size(); ++i) {
if (try_lock(block_pool[i])) {
block = block_pool[i];
block_pool.remove(i);
// std::cout << block->is_calc_idx << ": locked for calc" << std::endl;
break;
}
}
}
block_pool_flag.clear(std::memory_order_release);
return block;
}
bool is_available_block(sm::block* block) {
for (auto input_block : block->input_blocks) {
// std::cout << input_block->is_calc_idx << ": check " << is_calc_statuses[input_block->is_calc_idx] << std::endl;
if (!is_calc_statuses[input_block->is_calc_idx]) return false;
}
return true;
}
void post_available_blocks(sm::block* calc_block, PIVector<sm::block*>& new_available_blocks) {
while(is_calc_barrier.test_and_set(std::memory_order_acquire)) {
std::this_thread::yield();
}
// std::cout << calc_block->is_calc_idx << ": looking for new available blocks" << std::endl;
is_calc_statuses[calc_block->is_calc_idx] = true;
new_available_blocks.clear();
for (auto output_block : calc_block->output_blocks) {
if (is_available_block(output_block)) {
// std::cout << calc_block->is_calc_idx << ": new available block: " << output_block->is_calc_idx << std::endl;
new_available_blocks.push_back(output_block);
}
}
is_calc_barrier.clear(std::memory_order_release);
}
sm::time_report algorithm_runnable(std::atomic_flag& block_pool_flag,
PIVector<sm::block*>& block_pool,
std::atomic_int& waiting_threads_flags,
unsigned i, unsigned thread_count) {
bool is_block_pool_empty;
bool is_block_pool_empty_old;
PIVector<sm::block*> new_available_blocks;
double calc_time = 0;
auto all_start = std::chrono::high_resolution_clock::now();
auto all_end = std::chrono::high_resolution_clock::now();
do {
auto block = try_lock_next_and_post(block_pool_flag, block_pool, is_block_pool_empty,new_available_blocks);
new_available_blocks.clear();
if (is_block_pool_empty) {
int waiting_threads_val;
if (is_block_pool_empty_old) {
waiting_threads_val = waiting_threads_flags.load(std::memory_order_acquire);
} else {
waiting_threads_val = waiting_threads_flags.fetch_or(1u << i, std::memory_order_acq_rel) | 1u << i;
all_end = std::chrono::high_resolution_clock::now();
}
// std::cout << i << " wtv=" << waiting_threads_val << std::endl;
if (waiting_threads_val == (1u << thread_count) - 1) break;
is_block_pool_empty_old = is_block_pool_empty;
} else if (!is_block_pool_empty && is_block_pool_empty_old) {
waiting_threads_flags.fetch_and(~(1u << i));
is_block_pool_empty_old = is_block_pool_empty;
}
if (block == nullptr) {
std::this_thread::yield();
} else {
auto start = std::chrono::high_resolution_clock::now();
block->calc();
auto end = std::chrono::high_resolution_clock::now();
calc_time += std::chrono::duration_cast<std::chrono::microseconds>(end - start).count() / 1000.;
unlock(block);
post_available_blocks(block, new_available_blocks);
}
} while (true);
piCout << "terminating thread" << i;
double all_time = std::chrono::duration_cast<std::chrono::microseconds>(all_end - all_start).count() / 1000.;
return { .calc_time_ms = calc_time, .sync_time_ms = all_time - calc_time };
}
std::vector<std::future<sm::time_report>> check_performance(const PIVector<sm::block*>& start_blocks, const unsigned thread_count) {
std::atomic_int waiting_threads_flags(0);
std::atomic_flag block_pool_flag = ATOMIC_FLAG_INIT;
PIVector<sm::block*> block_pool = start_blocks;
std::vector<std::future<sm::time_report>> duration_futures;
for (unsigned i = 0; i < thread_count; ++i) {
auto duration = std::async(std::launch::async, [&, i](){
return algorithm_runnable(block_pool_flag, block_pool, waiting_threads_flags, i, thread_count);
});
duration_futures.push_back(std::move(duration));
}
for (auto & future : duration_futures) future.wait();
return duration_futures;
}
void print_performance(std::vector<std::future<sm::time_report>>& duration_futures) {
for (auto & future : duration_futures) future.wait();
std::cout << "durations for " << duration_futures.size() << " threads: ";
double sum_sync_time = 0., sum_calc_time = 0., max_time = 0.;
for (auto & future : duration_futures) {
sm::time_report tr = future.get();
sum_sync_time += tr.sync_time_ms;
sum_calc_time += tr.calc_time_ms;
if (tr.sync_time_ms + tr.calc_time_ms > max_time) max_time = tr.sync_time_ms + tr.calc_time_ms;
std::cout << "(sync=" << (int)tr.sync_time_ms << "ms,calc=" << (int)tr.calc_time_ms << "ms) ";
}
if (duration_futures.size() > 1) {
std::cout << "sum_sync=" << (int)sum_sync_time << "ms,max_time=" << max_time << "ms";
}
std::cout << std::endl;
}
int main() {
srand(time(nullptr));
PIVector<sm::block*> start_blocks = scheme_generate(100, 3, 1, 3, 0.1);
scheme_print(start_blocks);
auto duration_futures = check_performance(start_blocks, 1);
print_performance(duration_futures);
scheme_clear_calc_statuses(start_blocks);
duration_futures = check_performance(start_blocks, 6);
print_performance(duration_futures);
return 0;
}