Visual Studio
使用信号量维护一个任务队列信号量内核对象:可以把一个信号量看作一个计数器,当信号量内的计数器为0时,等待该信号量的线程会被阻塞,当信号量内的计数器>0时,等待该信号量的线程会被激活。创建信号量:HANDLE CreateSemaphore( LPSECURITY_ATTRIBUTES lpSemaphoreAttributes, // 通常传入NULL即可 LONG lInitialCount, // 计数器初始值 LONG lMaximumCount, // 计数器最大值 LPCTSTR lpName ); 增加信号量计数:BOOL ReleaseSemaphore( HANDLE hSemaphore, // 信号量的句柄 LONG lReleaseCount, // 计数器要增加的数值 LPLONG lpPreviousCount ); // 计数器原来的值,不需要可以传入NULL 等待信号量:WaitForSingleObjectWaitForMultipleObjects循环队列:循环队列可以用数组或者链表实现,当队列有“满”的情况时,使用数组是比较方便的。设置两个索引,一个用来写(nForWirte),一个用来读(nForRead),设最大容量为nMaxSize,则:如果 nForRead == nForWrite,队列为空;如果 ( nForWrite + 1 ) % nMaxSize == nForRead,队列已满。
CPP源码:#include 'stdafx.h' #include 'HXThreadPool.h' CHXWorker::CHXWorker() { } CHXWorker::~CHXWorker() { } CHXThreadPool::CHXThreadPool() : m_bReady( FALSE ) , m_hForWriter( NULL ) , m_hForReader( NULL ) , m_phThreads( NULL ) , m_pWorkerQueue( NULL ) , m_nWriter( 0 ) , m_nReader( 0 ) , m_nMaxWorkers( 0 ) , m_nMaxThreads( 0 ) { } CHXThreadPool::~CHXThreadPool() { RelaseThreadPool(); } BOOL CHXThreadPool::InitThreadPool( long nTaskQueue, int nMaxThreads ) { BOOL bResult = FALSE; int nThreadCreated = 0; __try { if( m_bReady ) __leave; // 初始化临界资源 InitializeCriticalSection( &m_csQueue ); // 创建两个信号量,一个读,一个写。 m_hForReader = CreateSemaphore( NULL, 0, nTaskQueue, NULL ); m_hForWriter = CreateSemaphore( NULL, nTaskQueue, nTaskQueue, NULL ); if( m_hForReader == NULL || m_hForWriter == NULL ) __leave; // 创建和初始化循环队列 m_pWorkerQueue = new CHXWorker * [ nTaskQueue ]; if( m_pWorkerQueue == NULL ) __leave; for( int i = 0; i < nTaskQueue; ++ i ) m_pWorkerQueue[ i ] = NULL; m_nMaxWorkers = nTaskQueue; m_nReader = 0; m_nWriter = 0; // 创建工作线程 if( nMaxThreads <= 0 ) m_nMaxThreads = GetCPUNumber() * 2; else m_nMaxThreads = nMaxThreads; m_phThreads = new HANDLE [ m_nMaxThreads ]; if( m_phThreads == NULL ) __leave; for( int i = 0; i < m_nMaxThreads; ++ i ) { m_phThreads[ i ] = CreateThread( NULL, 0, &CHXThreadPool::DoWorker, this, 0, NULL ); if( m_phThreads[ i ] == NULL ) __leave; else ++ nThreadCreated; } bResult = TRUE; m_bReady = TRUE; } __finally { if( ! bResult ) // 如果失败,清理资源 { // Delete queue EnterCriticalSection( &m_csQueue ); if( m_pWorkerQueue != NULL ) { delete [] m_pWorkerQueue; m_pWorkerQueue = NULL; m_nReader = 0; m_nWriter = 0; m_nMaxWorkers = 0; } LeaveCriticalSection( &m_csQueue ); // Clear Threads... if( m_phThreads != NULL ) { ReleaseSemaphore( m_hForReader, nThreadCreated, NULL ); WaitForMultipleObjects( nThreadCreated, m_phThreads, TRUE, INFINITE ); for( int i = 0; i < nThreadCreated; ++ i ) CloseHandle( m_phThreads[ i ]); delete [] m_phThreads; m_phThreads = NULL; } // Close Semaphores... if( m_hForReader != NULL ) { CloseHandle( m_hForReader ); m_hForReader = NULL; } if( m_hForWriter != NULL ) { CloseHandle( m_hForWriter ); m_hForWriter = NULL; } // Delete cs... DeleteCriticalSection( &m_csQueue ); m_dwTimedout = INFINITE; } } return bResult; } void CHXThreadPool::RelaseThreadPool() { if( m_bReady ) { m_bReady = FALSE; EnterCriticalSection( &m_csQueue ); if( m_pWorkerQueue != NULL ) { delete [] m_pWorkerQueue; m_pWorkerQueue = NULL; m_nReader = 0; m_nWriter = 0; m_nMaxWorkers = 0; } LeaveCriticalSection( &m_csQueue ); // Clear Threads... if( m_phThreads != NULL ) { ReleaseSemaphore( m_hForReader, m_nMaxThreads, NULL ); WaitForMultipleObjects( m_nMaxThreads, m_phThreads, TRUE, INFINITE ); for( int i = 0; i < m_nMaxThreads; ++ i ) CloseHandle( m_phThreads[ i ]); delete [] m_phThreads; m_phThreads = NULL; } // Close Semaphores... if( m_hForReader != NULL ) { CloseHandle( m_hForReader ); m_hForReader = NULL; } if( m_hForWriter != NULL ) { CloseHandle( m_hForWriter ); m_hForWriter = NULL; } // Delete cs... DeleteCriticalSection( &m_csQueue ); m_nReader = 0; m_nWriter = 0; m_dwTimedout = INFINITE; } } BOOL CHXThreadPool::PostAWorker( CHXWorker * pWorker ) // 投递一个工作 { DWORD dwWaitfor; // 如果队列已满,就等待,直到队列不满(被工作线程取走去处理) dwWaitfor = WaitForSingleObject( m_hForWriter, INFINITE ); // 将该工作放入循环队列中 if( dwWaitfor == WAIT_OBJECT_0 ) { long n; EnterCriticalSection( &m_csQueue ); if( m_pWorkerQueue == NULL ) { LeaveCriticalSection( &m_csQueue ); return FALSE; } n = ( m_nWriter + 1 ) % m_nMaxWorkers; if( n == m_nReader ) { LeaveCriticalSection( &m_csQueue ); return FALSE; } else { m_pWorkerQueue[ m_nWriter ] = pWorker; m_nWriter = n; LeaveCriticalSection( &m_csQueue ); } // 增加读的信号量计数,以激活某线程去处理该工作 ReleaseSemaphore( m_hForReader, 1, NULL ); } else return FALSE; return TRUE; } int CHXThreadPool::GetCurrentWorkers() // 获取目前已经投递但尚未被处理的工作数量 { int n; EnterCriticalSection( &m_csQueue ); n = ( m_nWriter - m_nReader + m_nMaxWorkers ) % m_nMaxWorkers; LeaveCriticalSection( &m_csQueue ); return n; } DWORD CHXThreadPool::DoWorker( LPVOID lpVoid ) // 工作线程 { CHXThreadPool * pThreadPool = ( CHXThreadPool * ) lpVoid; CHXWorker * pWorker; DWORD dwWaitfor; while( TRUE ) { // 如果队列是空的,就等待,直到有任务被投递进来 dwWaitfor = WaitForSingleObject( pThreadPool->m_hForReader, INFINITE ); if( dwWaitfor == WAIT_OBJECT_0 ) { EnterCriticalSection( &( pThreadPool->m_csQueue )); if( pThreadPool->m_pWorkerQueue == NULL ) { LeaveCriticalSection( &( pThreadPool->m_csQueue )); break; } if( pThreadPool->m_nReader == pThreadPool->m_nWriter ) { LeaveCriticalSection( &( pThreadPool->m_csQueue )); continue; } // 从队列中取走工作 pWorker = pThreadPool->m_pWorkerQueue[ pThreadPool->m_nReader ]; pThreadPool->m_pWorkerQueue[ pThreadPool->m_nReader ] = NULL; pThreadPool->m_nReader = ( pThreadPool->m_nReader + 1 ) % pThreadPool->m_nMaxWorkers; LeaveCriticalSection( &( pThreadPool->m_csQueue )); //增加写信号量计数,当队列满时,表示队列中有空余位置被腾出,可以继续投递新的工作了 ReleaseSemaphore( pThreadPool->m_hForWriter, 1, NULL ); if( pWorker == NULL ) continue; else { if( pWorker->DoWorker()) // 如果工作完成后返回TRUE,就在这里清理它。这个完全是自定义的 delete pWorker; } } } return 0; } int CHXThreadPool::GetCPUNumber() // 获取系统中CPU的数量 { SYSTEM_INFO sysInfo; GetSystemInfo( &sysInfo ); return ( int ) sysInfo.dwNumberOfProcessors; }