Skip to content

06.事件循环

说明

原文链接:https://build-your-own.org/redis/06_event_loop_impl

原文作者:James Smith

译者:Cheng

6.1 引言

将第四章的echo服务器改写为事件循环。

python
while running:
    want_read = [...]           # 套接字文件描述符
    want_write = [...]          # 套接字文件描述符
    can_read, can_write = wait_for_readiness(want_read, want_write) # 阻塞!
    for fd in can_read:
        data = read_nb(fd)      # 非阻塞,仅从缓冲区消耗
        handle_data(fd, data)   # 无 IO 的应用逻辑
    for fd in can_write:
        data = pending_data(fd) # 由应用程序生成
        n = write_nb(fd, data)  # 非阻塞,仅追加到缓冲区
        data_written(fd, n)     # n <= len(data),受可用空间限制

应用代码 vs 事件循环代码

一些库可以对事件循环进行抽象:事件循环代码通过回调与应用代码交互,而应用代码通过一个明确定义的API与事件循环交互。我们不是在编写一个库,但在事件循环代码和应用代码之间存在一个隐形的边界。

6.2 每个连接的状态

在使用事件循环时,一个应用程序任务可以跨越多个循环进行迭代,因此状态必须显式地存储在某个地方。以下是我们的每个连接状态:

cpp
struct Conn {
    int fd = -1;
    // 应用程序的意图,供事件循环使用
    bool want_read = false;
    bool want_write = false;
    bool want_close = false;
    // 缓冲的输入和输出
    std::vector<uint8_t> incoming;  // 待应用程序解析的数据
    std::vector<uint8_t> outgoing;  // 应用程序生成的响应
};
  • Conn::want_readConn::want_write分别对应就绪API监听目标的文件描述符列表。
  • Conn::want_close告诉事件循环销毁该连接。
  • Conn::incoming为协议解析器缓冲来自套接字的数据。
  • Conn::outgoing缓冲区存储着待写入套接字的响应数据。

输入缓冲区的必要性

由于读操作现在是异步的,我们在解析协议的时候不能简单地等待n个字节。read_full()函数现在已经无关紧要了。我们将这样做:

在每次循环迭代中,如果套接字准备好读取:

  1. 执行一次非阻塞读
  2. 将新数据添加到Conn::incoming缓冲区。
  3. 尝试解析累积的缓冲区
    • 如果数据不足,则在该次迭代中不执行任何操作。
  4. 处理已解析的消息
  5. Conn::incoming中移除该消息。

为什么输出数据需要缓冲?

由于写操作现在是非阻塞的,我们不能随意向套接字写入数据。只有当套接字准备好写入时,数据才会被写入。一个大的响应可能需要多个循环迭代才能够完成。因此,响应数据必须存储在一个缓冲区(Conn::outgoing)中。

6.3 事件循环代码

poll()返回一个文件描述符列表。我们需要将每个文件描述符映射到Conn对象。

cpp
// 所有客户端连接的映射,键为文件描述符
std::vector<Conn *> fd2conn;

在Unix上,文件描述符被分配为最小的可用非负整数,因此从文件描述符到Conn的映射可以是一个以文件描述符为索引的扁平数组,并且该数组僵尸密集填充的。没有比这更高效的了。有时简单的数组可以替代像哈希表这样的复杂数据结构。

poll()系统调用

就绪API的核心机制是:程序先提交需要监控输入/输出的文件描述符列表,该API随后返回其中可立即执行IO操作的文件描述符列表。共支持两种就绪类型:读就绪与写就绪。

cpp
can_read, can_write = wait_for_readiness(want_read, want_write)

我们将使用poll(),它对输入和输出使用相同的文件描述符列表。

cpp
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

struct pollfd {
    int   fd;
    short events;   // 请求:想要读、写,还是两者都要?
    short revents;  // 返回:可以读吗?可以写吗?
};
  • nfds参数是fsd数组的大小。
  • timeout参数设置为-1,表示没有超时。
  • pollfd::eventsPOLLINPOLLOUTPOLLERR的组合:
    • POLLINPOLLOUT对应于want_readwant_write文件描述符列表。
    • POLLERR表示我们总是希望被通知的套接字错误。
  • pollfd::reventspoll()返回。它使用相同的标志集来指示文件描述符是否在can_readcan_write列表中。

