之前实现过一个webserver,其中的线程池实现方式让我感觉非常优雅,就学了一手,后来ai说也就小项目用用,实际工程不是这样。所以记录一下线程池的不同实现方式,顺便加深记忆。
第一种
要点:
- 拷贝构造和拷贝赋值需要delete,因为线程不能被复制。移动虽然有时是可以的,但是安全起见,最好都给delete掉。
- 这种方式直接把子线程detach,不需要在主线程保存所有子线程,但是主线程结束,子线程拿不到主线程的数据,就会出问题。所以这种情况,主线程需要sleep或者放while(true)里。
// ThreadPool.hclassThreadPool{public:explicitThreadPool(intthreadNum=8);~ThreadPool();ThreadPool(constThreadPool&o)=delete;ThreadPool(ThreadPool&&o)=delete;ThreadPool&operator=(constThreadPool&o)=delete;ThreadPool&operator=(ThreadPool&&o)=delete;template<typenameF>voidAddTask(F&&task);private:boolIsClosed();private:boolisClosed_=false;std::queue<std::function<void()>>tasks_;std::mutex mtx_;std::condition_variable cond_;};template<typenameF>voidThreadPool::AddTask(F&&task){if(IsClosed())return;std::lock_guard<std::mutex>locker(mtx_);tasks_.emplace(std::forward<F>(task));cond_.notify_one();}// ThreadPool.cppThreadPool::ThreadPool(intthreadNum){assert(thread_num>0);for(inti=0;i<threadNum;++i){std::thread([this](){std::unique_lock<std::mutex>locker(mtx_);while(true){if(!tasks_.empty()){autotask=std::move(tasks_.front());// 使用move避免拷贝构造tasks_.pop();locker.unlock();task();locker.lock();}elseif(isClosed_){// 线程池关闭则结束子线程break;}else{// 任务队列为空则阻塞cond_.wait(locker);}}}).detach();}}ThreadPool::~ThreadPool(){{std::lock_guard<std::mutex>locker(mtx_);isClosed_=true;}cond_.notify_all();}boolThreadPool::IsClosed(){std::lock_guard<std::mutex>locker(mtx_);returnisClosed_;}第二种
要点:
- 使用packaged_task包裹函数,可以通过get_future得到一个future对象,该对象调用get来获取被包裹函数的返回值,如果还有返回则阻塞。
- 因为task是一个局部变量,而需要在子线程中调用,所以需要用shared_ptr管理其生命周期。
- 这个实现用vector去管理工作线程,所以在析构函数中需要join所有的线程。
// ThreadPool.hclassThreadPool{public:explicitThreadPool(intthreadNum=8);~ThreadPool();ThreadPool(constThreadPool&o)=delete;ThreadPool(ThreadPool&&o)=delete;ThreadPool&operator=(constThreadPool&o)=delete;ThreadPool&operator=(ThreadPool&&o)=delete;template<typenameF,typename...Args>autoAddTask(F&&task,Args&&...args)->std::future<decltype(task(args...))>;private:voidWorkerLoop();private:boolisClosed_=false;std::queue<std::function<void()>>tasks_;std::vector<std::thread>workers_;std::mutex mtx_;std::condition_variable cond_;};template<typenameF,typename...Args>autoThreadPool::AddTask(F&&task,Args&&...args)->std::future<decltype(task(args...))>{usingReturnType=decltype(task(args...));// 用decltype编译时确定任务函数返回值类型autotask=std::make_shared<std::packaged_task<ReturnType()>>(std::bind(std::forward<F>(task),std::forward<Args>(args)...));autofuture=task->get_future();// 返回类型为std::future<ReturnType>{std::lock_guard<std::mutex>locker(mtx_);if(isClosed_){// 线程池关闭,不能再加任务,返回无效future或者throw异常returnstd::future<ReturnType>();}tasks_.emplace([task]{(*task)();});}cond_.notify_one();// 插入完后记得唤醒一个工作线程returnfuture;}// ThreadPool.cppThreadPool::ThreadPool(intthreadNum){assert(threadNum>0);for(inti=0;i<threadNum;++i){workers_.emplace_back(&ThreadPool::WorkerLoop,this);}}ThreadPool::~ThreadPool(){{std::lock_guard<std::mutex>locker(mtx_);isClosed_=true;}cond_.notify_all();for(auto&&worker:workers_){if(worker.joinable()){worker.join();}}}voidThreadPool::WorkerLoop(){std::unique_lock<std::mutex>locker(mtx_);while(true){// 线程池关闭,任务队列中有任务时,不再阻塞(其他线程唤醒)。cond_.wait(locker,[this]{returnisClosed_||!tasks_.empty();});if(isClosed_&&tasks_.empty())break;// 所有任务处理完后才退出autotask=std::move(tasks_.front());tasks_.pop();locker.unlock();task();locker.lock();}}第三种
- 使用无锁队列实现无锁线程池,避免锁竞争。
其他
线程池应该分配多少线程
今天面试完,被压力懵了,连线程池应该分配多少线程都忘了,直接前言不搭后语,左右脑互搏。
首先需要知道自己电脑的cpu核心数,使用std::thread::hardware_concurrency();获取核心数。接下来就是分情况计算线程数了。
cpu密集型任务
cpu密集型任务,基本不会阻塞,每时每刻都在使用cpu核心。这种情况下,需要避免上下文的切换,所以不要分配过多的线程,分配过多的线程,当超过物理核心数时,操作系统就会让多个线程在同一核心上并发运行,这时会有上下文切换。而且线程运行时会访问该核心的L1/L2缓存,当多个线程同时使用时,会污染缓存,导致未命中去从内存从新读取。
所以cpu密集型,通常分配等于cpu核心数的线程数。
- 核心数
+1的情况, 线程可能出现内存页错误或其他原因阻塞线程,这时多一个线程可以利用这个空闲cpu时间。 - 核心数
-1的情况,留出一个核心用于处理系统的任务和其他服务 - 其实如果是轻量级的cpu任务,任务切换开销小,多线程能更好利用 CPU 。线程数可以为核心数的2倍,
IO密集型任务
IO密集型任务,会频繁的阻塞,基本大部分时间都花在了阻塞上。如果所有的线程都阻塞,当有新的任务可以执行时,此时cpu空闲却没有线程执行。所以IO密集型任务需要多分配点线程
IO密集型,通常分配cpu核心数的2~5倍
更具体的计算:
线程数=核心数×(1+IO阻塞时间cpu执行时间)线程数=核心数\times(1+\frac{IO阻塞时间}{cpu执行时间})线程数=核心数×(1+cpu执行时间IO阻塞时间)
混合型任务
直接进行压测吧。或者动态调整