在設(shè)計(jì)CDistributeQueue類(lèi)時(shí),通常有兩種方案值得考慮:
方案1和方案2可以說(shuō)各有優(yōu)缺點(diǎn)。方案1中,在事先不知道有多少線程會(huì)訪問(wèn)分布式隊(duì)列的情況下,預(yù)先創(chuàng)建好本地隊(duì)列會(huì)造成程序初始化時(shí)間過(guò)長(zhǎng),并且可能有一些創(chuàng)建好的隊(duì)列得不到使用。
方案2中,采用線程訪問(wèn)分布式隊(duì)列時(shí)才創(chuàng)建本地隊(duì)列,初始化時(shí)比較簡(jiǎn)單,并且不會(huì)造成多創(chuàng)建了本地隊(duì)列的情況。缺點(diǎn)是編程時(shí),隊(duì)列的操作代碼會(huì)變復(fù)雜一些,效率會(huì)有所降低。
下面的代碼中,給出的是方案2的實(shí)現(xiàn)。
//獲取線程Id回調(diào)函數(shù)定義
typedef int (*GetThreadIdFunc)(void *pArg);
template <class T, class LocalQueue, class SharedQueue>
class CDistributedQueue {
private:
LocalQueue ** m_ppLocalQueue; // 本地隊(duì)列數(shù)組
SharedQueue * m_pSharedQueue; // 共享隊(duì)列池或共享隊(duì)列
int m_nLocalQueueSize;
int m_nSharedQueueSize;
int m_nLocalQueueCount;
int m_nSharedQueueCount;
DWORD m_dwTlsIndex; //線程本地存儲(chǔ)索引
LONG volatile m_lThreadIdIndex; //線程編號(hào)最大值
GetThreadIdFunc m_GetThreadIdFunc; //獲取線程編號(hào)回調(diào)函數(shù),如果由外面
//的線程池提供編號(hào)時(shí),需要傳入回調(diào)函數(shù)
void * m_pThreadIdFuncArg; //獲取線程編號(hào)回調(diào)函數(shù)的參數(shù)
CFastLock m_LocalQueueResizeLock; //專(zhuān)為下面的ResizeLocalQueue函數(shù)使用
void ResizeLocalQueue(); //將m_ppLocalQueue數(shù)組的大小擴(kuò)大一倍
public:
CDistributedQueue(){
m_GetThreadIdFunc = NULL;
m_pThreadIdFuncArg = NULL;
m_lThreadIdIndex = 0;
};
void Create( int nLocalQueueSize, int nLocalQueueCount,
int nSharedQueueSize, int nSharedQueueCount);
void Create( int nLocalQueueSize, int nLocalQueueCount,
int nSharedQueueSize, int nSharedQueueCount,
GetThreadIdFunc GetThreadId, void * pThreadIdFuncArg);
virtual ~CDistributedQueue();
LONG ThreadIdGet();
void EnQueue(T &Data);
int DeQueue(T &Data);
void PushToLocalQueue(T &Data);
void PushToLocalQueue(T &Data, int nIndex);
int PopFromLocalQueue(T &Data);
SharedQueue *GetSharedQueue() { return m_pSharedQueue; };
int PrivatizeSharedQueue(int nSharedQueueIndex);
};
說(shuō)明一下:CDistributedQueue類(lèi)中有三個(gè)模板參數(shù),第1個(gè)模板參數(shù)T是表示數(shù)據(jù)類(lèi)型;第2個(gè)模板參數(shù)是表示本地隊(duì)列類(lèi)的類(lèi)型,為一個(gè)不需要使用鎖的普通隊(duì)列,比如環(huán)形隊(duì)列等;第3個(gè)模板參數(shù)是表示一個(gè)需要使用鎖的共享隊(duì)列類(lèi),可以是一個(gè)隊(duì)列池類(lèi),也可以是普通的使用鎖的共享隊(duì)列類(lèi)。
/** 分布式隊(duì)列的創(chuàng)建函數(shù)
@param int nLocalQueueSize - 本地子隊(duì)列的大小
@param int nLocalQueueCount - 本地隊(duì)列的個(gè)數(shù)(數(shù)組的大小)
@param int nSharedQueueSize - 共享子隊(duì)列的大小
@param int nSharedQueueCount - 共享子隊(duì)列的個(gè)數(shù)
@return void - 無(wú)
*/
template <class T, class LocalQueue, class SharedQueue>
void CDistributedQueue<T, LocalQueue, SharedQueue>::Create(
int nLocalQueueSize, int nLocalQueueCount,
int nSharedQueueSize, int nSharedQueueCount)
{
m_nLocalQueueSize = nLocalQueueSize;
m_nSharedQueueSize = nSharedQueueSize;
if ( nLocalQueueCount != 0 )
{
m_nLocalQueueCount = nLocalQueueCount;
}
else
{
m_nLocalQueueCount = omp_get_num_procs();
}
if ( nSharedQueueCount != 0 )
{
m_nSharedQueueCount = nSharedQueueCount;
}
else
{
m_nSharedQueueCount = omp_get_num_procs();
}
m_ppLocalQueue = new LocalQueue *[m_nLocalQueueCount];
int i;
for ( i = 0; i < m_nLocalQueueCount; i++ )
{
m_ppLocalQueue[i] = NULL;
}
m_pSharedQueue = new SharedQueue(m_nSharedQueueCount, m_nSharedQueueSize);
m_dwTlsIndex = TlsAlloc();
m_lThreadIdIndex = 0;
}
/** 分布式隊(duì)列的創(chuàng)建函數(shù)
@param int nLocalQueueSize - 本地子隊(duì)列的大小
@param int nLocalQueueCount - 本地隊(duì)列的個(gè)數(shù)(數(shù)組的大小)
@param int nSharedQueueSize - 共享子隊(duì)列的大小
@param int nSharedQueueCount - 共享子隊(duì)列的個(gè)數(shù)
@param GetThreadIdFunc GetThreadId - 獲取線程Id回調(diào)函數(shù)
@param void * pThreadIdFuncArg - GetThreadId回調(diào)函數(shù)的參數(shù)
@return void - 無(wú)
*/
template <class T, class LocalQueue, class SharedQueue>
void CDistributedQueue<T, LocalQueue, SharedQueue>::Create(
int nLocalQueueSize, int nLocalQueueCount,
int nSharedQueueSize, int nSharedQueueCount,
GetThreadIdFunc GetThreadId, void * pThreadIdFuncArg)
{
m_GetThreadIdFunc = GetThreadId;
m_pThreadIdFuncArg = pThreadIdFuncArg;
Create(nLocalQueueSize, nLocalQueueCount, nSharedQueueSize, nSharedQueueCount);
}
/** 分布式隊(duì)列的析構(gòu)函數(shù)
@return - 無(wú)
*/
template <class T, class LocalQueue, class SharedQueue>
CDistributedQueue<T, LocalQueue, SharedQueue>::~CDistributedQueue()
{
int i;
for ( i = 0; i < m_nLocalQueueCount; i++ )
{
if ( m_ppLocalQueue[i] != NULL )
{
delete m_ppLocalQueue[i];
}
}
delete [] m_ppLocalQueue;
delete m_pSharedQueue;
TlsFree(m_dwTlsIndex);
}
這個(gè)函數(shù)主要是考慮有可能程序升級(jí)后,訪問(wèn)的線程數(shù)量可能大于本地隊(duì)列數(shù)組的大小的情況,此時(shí)采取將本地隊(duì)列數(shù)組擴(kuò)大一倍的策略。
/** 分布式隊(duì)列的將本地隊(duì)列數(shù)組擴(kuò)大一倍的內(nèi)部成員函數(shù)
@return void - 無(wú)
*/
template <class T, class LocalQueue, class SharedQueue>
void CDistributedQueue<T, LocalQueue, SharedQueue>::ResizeLocalQueue()
{
//將本地隊(duì)列數(shù)組擴(kuò)大一倍, 防止線程數(shù)量多于隊(duì)列數(shù)量,以保證程序安全
int i;
LocalQueue **ppQueue = new LocalQueue *[m_nLocalQueueCount * 2];
for ( i = 0; i < m_nLocalQueueCount; i++ )
{
ppQueue[i] = m_ppLocalQueue[i];
}
for ( i = m_nLocalQueueCount; i < m_nLocalQueueCount * 2; i++ )
{
ppQueue[i] = NULL;
}
delete [] m_ppLocalQueue;
m_ppLocalQueue = ppQueue;
//使用原子操作避免m_nLocalQueueCount的數(shù)據(jù)競(jìng)爭(zhēng)問(wèn)題
AtomicWrite((LONG volatile *)&m_nLocalQueueCount, m_nLocalQueueCount * 2);
}
獲取線程Id成員函數(shù)中,這個(gè)函數(shù)中完成本地隊(duì)列的創(chuàng)建和分派工作。先是判斷獲取的線程Id是否為0,如果為0則表明還沒(méi)有創(chuàng)建本地隊(duì)列,此時(shí)需要給線程進(jìn)行編號(hào),并創(chuàng)建一個(gè)新的本地隊(duì)列放到數(shù)組中下標(biāo)等于線程編號(hào)的位置上。
/** 分布式隊(duì)列的獲取線程Id函數(shù)
如果m_GetThreadIdFunc回調(diào)函數(shù)不為空,則使用它獲取Id
否則根據(jù)分布式隊(duì)列內(nèi)部的編號(hào)機(jī)制獲取線程Id
@return LONG - 返回線程的編號(hào)
*/
template <class T, class LocalQueue, class SharedQueue>
LONG CDistributedQueue<T, LocalQueue, SharedQueue>::ThreadIdGet()
{
LONG Id;
LocalQueue *pQueue = NULL;
if ( m_GetThreadIdFunc != NULL )
{
Id = (*m_GetThreadIdFunc)(m_pThreadIdFuncArg);
if ( Id >= m_nLocalQueueCount )
{
CScopedLock<CFastLock> slock(m_LocalQueueResizeLock);
if ( Id >= m_nLocalQueueCount )
{
ResizeLocalQueue();
}
}
if ( m_ppLocalQueue[Id] == NULL )
{
m_ppLocalQueue[Id] = new LocalQueue(m_nLocalQueueSize);
}
return Id;
}
else
{
Id = (LONG )TlsGetValue(m_dwTlsIndex);
if ( Id == 0 )
{
Id = AtomicIncrement(&m_lThreadIdIndex);
TlsSetValue(m_dwTlsIndex, (void *)Id);
pQueue = new LocalQueue(m_nLocalQueueSize);
}
--Id;
}
if ( Id >= m_nLocalQueueCount)
{
CScopedLock<CFastLock> slock(m_LocalQueueResizeLock);
if ( Id >= m_nLocalQueueCount )
{
ResizeLocalQueue();
}
}
if ( pQueue != NULL )
{
m_ppLocalQueue[Id] = pQueue;
}
return Id;
}
/** 分布式隊(duì)列的進(jìn)隊(duì)操作函數(shù)
這里假定了本地隊(duì)列可以無(wú)限進(jìn)隊(duì)
進(jìn)隊(duì)策略按以下優(yōu)先級(jí)進(jìn)行:
1、本地隊(duì)列空時(shí)進(jìn)入本地隊(duì)列,、共享隊(duì)列未滿(mǎn)時(shí)進(jìn)入共享隊(duì)列
3、共享隊(duì)列滿(mǎn)時(shí)進(jìn)入本地隊(duì)列
@param T &Data - 要進(jìn)隊(duì)的數(shù)據(jù)
@return void - 無(wú)
*/
template <class T, class LocalQueue, class SharedQueue>
void CDistributedQueue<T, LocalQueue, SharedQueue>::EnQueue(T &Data)
{
int nId = ThreadIdGet();
if ( m_ppLocalQueue[nId]->IsEmpty() )
{
m_ppLocalQueue[nId]->EnQueue(Data);
}
else if ( m_pSharedQueue->Push(Data) != CAPI_SUCCESS )
{
int nId = ThreadIdGet();
m_ppLocalQueue[nId]->EnQueue(Data);
}
else
{
//這個(gè)分支不需要做任何事
}
return;
}
/** 分布式隊(duì)列的本地隊(duì)列進(jìn)隊(duì)函數(shù)
將數(shù)據(jù)進(jìn)入到當(dāng)前線程的本地隊(duì)列中
@param T &Data - 要進(jìn)隊(duì)的數(shù)據(jù)
@return void - 無(wú)
*/
template <class T, class LocalQueue, class SharedQueue>
void CDistributedQueue<T, LocalQueue, SharedQueue>::PushToLocalQueue(
T &Data)
{
int nId = ThreadIdGet();
m_ppLocalQueue[nId]->EnQueue(Data);
return;
}
/** 分布式隊(duì)列的指定序號(hào)本地隊(duì)列進(jìn)隊(duì)函數(shù)
這是一個(gè)為特殊需求而設(shè)計(jì)的函數(shù)
使用這個(gè)函數(shù)要特別小心,必須保證不會(huì)發(fā)生數(shù)據(jù)競(jìng)爭(zhēng)問(wèn)題
@param T &Data - 要進(jìn)隊(duì)的數(shù)據(jù)
@param int nIndex - 本地隊(duì)列的序號(hào)
@return void - 無(wú)
*/
template <class T, class LocalQueue, class SharedQueue>
void CDistributedQueue<T, LocalQueue, SharedQueue>::PushToLocalQueue(
T &Data, int nIndex)
{
if ( nIndex >= m_nLocalQueueCount * 2)
{
return;
}
if ( nIndex >= m_nLocalQueueCount )
{
CScopedLock<CFastLock> slock(m_LocalQueueResizeLock);
if ( nIndex >= m_nLocalQueueCount )
{
ResizeLocalQueue();
}
}
if ( m_ppLocalQueue[nIndex] == NULL )
{
m_ppLocalQueue[nIndex] = new LocalQueue(m_nLocalQueueSize);
}
m_ppLocalQueue[nIndex]->EnQueue(Data);
return;
}
/** 分布式隊(duì)列的本地隊(duì)列出隊(duì)函數(shù)
@param T &Data - 接收出隊(duì)的數(shù)據(jù)
@return int - 出隊(duì)成功返回CAPI_SUCCESS, 失敗(隊(duì)列為空)返回CAPI_FAILED.
*/
template <class T, class LocalQueue, class SharedQueue>
int CDistributedQueue<T, LocalQueue, SharedQueue>::PopFromLocalQueue(
T &Data)
{
int nId = ThreadIdGet();
return m_ppLocalQueue[nId]->DeQueue(Data);
}
/** 分布式隊(duì)列的出隊(duì)函數(shù)
出隊(duì)操作策略為,先從本地隊(duì)列中出隊(duì),如果失敗則從共享隊(duì)列中出隊(duì)
@param T &Data - 接收出隊(duì)的數(shù)據(jù)
@return int - 成功返回CAPI_SUCCESS, 失敗返回CAPI_FAILED.
*/
template <class T, class LocalQueue, class SharedQueue>
int CDistributedQueue<T, LocalQueue, SharedQueue>::DeQueue(T &Data)
{
int nRet;
int nId = ThreadIdGet();
nRet = m_ppLocalQueue[nId]->DeQueue(Data);
if ( nRet == CAPI_FAILED )
{
nRet = m_pSharedQueue->Pop(Data);
}
return nRet;
}
聯(lián)系客服