第一步:为poll()构建文件描述符列表

应用代码决定就绪通知的类型。它通过Conn中的want_readwant_write标志与事件循环通信,然后fds参数根据这些标志构建:

cpp
    // 所有客户端连接的映射,键为文件描述符
    std::vector<Conn *> fd2conn;
    // 事件循环
    std::vector<struct pollfd> poll_args;
    while (true) {
        // 准备 poll() 的参数
        poll_args.clear();
        // 将监听套接字放在第一个位置
        struct pollfd pfd = {fd, POLLIN, 0};
        poll_args.push_back(pfd);
        // 其余是连接套接字
        for (Conn *conn : fd2conn) {
            if (!conn) {
                continue;
            }
            struct pollfd pfd = {conn->fd, POLLERR, 0};
            // 根据应用程序的意图设置 poll() 标志
            if (conn->want_read) {
                pfd.events |= POLLIN;
            }
            if (conn->want_write) {
                pfd.events |= POLLOUT;
            }
            poll_args.push_back(pfd);
        }

        // 更多...
    }

第二步:调用poll()

cpp
    // 事件循环
    while (true) {
        // 准备 poll() 的参数
        // ...

        // 等待就绪
        int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
        if (rv < 0 && errno == EINTR) {
            continue;   // 不是一个错误
        }
        if (rv < 0) {
            die("poll");
        }

        // ...
    }

poll()是整个程序中唯一的阻塞性系统调用。通常它在至少一个文件描述符就绪时返回。然而,及时没有任何东西就绪,它也可能偶尔返回并设置errno = EINTR

如果一个进程在阻塞性系统调用期间收到Unix信号,该系统调用会立即返回并设置EINTR,以便让进程有机会处理该信号。对于非阻塞性系统调用,不期望出现EINTR

EINTR不是一个错误,应该重试该系统调用。即使你不使用信号,你也应该处理EINTR,因为可能存在意想不到的信号来源。

第三步:接受新连接

我们将监听套接字放在文件描述符列表的第0个位置。

cpp
    // 事件循环
    while (true) {
        // 准备 poll() 的参数
        poll_args.clear();
        // 将监听套接字放在第一个位置
        struct pollfd pfd = {fd, POLLIN, 0};
        poll_args.push_back(pfd);
        // ...

        // 等待就绪
        int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
        // ...

        // 处理监听套接字
        if (poll_args[0].revents) {
            if (Conn *conn = handle_accept(fd)) {
                // 将其放入映射中
                if (fd2conn.size() <= (size_t)conn->fd) {
                    fd2conn.resize(conn->fd + 1);
                }
                fd2conn[conn->fd] = conn;
            }
        }
        // ...
    }   // 事件循环

在就绪通知中,accept()被视为read(),因此它使用POLLINpoll()返回后,检查第一个文件描述符以确定我们是否可以accept()

handle_accept()为新连接创建Conn对象。我们稍后会编写这部分代码。

第四步:应用程序回调

剩下的文件描述符列表将专门分配给连接套接字。如果他们准备好进行IO操作,就调用应用程序代码。

cpp
    while (true) {
        // ...
        // 等待就绪
        int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
        // ...

        // 处理连接套接字
        for (size_t i = 1; i < poll_args.size(); ++i) { // 注意:跳过第一个
            uint32_t ready = poll_args[i].revents;
            Conn *conn = fd2conn[poll_args[i].fd];
            if (ready & POLLIN) {
                handle_read(conn);  // 应用逻辑
            }
            if (ready & POLLOUT) {
                handle_write(conn); // 应用逻辑
            }
        }
    }

第五步:终止连接

我们总是调用poll()来处理连接到套接字的请求,以便在错误时终止连接。应用程序代码也可以设置Conn::want_close以请求事件循环终止连接。

cpp
        // 处理连接套接字
        for (size_t i = 1; i < poll_args.size(); ++i) {
            uint32_t ready = poll_args[i].revents;
            Conn *conn = fd2conn[poll_args[i].fd];
            // 读和写...

            // 因套接字错误或应用逻辑而关闭套接字
            if ((ready & POLLERR) || conn->want_close) {
                (void)close(conn->fd);
                fd2conn[conn->fd] = NULL;
                delete conn;
            }
        }

