> 技术文档 > C++ 网络编程(14) asio多线程模型IOThreadPool_boost::asio 线程池

C++ 网络编程(14) asio多线程模型IOThreadPool_boost::asio 线程池


🎯 C++ Boost.Asio 多线程模型快速上手指南:从单线程到多 io_context
📅 更新时间:2025年7月3日
🏷️ 标签:C++ | Boost.Asio | 多线程 | 网络编程 | io_context

文章目录

  • 前言
  • 一、结果模式图
  • 一、IOThreadPool.h实现
    • 1.代码部分
    • 2.代码详解
      • 1.详解一
      • 2.详解二
      • 3.详解三
      • 4.详解四
  • 二、IOThreadPool.cpp实现
    • 1.代码部分
    • 2.代码思路解析
  • 三、全局思路
  • 四、安全隐患
  • 五、利用strand改进
    • 1.原理
    • 2.模式图
    • 3.调用strand
  • 六、主线程
  • 七、测试与总结

前言

前文我们学习了一种asio的多线程模式,根据CPU的核数,创建多了线程与多个上下文io_context,然后每个线程使用一个io_context进行调用
今天我们学习第二种asio多线程的模式,使用同一个上下文io_context 然后在多个线程中被调用


一、结果模式图

C++ 网络编程(14) asio多线程模型IOThreadPool_boost::asio 线程池

一、IOThreadPool.h实现

1.代码部分

#pragma once#include\"Singleton.h\"#includeclass AsioThreadPool:public Singleton<AsioThreadPool>{public:friend class Singleton<AsioThreadPool>;~AsioThreadPool() {};AsioThreadPool& operator=(const AsioThreadPool&) = delete;AsioThreadPool(const AsioThreadPool&) = delete;boost::asio::io_context& GetIOService();//返回io_context;void Stop();private:AsioThreadPool(int threadNum = std::thread::hardware_concurrency());boost::asio::io_context _service;std::unique_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_guard;std::vector<std::thread> _threads;};

2.代码详解

1.详解一

我们还是用单例模式,所以这边直接继承我们的单例模板即可

class AsioThreadPool:public Singleton<AsioThreadPool>

2.详解二

我们既然使用的是单例模式,所以我们把拷贝构造和赋值都给禁用掉

AsioThreadPool& operator=(const AsioThreadPool&) = delete;AsioThreadPool(const AsioThreadPool&) = delete;

3.详解三

boost::asio::io_context& GetIOService();//返回io_context;

这个函数是用来返回我们用的上下文io_context

4.详解四

std::unique_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_guard;

这个work_guard是用来绑定上下文io_context 防止在没有任务的时候,io_context提前退出

二、IOThreadPool.cpp实现

1.代码部分

#include \"AsioThreadPool.h\"#include#include\"Singleton.h\"AsioThreadPool::AsioThreadPool(int threadNum):work_guard(new boost::asio::executor_work_guard<boost::asio::io_context::executor_type>(_service.get_executor())){for (int i = 0; i < threadNum;i++)//插入线程{_threads.emplace_back([this](){_service.run();});}}boost::asio::io_context& AsioThreadPool::GetIOService(){return _service;}void AsioThreadPool::Stop(){work_guard.reset();for (auto& t : _threads){t.join();}}

2.代码思路解析

1.构造函数

构造函数中我们要初始化一个work_guard去绑定上下文,防止其提前释放

work_guard(new boost::asio::executor_work_guard<boost::asio::io_context::executor_type>(_service.get_executor()))

然后我们要创建多个线程,并且在每个线程中都启动上下文

for (int i = 0; i < threadNum;i++)//插入线程{_threads.emplace_back([this](){_service.run();});}

2.Stop()

我们在停止函数首先要将绑定到上下文的work_guard解除
然后对于每个线程我们要等待所有线程走完

三、全局思路

构造函数中实现了一个线程池线程池里每个线程都会运行_service.run函数,_service.run函数内部就是从iocp或者epoll获取就绪描述符和绑定的回调函数,进而调用回调函数因为回调函数是在不同的线程里调用的,所以会存在不同的线程调用同一个socket的回调函数的情况
_service.run 内部在Linux环境下调用的是epoll_wait返回所有就绪的描述符列表,在windows上会循环调用 GetQueuedCompletionStatus函数返回就绪的描述符,二者原理类似,进而通过描述符找到对应的注册的回调函数,然后调用回调函数

比如iocp的流程是这样的
C++ 网络编程(14) asio多线程模型IOThreadPool_boost::asio 线程池
epoll的流程是这样的
C++ 网络编程(14) asio多线程模型IOThreadPool_boost::asio 线程池

四、安全隐患

