从零实现主从Reactor:用C++手撕muduo核心架构
当我在大学第一次接触服务器编程时,被各种网络模型和线程调度搞得晕头转向。直到亲手用C++实现了一个简化版muduo库,那些晦涩的概念才真正活了起来。本文将带你用代码解剖主从Reactor模型,每个模块都配有可直接编译的代码片段,建议边阅读边在IDE中实践。
1. 为什么需要主从Reactor模型?
在传统的阻塞式服务器中,每个连接都需要独占一个线程。当并发量达到10,000时,光是线程切换就会耗尽CPU资源。而Reactor模式通过单线程监听所有连接状态变化,从根本上解决了这个问题。
主从Reactor的典型性能指标:
- 单机可维持10万+长连接
- 每秒处理5万+请求
- 延迟稳定在毫秒级
// 最简单的Reactor示例 while(true) { int ready = epoll_wait(epfd, events, MAX_EVENTS, -1); for(int i=0; i<ready; i++) { if(events[i].data.fd == listen_fd) { // 处理新连接 } else { // 处理数据读写 } } }2. 核心模块拆解
2.1 事件分发中心:Poller
Poller是对epoll的封装,核心是维护一个文件描述符到Channel的映射表:
class Poller { private: int epfd_; std::unordered_map<int, Channel*> channel_map_; public: void UpdateChannel(Channel* ch); void RemoveChannel(Channel* ch); std::vector<Channel*> Poll(int timeout); };关键点:
- 使用
EPOLLET边缘触发模式 - 通过
channel_map_快速查找事件对应的处理对象 - 每个EventLoop独占一个Poller实例
2.2 事件处理器:Channel
每个Channel管理一个文件描述符的IO事件:
class Channel { public: using EventCallback = std::function<void()>; void HandleEvent() { if (revents_ & EPOLLIN) read_callback_(); if (revents_ & EPOLLOUT) write_callback_(); } void EnableReading() { events_ |= EPOLLIN; Update(); } void EnableWriting() { events_ |= EPOLLOUT; Update(); } private: int fd_; uint32_t events_; uint32_t revents_; EventLoop* loop_; EventCallback read_callback_; EventCallback write_callback_; };2.3 线程核心:EventLoop
实现"One Thread One Loop"的关键类:
class EventLoop { public: void Loop() { while (!quit_) { active_channels_.clear(); poller_->Poll(kPollTimeMs, &active_channels_); for (Channel* channel : active_channels_) { channel->HandleEvent(); } ExecutePendingTasks(); } } void RunInLoop(Functor cb) { if (IsInLoopThread()) { cb(); } else { QueueInLoop(std::move(cb)); } } private: std::unique_ptr<Poller> poller_; std::vector<Channel*> active_channels_; std::thread::id thread_id_; std::mutex mutex_; std::vector<Functor> pending_functors_; };3. 主从Reactor的具体实现
3.1 主Reactor:Acceptor
主Reactor只负责接受新连接:
class Acceptor { public: Acceptor(EventLoop* loop, int port) : loop_(loop), accept_socket_(CreateNonblockingSocket()), accept_channel_(loop, accept_socket_.Fd()) { BindListenPort(port); accept_channel_.SetReadCallback( std::bind(&Acceptor::HandleNewConnection, this)); } void Listen() { accept_channel_.EnableReading(); } private: void HandleNewConnection() { InetAddress peer_addr; int connfd = accept_socket_.Accept(&peer_addr); new_connection_callback_(connfd, peer_addr); } EventLoop* loop_; Socket accept_socket_; Channel accept_channel_; NewConnectionCallback new_connection_callback_; };3.2 从Reactor:TcpServer
管理多个从Reactor的线程池:
class TcpServer { public: TcpServer(EventLoop* base_loop, int port) : base_loop_(base_loop), acceptor_(base_loop, port), thread_pool_(new EventLoopThreadPool(base_loop)) { acceptor_.SetNewConnectionCallback( std::bind(&TcpServer::HandleNewConnection, this, _1, _2)); } void Start() { thread_pool_->Start(); acceptor_.Listen(); } private: void HandleNewConnection(int sockfd, const InetAddress& peer_addr) { EventLoop* io_loop = thread_pool_->GetNextLoop(); std::shared_ptr<TcpConnection> conn = std::make_shared<TcpConnection>(io_loop, sockfd); connections_[sockfd] = conn; } EventLoop* base_loop_; Acceptor acceptor_; std::unique_ptr<EventLoopThreadPool> thread_pool_; std::unordered_map<int, std::shared_ptr<TcpConnection>> connections_; };4. 性能优化关键点
4.1 零拷贝技术
使用writev合并小数据包:
void TcpConnection::Send(const std::string& message) { if (state_ == kConnected) { if (loop_->IsInLoopThread()) { SendInLoop(message); } else { loop_->RunInLoop( std::bind(&TcpConnection::SendInLoop, this, message)); } } } void TcpConnection::SendInLoop(const std::string& message) { struct iovec vec; vec.iov_base = const_cast<char*>(message.data()); vec.iov_len = message.size(); ::writev(channel_->Fd(), &vec, 1); }4.2 缓冲区设计
双缓冲技术减少锁竞争:
class Buffer { public: void Append(const char* data, size_t len) { std::lock_guard<std::mutex> lock(mutex_); if (WritableBytes() < len) { MakeSpace(len); } std::copy(data, data + len, BeginWrite()); has_written_ += len; } void Retrieve(size_t len) { std::lock_guard<std::mutex> lock(mutex_); read_index_ += len; if (read_index_ == write_index_) { read_index_ = write_index_ = 0; } } private: std::vector<char> buffer_; size_t read_index_ = 0; size_t write_index_ = 0; mutable std::mutex mutex_; };5. 实战:构建HTTP服务器
基于Reactor实现HTTP协议解析:
class HttpContext { public: bool ParseRequest(Buffer* buf) { while (buf->ReadableBytes() > 0) { if (state_ == kExpectRequestLine) { const char* crlf = buf->FindCRLF(); if (crlf) { if (ParseRequestLine(buf->Peek(), crlf)) { buf->RetrieveUntil(crlf + 2); state_ = kExpectHeaders; continue; } } } // 其他状态处理... } return true; } private: enum HttpRequestParseState { kExpectRequestLine, kExpectHeaders, kExpectBody, kGotAll }; HttpRequestParseState state_; };6. 踩坑经验分享
- 定时器管理:不要在事件回调中直接删除连接,应该放入待执行队列
- 线程安全:所有跨线程操作必须通过
RunInLoop提交到对应线程 - 资源释放:使用
shared_ptr管理连接对象,避免野指针 - 性能陷阱:
EPOLLONESHOT会导致额外系统调用,慎用
测试时建议重点关注:
- 内存泄漏(valgrind检测)
- 线程安全(TSAN工具)
- 性能瓶颈(perf火焰图)
# 性能测试命令示例 $ webbench -c 5000 -t 30 http://127.0.0.1:8080/