你可以添加一个回调handle_err()让应用代码处理错误,但在我们的应用中没有什么可以处理的,所以我们在这里直接关闭套接字。

6.4 使用非阻塞的IO应用代码

非阻塞accept()

在进入事件之前,使用fcntl使监听套接字变为非阻塞。

c
static void fd_set_nb(int fd) {
    fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
}

然后事件循环回调应用代码执行accept()

cpp
static Conn *handle_accept(int fd) {
    // accept
    struct sockaddr_in client_addr = {};
    socklen_t addrlen = sizeof(client_addr);
    int connfd = accept(fd, (struct sockaddr *)&client_addr, &addrlen);
    if (connfd < 0) {
        return NULL;
    }
    // 将新的连接文件描述符设置为非阻塞模式
    fd_set_nb(connfd);
    // 创建一个 `struct Conn`
    Conn *conn = new Conn();
    conn->fd = connfd;
    conn->want_read = true; // 读取第一个请求
    return conn;
}

连接套接字也被设置为非阻塞,等待它的第一次读取。

使用非阻塞式协议解析器

请看下面每个子步骤的注释

cpp
static void handle_read(Conn *conn) {
    // 1. 执行一次非阻塞读。
    uint8_t buf[64 * 1024];
    ssize_t rv = read(conn->fd, buf, sizeof(buf));
    if (rv <= 0) {  // 处理 IO 错误 (rv < 0) 或 EOF (rv == 0)
        conn->want_close = true;
        return;
    }
    // 2. 将新数据添加到 `Conn::incoming` 缓冲区。
    buf_append(conn->incoming, buf, (size_t)rv);
    // 3. 尝试解析累积的缓冲区。
    // 4. 处理已解析的消息。
    // 5. 从 `Conn::incoming` 中移除该消息。
    try_one_request(conn)
    // ...
}

处理过程被拆分到try_one_request()中。如果数据不足,它将不做任何事情,直到未来的循环迭代中有更多数据为止。

cpp
// 如果有足够的数据,则处理 1 个请求
static bool try_one_request(Conn *conn) {
    // 3. 尝试解析累积的缓冲区。
    // 协议:消息头
    if (conn->incoming.size() < 4) {
        return false;   // 需要读
    }
    uint32_t len = 0;
    memcpy(&len, conn->incoming.data(), 4);
    if (len > k_max_msg) {  // 协议错误
        conn->want_close = true;
        return false;   // 需要关闭
    }
    // 协议:消息体
    if (4 + len > conn->incoming.size()) {
        return false;   // 需要读
    }
    const uint8_t *request = &conn->incoming[4];
    // 4. 处理已解析的消息。
    // ...
    // 生成响应(回显)
    buf_append(conn->outgoing, (const uint8_t *)&len, 4);
    buf_append(conn->outgoing, request, len);
    // 5. 从 `Conn::incoming` 中移除该消息。
    buf_consume(conn->incoming, 4 + len);
    return true;        // 成功
}

我们使用std::vector作为缓冲区数组,它是一个动态数组。

cpp
// 追加到尾部
static void
buf_append(std::vector<uint8_t> &buf, const uint8_t *data, size_t len) {
    buf.insert(buf.end(), data, data + len);
}
// 从头部移除
static void buf_consume(std::vector<uint8_t> &buf, size_t n) {
    buf.erase(buf.begin(), buf.begin() + n);
}

非阻塞写入

这里没有应用程序逻辑,只是写入一些数据并将其从缓冲区中移除。write()可能返回较少的字节数,这是允许的,因为事件循环会再次调用它。

cpp
static void handle_write(Conn *conn) {
    assert(conn->outgoing.size() > 0);
    ssize_t rv = write(conn->fd, conn->outgoing.data(), conn->outgoing.size());
    if (rv < 0) {
        conn->want_close = true;    // 错误处理
        return;
    }
    // 从 `outgoing` 中移除已写入的数据
    buf_consume(conn->outgoing, (size_t)rv);
    // ...
}

请求和响应之间的状态流转

