#include "curl_thread_pool_p.h" #include "pihttpclient.h" #include "pitime.h" #include CurlThreadPool::CurlThreadPool() { curl_global_init(CURL_GLOBAL_DEFAULT); piForTimes(10) { auto * t = new PIThread([this]() { threadFunc(); }, true); threads << t; } } CurlThreadPool::~CurlThreadPool() { destroy(); // piCout << "~CurlThreadPool cleanup ..."; curl_global_cleanup(); // piCout << "~CurlThreadPool cleanup ok"; } void CurlThreadPool::threadFunc() { if (exiting) return; // piCout << "threadFuncl w ..."; sem.acquire(); // piCout << "threadFuncl wdone"; if (exiting) return; PIHTTPClient * c = nullptr; { auto cr = clients.getRef(); if (cr->isEmpty()) return; c = cr->dequeue(); // piCout << "threadFuncl get c"; } // piCout << "threadFuncl proc c"; procClient(c); // piCout << "threadFuncl end"; } void CurlThreadPool::procClient(PIHTTPClient * c) { if (c->init()) c->perform(); delete c; } void CurlThreadPool::registerClient(PIHTTPClient * c) { clients.getRef()->enqueue(c); sem.release(); // piCout << "registerClient"; } CurlThreadPool * CurlThreadPool::instance() { static CurlThreadPool ret; return &ret; } void CurlThreadPool::destroy() { // piCout << "~CurlThreadPool"; exiting = true; for (auto * t: threads) t->stop(); { auto cr = clients.getRef(); for (auto c: *cr) c->abort(); } // piCout << "~CurlThreadPool release ..."; sem.release(threads.size()); for (auto * t: threads) { t->waitForFinish(); t->setDebug(false); t->terminate(); } // piCout << "~CurlThreadPool delete ..."; piDeleteAllAndClear(threads); { auto cr = clients.getRef(); for (auto c: *cr) delete c; } // piCout << "~CurlThreadPool ok"; }