并发编程(c++)——2.线程池模式
2026/6/12 10:58:52 网站建设 项目流程

线程池

线程池是并发编程核心的模式之一,覆盖了大多数需要并发的场景,这篇文章记录了对线程池的理解,使用简单的语言对线程池的概念进行阐述,主要包括线程池的使用场景,面临的问题,线程池的核心思想和如何使用c++代码实现,最后用一个io密集的案例进行使用示范。

目录

  • 线程池
    • 目录
    • 概念
    • 场景
    • 存在问题
    • 解决方案
    • 线程池实现
      • 线程池类
      • 生成线程池
      • 提交任务
      • 销毁线程池
    • 案例
      • 生成数据
      • 数据处理任务
      • 线程池计调用
      • 结果

概念

目的:在保证系统的稳定性下,尽量提高系统的性能。
思想:预先创建若干线程,当有任务时,直接从池中取出线程执行任务,任务完成后线程归还给池。
方法:线程池通过限制线程数量 + 复用线程 + 任务队列缓冲 + future 异步回执。

场景

存在大量并发任务的场景
1.io密集型任务,如大量读写文件网络和数据库请求
2.cpu密集任务,如科学计算、大数据处理

存在问题

1.系统对线程数量存在上限,无限制创建可能导致系统不稳定甚至崩溃
2.线程创建过多会消耗大量内存资源,存在 OOM 风险
3.线程分散创建,生命周期、异常与资源回收难以统一管理
4.频繁创建与销毁线程,系统开销大
5.线程数量过多,上下文切换成本高,降低整体吞吐

解决方案

池化思想,预先创建若干线程,当有任务时,直接从池中取出线程执行任务,任务完成后线程归还给池。
池统一管理线程,创建和销毁,线程复用,线程数量可控,上下文切换开销小。
1.线程池限制了线程数量,防止系统崩溃
2.线程池限制了线程数量,防止内存溢出
3.线程池统一创建销毁,管理简单
4.线程复用,减少创建/销毁成本
5.控制线程数量,降低上下文切换

线程池实现

核心流程:
1.生成线程池,创建若干工作线程,使其进入阻塞等待任务状态
2.提交任务,将任务封装后放入任务队列,通过条件变量唤醒线程执行,并返回任务回执(future)
3.等待结果,调用方通过 future 等待任务执行完成并获取结果

实现要素:
1.线程队列,存放工作线程,线程死循环,默认阻塞状态,无任务时阻塞,有任务时唤醒
2.任务队列,存放任务
3.条件变量,通信任务和线程(实现线程与任务间的同步),存放任务时唤醒线程
4.future/promise, 接收线程执行结果(异步获取执行结果),future等待线程执行结果

线程池类