在请求-响应协议中,程序要么是在读取请求,要么是在写入响应。在handle_read()handle_write()的末尾,我们需要在这两种状态之间切换。

cpp
static void handle_read(Conn *conn) {
    // ...
    // 更新就绪意图
    if (conn->outgoing.size() > 0) {    // 有一个响应
        conn->want_read = false;
        conn->want_write = true;
    }   // 否则:需要读
}
cpp
static void handle_write(Conn *conn) {
    // ...
    if (conn->outgoing.size() == 0) {   //所有数据都已写入
        conn->want_read = true;
        conn->want_write = false;
    } // 否则:需要写
}

这并非普遍适用的。例如,一些代理和消息传递协议不是请求-响应模式,可以同时进行读写。

6.5 结尾

我们这次实现的协议与第四章的相同。所以你可以重用测试客户端。这个服务器是类似于生产级别产品的最简版本,但它仍然是一个玩具,请进入下一章学习更高级的内容。

源代码:

06_client.cpp
cpp
#include <assert.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/ip.h>
#include <string>
#include <vector>


static void msg(const char *msg) {
    fprintf(stderr, "%s\n", msg);
}

static void die(const char *msg) {
    int err = errno;
    fprintf(stderr, "[%d] %s\n", err, msg);
    abort();
}

static int32_t read_full(int fd, uint8_t *buf, size_t n) {
    while (n > 0) {
        ssize_t rv = read(fd, buf, n);
        if (rv <= 0) {
            return -1;  // error, or unexpected EOF
        }
        assert((size_t)rv <= n);
        n -= (size_t)rv;
        buf += rv;
    }
    return 0;
}

static int32_t write_all(int fd, const uint8_t *buf, size_t n) {
    while (n > 0) {
        ssize_t rv = write(fd, buf, n);
        if (rv <= 0) {
            return -1;  // error
        }
        assert((size_t)rv <= n);
        n -= (size_t)rv;
        buf += rv;
    }
    return 0;
}

// append to the back
static void
buf_append(std::vector<uint8_t> &buf, const uint8_t *data, size_t len) {
    buf.insert(buf.end(), data, data + len);
}

const size_t k_max_msg = 32 << 20;  // likely larger than the kernel buffer

// the `query` function was simply splited into `send_req` and `read_res`.
static int32_t send_req(int fd, const uint8_t *text, size_t len) {
    if (len > k_max_msg) {
        return -1;
    }

    std::vector<uint8_t> wbuf;
    buf_append(wbuf, (const uint8_t *)&len, 4);
    buf_append(wbuf, text, len);
    return write_all(fd, wbuf.data(), wbuf.size());
}

static int32_t read_res(int fd) {
    // 4 bytes header
    std::vector<uint8_t> rbuf;
    rbuf.resize(4);
    errno = 0;
    int32_t err = read_full(fd, &rbuf[0], 4);
    if (err) {
        if (errno == 0) {
            msg("EOF");
        } else {
            msg("read() error");
        }
        return err;
    }

    uint32_t len = 0;
    memcpy(&len, rbuf.data(), 4);  // assume little endian
    if (len > k_max_msg) {
        msg("too long");
        return -1;
    }

    // reply body
    rbuf.resize(4 + len);
    err = read_full(fd, &rbuf[4], len);
    if (err) {
        msg("read() error");
        return err;
    }

    // do something
    printf("len:%u data:%.*s\n", len, len < 100 ? len : 100, &rbuf[4]);
    return 0;
}

int main() {
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    if (fd < 0) {
        die("socket()");
    }

    struct sockaddr_in addr = {};
    addr.sin_family = AF_INET;
    addr.sin_port = ntohs(1234);
    addr.sin_addr.s_addr = ntohl(INADDR_LOOPBACK);  // 127.0.0.1
    int rv = connect(fd, (const struct sockaddr *)&addr, sizeof(addr));
    if (rv) {
        die("connect");
    }

    // multiple pipelined requests
    std::vector<std::string> query_list = {
        "hello1", "hello2", "hello3",
        // a large message requires multiple event loop iterations
        std::string(k_max_msg, 'z'),
        "hello5",
    };
    for (const std::string &s : query_list) {
        int32_t err = send_req(fd, (uint8_t *)s.data(), s.size());
        if (err) {
            goto L_DONE;
        }
    }
    for (size_t i = 0; i < query_list.size(); ++i) {
        int32_t err = read_res(fd);
        if (err) {
            goto L_DONE;
        }
    }

L_DONE:
    close(fd);
    return 0;
}
06_server.cpp
cpp
// stdlib
#include <assert.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
// system
#include <fcntl.h>
#include <poll.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/ip.h>
// C++
#include <vector>


