add http_client library, using libcurl
take out common http entities to http_common dir
This commit is contained in:
75
libs/http_client/curl_thread_pool_p.cpp
Normal file
75
libs/http_client/curl_thread_pool_p.cpp
Normal file
@@ -0,0 +1,75 @@
|
||||
|
||||
#include "curl_thread_pool_p.h"
|
||||
|
||||
#include "pihttpclient.h"
|
||||
#include "pitime.h"
|
||||
|
||||
|
||||
CurlThreadPool::CurlThreadPool() {
|
||||
piForTimes(10) {
|
||||
auto * t = new PIThread([this]() { threadFunc(); }, true);
|
||||
threads << t;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
CurlThreadPool::~CurlThreadPool() {
|
||||
exiting = true;
|
||||
for (auto * t: threads)
|
||||
t->stop();
|
||||
{
|
||||
auto cr = clients.getRef();
|
||||
for (auto c: *cr)
|
||||
c->abort();
|
||||
}
|
||||
sem.release(threads.size());
|
||||
for (auto * t: threads) {
|
||||
t->waitForFinish();
|
||||
t->setDebug(false);
|
||||
t->terminate();
|
||||
}
|
||||
piDeleteAllAndClear(threads);
|
||||
{
|
||||
auto cr = clients.getRef();
|
||||
for (auto c: *cr)
|
||||
delete c;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void CurlThreadPool::threadFunc() {
|
||||
if (exiting) return;
|
||||
// piCout << "threadFuncl w ...";
|
||||
sem.acquire();
|
||||
// piCout << "threadFuncl wdone";
|
||||
if (exiting) return;
|
||||
PIHTTPClient * c = nullptr;
|
||||
{
|
||||
auto cr = clients.getRef();
|
||||
if (cr->isEmpty()) return;
|
||||
c = cr->dequeue();
|
||||
// piCout << "threadFuncl get c";
|
||||
}
|
||||
// piCout << "threadFuncl proc c";
|
||||
procClient(c);
|
||||
// piCout << "threadFuncl end";
|
||||
}
|
||||
|
||||
|
||||
void CurlThreadPool::procClient(PIHTTPClient * c) {
|
||||
if (c->init()) c->perform();
|
||||
delete c;
|
||||
}
|
||||
|
||||
|
||||
void CurlThreadPool::registerClient(PIHTTPClient * c) {
|
||||
clients.getRef()->enqueue(c);
|
||||
sem.release();
|
||||
// piCout << "registerClient";
|
||||
}
|
||||
|
||||
|
||||
CurlThreadPool * CurlThreadPool::instance() {
|
||||
static CurlThreadPool ret;
|
||||
return &ret;
|
||||
}
|
||||
32
libs/http_client/curl_thread_pool_p.h
Normal file
32
libs/http_client/curl_thread_pool_p.h
Normal file
@@ -0,0 +1,32 @@
|
||||
#ifndef curl_thread_pool_p_H
|
||||
#define curl_thread_pool_p_H
|
||||
|
||||
#include "piconditionvar.h"
|
||||
#include "piprotectedvariable.h"
|
||||
#include "pisemaphore.h"
|
||||
#include "pithread.h"
|
||||
|
||||
class PIHTTPClient;
|
||||
|
||||
class CurlThreadPool {
|
||||
public:
|
||||
void registerClient(PIHTTPClient * c);
|
||||
|
||||
static CurlThreadPool * instance();
|
||||
|
||||
private:
|
||||
NO_COPY_CLASS(CurlThreadPool)
|
||||
CurlThreadPool();
|
||||
~CurlThreadPool();
|
||||
|
||||
void threadFunc();
|
||||
void procClient(PIHTTPClient * c);
|
||||
|
||||
PIProtectedVariable<PIQueue<PIHTTPClient *>> clients;
|
||||
PISemaphore sem;
|
||||
PIVector<PIThread *> threads;
|
||||
std::atomic_bool exiting = {false};
|
||||
};
|
||||
|
||||
|
||||
#endif
|
||||
210
libs/http_client/pihttpclient.cpp
Normal file
210
libs/http_client/pihttpclient.cpp
Normal file
@@ -0,0 +1,210 @@
|
||||
#include "pihttpclient.h"
|
||||
|
||||
#include "curl_thread_pool_p.h"
|
||||
#include "piliterals_bytes.h"
|
||||
#include "piliterals_string.h"
|
||||
#include "pisystemtime.h"
|
||||
|
||||
#include <curl/curl.h>
|
||||
|
||||
|
||||
int xfer_callback(void * ptr, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow) {
|
||||
return reinterpret_cast<PIHTTPClientBase *>(ptr)->__infoFunc(dltotal, dlnow, ultotal, ulnow);
|
||||
}
|
||||
|
||||
int debug_callback(CURL * handle, curl_infotype type, char * data, size_t size, void * ptr) {
|
||||
return reinterpret_cast<PIHTTPClientBase *>(ptr)->__debugFunc(type, data, size);
|
||||
}
|
||||
|
||||
|
||||
PRIVATE_DEFINITION_START(PIHTTPClient)
|
||||
CURL * handle = nullptr;
|
||||
PRIVATE_DEFINITION_END(PIHTTPClient)
|
||||
|
||||
|
||||
PIHTTPClient::PIHTTPClient() {}
|
||||
|
||||
|
||||
PIHTTPClient::~PIHTTPClient() {}
|
||||
|
||||
|
||||
bool PIHTTPClient::init() {
|
||||
if (is_cancel) return false;
|
||||
PRIVATE->handle = curl_easy_init();
|
||||
if (!PRIVATE->handle) return false;
|
||||
curl_easy_setopt(PRIVATE->handle, CURLOPT_WRITEDATA, this);
|
||||
curl_easy_setopt(PRIVATE->handle, CURLOPT_READDATA, this);
|
||||
curl_easy_setopt(PRIVATE->handle, CURLOPT_XFERINFODATA, this);
|
||||
curl_easy_setopt(PRIVATE->handle, CURLOPT_DEBUGDATA, this);
|
||||
curl_easy_setopt(PRIVATE->handle, CURLOPT_HEADERDATA, this);
|
||||
curl_easy_setopt(PRIVATE->handle, CURLOPT_WRITEFUNCTION, writeMemoryFunc);
|
||||
curl_easy_setopt(PRIVATE->handle, CURLOPT_READFUNCTION, readMemoryFunc);
|
||||
curl_easy_setopt(PRIVATE->handle, CURLOPT_XFERINFOFUNCTION, xfer_callback);
|
||||
curl_easy_setopt(PRIVATE->handle, CURLOPT_DEBUGFUNCTION, debug_callback);
|
||||
curl_easy_setopt(PRIVATE->handle, CURLOPT_HEADERFUNCTION, headerFunc);
|
||||
curl_easy_setopt(PRIVATE->handle, CURLOPT_URL, url.dataUTF8());
|
||||
curl_easy_setopt(PRIVATE->handle, CURLOPT_CUSTOMREQUEST, PIHTTP::methodName(request.method()));
|
||||
curl_easy_setopt(PRIVATE->handle, CURLOPT_NOPROGRESS, 0L);
|
||||
// curl_easy_setopt(PRIVATE->handle, CURLOPT_VERBOSE, 1L);
|
||||
// curl_easy_setopt(PRIVATE->handle, CURLOPT_ERRORBUFFER, buffer_error.data());
|
||||
curl_easy_setopt(PRIVATE->handle, CURLOPT_SSL_VERIFYPEER, 0L);
|
||||
if (request.body().isNotEmpty()) {
|
||||
curl_easy_setopt(PRIVATE->handle, CURLOPT_UPLOAD, 1L);
|
||||
curl_easy_setopt(PRIVATE->handle, CURLOPT_INFILESIZE_LARGE, static_cast<curl_off_t>(request.body().size()));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void PIHTTPClient::perform() {
|
||||
if (!PRIVATE->handle) return;
|
||||
if (!is_cancel) {
|
||||
// piCout << "perform ...";
|
||||
PITimeMeasurer tm;
|
||||
CURLcode res = curl_easy_perform(PRIVATE->handle);
|
||||
// piCout << "done" << res << "in" << tm.elapsed_m() << ", bytes" << buffer_out.size();
|
||||
if (res == CURLE_OK) {
|
||||
reply.setBody(std::move(buffer_out));
|
||||
if (on_finish) on_finish(reply);
|
||||
} else {
|
||||
if (res == CURLE_ABORTED_BY_CALLBACK || is_cancel) {
|
||||
// piCerr << "curl_easy_perform() failed:" << curl_easy_strerror(res);
|
||||
if (on_abort) on_abort(reply);
|
||||
} else {
|
||||
last_error = curl_easy_strerror(res);
|
||||
if (on_error) on_error(reply);
|
||||
}
|
||||
}
|
||||
}
|
||||
// piCout << last_error;
|
||||
curl_easy_cleanup(PRIVATE->handle);
|
||||
PRIVATE->handle = nullptr;
|
||||
}
|
||||
|
||||
|
||||
void PIHTTPClient::procHeaderLine(PIString & line) {
|
||||
if (line.startsWith("HTTP"_a)) {
|
||||
// HTTP/Версия КодСостояния Пояснение
|
||||
line.cutLeft(5);
|
||||
line.takeWord();
|
||||
int code = line.takeWord().toInt();
|
||||
// piCout << "code" << code;
|
||||
reply.setCode(static_cast<PIHTTP::Code>(code));
|
||||
return;
|
||||
}
|
||||
int ind = line.find(':');
|
||||
if (ind < 0) return;
|
||||
PIString hname = line.takeLeft(ind);
|
||||
line.cutLeft(1).trim();
|
||||
reply.addHeader(hname, line);
|
||||
}
|
||||
|
||||
|
||||
size_t PIHTTPClient::writeMemoryFunc(void * contents, size_t size, size_t nmemb, void * ptr) {
|
||||
size_t bytes = size * nmemb;
|
||||
piCout << "writeMemoryFunc" << bytes;
|
||||
auto client = reinterpret_cast<PIHTTPClient *>(ptr);
|
||||
if (client->is_cancel) return CURL_WRITEFUNC_ERROR;
|
||||
client->buffer_out.append(contents, bytes);
|
||||
return bytes;
|
||||
}
|
||||
|
||||
|
||||
size_t PIHTTPClient::readMemoryFunc(void * contents, size_t size, size_t nmemb, void * ptr) {
|
||||
size_t bytes = size * nmemb;
|
||||
piCout << "readMemoryFunc" << bytes;
|
||||
auto client = reinterpret_cast<PIHTTPClient *>(ptr);
|
||||
if (client->is_cancel) return CURL_READFUNC_ABORT;
|
||||
const auto & buffer(client->request.body());
|
||||
if (buffer.isEmpty()) return 0;
|
||||
// piCout << bytes;
|
||||
ssize_t ret = piClamp<ssize_t>(bytes, 0, buffer.size_s() - client->read_pos);
|
||||
if (ret < 0) ret = 0;
|
||||
if (ret > 0) memcpy(contents, buffer.data(client->read_pos), ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
size_t PIHTTPClient::headerFunc(char * contents, size_t size, size_t nmemb, void * ptr) {
|
||||
size_t bytes = size * nmemb;
|
||||
auto client = reinterpret_cast<PIHTTPClient *>(ptr);
|
||||
if (client->is_cancel) return CURL_WRITEFUNC_ERROR;
|
||||
PIString line = PIString::fromUTF8(contents, bytes).trim();
|
||||
if (line.isNotEmpty()) client->procHeaderLine(line);
|
||||
return bytes;
|
||||
}
|
||||
|
||||
|
||||
int PIHTTPClient::infoFunc(ssize_t dltotal, ssize_t dlnow, ssize_t ultotal, ssize_t ulnow) {
|
||||
// piCout << "infoFunc" << dltotal << dlnow << ultotal << ulnow;
|
||||
if (is_cancel) return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int PIHTTPClient::debugFunc(int type, char * data, size_t size) {
|
||||
// piCout << "debugFunc" << type << PIString::fromUTF8(data, size);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
PIHTTPClient * PIHTTPClient::create(const PIString & url_, PIHTTP::Method method, const PIHTTP::MessageConst & req) {
|
||||
PIHTTPClient * ret = new PIHTTPClient();
|
||||
static_cast<PIHTTP::MessageConst &>(ret->request) = req;
|
||||
ret->request.setMethod(method);
|
||||
ret->url = url_;
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
PIHTTPClient * PIHTTPClient::onFinish(std::function<void()> f) {
|
||||
return onFinish([f](const PIHTTP::MessageConst &) { f(); });
|
||||
}
|
||||
|
||||
|
||||
PIHTTPClient * PIHTTPClient::onFinish(std::function<void(const PIHTTP::MessageConst &)> f) {
|
||||
on_finish = f;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
PIHTTPClient * PIHTTPClient::onError(std::function<void()> f) {
|
||||
return onError([f](const PIHTTP::MessageConst &) { f(); });
|
||||
}
|
||||
|
||||
|
||||
PIHTTPClient * PIHTTPClient::onError(std::function<void(const PIHTTP::MessageConst &)> f) {
|
||||
on_error = f;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
PIHTTPClient * PIHTTPClient::onAbort(std::function<void()> f) {
|
||||
return onAbort([f](const PIHTTP::MessageConst &) { f(); });
|
||||
}
|
||||
|
||||
|
||||
PIHTTPClient * PIHTTPClient::onAbort(std::function<void(const PIHTTP::MessageConst &)> f) {
|
||||
on_abort = f;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
void PIHTTPClient::start() {
|
||||
CurlThreadPool::instance()->registerClient(this);
|
||||
}
|
||||
|
||||
|
||||
void PIHTTPClient::abort() {
|
||||
is_cancel = true;
|
||||
}
|
||||
|
||||
|
||||
int PIHTTPClientBase::__infoFunc(ssize_t dltotal, ssize_t dlnow, ssize_t ultotal, ssize_t ulnow) {
|
||||
return reinterpret_cast<PIHTTPClient *>(this)->infoFunc(dltotal, dlnow, ultotal, ulnow);
|
||||
}
|
||||
|
||||
|
||||
int PIHTTPClientBase::__debugFunc(int type, char * data, size_t size) {
|
||||
return reinterpret_cast<PIHTTPClient *>(this)->debugFunc(type, data, size);
|
||||
}
|
||||
Reference in New Issue
Block a user