#include "stdio.h"
#include "assert.h"
#include "stdlib.h"
#include <algorithm>
#include "ThreadPool.h"
#include "WorkerThread.h"
#include "Job.h"
CThreadPool::CThreadPool():m_shutDown(FALSE)
{
m_maxNum = CThreadPool::THREAD_MAX_NUM;
m_availLow = CThreadPool::THREAD_AVAIL_LOW;
m_initNum = m_availNum = CThreadPool::THREAD_INIT_NUM;
m_availHigh = CThreadPool::THREAD_AVAIL_HIGH;
//初始化线程数目不超过最大线程数目
assert(m_initNum > 0 && m_initNum <= m_maxNum);
//保证初始化的线程数目在最小和最大的空闲线程数目之间则合理
assert(m_initNum >= m_availLow && m_initNum <= m_availHigh);
m_threadList.clear();
m_busyList.clear();
m_idleList.clear();
for (unsigned int i = 0; i < m_initNum; ++i)
{
CWorkerThread* thr = new CWorkerThread();
assert(thr);
thr->SetThreadPool(this);
AppendToIdleList(thr);
thr->Start();
}
}
CThreadPool::CThreadPool(int initNum):m_shutDown(FALSE)
{
m_maxNum = CThreadPool::THREAD_MAX_NUM;
m_availLow = CThreadPool::THREAD_AVAIL_LOW;
m_initNum = m_availNum = initNum;
m_availHigh = CThreadPool::THREAD_AVAIL_HIGH;
//初始化线程数目不超过最大线程数目
assert(initNum > 0 && (unsigned int)initNum <= m_maxNum);
//保证初始化的线程数目在最小和最大的空闲线程数目之间则合理
assert((unsigned int)initNum >= m_availLow && (unsigned int)initNum <= m_availHigh);
m_threadList.clear();
m_busyList.clear();
m_idleList.clear();
for (unsigned int i = 0; i < (unsigned int)initNum; ++i)
{
CWorkerThread* thr = new CWorkerThread();
assert(thr);
thr->SetThreadPool(this);
AppendToIdleList(thr);
thr->Start();
}
}
CThreadPool::~CThreadPool()
{
m_threadList.clear();
m_busyList.clear();
m_idleList.clear();
}
void CThreadPool::TerminateAll()
{
m_shutDown = TRUE;
for (unsigned int i = 0; i < m_threadList.size(); ++i)
{
CWorkerThread* thr = m_threadList[i];
if (!thr) continue;
thr->SetThreadState(THREAD_DEL);
thr->Terminate();
delete thr; thr = NULL;
}
return;
}
CWorkerThread* CThreadPool::GetIdleThread(void)
{
if (m_shutDown) return NULL;
while (m_idleList.size() == 0)
{
printf("no idle thread, wait pls...\n");
m_idleCond.Wait();
}
m_idleMutex.Lock();
if (m_idleList.size() > 0)
{
CWorkerThread* thr = (CWorkerThread*)m_idleList.front();
if (!thr)
{
m_idleMutex.Unlock();
return NULL;
}
//printf("get idle thread [%d]\n", thr->GetThreadID());
m_idleMutex.Unlock();
if (thr->GetThreadState() == THREAD_IDLE)
return thr;
}
m_idleMutex.Unlock();
return NULL;
}
void CThreadPool::AppendToIdleList(CWorkerThread* jobThread)
{
if (m_shutDown) return;
if (!jobThread) return;
m_idleMutex.Lock();
jobThread->SetThreadState(THREAD_IDLE);
m_idleList.push_back(jobThread);
m_idleMutex.Unlock();
m_threadList.push_back(jobThread);
}
void CThreadPool::MoveToBusyList(CWorkerThread* idleThread)
{
if (m_shutDown) return;
if (!idleThread) return;
m_busyMutex.Lock();
idleThread->SetThreadState(THREAD_BUSY);
m_busyList.push_back(idleThread);
m_availNum --;
m_busyMutex.Unlock();
m_idleMutex.Lock();
vector<CWorkerThread*>::iterator iter;
iter = find(m_idleList.begin(), m_idleList.end(), idleThread);
if (iter != m_idleList.end())
m_idleList.erase(iter);
m_idleMutex.Unlock();
}
void CThreadPool::MoveToIdleList(CWorkerThread* busyThread)
{
if (m_shutDown) return;
if (!busyThread) return;
m_idleMutex.Lock();
busyThread->SetThreadState(THREAD_IDLE);
m_idleList.push_back(busyThread);
m_availNum ++;
m_idleMutex.Unlock();
m_busyMutex.Lock();
vector<CWorkerThread*>::iterator iter;
iter = find(m_busyList.begin(), m_busyList.end(), busyThread);
if (iter != m_busyList.end())
m_busyList.erase(iter);
m_busyMutex.Unlock();
m_idleCond.Signal();
m_maxNumCond.Signal();
}
void CThreadPool::CreateIdleThread(int num)
{
if (m_shutDown) return;
if (num <= 0) return;
printf("add new %d threads\n", num);
for (int i = 0; i < num; ++i)
{
CWorkerThread* thr = new CWorkerThread();
if (!thr) continue;
thr->SetThreadPool(this);
AppendToIdleList(thr);
m_varMutex.Lock();
m_availNum ++;
m_varMutex.Unlock();
thr->Start();
}
}
void CThreadPool::DeleteIdleThread(int num)
{
if (m_shutDown) return;
if (num <= 0) return;
m_idleMutex.Lock();
//如果空闲线程数目小于允许最大空闲线程数目,则不再删除
if (m_idleList.size() < m_availHigh)
{
printf("idle thread list size=%d\n", m_idleList.size());
m_idleMutex.Unlock();
return;
}
///TIPS:如果空闲列表头节点线程无效,则本次删除操作失效
//printf("delete %d threads\n", num);
for (int i = 0; i < num; ++i)
{
CWorkerThread* thr = NULL;
if (m_idleList.size() > 0)
thr = (CWorkerThread*)m_idleList.front();
if (!thr) continue;
if (THREAD_IDLE != thr->GetThreadState()) continue;
vector<CWorkerThread*>::iterator iter;
iter = find(m_idleList.begin(), m_idleList.end(), thr);
if (iter != m_idleList.end())
{
//printf("delete thread [%d]\n", (*iter)->GetThreadID());
(*iter)->SetThreadState(THREAD_DEL);
DestroyIdleThread(*iter);
m_idleList.erase(iter);
}
m_availNum --;
}
//printf("after delete, idle threads num:%d, idle list size=%d\n", m_availNum, m_idleList.size());
m_idleMutex.Unlock();
}
void CThreadPool::DestroyIdleThread(CWorkerThread* idleThread)
{
if (m_shutDown) return;
if (!idleThread) return;
if (THREAD_IDLE != idleThread->GetThreadState()) return;
vector<CWorkerThread*>::iterator iter;
iter = find(m_threadList.begin(), m_threadList.end(), idleThread);
if (iter != m_threadList.end())
{
CWorkerThread *delThread = *iter;
if (!delThread) return;
delThread->Terminate();
delete delThread; delThread = NULL;
m_threadList.erase(iter);
}
}
void CThreadPool::Run(CJob* job, void* JobData)
{
if (m_shutDown) return;
assert(job != NULL);
//线程池已经达到最大并发数目,则等待
if (GetBusyNum() >= m_maxNum)
{
printf("busy threads reach max permit num:%d\n", m_maxNum);
m_maxNumCond.Wait();
}
//补充空闲线程(保持空闲线程数目为m_initNum)
if (m_idleList.size() < m_availLow)
{
if (GetAllNum() + m_initNum - m_idleList.size() < m_maxNum)
CreateIdleThread(m_initNum - m_idleList.size());
else
CreateIdleThread(m_maxNum - GetAllNum());
}
CWorkerThread* idleThr = GetIdleThread();
if (idleThr != NULL)
{
MoveToBusyList(idleThr);
idleThr->SetThreadPool(this);
job->SetWorkThread(idleThr);
idleThr->SetJob(job, JobData);
//printf("job [%d] is bind to thread [%d]\n", job->GetJobNo(), idleThr->GetThreadID());
}
}
基于win32的C++线程池实现(改进版)
需积分: 0 45 浏览量
更新于2017-12-11
1
收藏 290KB RAR 举报
在IT行业中,线程池是一种优化多线程编程的技术,它允许程序预先创建一组线程,然后根据需要从池中分配线程来执行任务,而不是每次需要时都创建新的线程。这种技术对于提高系统效率,减少线程创建和销毁的开销,以及更好地管理系统资源具有重要意义。在Windows平台上,基于Win32 API实现线程池是一种常见的方法,尤其适用于C++编程。
在"基于win32的C++线程池实现(改进版)"中,开发者已经针对上一版本的问题进行了修复,如崩溃和内存泄漏。这些问题是多线程编程中常见的挑战,崩溃可能是由于线程间的同步问题或者资源管理不当,而内存泄漏则可能导致程序运行时间越长,消耗的内存越多,直至耗尽系统资源。
线程池的实现通常涉及以下关键组件:
1. **线程创建和管理**:在Win32 API中,可以使用`CreateThread`函数创建线程,但线程池需要维护一个线程列表,以便在需要时分配。线程池可能包含一个队列来存储待处理的任务,以及一个机制来分配空闲线程执行这些任务。
2. **任务提交**:用户或程序可以通过调用特定接口将任务添加到线程池,这些接口需要确保任务被安全地插入任务队列,并通知线程池有新任务可用。
3. **同步机制**:为了防止数据竞争和死锁,线程池需要使用Windows API中的同步对象,如事件(Event)、互斥量(Mutex)或信号量(Semaphore)。这些同步工具可以帮助线程等待任务、避免同时访问共享资源,以及控制并发程度。
4. **资源回收**:当线程完成任务后,线程池需要有一种机制来回收这些线程,使其可以再次用于处理新任务,而不是立即销毁。这通常通过线程等待或线程休眠实现。
5. **错误处理和调试**:改进版的线程池应包含对错误的适当处理,例如捕获和记录异常,以及提供调试信息以帮助定位问题。内存泄漏的修复可能涉及到对动态分配内存的跟踪和正确释放。
6. **性能优化**:线程池的设计应考虑系统的负载和资源限制,例如,可以根据系统性能动态调整线程的数量,或者设置最小和最大线程限制,以平衡资源使用和响应速度。
通过阅读提供的"thread_pool"文件,我们可以更深入地了解这个实现的具体细节,包括如何初始化线程池,如何调度任务,以及如何处理线程间的通信和同步。然而,具体的实现细节将依赖于源代码的分析。
掌握基于Win32的C++线程池实现不仅可以提高多线程编程的技能,还能帮助开发者理解系统级编程中的复杂性和挑战。对于初学者而言,这是一个很好的实践项目,能够提升他们对操作系统级别的编程理解和调试技巧。

