多语言展示
当前在线:620今日阅读:86今日分享:14

VC++/Windows 如何使用线程池。

为什么用线程池?       先来看一个真理:对于每一个CPU,在同一时刻只可能运行一个线程。之所以可以“同时运行”多个线程,那是因为系统会在不同的线程之间进行切换、调度,只是速度很快,看上去像是在同时运行而已。      使用多线程可以提高工作效率,这是毋容置疑的了,那怎么用多线程来设计程序呢?最简单、原始的方法就是,对于每一项工作,都创建一个线程去处理,直到这项 工作结束,线程就结束,但这样做存在一个问题:如果每一项工作都很简短,但工作数量却很大,那么我们就必须创建很多的线程,CPU就会在这些线程之间不停 的进行切换,以使所有的工作得以进行,并且,系统会不断的结束已经工作完成的线程(释放线程资源)和新建一些线程去运行新的工作....这非常不好——线 程间的切换需要时间、新建、终止线程也很浪费时间,当线程数量很多,并且任务频繁更新时,CPU占用率可能很快达到100%,但我们真正处理工作所耗的 CPU可能不足50%。      那么,线程池就是解决上述问题的方案。他的思路是:事先创建好一定数量的线程,当有任务需要处理时,就从这些线程中找出一个空闲的去做,做完之后线程并不 退出,而是继续等待新的任务;当没有任务要处理时,这些线程就挂起,直到一个新的任务到来。那么,我们需要多少个这样的线程呢?答案是根据CPU数量来设 定,考虑到线程可能会做sleep、Waitfor...之类的操作,因此最佳数量是CPU数量×2,就是每个CPU上平均运行2个线程。这样避免了频繁 的线程切换、创建和销毁等操作,把时间都用在工作上,自然效率就提高了。
工具/原料

Visual Studio

方法/步骤
1