IOThreadPool模式有一个隐患,同一个socket的就绪后,触发的回调函数可能在不同的线程里,比如第一次是在线程1,第二次是在线程3,如果这两次触发间隔时间不大,那么很可能出现不同线程并发访问数据的情况,比如在处理读事件时,第一次回调触发后我们从socket的接收缓冲区读数据出来,第二次回调触发,还是从socket的接收缓冲区读数据,就会造成两个线程同时从socket中读数据的情况,会造成数据混乱

五、利用strand改进

1.原理

对于多线程触发回调函数的情况,我们可以利用asio提供的串行类 strand封装一下,这样就可以被串行调用了,其基本原理就是在线程各自调用函数时取消了直接调用的方式,而是利用一个strand类型的对象将要调用的函数投递到strand管理的队列中,再由一个统一的线程调用回调函数,调用是串行的,解决了线程并发带来的安全问题

2.模式图

C++ 网络编程(14) asio多线程模型IOThreadPool_boost::asio 线程池
图中当socket就绪后并不是由多个线程调用每个socket注册的回调函数,而是将回调函数投递给strand管理的队列,再由strand统一调度派发。

为了让回调函数被派发到strand的队列,我们只需要在注册回调函数时加一层strand的包装即可’’

3.调用strand

我们直接在触发回调函数前绑定strand即可

CSession类中添加一个成员变量

strand<io_context::executor_type> _strand;

CSession的构造函数

CSession::CSession(boost::asio::io_context& io_context, CServer* server): _socket(io_context), _server(server), _b_close(false), _b_head_parse(false), _strand(io_context.get_executor()){ boost::uuids::uuid a_uuid = boost::uuids::random_generator()(); _uuid = boost::uuids::to_string(a_uuid); _recv_head_node = make_shared<MsgNode>(HEAD_TOTAL_LEN);}

可以看到_strand的初始化是放在初始化列表里,利用io_context.get_executor()返回的执行器构造strand

因为在asio中无论iocontext还是strand底层都是通过executor调度的,我们将他理解为调度器就可以了,如果多个iocontextstrand的调度器是一个,那他们的消息派发统一由这个调度器执行

我们利用iocontext的调度器构造strand,这样他们统一由一个调度器管理。在绑定回调函数的调度器时,我们选择strand绑定即可

比如我们在Start函数里添加绑定 ,将回调函数的调用者绑定为_strand

void CSession::Start(){ ::memset(_data, 0, MAX_LENGTH); _socket.async_read_some(boost::asio::buffer(_data, MAX_LENGTH), boost::asio::bind_executor(_strand, std::bind(&CSession::HandleRead, this, std::placeholders::_1, std::placeholders::_2, SharedSelf())));}

boost::asio::bind_executor(_strand, handler) 的作用是:把 handler(回调)绑定到 _strand 上。
_strand 是一个 “串行化执行器”,保证所有绑定到它的 handler 都会被顺序、一个接一个地执行,即使在多线程环境下也不会并发执行
这样可以避免多线程下的并发访问问题(比如成员变量的竞态条件),保证同一个 session 的回调不会并发执行

六、主线程

#include #include#include\"CSession.h\"#include\"CServer.h\"#include\"AsioIOServicePool.h\"#include\"AsioThreadPool.h\"using namespace std;bool bstop = false;std::condition_variable cond_quit;std::mutex mutex_quit;int main(){ try { auto pool = AsioThreadPool::GetInstance(); boost::asio::io_context ioc; boost::asio::signal_set signals(ioc, SIGINT, SIGTERM); signals.async_wait([&ioc](auto, auto) { ioc.stop(); pool->Stop(); std::unique_lock<std::mutex> lock(mutex_quit); bstop = true; cond_quit.notify_one(); }); CServer s(pool->GetIOService(), 10086); { std::unique_lock<std::mutex> lock(mutex_quit); while (!bstop) { cond_quit.wait(lock); } } } catch (std::exception& e) { std::cerr << \"Exception: \" << e.what(); }}

我们这里设置了条件变量 _bstop ,然后用搭配cond_quit 当触发关闭信号的时候,再修改条件变量 _bstop
使得主线程运行下去,不然会卡住

因为如果没有条件变量,主线程在执行完 CServer s(pool->GetIOService(), 10086); 之后,会直接往下走,如果没有其他阻塞代码,主线程就会退出,整个进程也会结束(即使后台有线程池的线程在跑)
这会导致服务刚启动就立刻退出,无法正常提供服务

七、测试与总结

我们在客户端创建100个线程,每个线程进行500收发信息,我们来测试现在服务器的多线程效果如何
C++ 网络编程(14) asio多线程模型IOThreadPool_boost::asio 线程池

实际的生产和开发中,我们尽可能利用C++特性,使用多核的优势,将io_context分布在不同的线程中效率更可取一点,但也要防止线程过多导致cpu切换带来的时间片开销,所以尽量让开辟的线程数小于或等于cpu的核数,从而利用多核优势