先看一个简单的例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
|
#include <iostream> #include <workflow/Workflow.h> #include <workflow/WFTaskFactory.h> #include <workflow/WFFacilities.h> #include <signal.h>
using namespace protocol;
#define REDIRECT_MAX 4 #define RETRY_MAX 2
void http_callback(WFHttpTask *task) { HttpResponse *resp = task->get_resp(); fprintf(stderr, "Http status : %s\n", resp->get_status_code());
const void *body; size_t body_len; resp->get_parsed_body(&body, &body_len);
FILE *fp = fopen("res.txt", "w"); fwrite(body, 1, body_len, fp); fclose(fp);
fprintf(stderr, "write file done"); }
static WFFacilities::WaitGroup wait_group(1);
void sig_handler(int signo) { wait_group.done(); }
int main() { signal(SIGINT, sig_handler); std::string url = "http://www.baidu.com";
WFHttpTask *task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX, http_callback);
task->start(); wait_group.wait(); }
|
上述代码中,通过WFTaskFactory::create_http_task函数创建了一个http任务。函数细节如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
WFHttpTask *WFTaskFactory::create_http_task(const std::string &url, int redirect_max, int retry_max, http_callback_t callback) { auto *task = new ComplexHttpTask(redirect_max, retry_max, std::move(callback)); ParsedURI uri; URIParser::parse(url, uri); task->init(std::move(uri)); task->set_keep_alive(HTTP_KEEPALIVE_DEFAULT); return task; }
|
可以看到,上面函数返回的是一个ComplexHttpTask:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
class ComplexHttpTask : public WFComplexClientTask<HttpRequest, HttpResponse> { public:
ComplexHttpTask(int redirect_max, int retry_max, http_callback_t &&callback) : WFComplexClientTask(retry_max, std::move(callback)), redirect_max_(redirect_max), redirect_count_(0) { HttpRequest *client_req = this->get_req(); client_req->set_method(HttpMethodGet); client_req->set_http_version("HTTP/1.1"); }
public:
private: };
|
WFComplexClientTask<HttpRequest, HttpResponse>
而ComplexHttpTask又继承自WFComplexClientTask<HttpRequest, HttpResponse>,这是一个模板特化类
1 2 3 4 5 6 7 8 9 10 11 12 13 14
|
template <class REQ, class RESP, typename CTX = bool> class WFComplexClientTask : public WFClientTask<REQ, RESP> {
protected: void dispatch() override;
SubTask *done() override; }
|
WFComplexClientTask类的具体实现细节先放后面,总之只需要知道这个类实现了应用层client该有的功能就行了,其他细节以后在说。
在workflow中,所有的task任务类都直接或者间接继承自SubTask和ParallelTask,同时必须实现两个纯虚函数:dispatch和done,不同的task继承实现不同的逻辑。dispatch是任务
而http client中的具体逻辑就实现在WFComplexClientTask这一层。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| template <class REQ, class RESP, typename CTX> void WFComplexClientTask<REQ, RESP, CTX>::dispatch() { switch (this->state) { case WFT_STATE_UNDEFINED: if (this->check_request()) { if (this->route_result_.request_object) { case WFT_STATE_SUCCESS: this->set_request_object(this->route_result_.request_object); this->WFClientTask<REQ, RESP>::dispatch(); return; } router_task_ = this->route(); series_of(this)->push_front(this); series_of(this)->push_front(router_task_); } default: break; } this->subtask_done(); }
|
这个函数的大致逻辑就是:
第一次执行dispatch时,由于不知道目标的ip地址,我们首先需要做DNS解析,这样才能得到请求对象的ip地址。此时新创建一个路由任务进行DNS解析,并且将其放在原任务的前面。
1 2 3 4
| router_task_ = this->route(); series_of(this)->push_front(this); series_of(this)->push_front(router_task_);
|
这样在获取到目标的ip地址后(route_task_执行完毕),会接着执行下一个任务,也就是再重新执行一次原任务,但这一次有了ip地址。
CommRequest::dispatch
还有一点,WFComplexClientTask<HttpRequest, HttpResponse>::dispatch()最终会调用this->WFClientTask<REQ, RESP>::dispatch();来执行任务的真正逻辑(发送请求)。
而WFClientTask<REQ, RESP>其实并没有实现dispatch(),但它有继承于CommRequest的dispatch函数。
先来看一下继承关系:
1 2 3 4 5 6 7 8 9 10 11 12 13
| 父 CommRequest // dispatch() override ^ | 继承 WFNetworkTask<REQ, RESP> ^ | 继承 WFClientTask<REQ, RESP> ^ | 继承 WFComplexClientTask<HttpRequest, HttpResponse>(模板特化) // dispatch() override ^ | 继承 子 ComplexHttpTask
|
再看一下具体实现:
1 2 3 4 5 6 7 8 9 10 11 12
| class CommRequest : public SubTask, public CommSession { public: void dispatch() override { if (this->scheduler->request(this, this->object, this->wait_timeout, &this->target) < 0) { this->handle(CS_STATE_ERROR, errno); } } };
|
这里需要关注两点:
首先 CommRequest 继承自 SubTask 和 CommSession
说明 CommRequest 即是一个(请求)任务,又满足 CommSession 的特性。
CommSession是一次 req->resp(请求到响应) 的交互,主要要实现message_in(), message_out()等几个虚函数,让核心知道怎么收发消息。同时CommSession也是协议无关的,具体看后面代码。
这里的 scheduler 是 CommScheduler
之前我们在epoll章节中讲过,CommScheduler是全局唯一的单例,在Scheduler 单例第一次实例化的时候,执行了 CommScheduler init,然后Communicator init, 产生 poller 线程和线程池,并启动了 poller 线程。
附上CommSession的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| class CommSession { private:
virtual CommMessageOut *message_out() = 0; virtual CommMessageIn *message_in() = 0;
virtual int send_timeout() { return -1; } virtual int receive_timeout() { return -1; } virtual int keep_alive_timeout() { return 0; } virtual int first_timeout() { return 0; }
virtual void handle(int state, int error) = 0;
protected: [[nodiscard]] CommTarget *get_target() const { return this->target; } [[nodiscard]] CommConnection *get_connect() const { return this->conn; } [[nodiscard]] CommMessageIn *get_message_in() const { return this->msg_in; } [[nodiscard]] CommMessageOut *get_message_out() const { return msg_out; } [[nodiscard]] long long get_seq() const { return this->seq; }
private: CommTarget *target; CommConnection *conn; CommMessageOut *msg_out; CommMessageIn *msg_in; long long seq;
struct timespec begin_time; int timeout; int passive;
public: CommSession() { this->passive = 0; } virtual ~CommSession() = 0; friend class CommMessageIn; friend class Communicator; };
|
CommSchedObject/CommTarget
回到CommRequest::dispatch()中,我们可以看到发送请求时执行的是:
1
| this->scheduler->request(this, this->object, this->wait_timeout, &this->target);
|
此处的request函数在CommScheduler中:
1 2 3 4 5 6 7 8 9 10
| int CommScheduler::request(CommSession *session, CommSchedObject *object, const int wait_timeout, CommTarget **target) { int ret = -1; *target = object->acquire(wait_timeout); if (*target) { ret = this->comm.request(session, *target); if (ret < 0) { (*target)->release(); } } return ret; }
|
注意object->acquire(wait_timeout);函数。
这个函数是用来获取连接的。可能是复用已有的空闲连接,也可能是新创建的连接。
具体后面再说
那么,request函数传入的CommSchedObject和CommTarget是什么呢?
在WFComplexClientTask的构造函数中可以看到,变量CommScheduler::object在初始化时传入的是NULL(通过子类构造函数一层一层向上传递),显然NULL值不可能直接拿来用,那是在哪里传入的非空值呢?
1 2 3 4 5 6
| WFComplexClientTask(const int retry_max, task_callback_t &&cb) : WFClientTask<REQ, RESP>(NULL, WFGlobal::get_scheduler(), std::move(cb)) { }
|
只有一个地方:
1
| void set_request_object(CommSchedObject *_object) { this->object = _object; }
|
没错,就是这个小小的set函数,它在哪里被调用呢?在WFComplexClientTask::dispatch中,有一句:
1
| this->set_request_object(route_result_.request_object)
|
也只有这一个地方调用了该set函数。代码在上面已经给出了,不重复贴了。
而此处传入的route_result_.request_object其实就是之前说的DNS解析生成的数据。它是CommSchedObject*类型。关于DNS解析,此处先略过。
CommSchedObject
所以CommSchedObject到底是一个什么呢?可以暂时这样理解:
这个类的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| class CommSchedObject { public: [[nodiscard]] size_t get_max_load() const { return this->max_load; } [[nodiscard]] size_t get_cur_load() const { return this->cur_load; }
private: virtual CommTarget *acquire(int wait_timeout) = 0;
protected: size_t max_load; size_t cur_load;
public: virtual ~CommSchedObject() = default; friend class CommScheduler; };
|
也就是说:这个类其实是为了负载均衡而设计的。因为第一次请求的时候,我们不知道目标是一个还是多个,所以上面set_request_object(route_result_.request_object)中的request_object可能是两种:
CommSchedTarget: 当 DNS 解析结果只有一个 IP 地址时, 它直接代表一个具体的服务器目标.CommSchedGroup: 当 DNS 解析结果有多个 IP 地址(即多个目标)时, 它是一个负载均衡组, 内部根据策略(在workflow中是连接池活跃连接数)选择一个目标进行连接.
注意,这里的负载指的是:
- 每个
CommSchedTarget 代表一个目标服务器 IP(例如 192.168.1.10)。 - 每个
CommSchedTarget 内部维护一个连接池(例如:最多 100 个连接到 192.168.1.10)。 CommSchedGroup 内部保存了同一个域名解析出的多个 IP 对应的多个 CommSchedTarget 。通过比较每个目标 IP 对应的 CommSchedTarget 的连接池使用率(活跃连接数)来决定"哪个目标更空闲"。
1 2 3 4
| class CommSchedTarget : public CommSchedObject, public CommTarget { }
|
1 2 3 4
| class CommSchedGroup : public CommSchedObject { }
|
CommSchedTarget和CommSchedGroup都继承了CommSchedObject.
而CommSchedTarget又继承了CommTarget。CommTarget是通讯目标(IP + PORT).
此处需要区分:
前面我们说了,CommSchedObject: 路由结果的核心, 表示一个可被调度的连接对象。此处的可调度实际上指的是连接池的调度。
而CommTarget则表示通信的对端对象。
CommTarget
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| class CommTarget { public: int init(const sockaddr *addr, socklen_t addrlen, int connect_timeout, int response_timeout);
void deinit();
void get_addr(const sockaddr **addr, socklen_t *addrlen) const { *addr = this->addr; *addrlen = this->addrlen; }
[[nodiscard]] bool has_idle_conn() const { return !list_is_empty(&this->idle_list); }
private: virtual int create_connect_fd() { return socket(this->addr->sa_family, SOCK_STREAM, 0); }
virtual CommConnection *new_connection(int connect_fd) { return new CommConnection; }
public: virtual void release() {}
private: sockaddr *addr; socklen_t addrlen; int connect_timeout; int response_timeout; int ssl_connect_timeout; SSL_CTX *ssl_ctx;
list_head idle_list; pthread_mutex_t mutex; friend class CommServiceTarget; friend class Communicator; };
|
从源码中可以看出,CommTarget中不仅存储了对端的sockaddr(IP+PORT),还有两个超时参数,以及空闲连接池idle_list。
CommTarget是什么时候创建的呢?
前面说过,scheduler->request() 中,有一个函数调用是这么写的:
1
| *target = object->acquire(wait_timeout);
|
这个object->acquire(wait_timeout)返回的就是一个CommTarget。当然根据多态,真正返回的是它的子类。这个前面已经讲过了。
总结
这一节中,有许多的细节,但是我们抛开细节看流程
我们用户调用的是create_http_task
http task实际上是 new 了一个 ComplexHttpTask
ComplexHttpTask继承自WFComplexClientTask<HttpRequest, HttpResponse>
这里把client加入http的特化信息
WFComplexClientTask的核心在于他实现的dispatch,但是他的dispatch首先进来是插入一个dns解析的task
dns解析设置了route_result_.request_object
再次到WFComplexClientTask,执行dispatch其实是CommRequest::dispatch()
在CommRequest::dispatch()中,我们从route_result_.request_object获取到通信的目标,然后comm.request(session, *target);发送出请求。