使用信号量维护一个任务队列信号量内核对象:可以把一个信号量看作一个计数器,当信号量内的计数器为0时,等待该信号量的线程会被阻塞,当信号量内的计数器>0时,等待该信号量的线程会被激活。创建信号量:HANDLE CreateSemaphore( LPSECURITY_ATTRIBUTES lpSemaphoreAttributes, // 通常传入NULL即可                          LONG lInitialCount,                          // 计数器初始值                          LONG lMaximumCount,                          // 计数器最大值                          LPCTSTR lpName );    增加信号量计数:BOOL ReleaseSemaphore(  HANDLE hSemaphore,         // 信号量的句柄                          LONG lReleaseCount,        // 计数器要增加的数值                          LPLONG lpPreviousCount );  // 计数器原来的值,不需要可以传入NULL  等待信号量:WaitForSingleObjectWaitForMultipleObjects循环队列:循环队列可以用数组或者链表实现,当队列有“满”的情况时,使用数组是比较方便的。设置两个索引,一个用来写(nForWirte),一个用来读(nForRead),设最大容量为nMaxSize,则:如果 nForRead == nForWrite,队列为空;如果 ( nForWrite + 1 ) % nMaxSize == nForRead,队列已满。

2

CPP源码:#include 'stdafx.h'  #include 'HXThreadPool.h'    CHXWorker::CHXWorker()  {    }  CHXWorker::~CHXWorker()  {    }    CHXThreadPool::CHXThreadPool()      : m_bReady( FALSE )      , m_hForWriter( NULL )      , m_hForReader( NULL )      , m_phThreads( NULL )      , m_pWorkerQueue( NULL )      , m_nWriter( 0 )      , m_nReader( 0 )      , m_nMaxWorkers( 0 )      , m_nMaxThreads( 0 )  {  }    CHXThreadPool::~CHXThreadPool()  {      RelaseThreadPool();  }    BOOL CHXThreadPool::InitThreadPool( long nTaskQueue, int nMaxThreads )  {      BOOL bResult = FALSE;      int  nThreadCreated = 0;        __try      {          if( m_bReady )              __leave;              // 初始化临界资源          InitializeCriticalSection( &m_csQueue );            // 创建两个信号量,一个读,一个写。          m_hForReader = CreateSemaphore( NULL, 0, nTaskQueue, NULL );          m_hForWriter = CreateSemaphore( NULL, nTaskQueue, nTaskQueue, NULL );          if( m_hForReader == NULL || m_hForWriter == NULL )              __leave;            // 创建和初始化循环队列          m_pWorkerQueue = new CHXWorker * [ nTaskQueue ];          if( m_pWorkerQueue == NULL )              __leave;          for( int i = 0; i < nTaskQueue; ++ i )              m_pWorkerQueue[ i ] = NULL;          m_nMaxWorkers = nTaskQueue;          m_nReader = 0;          m_nWriter = 0;            // 创建工作线程          if( nMaxThreads <= 0 )              m_nMaxThreads = GetCPUNumber() * 2;          else              m_nMaxThreads = nMaxThreads;          m_phThreads = new HANDLE [ m_nMaxThreads ];          if( m_phThreads == NULL )              __leave;          for( int i = 0; i < m_nMaxThreads; ++ i )          {              m_phThreads[ i ] = CreateThread( NULL, 0, &CHXThreadPool::DoWorker, this, 0, NULL );              if( m_phThreads[ i ] == NULL )                  __leave;              else                  ++ nThreadCreated;          }            bResult = TRUE;          m_bReady = TRUE;      }      __finally      {          if( ! bResult )  // 如果失败,清理资源          {              // Delete queue              EnterCriticalSection( &m_csQueue );              if( m_pWorkerQueue != NULL )              {                  delete [] m_pWorkerQueue;                  m_pWorkerQueue = NULL;                  m_nReader = 0;                  m_nWriter = 0;                  m_nMaxWorkers = 0;              }              LeaveCriticalSection( &m_csQueue );                // Clear Threads...              if( m_phThreads != NULL )              {                  ReleaseSemaphore( m_hForReader, nThreadCreated, NULL );                  WaitForMultipleObjects( nThreadCreated, m_phThreads, TRUE, INFINITE );                  for( int i = 0; i < nThreadCreated; ++ i )                      CloseHandle( m_phThreads[ i ]);                  delete [] m_phThreads;                  m_phThreads = NULL;              }                // Close Semaphores...              if( m_hForReader != NULL )              {                  CloseHandle( m_hForReader );                  m_hForReader = NULL;              }              if( m_hForWriter != NULL )              {                  CloseHandle( m_hForWriter );                  m_hForWriter = NULL;              }                // Delete cs...              DeleteCriticalSection( &m_csQueue );              m_dwTimedout = INFINITE;          }      }      return bResult;  }    void CHXThreadPool::RelaseThreadPool()  {      if( m_bReady )      {          m_bReady = FALSE;            EnterCriticalSection( &m_csQueue );          if( m_pWorkerQueue != NULL )          {              delete [] m_pWorkerQueue;              m_pWorkerQueue = NULL;              m_nReader = 0;              m_nWriter = 0;              m_nMaxWorkers = 0;          }          LeaveCriticalSection( &m_csQueue );            // Clear Threads...          if( m_phThreads != NULL )          {              ReleaseSemaphore( m_hForReader, m_nMaxThreads, NULL );              WaitForMultipleObjects( m_nMaxThreads, m_phThreads, TRUE, INFINITE );              for( int i = 0; i < m_nMaxThreads; ++ i )                  CloseHandle( m_phThreads[ i ]);              delete [] m_phThreads;              m_phThreads = NULL;          }            // Close Semaphores...          if( m_hForReader != NULL )          {              CloseHandle( m_hForReader );              m_hForReader = NULL;          }          if( m_hForWriter != NULL )          {              CloseHandle( m_hForWriter );              m_hForWriter = NULL;          }            // Delete cs...          DeleteCriticalSection( &m_csQueue );          m_nReader = 0;          m_nWriter = 0;          m_dwTimedout = INFINITE;      }  }    BOOL CHXThreadPool::PostAWorker( CHXWorker * pWorker )  // 投递一个工作  {      DWORD dwWaitfor;            // 如果队列已满,就等待,直到队列不满(被工作线程取走去处理)      dwWaitfor = WaitForSingleObject( m_hForWriter, INFINITE );            // 将该工作放入循环队列中      if( dwWaitfor == WAIT_OBJECT_0 )      {          long n;          EnterCriticalSection( &m_csQueue );          if( m_pWorkerQueue == NULL )          {              LeaveCriticalSection( &m_csQueue );              return FALSE;          }          n = ( m_nWriter + 1 ) % m_nMaxWorkers;          if( n == m_nReader )          {              LeaveCriticalSection( &m_csQueue );              return FALSE;          }          else          {              m_pWorkerQueue[ m_nWriter ] = pWorker;              m_nWriter = n;              LeaveCriticalSection( &m_csQueue );          }          // 增加读的信号量计数,以激活某线程去处理该工作          ReleaseSemaphore( m_hForReader, 1, NULL );      }      else          return FALSE;        return TRUE;  }    int CHXThreadPool::GetCurrentWorkers()  // 获取目前已经投递但尚未被处理的工作数量  {      int n;      EnterCriticalSection( &m_csQueue );      n = ( m_nWriter - m_nReader + m_nMaxWorkers ) % m_nMaxWorkers;      LeaveCriticalSection( &m_csQueue );      return n;  }    DWORD CHXThreadPool::DoWorker( LPVOID lpVoid )  // 工作线程  {      CHXThreadPool * pThreadPool = ( CHXThreadPool * ) lpVoid;      CHXWorker * pWorker;      DWORD dwWaitfor;      while( TRUE )      {          // 如果队列是空的,就等待,直到有任务被投递进来          dwWaitfor = WaitForSingleObject( pThreadPool->m_hForReader, INFINITE );          if( dwWaitfor == WAIT_OBJECT_0 )          {              EnterCriticalSection( &( pThreadPool->m_csQueue ));              if( pThreadPool->m_pWorkerQueue == NULL )              {                  LeaveCriticalSection( &( pThreadPool->m_csQueue ));                  break;              }              if( pThreadPool->m_nReader == pThreadPool->m_nWriter )              {                  LeaveCriticalSection( &( pThreadPool->m_csQueue ));                  continue;              }              // 从队列中取走工作              pWorker = pThreadPool->m_pWorkerQueue[ pThreadPool->m_nReader ];              pThreadPool->m_pWorkerQueue[ pThreadPool->m_nReader ] = NULL;              pThreadPool->m_nReader = ( pThreadPool->m_nReader + 1 ) % pThreadPool->m_nMaxWorkers;              LeaveCriticalSection( &( pThreadPool->m_csQueue ));                            //增加写信号量计数,当队列满时,表示队列中有空余位置被腾出,可以继续投递新的工作了              ReleaseSemaphore( pThreadPool->m_hForWriter, 1, NULL );                if( pWorker == NULL )                  continue;              else              {                  if( pWorker->DoWorker()) // 如果工作完成后返回TRUE,就在这里清理它。这个完全是自定义的                      delete pWorker;              }          }      }      return 0;  }    int CHXThreadPool::GetCPUNumber()  // 获取系统中CPU的数量  {      SYSTEM_INFO sysInfo;      GetSystemInfo( &sysInfo );      return ( int ) sysInfo.dwNumberOfProcessors;  }

推荐信息