static void msg(const char *msg) {
    fprintf(stderr, "%s\n", msg);
}

static void msg_errno(const char *msg) {
    fprintf(stderr, "[errno:%d] %s\n", errno, msg);
}

static void die(const char *msg) {
    fprintf(stderr, "[%d] %s\n", errno, msg);
    abort();
}

static void fd_set_nb(int fd) {
    errno = 0;
    int flags = fcntl(fd, F_GETFL, 0);
    if (errno) {
        die("fcntl error");
        return;
    }

    flags |= O_NONBLOCK;

    errno = 0;
    (void)fcntl(fd, F_SETFL, flags);
    if (errno) {
        die("fcntl error");
    }
}

const size_t k_max_msg = 32 << 20;  // likely larger than the kernel buffer

struct Conn {
    int fd = -1;
    // application's intention, for the event loop
    bool want_read = false;
    bool want_write = false;
    bool want_close = false;
    // buffered input and output
    std::vector<uint8_t> incoming;  // data to be parsed by the application
    std::vector<uint8_t> outgoing;  // responses generated by the application
};

// append to the back
static void
buf_append(std::vector<uint8_t> &buf, const uint8_t *data, size_t len) {
    buf.insert(buf.end(), data, data + len);
}

// remove from the front
static void buf_consume(std::vector<uint8_t> &buf, size_t n) {
    buf.erase(buf.begin(), buf.begin() + n);
}

// application callback when the listening socket is ready
static Conn *handle_accept(int fd) {
    // accept
    struct sockaddr_in client_addr = {};
    socklen_t addrlen = sizeof(client_addr);
    int connfd = accept(fd, (struct sockaddr *)&client_addr, &addrlen);
    if (connfd < 0) {
        msg_errno("accept() error");
        return NULL;
    }
    uint32_t ip = client_addr.sin_addr.s_addr;
    fprintf(stderr, "new client from %u.%u.%u.%u:%u\n",
        ip & 255, (ip >> 8) & 255, (ip >> 16) & 255, ip >> 24,
        ntohs(client_addr.sin_port)
    );

    // set the new connection fd to nonblocking mode
    fd_set_nb(connfd);

    // create a `struct Conn`
    Conn *conn = new Conn();
    conn->fd = connfd;
    conn->want_read = true;
    return conn;
}

// process 1 request if there is enough data
static bool try_one_request(Conn *conn) {
    // try to parse the protocol: message header
    if (conn->incoming.size() < 4) {
        return false;   // want read
    }
    uint32_t len = 0;
    memcpy(&len, conn->incoming.data(), 4);
    if (len > k_max_msg) {
        msg("too long");
        conn->want_close = true;
        return false;   // want close
    }
    // message body
    if (4 + len > conn->incoming.size()) {
        return false;   // want read
    }
    const uint8_t *request = &conn->incoming[4];

    // got one request, do some application logic
    printf("client says: len:%d data:%.*s\n",
        len, len < 100 ? len : 100, request);

    // generate the response (echo)
    buf_append(conn->outgoing, (const uint8_t *)&len, 4);
    buf_append(conn->outgoing, request, len);

    // application logic done! remove the request message.
    buf_consume(conn->incoming, 4 + len);
    // Q: Why not just empty the buffer? See the explanation of "pipelining".
    return true;        // success
}

// application callback when the socket is writable
static void handle_write(Conn *conn) {
    assert(conn->outgoing.size() > 0);
    ssize_t rv = write(conn->fd, &conn->outgoing[0], conn->outgoing.size());
    if (rv < 0 && errno == EAGAIN) {
        return; // actually not ready
    }
    if (rv < 0) {
        msg_errno("write() error");
        conn->want_close = true;    // error handling
        return;
    }

    // remove written data from `outgoing`
    buf_consume(conn->outgoing, (size_t)rv);

    // update the readiness intention
    if (conn->outgoing.size() == 0) {   // all data written
        conn->want_read = true;
        conn->want_write = false;
    } // else: want write
}

