Compare commits

...

5 Commits

23 changed files with 427 additions and 143 deletions

View File

@@ -1,9 +1,13 @@
cmake_minimum_required(VERSION 3.0) cmake_minimum_required(VERSION 3.0)
cmake_policy(SET CMP0020 NEW) cmake_policy(SET CMP0020 NEW)
set (CMAKE_CXX_STANDARD 17)
project(multithread_experiments) project(multithread_experiments)
find_package(SM REQUIRED) if (DEFINED PATH_TO_SMSDK OR DEFINED ENV{SMSDK_DIR})
find_package(SM REQUIRED)
endif()
find_package(Concurrent QUIET)
if (CMAKE_BUILD_TYPE MATCHES Debug) if (CMAKE_BUILD_TYPE MATCHES Debug)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g3 -fPIC -std=c++11") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g3 -fPIC -std=c++11")
@@ -13,40 +17,5 @@ else()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O3 -fPIC") set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -O3 -fPIC")
endif() endif()
include_directories(experiments ${SMBRICKS_INCLUDES} ${PIP_INCLUDES} ${PROJECT_SOURCE_DIR}) add_subdirectory(can)
add_subdirectory(experiments)
if(WIN32)
include_directories(can)
add_subdirectory(can)
add_custom_target(copy_dependencies
COMMAND ${CMAKE_COMMAND} -E copy ${PCAN_LIB} ${CMAKE_CURRENT_BINARY_DIR}/PCANBasic${CMAKE_SHARED_LIBRARY_SUFFIX}
COMMAND ${CMAKE_COMMAND} -E copy ${VSCAN_LIB} ${CMAKE_CURRENT_BINARY_DIR}/vs_can_api${CMAKE_SHARED_LIBRARY_SUFFIX})
add_executable(can_send_multithread experiments/can_send_multithread.cpp)
target_link_libraries(can_send_multithread can)
add_dependencies(can_send_multithread copy_dependencies)
add_executable(can_send experiments/can_send.cpp)
target_link_libraries(can_send can)
add_dependencies(can_send copy_dependencies)
endif()
add_executable(mutex experiments/mutex.cpp)
target_link_libraries(mutex ${PIP_LIBRARY} ${PIP_CONCURRENT_LIBRARY})
add_executable(mutex_multithread experiments/mutex_multithread.cpp)
target_link_libraries(mutex_multithread ${PIP_LIBRARY} ${PIP_CONCURRENT_LIBRARY})
add_executable(vectors experiments/vectors.cpp)
target_link_libraries(vectors ${PIP_LIBRARY})
add_executable(block_choice experiments/block_choice.cpp)
target_link_libraries(block_choice ${PIP_LIBRARY})
add_executable(smbusdata_crash_test experiments/smbusdata_crash_test.cpp)
target_link_libraries(smbusdata_crash_test SMBricks_shared ${PIP_LIBRARY} ${PIP_CONCURRENT_LIBRARY} ${PIP_CRYPT_LIBRARY})
add_executable(packaged_task experiments/packaged_task.cpp)
target_link_libraries(packaged_task SMBricks_shared ${PIP_LIBRARY} ${PIP_CONCURRENT_LIBRARY} ${PIP_CRYPT_LIBRARY})

2
can

Submodule can updated: 6807666d0b...38d6d57271

View File

@@ -0,0 +1,10 @@
add_subdirectory(can)
add_subdirectory(pip)
if (Concurrent_FOUND)
add_subdirectory(concurrent)
endif()
if (DEFINED PATH_TO_SMSDK OR DEFINED ENV{SMSDK_DIR})
add_subdirectory(sm)
endif()

View File

@@ -0,0 +1,23 @@
cmake_minimum_required(VERSION 3.0)
cmake_policy(SET CMP0020 NEW)
find_package(PIP REQUIRED)
add_executable(asyncdevicepool asyncdevicepool.cpp)
target_link_libraries(asyncdevicepool PIP can)
if(WIN32)
add_custom_target(copy_dependencies
COMMAND ${CMAKE_COMMAND} -E copy ${PCAN_LIB} ${CMAKE_CURRENT_BINARY_DIR}/PCANBasic${CMAKE_SHARED_LIBRARY_SUFFIX}
COMMAND ${CMAKE_COMMAND} -E copy ${VSCAN_LIB} ${CMAKE_CURRENT_BINARY_DIR}/vs_can_api${CMAKE_SHARED_LIBRARY_SUFFIX})
add_dependencies(asyncdevicepool copy_dependencies)
add_executable(can_send_multithread can_send_multithread.cpp)
target_link_libraries(can_send_multithread can)
add_dependencies(can_send_multithread copy_dependencies)
add_executable(can_send can_send.cpp)
target_link_libraries(can_send can)
add_dependencies(can_send copy_dependencies)
endif()

