Working block sync algorithm
This commit is contained in:
@@ -5,6 +5,7 @@
|
|||||||
#include <future>
|
#include <future>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
#include <math.h>
|
||||||
|
|
||||||
#define assert(_Expression) \
|
#define assert(_Expression) \
|
||||||
(void) \
|
(void) \
|
||||||
@@ -25,14 +26,25 @@ namespace sm {
|
|||||||
PIVector<block*> output_blocks;
|
PIVector<block*> output_blocks;
|
||||||
std::atomic_flag barrier = ATOMIC_FLAG_INIT;
|
std::atomic_flag barrier = ATOMIC_FLAG_INIT;
|
||||||
const int is_calc_idx;
|
const int is_calc_idx;
|
||||||
|
const std::chrono::microseconds calc_time;
|
||||||
|
|
||||||
block(const int isCalcIdx) : is_calc_idx(isCalcIdx) {}
|
static std::chrono::microseconds random_time() {
|
||||||
|
float val = powf(rand() % 1000 / 1000.f, 10.f) * 100.f * 1000.f;
|
||||||
|
// std::cout << int(val) << std::endl;
|
||||||
|
return std::chrono::microseconds(int(val));
|
||||||
|
}
|
||||||
|
|
||||||
|
explicit block(const int is_calc_idx) : is_calc_idx(is_calc_idx), calc_time(random_time()) {}
|
||||||
|
|
||||||
void calc() {
|
void calc() {
|
||||||
std::this_thread::sleep_for(std::chrono::microseconds(rand() % 20));
|
std::this_thread::sleep_for(calc_time);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct time_report {
|
||||||
|
double calc_time_ms;
|
||||||
|
double sync_time_ms;
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
void link(sm::block* out, sm::block* in) {
|
void link(sm::block* out, sm::block* in) {
|
||||||
@@ -40,7 +52,7 @@ void link(sm::block* out, sm::block* in) {
|
|||||||
in->input_blocks.push_back(out);
|
in->input_blocks.push_back(out);
|
||||||
}
|
}
|
||||||
|
|
||||||
PIVector<sm::block*> generate_scheme(int blocks_count = 100, int begin_block_count = 3, int expansion = 3, float connectivity = 0.3) {
|
PIVector<sm::block*> scheme_generate(int blocks_count = 100, int begin_block_count = 3, int expansion = 3, float connectivity = 0.3) {
|
||||||
PIVector<sm::block*> start_blocks;
|
PIVector<sm::block*> start_blocks;
|
||||||
if (blocks_count <= 0) return start_blocks;
|
if (blocks_count <= 0) return start_blocks;
|
||||||
is_calc_statuses.resize(blocks_count, false);
|
is_calc_statuses.resize(blocks_count, false);
|
||||||
@@ -81,7 +93,7 @@ PIVector<sm::block*> generate_scheme(int blocks_count = 100, int begin_block_cou
|
|||||||
return start_blocks;
|
return start_blocks;
|
||||||
}
|
}
|
||||||
|
|
||||||
void print_scheme(PIVector<sm::block*>& start_blocks) {
|
void scheme_print(PIVector<sm::block*>& start_blocks) {
|
||||||
PIVector<sm::block*> current_blocks = start_blocks;
|
PIVector<sm::block*> current_blocks = start_blocks;
|
||||||
|
|
||||||
int num = 1;
|
int num = 1;
|
||||||
@@ -96,20 +108,24 @@ void print_scheme(PIVector<sm::block*>& start_blocks) {
|
|||||||
for (auto current_block : current_blocks) {
|
for (auto current_block : current_blocks) {
|
||||||
PICout cout = piCout;
|
PICout cout = piCout;
|
||||||
cout.setControl(0, true);
|
cout.setControl(0, true);
|
||||||
cout << current_block->is_calc_idx << "(";
|
cout << current_block->is_calc_idx << "{" << current_block->calc_time.count() / 1000.f << "ms,(";
|
||||||
for (auto & output_block : current_block->output_blocks) {
|
for (auto & output_block : current_block->output_blocks) {
|
||||||
if (block_nums.contains(output_block)) continue;
|
if (block_nums.contains(output_block)) continue;
|
||||||
block_nums[output_block] = num++;
|
block_nums[output_block] = num++;
|
||||||
cout << output_block->is_calc_idx << ",";
|
cout << output_block->is_calc_idx << ",";
|
||||||
next_current_blocks.push_back(output_block);
|
next_current_blocks.push_back(output_block);
|
||||||
}
|
}
|
||||||
cout << ") ";
|
cout << ")} ";
|
||||||
}
|
}
|
||||||
piCout << "";
|
piCout << "";
|
||||||
current_blocks = next_current_blocks;
|
current_blocks = next_current_blocks;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void scheme_clear_calc_statuses(PIVector<sm::block*>& start_blocks) {
|
||||||
|
is_calc_statuses.forEachInplace([](bool is_calced){ return false; });
|
||||||
|
}
|
||||||
|
|
||||||
void unlock(sm::block* block, int locks_count = -1) {
|
void unlock(sm::block* block, int locks_count = -1) {
|
||||||
if (locks_count == -1) locks_count = block->input_blocks.size();
|
if (locks_count == -1) locks_count = block->input_blocks.size();
|
||||||
for (int i = 0; i < locks_count; ++i) {
|
for (int i = 0; i < locks_count; ++i) {
|
||||||
@@ -186,20 +202,10 @@ void post_available_blocks(sm::block* calc_block, PIVector<sm::block*>& new_avai
|
|||||||
is_calc_barrier.clear(std::memory_order_release);
|
is_calc_barrier.clear(std::memory_order_release);
|
||||||
}
|
}
|
||||||
|
|
||||||
int main() {
|
sm::time_report algorithm_runnable(std::atomic_flag& block_pool_flag,
|
||||||
srand(time(nullptr));
|
PIVector<sm::block*>& block_pool,
|
||||||
|
std::atomic_int& waiting_threads_flags,
|
||||||
PIVector<sm::block*> start_blocks = generate_scheme(100);
|
unsigned i, unsigned thread_count) {
|
||||||
print_scheme(start_blocks);
|
|
||||||
|
|
||||||
std::atomic_int waiting_threads_flags;
|
|
||||||
std::atomic_flag block_pool_flag = ATOMIC_FLAG_INIT;
|
|
||||||
PIVector<sm::block*> block_pool = start_blocks;
|
|
||||||
|
|
||||||
const unsigned THREAD_COUNT = 6;
|
|
||||||
std::vector<std::future<double>> duration_futures;
|
|
||||||
for (unsigned i = 0; i < THREAD_COUNT; ++i) {
|
|
||||||
auto duration = std::async(std::launch::deferred, [&, i](){
|
|
||||||
bool is_block_pool_empty;
|
bool is_block_pool_empty;
|
||||||
bool is_block_pool_empty_old;
|
bool is_block_pool_empty_old;
|
||||||
PIVector<sm::block*> new_available_blocks;
|
PIVector<sm::block*> new_available_blocks;
|
||||||
@@ -207,15 +213,21 @@ int main() {
|
|||||||
double calc_time = 0;
|
double calc_time = 0;
|
||||||
|
|
||||||
auto all_start = std::chrono::high_resolution_clock::now();
|
auto all_start = std::chrono::high_resolution_clock::now();
|
||||||
|
auto all_end = std::chrono::high_resolution_clock::now();
|
||||||
do {
|
do {
|
||||||
auto block = try_lock_next_and_post(block_pool_flag, block_pool, is_block_pool_empty,
|
auto block = try_lock_next_and_post(block_pool_flag, block_pool, is_block_pool_empty,new_available_blocks);
|
||||||
new_available_blocks);
|
|
||||||
new_available_blocks.clear();
|
new_available_blocks.clear();
|
||||||
|
|
||||||
if (is_block_pool_empty && !is_block_pool_empty_old) {
|
if (is_block_pool_empty) {
|
||||||
int waiting_threads_val = waiting_threads_flags.fetch_or(1u << i) | 1u << i;
|
int waiting_threads_val;
|
||||||
|
if (is_block_pool_empty_old) {
|
||||||
|
waiting_threads_val = waiting_threads_flags.load(std::memory_order_acquire);
|
||||||
|
} else {
|
||||||
|
waiting_threads_val = waiting_threads_flags.fetch_or(1u << i, std::memory_order_acq_rel) | 1u << i;
|
||||||
|
all_end = std::chrono::high_resolution_clock::now();
|
||||||
|
}
|
||||||
// std::cout << i << " wtv=" << waiting_threads_val << std::endl;
|
// std::cout << i << " wtv=" << waiting_threads_val << std::endl;
|
||||||
if (waiting_threads_val == (1u << THREAD_COUNT) - 1) break;
|
if (waiting_threads_val == (1u << thread_count) - 1) break;
|
||||||
is_block_pool_empty_old = is_block_pool_empty;
|
is_block_pool_empty_old = is_block_pool_empty;
|
||||||
} else if (!is_block_pool_empty && is_block_pool_empty_old) {
|
} else if (!is_block_pool_empty && is_block_pool_empty_old) {
|
||||||
waiting_threads_flags.fetch_and(~(1u << i));
|
waiting_threads_flags.fetch_and(~(1u << i));
|
||||||
@@ -235,20 +247,58 @@ int main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
} while (true);
|
} while (true);
|
||||||
std::cout << "terminating thread " << i << std::endl;
|
piCout << "terminating thread" << i;
|
||||||
auto all_end = std::chrono::high_resolution_clock::now();
|
|
||||||
|
|
||||||
double all_time = std::chrono::duration_cast<std::chrono::microseconds>(all_end - all_start).count() / 1000.f;
|
double all_time = std::chrono::duration_cast<std::chrono::microseconds>(all_end - all_start).count() / 1000.;
|
||||||
return all_time - calc_time;
|
return { .calc_time_ms = calc_time, .sync_time_ms = all_time - calc_time };
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<std::future<sm::time_report>> check_performance(const PIVector<sm::block*>& start_blocks, const unsigned thread_count) {
|
||||||
|
std::atomic_int waiting_threads_flags(0);
|
||||||
|
std::atomic_flag block_pool_flag = ATOMIC_FLAG_INIT;
|
||||||
|
PIVector<sm::block*> block_pool = start_blocks;
|
||||||
|
|
||||||
|
std::vector<std::future<sm::time_report>> duration_futures;
|
||||||
|
for (unsigned i = 0; i < thread_count; ++i) {
|
||||||
|
auto duration = std::async(std::launch::async, [&, i](){
|
||||||
|
return algorithm_runnable(block_pool_flag, block_pool, waiting_threads_flags, i, thread_count);
|
||||||
});
|
});
|
||||||
duration_futures.push_back(std::move(duration));
|
duration_futures.push_back(std::move(duration));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto & future : duration_futures) future.wait();
|
for (auto & future : duration_futures) future.wait();
|
||||||
|
return duration_futures;
|
||||||
|
}
|
||||||
|
|
||||||
std::cout << "Sync durations: ";
|
void print_performance(std::vector<std::future<sm::time_report>>& duration_futures) {
|
||||||
for (auto & future : duration_futures) std::cout << future.get() << "ms ";
|
for (auto & future : duration_futures) future.wait();
|
||||||
|
std::cout << "durations for " << duration_futures.size() << " threads: ";
|
||||||
|
double sum_sync_time = 0., sum_calc_time = 0.;
|
||||||
|
for (auto & future : duration_futures) {
|
||||||
|
sm::time_report tr = future.get();
|
||||||
|
sum_sync_time += tr.sync_time_ms;
|
||||||
|
sum_calc_time += tr.calc_time_ms;
|
||||||
|
std::cout << "(sync=" << (int)tr.sync_time_ms << "ms,calc=" << (int)tr.calc_time_ms << "ms) ";
|
||||||
|
}
|
||||||
|
if (duration_futures.size() > 1) {
|
||||||
|
std::cout << "sum_sync=" << (int)sum_sync_time << "ms,sum_calc=" << (int)sum_calc_time << "ms";
|
||||||
|
}
|
||||||
std::cout << std::endl;
|
std::cout << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main() {
|
||||||
|
srand(time(nullptr));
|
||||||
|
|
||||||
|
PIVector<sm::block*> start_blocks = scheme_generate(200, 3, 4);
|
||||||
|
scheme_print(start_blocks);
|
||||||
|
|
||||||
|
auto duration_futures = check_performance(start_blocks, 1);
|
||||||
|
print_performance(duration_futures);
|
||||||
|
|
||||||
|
scheme_clear_calc_statuses(start_blocks);
|
||||||
|
|
||||||
|
duration_futures = check_performance(start_blocks, 6);
|
||||||
|
print_performance(duration_futures);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user