线程池
线程池是并发编程核心的模式之一,覆盖了大多数需要并发的场景,这篇文章记录了对线程池的理解,使用简单的语言对线程池的概念进行阐述,主要包括线程池的使用场景,面临的问题,线程池的核心思想和如何使用c++代码实现,最后用一个io密集的案例进行使用示范。
目录
- 线程池
- 目录
- 概念
- 场景
- 存在问题
- 解决方案
- 线程池实现
- 线程池类
- 生成线程池
- 提交任务
- 销毁线程池
- 案例
- 生成数据
- 数据处理任务
- 线程池计调用
- 结果
概念
目的:在保证系统的稳定性下,尽量提高系统的性能。
思想:预先创建若干线程,当有任务时,直接从池中取出线程执行任务,任务完成后线程归还给池。
方法:线程池通过限制线程数量 + 复用线程 + 任务队列缓冲 + future 异步回执。
场景
存在大量并发任务的场景
1.io密集型任务,如大量读写文件网络和数据库请求
2.cpu密集任务,如科学计算、大数据处理
存在问题
1.系统对线程数量存在上限,无限制创建可能导致系统不稳定甚至崩溃
2.线程创建过多会消耗大量内存资源,存在 OOM 风险
3.线程分散创建,生命周期、异常与资源回收难以统一管理
4.频繁创建与销毁线程,系统开销大
5.线程数量过多,上下文切换成本高,降低整体吞吐
解决方案
池化思想,预先创建若干线程,当有任务时,直接从池中取出线程执行任务,任务完成后线程归还给池。
池统一管理线程,创建和销毁,线程复用,线程数量可控,上下文切换开销小。
1.线程池限制了线程数量,防止系统崩溃
2.线程池限制了线程数量,防止内存溢出
3.线程池统一创建销毁,管理简单
4.线程复用,减少创建/销毁成本
5.控制线程数量,降低上下文切换
线程池实现
核心流程:
1.生成线程池,创建若干工作线程,使其进入阻塞等待任务状态
2.提交任务,将任务封装后放入任务队列,通过条件变量唤醒线程执行,并返回任务回执(future)
3.等待结果,调用方通过 future 等待任务执行完成并获取结果
实现要素:
1.线程队列,存放工作线程,线程死循环,默认阻塞状态,无任务时阻塞,有任务时唤醒
2.任务队列,存放任务
3.条件变量,通信任务和线程(实现线程与任务间的同步),存放任务时唤醒线程
4.future/promise, 接收线程执行结果(异步获取执行结果),future等待线程执行结果
线程池类
classThreadPool{private:// 线程队列std::vector<std::thread>workers;// 任务队列std::queue<std::function<void()>>tasks;// 资源锁,保护任务队列std::mutex mtx;// 条件变量std::condition_variable cv;// 管理线程池是否停止boolstop;public:// 构造函数,生成线程池ThreadPool(size_t threads);// 析构函数,销毁线程池~ThreadPool();// 提交任务template<classF>autoenqueue(F&&f)->std::future<decltype(f())>;};生成线程池
输入线程数量size_t threads
管理线程池停止变量stop初始化为false
循环生成threads个线程,线程函数为lambda表达式,lambda表达式内为线程函数逻辑
线程函数内,定义任务task,用于接收任务队列中的任务
线程函数内,进行死循环
线程函数内,任务队列tasks为空时,线程阻塞,等待条件变量cv,任务队列tasks非空时,线程唤醒,从任务队列tasks中取出一个任务,执行任务
线程函数内,任务队列tasks为空且stop为true时,线程退出循环,线程结束
ThreadPool::ThreadPool(size_t threads):stop(false){for(size_t i=0;i<threads;++i){workers.emplace_back([this]{// 任务std::function<void()>task;while(true){{std::unique_lock<std::mutex>lock(mtx);// 当任务队列为空时,等待cv.wait(lock,[this]{returnstop||!tasks.empty();});// 如果线程池已经停止,则退出循环if(stop&&tasks.empty()){return;}// 从任务队列中取出一个任务task=std::move(tasks.front());tasks.pop();}// 执行任务task();}});}}提交任务
输入任务f
使用完美转发,将任务f封装为std::packaged_task
使用future获取任务f回执
将封装的任务f放入任务队列tasks中
通知条件变量cv,唤醒线程执行任务
返回任务回执
template<classF>autoThreadPool::enqueue(F&&f)->std::future<decltype(f())>{// 函数类型usingreturn_type=decltype(f());autotask=std::make_shared<std::packaged_task<return_type()>>(std::forward<F>(f));// 获取任务的结果std::future<return_type>res=task->get_future();{std::unique_lock<std::mutex>lock(mtx);if(stop){throwstd::runtime_error("enqueue on stopped ThreadPool");}// 将任务放入队列中tasks.emplace([task](){(*task)();});}cv.notify_one();returnres;}销毁线程池
销毁时触发
将stop设置为true
通知所有线程,线程池已经停止
等待所有线程结束
ThreadPool::~ThreadPool(){{std::unique_lock<std::mutex>lock(mtx);stop=true;}cv.notify_all();for(std::thread&worker:workers){worker.join();}}案例
磁盘io密集型任务,如大量读写文件
存在大量文件,文件中存储了大量数据,需要读取文件中的数据,进行计算处理
每一个任务为读取一个文件,处理文件中的数据
生成100个文件,每个文件中存储100个1000,计算所有文件中数字的和,结果为10000000
生成数据
生成100个文件,每个文件中随机存储100个数据
// 创建txt文件voidcreate_file(){std::vector<std::string>files;std::string filename="data/file";// 生成100个文件for(inti=1;i<=100;++i){files.push_back(filename+std::to_string(i)+".txt");}for(constauto&file:files){std::ofstreamf(file);//写入100行随机4位数for(inti=0;i<100;++i){f<<rand()%10000<<std::endl;}f.close();}}数据处理任务
计算文件中所有数字的和
// 计算文件数据和intcount_files(conststd::string&filename){std::ifstreamfile(filename);if(!file){throwstd::runtime_error("cannot open "+filename);}std::string line;// 计算每行数字和size_t sum=0;while(std::getline(file,line)){// 将字符串转换为数字size_t num=std::stoul(line);sum+=num;}returnsum;}线程池计调用
1.生成文件
2.根据cpu核心数,生成线程池
3.遍历文件列表,每个任务读取一个文件,提交到线程池,获取回执
4.遍历回执,获取每个文件的数据和
5.求和
intmain(){// 创建文件create_file();// 初始化线程池size_t cpu_cores=std::thread::hardware_concurrency();intthread_counts=cpu_cores*2;std::cout<<"cpu cores: "<<cpu_cores<<std::endl;ThreadPoolpool(thread_counts);// 文件列表std::vector<std::string>files;std::string filename="data/file";for(inti=1;i<=100;++i){files.push_back(filename+std::to_string(i)+".txt");}// 提交任务到线程池std::vector<std::future<int>>futures;for(constauto&file:files){futures.emplace_back(pool.enqueue([file]{returncount_files(file);}));}// 获取结果std::vector<int>results;for(auto&f:futures){try{results.push_back(f.get());}catch(conststd::exception&e){std::cerr<<e.what()<<std::endl;}}// 求和intdata_sum=0;for(constauto&result:results){std::cout<<filename<<": \n";std::cout<<"sum: "<<result<<std::endl;data_sum+=result;}std::cout<<"data sum: "<<data_sum<<std::endl;return0;}结果
sum: 100000 data/file: sum: 100000 data/file: sum: 100000 data/file: sum: 100000 data/file: sum: 100000 data/file: sum: 100000 data/file: sum: 100000 data sum: 10000000结果和预期一致,为1000×100×100=10000000