View File

@@ -0,0 +1,80 @@
#include <asyncdevicepool.h>
using namespace std::chrono;
class FakeDevice : public CANDevice {
PIOBJECT_SUBCLASS(FakeDevice, CANDevice)
public:
double duration_ms;
explicit FakeDevice(int counter) : duration_ms(0.0), counter(counter) {}
bool send(const CAN_Raw &m) override {
counter--;
if (counter <= 0) {
if (counter == 0) duration_ms = (duration_cast<milliseconds>(high_resolution_clock::now().time_since_epoch()) - start_time).count();
counter = -1;
throw can::error(0, can_category);
}
return true;
}
bool open() override {
start_time = duration_cast<milliseconds>(high_resolution_clock::now().time_since_epoch());
return true;
}
void close() override {
if (read_handle != can::event::invalid_handle) can::event::close(read_handle);
}
can::event::handle getReadEventHandle() override {
if (read_handle == can::event::invalid_handle) {
read_handle = can::event::make();
}
return read_handle;
}
protected:
bool nextMsg(CAN_Raw &rawMsg) override {
return false;
}
private:
int counter = 0;
milliseconds start_time;
};
int main() {
int device_count = 63;
PIBlockingQueue<double> wait_queue(device_count);
AsyncDevicePool asyncDevicePool;
asyncDevicePool.set_can_error_handler([&wait_queue](const can::error& error, CANDevice* device) {
// can::stdout_error_handler(error, device);
wait_queue.put(dynamic_cast<FakeDevice*>(device)->duration_ms);
return false;
});
int it_count = 1'000'000;
PIVector<CANDevice*> devices;
for (int i = 0; i < device_count; ++i) {
devices << new FakeDevice(it_count / device_count);
asyncDevicePool.add(devices.back());
}
for (int i = 0; i < it_count; ++i) {
CAN_Raw msg = { .Id = 0x11, .Size = 4, .Data = { 0x0, 0x1, 0x2, 0x3 } };
asyncDevicePool.send(devices[i % device_count], msg);
if (i % 1000 == 999) {
std::this_thread::yield();
// std::cout << i / 1000 << "/" << counter / 1000 << std::endl;
// std::this_thread::sleep_for(milliseconds(1));
}
}
double duration_ms;
for (int i = 0; i < device_count; ++i) duration_ms = wait_queue.take();
std::cout << duration_ms << " ms" << std::endl;
return 0;
}

View File

@@ -0,0 +1,41 @@
//
// Created by zuma on 12.10.2020.
//
#include <asyncdevice.h>
#include <picandevice.h>
#include <future>
int main() {
AsyncDevice device;
auto canDevice = new PICANDevice("can1");
int counter = 1;
std::promise<bool> is_success_promise;
auto is_success = is_success_promise.get_future();
auto closure = [&is_success_promise, &counter, canDevice](CAN_Raw msg){
if (msg.Id != 0x20) return;
if (msg.Size != sizeof(int)) {
is_success_promise.set_value(false);
throw can::error(canDevice, can::error::closed, "");
}
int recv_counter = reinterpret_cast<int&>(msg.Data);
if (counter != recv_counter) {
piCout << recv_counter << "<- miss ordering";
// is_success_promise.set_value(false);
// throw can::error(canDevice, can::error::closed, "");
} else {
piCout << recv_counter;
}
counter = recv_counter + 1;
if (counter - 1 == 1000) {
is_success_promise.set_value(true);
throw can::error(canDevice, can::error::closed, "");
}
};
CONNECTL(&device, readEvent, closure);
device.replace(canDevice);
bool result = is_success.get();
piCout << (result ? "success" : "failure");
}

View File

