先看一个简单的例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
|
#include <iostream> #include <workflow/Workflow.h> #include <workflow/WFTaskFactory.h> #include <workflow/WFFacilities.h> #include <signal.h>
using namespace protocol;
#define REDIRECT_MAX 4 #define RETRY_MAX 2
void http_callback(WFHttpTask *task) { HttpResponse *resp = task->get_resp(); fprintf(stderr, "Http status : %s\n", resp->get_status_code());
const void *body; size_t body_len; resp->get_parsed_body(&body, &body_len);
FILE *fp = fopen("res.txt", "w"); fwrite(body, 1, body_len, fp); fclose(fp);
fprintf(stderr, "write file done"); }
static WFFacilities::WaitGroup wait_group(1);
void sig_handler(int signo) { wait_group.done(); }
int main() { signal(SIGINT, sig_handler); std::string url = "http://www.baidu.com";
WFHttpTask *task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX, http_callback);
task->start(); wait_group.wait(); }
|
上述代码中,通过WFTaskFactory::create_http_task函数创建了一个http任务。函数细节如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
WFHttpTask *WFTaskFactory::create_http_task(const std::string &url, int redirect_max, int retry_max, http_callback_t callback) { auto *task = new ComplexHttpTask(redirect_max, retry_max, std::move(callback)); ParsedURI uri; URIParser::parse(url, uri); task->init(std::move(uri)); task->set_keep_alive(HTTP_KEEPALIVE_DEFAULT); return task; }
|
可以看到,上面函数返回的是一个ComplexHttpTask:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
class ComplexHttpTask : public WFComplexClientTask<HttpRequest, HttpResponse> { public:
ComplexHttpTask(int redirect_max, int retry_max, http_callback_t &&callback) : WFComplexClientTask(retry_max, std::move(callback)), redirect_max_(redirect_max), redirect_count_(0) { HttpRequest *client_req = this->get_req(); client_req->set_method(HttpMethodGet); client_req->set_http_version("HTTP/1.1"); }
public:
private: };
|
WFComplexClientTask<HttpRequest, HttpResponse>
而ComplexHttpTask又继承自WFComplexClientTask<HttpRequest, HttpResponse>,这是一个模板特化类
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
template <class REQ, class RESP, typename CTX = bool> class WFComplexClientTask : public WFClientTask<REQ, RESP> {
protected: void dispatch() override;
SubTask *done() override; }
|
WFComplexClientTask类的具体实现细节先放后面,总之只需要知道这个类实现了应用层client该有的功能就行了,其他细节以后在说。
在workflow中,所有的task任务类都直接或者间接继承自SubTask和ParallelTask,同时必须实现两个纯虚函数:dispatch和done,不同的task继承实现不同的逻辑。dispatch是任务
而http client中的具体逻辑就实现在WFComplexClientTask这一层。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| template <class REQ, class RESP, typename CTX> void WFComplexClientTask<REQ, RESP, CTX>::dispatch() { switch (this->state) { case WFT_STATE_UNDEFINED: if (this->check_request()) { if (this->route_result_.request_object) { case WFT_STATE_SUCCESS: this->set_request_object(this->route_result_.request_object); this->WFClientTask<REQ, RESP>::dispatch(); return; } router_task_ = this->route(); series_of(this)->push_front(this); series_of(this)->push_front(router_task_); } default: break; } this->subtask_done(); }
|
这个函数的大致逻辑就是:
第一次执行dispatch时,由于不知道目标的ip地址,我们首先需要做DNS解析,这样才能得到请求对象的ip地址。此时新创建一个路由任务进行DNS解析,并且将其放在原任务的前面。
1 2 3 4
| router_task_ = this->route(); series_of(this)->push_front(this); series_of(this)->push_front(router_task_);
|
这样在获取到目标的ip地址后(route_task_执行完毕),会接着执行下一个任务,也就是再重新执行一次原任务,但这一次有了ip地址。
CommRequest::dispatch
还有一点,WFComplexClientTask<HttpRequest, HttpResponse>::dispatch()最终会调用this->WFClientTask<REQ, RESP>::dispatch();来执行任务的真正逻辑(发送请求)。
而WFClientTask<REQ, RESP>其实并没有实现dispatch(),但它有继承于CommRequest的dispatch函数。
先来看一下继承关系:
1 2 3 4 5 6 7 8 9 10 11 12 13
| 父 CommRequest // dispatch() override ^ | 继承 WFNetworkTask<REQ, RESP> ^ | 继承 WFClientTask<REQ, RESP> ^ | 继承 WFComplexClientTask<HttpRequest, HttpResponse>(模板特化) // dispatch() override ^ | 继承 子 ComplexHttpTask
|
再看一下具体实现:
1 2 3 4 5 6 7 8 9 10 11 12
| class CommRequest : public SubTask, public CommSession { public: void dispatch() override { if (this->scheduler->request(this, this->object, this->wait_timeout, &this->target) < 0) { this->handle(CS_STATE_ERROR, errno); } } };
|
这里需要关注两点:
首先 CommRequest 继承自 SubTask 和 CommSession
说明 CommRequest 即是一个(请求)任务,又满足 CommSession 的特性。
CommSession是一次 req->resp(请求到响应) 的交互,主要要实现message_in(), message_out()等几个虚函数,让核心知道怎么收发消息。同时CommSession也是协议无关的,具体看后面代码。
这里的 scheduler 是 CommScheduler
之前我们在epoll章节中讲过,CommScheduler是全局唯一的单例,在Scheduler 单例第一次实例化的时候,执行了 CommScheduler init,然后Communicator init, 产生 poller 线程和线程池,并启动了 poller 线程。
附上CommSession的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| class CommSession { private:
virtual CommMessageOut *message_out() = 0; virtual CommMessageIn *message_in() = 0;
virtual int send_timeout() { return -1; } virtual int receive_timeout() { return -1; } virtual int keep_alive_timeout() { return 0; } virtual int first_timeout() { return 0; }
virtual void handle(int state, int error) = 0;
protected: [[nodiscard]] CommTarget *get_target() const { return this->target; } [[nodiscard]] CommConnection *get_connect() const { return this->conn; } [[nodiscard]] CommMessageIn *get_message_in() const { return this->msg_in; } [[nodiscard]] CommMessageOut *get_message_out() const { return msg_out; } [[nodiscard]] long long get_seq() const { return this->seq; }
private: CommTarget *target; CommConnection *conn; CommMessageOut *msg_out; CommMessageIn *msg_in; long long seq;
struct timespec begin_time; int timeout; int passive;
public: CommSession() { this->passive = 0; } virtual ~CommSession() = 0; friend class CommMessageIn; friend class Communicator; };
|
CommSchedObject/CommTarget
回到CommRequest::dispatch()中,我们可以看到发送请求时执行的是:
1
| this->scheduler->request(this, this->object, this->wait_timeout, &this->target);
|
此处的request函数在CommScheduler中:
1 2 3 4 5 6 7 8 9 10
| int CommScheduler::request(CommSession *session, CommSchedObject *object, const int wait_timeout, CommTarget **target) { int ret = -1; *target = object->acquire(wait_timeout); if (*target) { ret = this->comm.request(session, *target); if (ret < 0) { (*target)->release(); } } return ret; }
|
那么,此处传入的CommSchedObject和CommTarget是什么呢?
在WFComplexClientTask的构造函数中可以看到,变量CommScheduler::object在初始化时传入的是NULL(通过子类构造函数一层一层向上传递),显然NULL值不可能直接拿来用,那是在哪里传入的非空值呢?
1 2 3 4 5 6
| WFComplexClientTask(const int retry_max, task_callback_t &&cb) : WFClientTask<REQ, RESP>(NULL, WFGlobal::get_scheduler(), std::move(cb)) { }
|
只有一个地方:
1
| void set_request_object(CommSchedObject *_object) { this->object = _object; }
|
没错,就是这个小小的set函数,它在哪里被调用呢?在WFComplexClientTask::dispatch中,有一句:
1
| this->set_request_object(route_result_.request_object)
|
也只有这一个地方调用了该set函数。代码在上面已经给出了,不重复贴了。
而此处传入的route_result_.request_object其实就是之前说的DNS解析生成的数据。它是CommSchedObject*类型。关于DNS解析,此处先略过。
所以CommSchedObject到底是一个什么呢?可以暂时这样理解:
这个类的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| class CommSchedObject { public: [[nodiscard]] size_t get_max_load() const { return this->max_load; } [[nodiscard]] size_t get_cur_load() const { return this->cur_load; }
private: virtual CommTarget *acquire(int wait_timeout) = 0;
protected: size_t max_load; size_t cur_load;
public: virtual ~CommSchedObject() = default; friend class CommScheduler; };
|