classThreadPool{private:// 线程队列std::vector<std::thread>workers;// 任务队列std::queue<std::function<void()>>tasks;// 资源锁,保护任务队列std::mutex mtx;// 条件变量std::condition_variable cv;// 管理线程池是否停止boolstop;public:// 构造函数,生成线程池ThreadPool(size_t threads);// 析构函数,销毁线程池~ThreadPool();// 提交任务template<classF>autoenqueue(F&&f)->std::future<decltype(f())>;};

生成线程池

输入线程数量size_t threads
管理线程池停止变量stop初始化为false
循环生成threads个线程,线程函数为lambda表达式,lambda表达式内为线程函数逻辑
线程函数内,定义任务task,用于接收任务队列中的任务
线程函数内,进行死循环
线程函数内,任务队列tasks为空时,线程阻塞,等待条件变量cv,任务队列tasks非空时,线程唤醒,从任务队列tasks中取出一个任务,执行任务
线程函数内,任务队列tasks为空且stop为true时,线程退出循环,线程结束

ThreadPool::ThreadPool(size_t threads):stop(false){for(size_t i=0;i<threads;++i){workers.emplace_back([this]{// 任务std::function<void()>task;while(true){{std::unique_lock<std::mutex>lock(mtx);// 当任务队列为空时,等待cv.wait(lock,[this]{returnstop||!tasks.empty();});// 如果线程池已经停止,则退出循环if(stop&&tasks.empty()){return;}// 从任务队列中取出一个任务task=std::move(tasks.front());tasks.pop();}// 执行任务task();}});}}

提交任务

输入任务f
使用完美转发,将任务f封装为std::packaged_task
使用future获取任务f回执
将封装的任务f放入任务队列tasks中
通知条件变量cv,唤醒线程执行任务
返回任务回执

template<classF>autoThreadPool::enqueue(F&&f)->std::future<decltype(f())>{// 函数类型usingreturn_type=decltype(f());autotask=std::make_shared<std::packaged_task<return_type()>>(std::forward<F>(f));// 获取任务的结果std::future<return_type>res=task->get_future();{std::unique_lock<std::mutex>lock(mtx);if(stop){throwstd::runtime_error("enqueue on stopped ThreadPool");}// 将任务放入队列中tasks.emplace([task](){(*task)();});}cv.notify_one();returnres;}

销毁线程池

销毁时触发
将stop设置为true
通知所有线程,线程池已经停止
等待所有线程结束

ThreadPool::~ThreadPool(){{std::unique_lock<std::mutex>lock(mtx);stop=true;}cv.notify_all();for(std::thread&worker:workers){worker.join();}}

案例

磁盘io密集型任务,如大量读写文件
存在大量文件,文件中存储了大量数据,需要读取文件中的数据,进行计算处理
每一个任务为读取一个文件,处理文件中的数据
生成100个文件,每个文件中存储100个1000,计算所有文件中数字的和,结果为10000000

生成数据

生成100个文件,每个文件中随机存储100个数据

// 创建txt文件voidcreate_file(){std::vector<std::string>files;std::string filename="data/file";// 生成100个文件for(inti=1;i<=100;++i){files.push_back(filename+std::to_string(i)+".txt");}for(constauto&file:files){std::ofstreamf(file);//写入100行随机4位数for(inti=0;i<100;++i){f<<rand()%10000<<std::endl;}f.close();}}

数据处理任务

计算文件中所有数字的和

// 计算文件数据和intcount_files(conststd::string&filename){std::ifstreamfile(filename);if(!file){throwstd::runtime_error("cannot open "+filename);}std::string line;// 计算每行数字和size_t sum=0;while(std::getline(file,line)){// 将字符串转换为数字size_t num=std::stoul(line);sum+=num;}returnsum;}

线程池计调用

1.生成文件
2.根据cpu核心数,生成线程池
3.遍历文件列表,每个任务读取一个文件,提交到线程池,获取回执
4.遍历回执,获取每个文件的数据和
5.求和

intmain(){// 创建文件create_file();// 初始化线程池size_t cpu_cores=std::thread::hardware_concurrency();intthread_counts=cpu_cores*2;std::cout<<"cpu cores: "<<cpu_cores<<std::endl;ThreadPoolpool(thread_counts);// 文件列表std::vector<std::string>files;std::string filename="data/file";for(inti=1;i<=100;++i){files.push_back(filename+std::to_string(i)+".txt");}// 提交任务到线程池std::vector<std::future<int>>futures;for(constauto&file:files){futures.emplace_back(pool.enqueue([file]{returncount_files(file);}));}// 获取结果std::vector<int>results;for(auto&f:futures){try{results.push_back(f.get());}catch(conststd::exception&e){std::cerr<<e.what()<<std::endl;}}// 求和intdata_sum=0;for(constauto&result:results){std::cout<<filename<<": \n";std::cout<<"sum: "<<result<<std::endl;data_sum+=result;}std::cout<<"data sum: "<<data_sum<<std::endl;return0;}

结果

sum: 100000 data/file: sum: 100000 data/file: sum: 100000 data/file: sum: 100000 data/file: sum: 100000 data/file: sum: 100000 data/file: sum: 100000 data sum: 10000000

结果和预期一致,为1000×100×100=10000000

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询