06.事件循环
6.1 引言
将第四章的echo服务器改写为事件循环。
while running:
want_read = [...] # 套接字文件描述符
want_write = [...] # 套接字文件描述符
can_read, can_write = wait_for_readiness(want_read, want_write) # 阻塞!
for fd in can_read:
data = read_nb(fd) # 非阻塞,仅从缓冲区消耗
handle_data(fd, data) # 无 IO 的应用逻辑
for fd in can_write:
data = pending_data(fd) # 由应用程序生成
n = write_nb(fd, data) # 非阻塞,仅追加到缓冲区
data_written(fd, n) # n <= len(data),受可用空间限制
应用代码 vs 事件循环代码
一些库可以对事件循环进行抽象:事件循环代码通过回调与应用代码交互,而应用代码通过一个明确定义的API与事件循环交互。我们不是在编写一个库,但在事件循环代码和应用代码之间存在一个隐形的边界。
6.2 每个连接的状态
在使用事件循环时,一个应用程序任务可以跨越多个循环进行迭代,因此状态必须显式地存储在某个地方。以下是我们的每个连接状态:
struct Conn {
int fd = -1;
// 应用程序的意图,供事件循环使用
bool want_read = false;
bool want_write = false;
bool want_close = false;
// 缓冲的输入和输出
std::vector<uint8_t> incoming; // 待应用程序解析的数据
std::vector<uint8_t> outgoing; // 应用程序生成的响应
};
Conn::want_read
与Conn::want_write
分别对应就绪API监听目标的文件描述符列表。Conn::want_close
告诉事件循环销毁该连接。Conn::incoming
为协议解析器缓冲来自套接字的数据。Conn::outgoing
缓冲区存储着待写入套接字的响应数据。
输入缓冲区的必要性
由于读操作现在是异步的,我们在解析协议的时候不能简单地等待n个字节。read_full()
函数现在已经无关紧要了。我们将这样做:
在每次循环迭代中,如果套接字准备好读取:
- 执行一次非阻塞读
- 将新数据添加到
Conn::incoming
缓冲区。 - 尝试解析累积的缓冲区
- 如果数据不足,则在该次迭代中不执行任何操作。
- 处理已解析的消息
- 从
Conn::incoming
中移除该消息。
为什么输出数据需要缓冲?
由于写操作现在是非阻塞的,我们不能随意向套接字写入数据。只有当套接字准备好写入时,数据才会被写入。一个大的响应可能需要多个循环迭代才能够完成。因此,响应数据必须存储在一个缓冲区(Conn::outgoing
)中。
6.3 事件循环代码
poll()
返回一个文件描述符列表。我们需要将每个文件描述符映射到Conn
对象。
// 所有客户端连接的映射,键为文件描述符
std::vector<Conn *> fd2conn;
在Unix上,文件描述符被分配为最小的可用非负整数,因此从文件描述符到Conn
的映射可以是一个以文件描述符为索引的扁平数组,并且该数组僵尸密集填充的。没有比这更高效的了。有时简单的数组可以替代像哈希表这样的复杂数据结构。
poll()
系统调用
就绪API的核心机制是:程序先提交需要监控输入/输出的文件描述符列表,该API随后返回其中可立即执行IO操作的文件描述符列表。共支持两种就绪类型:读就绪与写就绪。
can_read, can_write = wait_for_readiness(want_read, want_write)
我们将使用poll()
,它对输入和输出使用相同的文件描述符列表。
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
struct pollfd {
int fd;
short events; // 请求:想要读、写,还是两者都要?
short revents; // 返回:可以读吗?可以写吗?
};
nfds
参数是fsd
数组的大小。timeout
参数设置为-1,表示没有超时。pollfd::events
是POLLIN
、POLLOUT
、POLLERR
的组合:POLLIN
和POLLOUT
对应于want_read
和want_write
文件描述符列表。POLLERR
表示我们总是希望被通知的套接字错误。
pollfd::revents
由poll()
返回。它使用相同的标志集来指示文件描述符是否在can_read
或can_write
列表中。
第一步:为poll()
构建文件描述符列表
应用代码决定就绪通知的类型。它通过Conn
中的want_read
和want_write
标志与事件循环通信,然后fds
参数根据这些标志构建:
// 所有客户端连接的映射,键为文件描述符
std::vector<Conn *> fd2conn;
// 事件循环
std::vector<struct pollfd> poll_args;
while (true) {
// 准备 poll() 的参数
poll_args.clear();
// 将监听套接字放在第一个位置
struct pollfd pfd = {fd, POLLIN, 0};
poll_args.push_back(pfd);
// 其余是连接套接字
for (Conn *conn : fd2conn) {
if (!conn) {
continue;
}
struct pollfd pfd = {conn->fd, POLLERR, 0};
// 根据应用程序的意图设置 poll() 标志
if (conn->want_read) {
pfd.events |= POLLIN;
}
if (conn->want_write) {
pfd.events |= POLLOUT;
}
poll_args.push_back(pfd);
}
// 更多...
}
第二步:调用poll()
// 事件循环
while (true) {
// 准备 poll() 的参数
// ...
// 等待就绪
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
if (rv < 0 && errno == EINTR) {
continue; // 不是一个错误
}
if (rv < 0) {
die("poll");
}
// ...
}
poll()
是整个程序中唯一的阻塞性系统调用。通常它在至少一个文件描述符就绪时返回。然而,及时没有任何东西就绪,它也可能偶尔返回并设置errno = EINTR
。
如果一个进程在阻塞性系统调用期间收到Unix信号,该系统调用会立即返回并设置EINTR
,以便让进程有机会处理该信号。对于非阻塞性系统调用,不期望出现EINTR
。
EINTR
不是一个错误,应该重试该系统调用。即使你不使用信号,你也应该处理EINTR
,因为可能存在意想不到的信号来源。
第三步:接受新连接
我们将监听套接字放在文件描述符列表的第0个位置。
// 事件循环
while (true) {
// 准备 poll() 的参数
poll_args.clear();
// 将监听套接字放在第一个位置
struct pollfd pfd = {fd, POLLIN, 0};
poll_args.push_back(pfd);
// ...
// 等待就绪
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
// ...
// 处理监听套接字
if (poll_args[0].revents) {
if (Conn *conn = handle_accept(fd)) {
// 将其放入映射中
if (fd2conn.size() <= (size_t)conn->fd) {
fd2conn.resize(conn->fd + 1);
}
fd2conn[conn->fd] = conn;
}
}
// ...
} // 事件循环
在就绪通知中,accept()
被视为read()
,因此它使用POLLIN
。poll()
返回后,检查第一个文件描述符以确定我们是否可以accept()
。
handle_accept()
为新连接创建Conn
对象。我们稍后会编写这部分代码。
第四步:应用程序回调
剩下的文件描述符列表将专门分配给连接套接字。如果他们准备好进行IO操作,就调用应用程序代码。
while (true) {
// ...
// 等待就绪
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
// ...
// 处理连接套接字
for (size_t i = 1; i < poll_args.size(); ++i) { // 注意:跳过第一个
uint32_t ready = poll_args[i].revents;
Conn *conn = fd2conn[poll_args[i].fd];
if (ready & POLLIN) {
handle_read(conn); // 应用逻辑
}
if (ready & POLLOUT) {
handle_write(conn); // 应用逻辑
}
}
}
第五步:终止连接
我们总是调用poll()
来处理连接到套接字的请求,以便在错误时终止连接。应用程序代码也可以设置Conn::want_close
以请求事件循环终止连接。
// 处理连接套接字
for (size_t i = 1; i < poll_args.size(); ++i) {
uint32_t ready = poll_args[i].revents;
Conn *conn = fd2conn[poll_args[i].fd];
// 读和写...
// 因套接字错误或应用逻辑而关闭套接字
if ((ready & POLLERR) || conn->want_close) {
(void)close(conn->fd);
fd2conn[conn->fd] = NULL;
delete conn;
}
}
你可以添加一个回调handle_err()
让应用代码处理错误,但在我们的应用中没有什么可以处理的,所以我们在这里直接关闭套接字。
6.4 使用非阻塞的IO应用代码
非阻塞accept()
在进入事件之前,使用fcntl
使监听套接字变为非阻塞。
static void fd_set_nb(int fd) {
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
}
然后事件循环回调应用代码执行accept()
static Conn *handle_accept(int fd) {
// accept
struct sockaddr_in client_addr = {};
socklen_t addrlen = sizeof(client_addr);
int connfd = accept(fd, (struct sockaddr *)&client_addr, &addrlen);
if (connfd < 0) {
return NULL;
}
// 将新的连接文件描述符设置为非阻塞模式
fd_set_nb(connfd);
// 创建一个 `struct Conn`
Conn *conn = new Conn();
conn->fd = connfd;
conn->want_read = true; // 读取第一个请求
return conn;
}
连接套接字也被设置为非阻塞,等待它的第一次读取。
使用非阻塞式协议解析器
请看下面每个子步骤的注释
static void handle_read(Conn *conn) {
// 1. 执行一次非阻塞读。
uint8_t buf[64 * 1024];
ssize_t rv = read(conn->fd, buf, sizeof(buf));
if (rv <= 0) { // 处理 IO 错误 (rv < 0) 或 EOF (rv == 0)
conn->want_close = true;
return;
}
// 2. 将新数据添加到 `Conn::incoming` 缓冲区。
buf_append(conn->incoming, buf, (size_t)rv);
// 3. 尝试解析累积的缓冲区。
// 4. 处理已解析的消息。
// 5. 从 `Conn::incoming` 中移除该消息。
try_one_request(conn)
// ...
}
处理过程被拆分到try_one_request()
中。如果数据不足,它将不做任何事情,直到未来的循环迭代中有更多数据为止。
// 如果有足够的数据,则处理 1 个请求
static bool try_one_request(Conn *conn) {
// 3. 尝试解析累积的缓冲区。
// 协议:消息头
if (conn->incoming.size() < 4) {
return false; // 需要读
}
uint32_t len = 0;
memcpy(&len, conn->incoming.data(), 4);
if (len > k_max_msg) { // 协议错误
conn->want_close = true;
return false; // 需要关闭
}
// 协议:消息体
if (4 + len > conn->incoming.size()) {
return false; // 需要读
}
const uint8_t *request = &conn->incoming[4];
// 4. 处理已解析的消息。
// ...
// 生成响应(回显)
buf_append(conn->outgoing, (const uint8_t *)&len, 4);
buf_append(conn->outgoing, request, len);
// 5. 从 `Conn::incoming` 中移除该消息。
buf_consume(conn->incoming, 4 + len);
return true; // 成功
}
我们使用std::vector
作为缓冲区数组,它是一个动态数组。
// 追加到尾部
static void
buf_append(std::vector<uint8_t> &buf, const uint8_t *data, size_t len) {
buf.insert(buf.end(), data, data + len);
}
// 从头部移除
static void buf_consume(std::vector<uint8_t> &buf, size_t n) {
buf.erase(buf.begin(), buf.begin() + n);
}
非阻塞写入
这里没有应用程序逻辑,只是写入一些数据并将其从缓冲区中移除。write()
可能返回较少的字节数,这是允许的,因为事件循环会再次调用它。
static void handle_write(Conn *conn) {
assert(conn->outgoing.size() > 0);
ssize_t rv = write(conn->fd, conn->outgoing.data(), conn->outgoing.size());
if (rv < 0) {
conn->want_close = true; // 错误处理
return;
}
// 从 `outgoing` 中移除已写入的数据
buf_consume(conn->outgoing, (size_t)rv);
// ...
}
请求和响应之间的状态流转
在请求-响应协议中,程序要么是在读取请求,要么是在写入响应。在handle_read()
和handle_write()
的末尾,我们需要在这两种状态之间切换。
static void handle_read(Conn *conn) {
// ...
// 更新就绪意图
if (conn->outgoing.size() > 0) { // 有一个响应
conn->want_read = false;
conn->want_write = true;
} // 否则:需要读
}
static void handle_write(Conn *conn) {
// ...
if (conn->outgoing.size() == 0) { //所有数据都已写入
conn->want_read = true;
conn->want_write = false;
} // 否则:需要写
}
这并非普遍适用的。例如,一些代理和消息传递协议不是请求-响应模式,可以同时进行读写。
6.5 结尾
我们这次实现的协议与第四章的相同。所以你可以重用测试客户端。这个服务器是类似于生产级别产品的最简版本,但它仍然是一个玩具,请进入下一章学习更高级的内容。
源代码:
06_client.cpp
#include <assert.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/ip.h>
#include <string>
#include <vector>
static void msg(const char *msg) {
fprintf(stderr, "%s\n", msg);
}
static void die(const char *msg) {
int err = errno;
fprintf(stderr, "[%d] %s\n", err, msg);
abort();
}
static int32_t read_full(int fd, uint8_t *buf, size_t n) {
while (n > 0) {
ssize_t rv = read(fd, buf, n);
if (rv <= 0) {
return -1; // error, or unexpected EOF
}
assert((size_t)rv <= n);
n -= (size_t)rv;
buf += rv;
}
return 0;
}
static int32_t write_all(int fd, const uint8_t *buf, size_t n) {
while (n > 0) {
ssize_t rv = write(fd, buf, n);
if (rv <= 0) {
return -1; // error
}
assert((size_t)rv <= n);
n -= (size_t)rv;
buf += rv;
}
return 0;
}
// append to the back
static void
buf_append(std::vector<uint8_t> &buf, const uint8_t *data, size_t len) {
buf.insert(buf.end(), data, data + len);
}
const size_t k_max_msg = 32 << 20; // likely larger than the kernel buffer
// the `query` function was simply splited into `send_req` and `read_res`.
static int32_t send_req(int fd, const uint8_t *text, size_t len) {
if (len > k_max_msg) {
return -1;
}
std::vector<uint8_t> wbuf;
buf_append(wbuf, (const uint8_t *)&len, 4);
buf_append(wbuf, text, len);
return write_all(fd, wbuf.data(), wbuf.size());
}
static int32_t read_res(int fd) {
// 4 bytes header
std::vector<uint8_t> rbuf;
rbuf.resize(4);
errno = 0;
int32_t err = read_full(fd, &rbuf[0], 4);
if (err) {
if (errno == 0) {
msg("EOF");
} else {
msg("read() error");
}
return err;
}
uint32_t len = 0;
memcpy(&len, rbuf.data(), 4); // assume little endian
if (len > k_max_msg) {
msg("too long");
return -1;
}
// reply body
rbuf.resize(4 + len);
err = read_full(fd, &rbuf[4], len);
if (err) {
msg("read() error");
return err;
}
// do something
printf("len:%u data:%.*s\n", len, len < 100 ? len : 100, &rbuf[4]);
return 0;
}
int main() {
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
die("socket()");
}
struct sockaddr_in addr = {};
addr.sin_family = AF_INET;
addr.sin_port = ntohs(1234);
addr.sin_addr.s_addr = ntohl(INADDR_LOOPBACK); // 127.0.0.1
int rv = connect(fd, (const struct sockaddr *)&addr, sizeof(addr));
if (rv) {
die("connect");
}
// multiple pipelined requests
std::vector<std::string> query_list = {
"hello1", "hello2", "hello3",
// a large message requires multiple event loop iterations
std::string(k_max_msg, 'z'),
"hello5",
};
for (const std::string &s : query_list) {
int32_t err = send_req(fd, (uint8_t *)s.data(), s.size());
if (err) {
goto L_DONE;
}
}
for (size_t i = 0; i < query_list.size(); ++i) {
int32_t err = read_res(fd);
if (err) {
goto L_DONE;
}
}
L_DONE:
close(fd);
return 0;
}
06_server.cpp
// stdlib
#include <assert.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
// system
#include <fcntl.h>
#include <poll.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/ip.h>
// C++
#include <vector>
static void msg(const char *msg) {
fprintf(stderr, "%s\n", msg);
}
static void msg_errno(const char *msg) {
fprintf(stderr, "[errno:%d] %s\n", errno, msg);
}
static void die(const char *msg) {
fprintf(stderr, "[%d] %s\n", errno, msg);
abort();
}
static void fd_set_nb(int fd) {
errno = 0;
int flags = fcntl(fd, F_GETFL, 0);
if (errno) {
die("fcntl error");
return;
}
flags |= O_NONBLOCK;
errno = 0;
(void)fcntl(fd, F_SETFL, flags);
if (errno) {
die("fcntl error");
}
}
const size_t k_max_msg = 32 << 20; // likely larger than the kernel buffer
struct Conn {
int fd = -1;
// application's intention, for the event loop
bool want_read = false;
bool want_write = false;
bool want_close = false;
// buffered input and output
std::vector<uint8_t> incoming; // data to be parsed by the application
std::vector<uint8_t> outgoing; // responses generated by the application
};
// append to the back
static void
buf_append(std::vector<uint8_t> &buf, const uint8_t *data, size_t len) {
buf.insert(buf.end(), data, data + len);
}
// remove from the front
static void buf_consume(std::vector<uint8_t> &buf, size_t n) {
buf.erase(buf.begin(), buf.begin() + n);
}
// application callback when the listening socket is ready
static Conn *handle_accept(int fd) {
// accept
struct sockaddr_in client_addr = {};
socklen_t addrlen = sizeof(client_addr);
int connfd = accept(fd, (struct sockaddr *)&client_addr, &addrlen);
if (connfd < 0) {
msg_errno("accept() error");
return NULL;
}
uint32_t ip = client_addr.sin_addr.s_addr;
fprintf(stderr, "new client from %u.%u.%u.%u:%u\n",
ip & 255, (ip >> 8) & 255, (ip >> 16) & 255, ip >> 24,
ntohs(client_addr.sin_port)
);
// set the new connection fd to nonblocking mode
fd_set_nb(connfd);
// create a `struct Conn`
Conn *conn = new Conn();
conn->fd = connfd;
conn->want_read = true;
return conn;
}
// process 1 request if there is enough data
static bool try_one_request(Conn *conn) {
// try to parse the protocol: message header
if (conn->incoming.size() < 4) {
return false; // want read
}
uint32_t len = 0;
memcpy(&len, conn->incoming.data(), 4);
if (len > k_max_msg) {
msg("too long");
conn->want_close = true;
return false; // want close
}
// message body
if (4 + len > conn->incoming.size()) {
return false; // want read
}
const uint8_t *request = &conn->incoming[4];
// got one request, do some application logic
printf("client says: len:%d data:%.*s\n",
len, len < 100 ? len : 100, request);
// generate the response (echo)
buf_append(conn->outgoing, (const uint8_t *)&len, 4);
buf_append(conn->outgoing, request, len);
// application logic done! remove the request message.
buf_consume(conn->incoming, 4 + len);
// Q: Why not just empty the buffer? See the explanation of "pipelining".
return true; // success
}
// application callback when the socket is writable
static void handle_write(Conn *conn) {
assert(conn->outgoing.size() > 0);
ssize_t rv = write(conn->fd, &conn->outgoing[0], conn->outgoing.size());
if (rv < 0 && errno == EAGAIN) {
return; // actually not ready
}
if (rv < 0) {
msg_errno("write() error");
conn->want_close = true; // error handling
return;
}
// remove written data from `outgoing`
buf_consume(conn->outgoing, (size_t)rv);
// update the readiness intention
if (conn->outgoing.size() == 0) { // all data written
conn->want_read = true;
conn->want_write = false;
} // else: want write
}
// application callback when the socket is readable
static void handle_read(Conn *conn) {
// read some data
uint8_t buf[64 * 1024];
ssize_t rv = read(conn->fd, buf, sizeof(buf));
if (rv < 0 && errno == EAGAIN) {
return; // actually not ready
}
// handle IO error
if (rv < 0) {
msg_errno("read() error");
conn->want_close = true;
return; // want close
}
// handle EOF
if (rv == 0) {
if (conn->incoming.size() == 0) {
msg("client closed");
} else {
msg("unexpected EOF");
}
conn->want_close = true;
return; // want close
}
// got some new data
buf_append(conn->incoming, buf, (size_t)rv);
// parse requests and generate responses
while (try_one_request(conn)) {}
// Q: Why calling this in a loop? See the explanation of "pipelining".
// update the readiness intention
if (conn->outgoing.size() > 0) { // has a response
conn->want_read = false;
conn->want_write = true;
// The socket is likely ready to write in a request-response protocol,
// try to write it without waiting for the next iteration.
return handle_write(conn);
} // else: want read
}
int main() {
// the listening socket
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
die("socket()");
}
int val = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
// bind
struct sockaddr_in addr = {};
addr.sin_family = AF_INET;
addr.sin_port = ntohs(1234);
addr.sin_addr.s_addr = ntohl(0); // wildcard address 0.0.0.0
int rv = bind(fd, (const sockaddr *)&addr, sizeof(addr));
if (rv) {
die("bind()");
}
// set the listen fd to nonblocking mode
fd_set_nb(fd);
// listen
rv = listen(fd, SOMAXCONN);
if (rv) {
die("listen()");
}
// a map of all client connections, keyed by fd
std::vector<Conn *> fd2conn;
// the event loop
std::vector<struct pollfd> poll_args;
while (true) {
// prepare the arguments of the poll()
poll_args.clear();
// put the listening sockets in the first position
struct pollfd pfd = {fd, POLLIN, 0};
poll_args.push_back(pfd);
// the rest are connection sockets
for (Conn *conn : fd2conn) {
if (!conn) {
continue;
}
// always poll() for error
struct pollfd pfd = {conn->fd, POLLERR, 0};
// poll() flags from the application's intent
if (conn->want_read) {
pfd.events |= POLLIN;
}
if (conn->want_write) {
pfd.events |= POLLOUT;
}
poll_args.push_back(pfd);
}
// wait for readiness
int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
if (rv < 0 && errno == EINTR) {
continue; // not an error
}
if (rv < 0) {
die("poll");
}
// handle the listening socket
if (poll_args[0].revents) {
if (Conn *conn = handle_accept(fd)) {
// put it into the map
if (fd2conn.size() <= (size_t)conn->fd) {
fd2conn.resize(conn->fd + 1);
}
assert(!fd2conn[conn->fd]);
fd2conn[conn->fd] = conn;
}
}
// handle connection sockets
for (size_t i = 1; i < poll_args.size(); ++i) { // note: skip the 1st
uint32_t ready = poll_args[i].revents;
if (ready == 0) {
continue;
}
Conn *conn = fd2conn[poll_args[i].fd];
if (ready & POLLIN) {
assert(conn->want_read);
handle_read(conn); // application logic
}
if (ready & POLLOUT) {
assert(conn->want_write);
handle_write(conn); // application logic
}
// close the socket from socket error or application logic
if ((ready & POLLERR) || conn->want_close) {
(void)close(conn->fd);
fd2conn[conn->fd] = NULL;
delete conn;
}
} // for each connection sockets
} // the event loop
return 0;
}