@@ -1,16 +1,16 @@
#include "can_send.h" #include "can_send.h"
#include <future> #include <future>
#include <picout.h> #include <iostream>
int main() { int main() {
auto time1 = std::async(std::launch::deferred, [] { return test_send(PCAN_USBBUS1); }); auto time1 = std::async(std::launch::deferred, [] { return test_send(PCAN_USBBUS1); });
auto time2 = std::async(std::launch::deferred, [] { return test_send(PCAN_USBBUS2); }); auto time2 = std::async(std::launch::deferred, [] { return test_send(PCAN_USBBUS2); });
time1.wait(); time1.wait();
piCout << "measurements for PCAN_USBBUS1:" << time1.get() / 1000.f << "ms"; std::cout << "measurements for PCAN_USBBUS1: " << time1.get() / 1000.f << " ms" << std::endl;
time2.wait(); time2.wait();
piCout << "measurements for PCAN_USBBUS2:" << time2.get() / 1000.f << "ms"; std::cout << "measurements for PCAN_USBBUS2: " << time2.get() / 1000.f << " ms" << std::endl;
return 0; return 0;
} }

View File

@@ -0,0 +1,22 @@
//
// Created by zuma on 12.10.2020.
//
#include <asyncdevice.h>
#include <picandevice.h>
int main() {
AsyncDevice device;
device.replace(new PICANDevice("can0"));
CAN_Raw msg = { .Id = 0x20, .Size = sizeof(int) };
int& counter = reinterpret_cast<int&>(msg.Data);
counter = 0;
while (counter++ < 1000) {
device.send(msg);
piMSleep(1);
}
piSleep(5);
}

View File

@@ -1,13 +1,13 @@
#include "can_send.h" #include "can_send.h"
#include <future> #include <future>
#include <picout.h> #include <iostream>
int main() { int main() {
auto time1 = std::async(std::launch::async, [] { return test_send(PCAN_USBBUS1); }); auto time1 = std::async(std::launch::async, [] { return test_send(PCAN_USBBUS1); });
auto time2 = std::async(std::launch::async, [] { return test_send(PCAN_USBBUS2); }); auto time2 = std::async(std::launch::async, [] { return test_send(PCAN_USBBUS2); });
piCout << "measurements for PCAN_USBBUS1:" << time1.get() / 1000.f << "ms"; std::cout << "measurements for PCAN_USBBUS1: " << time1.get() / 1000.f << " ms" << std::endl;
piCout << "measurements for PCAN_USBBUS2:" << time2.get() / 1000.f << "ms"; std::cout << "measurements for PCAN_USBBUS2: " << time2.get() / 1000.f << " ms" << std::endl;
return 0; return 0;
} }

View File

@@ -0,0 +1,6 @@
find_package(Concurrent REQUIRED)
add_executable(packaged_task packaged_task.cpp)
add_executable(queues queues.cpp)
target_link_libraries(queues concurrent)

View File

@@ -0,0 +1,5 @@
#include <blockingdequeue.h>
int main() {
// TODO
}

View File

@@ -0,0 +1,13 @@
cmake_minimum_required(VERSION 3.0)
cmake_policy(SET CMP0020 NEW)
find_package(PIP REQUIRED)
add_executable(mutex mutex.cpp)
target_link_libraries(mutex PIP)
add_executable(mutex_multithread mutex_multithread.cpp)
target_link_libraries(mutex_multithread PIP)
add_executable(vectors vectors.cpp)
target_link_libraries(vectors PIP)

View File