_July_12
- 粉丝: 8
最新资源
- 【状态估计】基于UKF法、AUKF法、EUKF法电力系统三相状态估计研究附Matlab代码.rar
- 【阻抗建模、验证扫频法】光伏并网逆变器扫频与稳定性分析(包含锁相环电流环)附Simulink仿真 - 副本.rar
- 【最新算法】人工兔优化:一种解决工程优化问题的新型生物启发元启发算法附Matlab代码.rar
- AVL响应和根位点的纵向基质配方附Matlab代码.rar
- Hough变换用于UiO数字图像分析研究附Matlab代码.rar
- IIR滤波器用于去除背景音频研究附Matlab代码.rar
- MATLAB基于3D FDTD的微带线馈矩形天线分析[用于模拟超宽带脉冲通过线馈矩形天线的传播,以计算微带结构的回波损耗参数]附Matlab代码.rar
- IMU数据均值滤波分析附Matlab代码.rar
- Pulse Compression脉冲压缩研究附Matlab代码.rar
- UWB-IMU、UWB定位对比研究附Matlab代码.rar
- MATLAB主动噪声和振动控制算法——对较大的次级路径变化具有鲁棒性附Matlab代码.rar
- Zernike 多项式在圆形、六边形、椭圆形、矩形或环形瞳孔上应用附Matlab代码.rar
- 采用GPS、里程计和电子罗盘作为定位传感器,EKF作为多传感器的融合算法,最终输出目标的滤波位置附Matlab代码.rar
- 采用P-f和Q-V滞控的去中心化逆变器型交流微电网的模拟附Simulink仿真.rar
- 【最新版】 GBT 19024-2025 质量管理体系 面向质量结果的组织管理 实现财务和经济效益的指南.rar
- 电动过滤器:LPF和HPF、模拟调制:调幅和调频、WiFi、蓝牙和蜂窝网络的容量分析.....附Matlab代码.rar