Refactor sm::block

This commit is contained in:
3 changed files with 68 additions and 59 deletions

View File

@@ -21,7 +21,7 @@ else()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O3 -fPIC")
endif()
include_directories(experiments ${PIP_INCLUDES})
include_directories(experiments ${PIP_INCLUDES} ${PROJECT_SOURCE_DIR})
if(WIN32)
include_directories(can)

View File

@@ -1,11 +1,9 @@
#include <pivector.h>
#include "sm/block.h"
#include <vector>
#include <pimap.h>
#include <picout.h>
#include <future>
#include <thread>
#include <iostream>
#include <math.h>
#define assert(_Expression) \
(void) \
@@ -20,33 +18,6 @@ void _assert (const char *_Message, const char *_File, unsigned _Line) {
std::atomic_flag is_calc_barrier = ATOMIC_FLAG_INIT;
PIVector<bool> is_calc_statuses;
namespace sm {
struct block {
PIVector<block*> input_blocks;
PIVector<block*> output_blocks;
std::atomic_flag barrier = ATOMIC_FLAG_INIT;
const int is_calc_idx;
const std::chrono::microseconds calc_time;
static std::chrono::microseconds random_time() {
float val = powf(rand() % 1000 / 1000.f, 20.f) * 30.f * 1000.f;
// std::cout << int(val) << std::endl;
return std::chrono::microseconds(int(val));
}
explicit block(const int is_calc_idx) : is_calc_idx(is_calc_idx), calc_time(random_time()) {}
void calc() {
std::this_thread::sleep_for(calc_time);
}
};
struct time_report {
double calc_time_ms;
double sync_time_ms;
};
}
void link(sm::block* out, sm::block* in) {
out->output_blocks.push_back(in);
in->input_blocks.push_back(out);
@@ -126,31 +97,6 @@ void scheme_clear_calc_statuses(PIVector<sm::block*>& start_blocks) {
is_calc_statuses.forEachInplace([](bool is_calced){ return false; });
}
void unlock(sm::block* block, int locks_count = -1) {
if (locks_count == -1) locks_count = block->input_blocks.size();
for (int i = 0; i < locks_count; ++i) {
block->input_blocks[i]->barrier.clear(std::memory_order_release);
}
block->barrier.clear(std::memory_order_release);
}
bool try_lock(sm::block* block) {
if (block->barrier.test_and_set(std::memory_order_acquire)) return false;
int locks_count = 0;
for (auto & input_block : block->input_blocks) {
if (input_block->barrier.test_and_set(std::memory_order_acquire)) break;
locks_count++;
}
if (locks_count == block->input_blocks.size()) {
return true;
} else {
unlock(block, locks_count);
return false;
}
}
sm::block* try_lock_next_and_post(std::atomic_flag& block_pool_flag, PIVector<sm::block*>& block_pool, bool& is_block_pool_empty, PIVector<sm::block*>& new_available_blocks) {
while(block_pool_flag.test_and_set(std::memory_order_acquire)) {
std::this_thread::yield();
@@ -164,7 +110,7 @@ sm::block* try_lock_next_and_post(std::atomic_flag& block_pool_flag, PIVector<sm
} else {
is_block_pool_empty = false;
for (int i = 0; i < block_pool.size(); ++i) {
if (try_lock(block_pool[i])) {
if (block_pool[i]->try_lock()) {
block = block_pool[i];
block_pool.remove(i);
// std::cout << block->is_calc_idx << ": locked for calc" << std::endl;
@@ -243,7 +189,7 @@ sm::time_report algorithm_runnable(std::atomic_flag& block_pool_flag,
block->calc();
auto end = std::chrono::high_resolution_clock::now();
calc_time += std::chrono::duration_cast<std::chrono::microseconds>(end - start).count() / 1000.;
unlock(block);
block->unlock();
post_available_blocks(block, new_available_blocks);
}

63
sm/block.h Normal file
View File

@@ -0,0 +1,63 @@
#ifndef MULTITHREAD_EXPERIMENTS_BLOCK_H
#define MULTITHREAD_EXPERIMENTS_BLOCK_H
#include <pivector.h>
#include <iostream>
#include <thread>
#include <cmath>
namespace sm {
struct block {
PIVector<block*> input_blocks;
PIVector<block*> output_blocks;
std::atomic_flag barrier = ATOMIC_FLAG_INIT;
const int is_calc_idx;
const std::chrono::microseconds calc_time;
static std::chrono::microseconds random_time() {
float val = powf(rand() % 1000 / 1000.f, 20.f) * 30.f * 1000.f;
// std::cout << int(val) << std::endl;
return std::chrono::microseconds(int(val));
}
explicit block(const int is_calc_idx) : is_calc_idx(is_calc_idx), calc_time(random_time()) {}
void calc() {
std::this_thread::sleep_for(calc_time);
}
void unlock(int locks_count = -1) {
if (locks_count == -1) locks_count = this->input_blocks.size();
for (int i = 0; i < locks_count; ++i) {
this->input_blocks[i]->barrier.clear(std::memory_order_release);
}
this->barrier.clear(std::memory_order_release);
}
bool try_lock() {
if (this->barrier.test_and_set(std::memory_order_acquire)) return false;
int locks_count = 0;
for (auto & input_block : this->input_blocks) {
if (input_block->barrier.test_and_set(std::memory_order_acquire)) break;
locks_count++;
}
if (locks_count == this->input_blocks.size()) {
return true;
} else {
unlock(locks_count);
return false;
}
}
};
struct time_report {
double calc_time_ms;
double sync_time_ms;
};
}
#endif //MULTITHREAD_EXPERIMENTS_BLOCK_H