@@ -1,5 +1,4 @@
#include <pimutex.h> #include <pimutex.h>
#include <piconditionlock.h>
#include <atomic> #include <atomic>
#include <future> #include <future>
#include <picout.h> #include <picout.h>
@@ -30,14 +29,6 @@ int main() {
}); });
piCout << "piMutex:" << piMutexPerformance.get() << "ms"; piCout << "piMutex:" << piMutexPerformance.get() << "ms";
PIConditionLock piConditionLock;
auto piConditionLockPerformance = check_performance([&piConditionLock](){
piConditionLock.lock();
int i = 0; while (i < 1000) i++;
piConditionLock.unlock();
});
piCout << "piConditionLock:" << piConditionLockPerformance.get() << "ms";
std::mutex stdMutex; std::mutex stdMutex;
auto stdMutexPerformance = check_performance([&stdMutex](){ auto stdMutexPerformance = check_performance([&stdMutex](){
stdMutex.lock(); stdMutex.lock();

View File

@@ -1,5 +1,4 @@
#include <pimutex.h> #include <pimutex.h>
#include <piconditionlock.h>
#include <atomic> #include <atomic>
#include <future> #include <future>
#include <picout.h> #include <picout.h>
@@ -37,16 +36,6 @@ int main() {
}); });
piCout << "piMutex:" << piMutexPerformance << "ms"; piCout << "piMutex:" << piMutexPerformance << "ms";
PIConditionLock piConditionLock;
auto piConditionLockPerformance = check_performance([&piConditionLock](long& k){
piConditionLock.lock();
int i = 0; while (i < 1000) { i++; }
long res = ++k;
piConditionLock.unlock();
return res;
});
piCout << "piConditionLock:" << piConditionLockPerformance << "ms";
std::mutex stdMutex; std::mutex stdMutex;
auto stdMutexPerformance = check_performance([&stdMutex](long& k){ auto stdMutexPerformance = check_performance([&stdMutex](long& k){
stdMutex.lock(); stdMutex.lock();

View File

@@ -0,0 +1,13 @@
cmake_minimum_required(VERSION 3.0)
cmake_policy(SET CMP0020 NEW)
find_package(SM REQUIRED)
add_executable(block_choice block_choice.cpp)
target_link_libraries(block_choice PIP)
add_executable(smbusdata_crash_test smbusdata_crash_test.cpp)
target_link_libraries(smbusdata_crash_test SMBricks_shared PIP::Crypt)
add_executable(pibytearray_template_spec template_specialization.cpp)
target_link_libraries(pibytearray_template_spec SMBricks_shared PIP::Crypt)

View File

@@ -1,4 +1,4 @@
#include "sm/block.h" #include "block.h"
#include <vector> #include <vector>
#include <pimap.h> #include <pimap.h>
#include <picout.h> #include <picout.h>

View File

@@ -0,0 +1,164 @@
#include <sm_base.h>
#include <pithreadpoolexecutor.h>
#include <future>
struct SomeLargeData {
uint8_t data[4]{};
SomeLargeData() { memset(data, 0xff, sizeof(data)); }
~SomeLargeData() {
memset(data, 0x00, sizeof(data));
}
};
inline PIByteArray & operator <<(PIByteArray & s, const SomeLargeData & v) {
s << PIByteArray::RawData(v.data, sizeof(v.data));
return s;
}
inline PIByteArray & operator >>(PIByteArray & s, SomeLargeData & v) {
if (s.size() < sizeof(v.data)) {
piCout << "Error in operator >> for SomeLargeData";
exit(1);
}
s >> PIByteArray::RawData(v.data, sizeof(v.data));
return s;
}
REGISTER_BUS_TYPE(SomeLargeData)
void test_blocking_queue() {
std::atomic_bool is_end(false);
PIBlockingQueue<SMBlockData> out_dequeue;
PIBlockingQueue<SMBlockData> in_dequeue;
auto runnable = [&](){
while (true) {
SMBlockData data;
bool is_ok;
data = in_dequeue.poll(100, SMBlockData(), &is_ok);
if (!is_ok) {
if (is_end) break;
continue;
}
out_dequeue.put(data);
// if (!is_ok) {
// if (is_end) break;
// std::this_thread::yield();
// }
}
};
PIVector<std::future<void>> futures;
for (int i = 0; i < 4; ++i) {
futures.append(std::async(std::launch::async, runnable));
}
SomeLargeData content;
int iteration_count = 100 * 1000;
for (int i = 0; i < iteration_count / 20; ++i) {
for (int j = 0; j < 20; ++j) {
SMBlockData block_data(j+1);
for (int k = 0; k < j+1; ++k) {
block_data[k].value<SomeLargeData>() = content;
}
// bus_data << SMBusData::create(content);
SMBlockData out_data = block_data.clone();
in_dequeue.put(out_data);
}
for (int j = 0; j < 20; ++j) {
auto block_data = out_dequeue.take();
if (block_data[0].isInvalid()) {
piCout << "Error: bus_data is invalid";
exit(1);
}
}
// printf("It's alive! %d\n", i);
}
is_end = true;
for (auto& future: futures) future.get();
}
void test_mutexes() {
std::atomic_bool is_end(false);
PIDeque<SMBlockData> out_dequeue;
PIDeque<SMBlockData> in_dequeue;
PIMutex out_mutex;
PIMutex in_mutex;
auto runnable = [&](){
while (true) {
SMBlockData data;
in_mutex.lock();
bool is_ok = !in_dequeue.isEmpty();
if (is_ok) data = in_dequeue.take_front();
in_mutex.unlock();
if (!is_ok) {
if (is_end) break;
std::this_thread::yield();
continue;
}
out_mutex.lock();
out_dequeue.push_back(data);
out_mutex.unlock();
// if (!is_ok) {
// if (is_end) break;
// std::this_thread::yield();
// }
}
};
PIVector<std::future<void>> futures;
for (int i = 0; i < 4; ++i) {
futures.append(std::async(std::launch::async, runnable));
}
SomeLargeData content;
int iteration_count = 100 * 1000;
for (int i = 0; i < iteration_count / 20; ++i) {
for (int j = 0; j < 20; ++j) {
SMBlockData block_data(j+1);
for (int k = 0; k < j+1; ++k) {
block_data[k].value<SomeLargeData>() = content;
}
// bus_data << SMBusData::create(content);
SMBlockData out_data = block_data.clone();
in_mutex.lock();
in_dequeue.push_back(out_data);
in_mutex.unlock();
}
for (int j = 0; j < 20; ++j) {
out_mutex.lock();
if (out_dequeue.isEmpty()) {
out_mutex.unlock();
j--;
std::this_thread::yield();
continue;
}
auto block_data = out_dequeue.take_front();
out_mutex.unlock();
if (block_data[0].isInvalid()) {
piCout << "Error: bus_data is invalid";
exit(1);
}
}
// printf("It's alive! %d\n", i);
}
is_end = true;
for (auto& future: futures) future.get();
}
int main() {
test_blocking_queue();
return 0;
}

View File

@@ -0,0 +1,35 @@
//
// Created by Stepan on 16.10.2020.
//
#include "../../can/can_data.h"
#include <sm_base.h>
REGISTER_BUS_TYPE(CAN_Raw)
PICout operator <<(PICout s, const CAN_Raw & v) {
s.saveControl();
s.setControl(0);
s << "{id=0x" << PIString::fromNumber(v.Id, 16) << ", size=" << v.Size << ", data=" << PIByteArray(v.Data, v.Size);
s.restoreControl();
return s;
}
int main() {
CAN_Raw msg = { .Id = 0x22, .Size = 8, .Data = { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07 } };
piCout << "src: " << msg << "\n";
CAN_Raw newMsg;
PIByteArray ar;
ar << msg;
ar >> newMsg;
piCout << "after ByteArray: " << newMsg << "\n";
SMBusData busData;
busData = SMBusData::create(msg);
PIByteArray serializedBusData = busData.save();
SMBusData loadedBusData = SMBusData::fromByteArray(serializedBusData, busData.busType());
newMsg = loadedBusData.value<CAN_Raw>();
piCout << "after bus data:" << newMsg;
}

View File

@@ -1,77 +0,0 @@
#include <sm_base.h>
#include <executor.h>
struct SomeLargeData {
uint8_t data[4]{};
SomeLargeData() { memset(data, 0xff, sizeof(data)); }
~SomeLargeData() {
memset(data, 0x00, sizeof(data));
}
};
inline PIByteArray & operator <<(PIByteArray & s, const SomeLargeData & v) {
s << PIByteArray::RawData(v.data, sizeof(v.data));
return s;
}
inline PIByteArray & operator >>(PIByteArray & s, SomeLargeData & v) {
if (s.size() < sizeof(v.data)) {
piCout << "Error in operator >> for SomeLargeData";
exit(1);
}
s >> PIByteArray::RawData(v.data, sizeof(v.data));
return s;
}
REGISTER_BUS_TYPE(SomeLargeData)
int main() {
std::atomic_bool is_end(false);
PIThreadPoolExecutor executor(4);
PIBlockingDequeue<SMBlockData> out_dequeue;
PIBlockingDequeue<SMBlockData> in_dequeue;
executor.execute([&](){
while (true) {
bool is_ok;
SMBlockData data;
data = in_dequeue.poll(100, data, &is_ok);
if (!is_ok) {
if (is_end) break;
}
out_dequeue.offer(data, 100);
if (!is_ok) {
if (is_end) break;
}
}
});
SomeLargeData content;
int iteration_count = 100 * 1000;
for (int i = 0; i < iteration_count / 20; ++i) {
for (int j = 0; j < 20; ++j) {
SMBlockData block_data(j+1);
for (int k = 0; k < j+1; ++k) {
block_data[k].sharedData<SomeLargeData>() = content;
}
// bus_data << SMBusData::create(content);
in_dequeue.offer(block_data.clone());
}
for (int j = 0; j < 20; ++j) {
auto block_data = out_dequeue.take();
if (block_data[0].isInvalid()) {
piCout << "Error: bus_data is invalid";
exit(1);
}
}
// printf("It's alive! %d\n", i);
}
is_end = true;
executor.shutdownNow();
return 0;
}