// application callback when the socket is readable
static void handle_read(Conn *conn) {
    // read some data
    uint8_t buf[64 * 1024];
    ssize_t rv = read(conn->fd, buf, sizeof(buf));
    if (rv < 0 && errno == EAGAIN) {
        return; // actually not ready
    }
    // handle IO error
    if (rv < 0) {
        msg_errno("read() error");
        conn->want_close = true;
        return; // want close
    }
    // handle EOF
    if (rv == 0) {
        if (conn->incoming.size() == 0) {
            msg("client closed");
        } else {
            msg("unexpected EOF");
        }
        conn->want_close = true;
        return; // want close
    }
    // got some new data
    buf_append(conn->incoming, buf, (size_t)rv);

    // parse requests and generate responses
    while (try_one_request(conn)) {}
    // Q: Why calling this in a loop? See the explanation of "pipelining".

    // update the readiness intention
    if (conn->outgoing.size() > 0) {    // has a response
        conn->want_read = false;
        conn->want_write = true;
        // The socket is likely ready to write in a request-response protocol,
        // try to write it without waiting for the next iteration.
        return handle_write(conn);
    }   // else: want read
}

int main() {
    // the listening socket
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    if (fd < 0) {
        die("socket()");
    }
    int val = 1;
    setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));

    // bind
    struct sockaddr_in addr = {};
    addr.sin_family = AF_INET;
    addr.sin_port = ntohs(1234);
    addr.sin_addr.s_addr = ntohl(0);    // wildcard address 0.0.0.0
    int rv = bind(fd, (const sockaddr *)&addr, sizeof(addr));
    if (rv) {
        die("bind()");
    }

    // set the listen fd to nonblocking mode
    fd_set_nb(fd);

    // listen
    rv = listen(fd, SOMAXCONN);
    if (rv) {
        die("listen()");
    }

    // a map of all client connections, keyed by fd
    std::vector<Conn *> fd2conn;
    // the event loop
    std::vector<struct pollfd> poll_args;
    while (true) {
        // prepare the arguments of the poll()
        poll_args.clear();
        // put the listening sockets in the first position
        struct pollfd pfd = {fd, POLLIN, 0};
        poll_args.push_back(pfd);
        // the rest are connection sockets
        for (Conn *conn : fd2conn) {
            if (!conn) {
                continue;
            }
            // always poll() for error
            struct pollfd pfd = {conn->fd, POLLERR, 0};
            // poll() flags from the application's intent
            if (conn->want_read) {
                pfd.events |= POLLIN;
            }
            if (conn->want_write) {
                pfd.events |= POLLOUT;
            }
            poll_args.push_back(pfd);
        }

        // wait for readiness
        int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), -1);
        if (rv < 0 && errno == EINTR) {
            continue;   // not an error
        }
        if (rv < 0) {
            die("poll");
        }

        // handle the listening socket
        if (poll_args[0].revents) {
            if (Conn *conn = handle_accept(fd)) {
                // put it into the map
                if (fd2conn.size() <= (size_t)conn->fd) {
                    fd2conn.resize(conn->fd + 1);
                }
                assert(!fd2conn[conn->fd]);
                fd2conn[conn->fd] = conn;
            }
        }

        // handle connection sockets
        for (size_t i = 1; i < poll_args.size(); ++i) { // note: skip the 1st
            uint32_t ready = poll_args[i].revents;
            if (ready == 0) {
                continue;
            }

            Conn *conn = fd2conn[poll_args[i].fd];
            if (ready & POLLIN) {
                assert(conn->want_read);
                handle_read(conn);  // application logic
            }
            if (ready & POLLOUT) {
                assert(conn->want_write);
                handle_write(conn); // application logic
            }

            // close the socket from socket error or application logic
            if ((ready & POLLERR) || conn->want_close) {
                (void)close(conn->fd);
                fd2conn[conn->fd] = NULL;
                delete conn;
            }
        }   // for each connection sockets
    }   // the event loop
    return 0;
}