国产一级a片免费看高清,亚洲熟女中文字幕在线视频,黄三级高清在线播放,免费黄色视频在线看

打開(kāi)APP
userphoto
未登錄

開(kāi)通VIP,暢享免費(fèi)電子書(shū)等14項(xiàng)超值服

開(kāi)通VIP
英特爾? 軟件網(wǎng)絡(luò)博客 - 中文 ? 多核分布式隊(duì)列的實(shí)現(xiàn):“偷”與“自私”的運(yùn)用(4)

在設(shè)計(jì)CDistributeQueue類(lèi)時(shí),通常有兩種方案值得考慮:

  • 1、 本地隊(duì)列預(yù)先創(chuàng)建好,當(dāng)有線程訪問(wèn)時(shí)就可以直接根據(jù)線程編號(hào)去訪問(wèn)對(duì)應(yīng)的本地隊(duì)列。
  • 2、 不預(yù)先創(chuàng)建本地隊(duì)列,當(dāng)線程第一次訪問(wèn)分布式隊(duì)列時(shí),由于獲取不到線程編號(hào),由此可以斷定本線程是第1次訪問(wèn)分布式隊(duì)列,此時(shí)才創(chuàng)建本地隊(duì)列。

方案1和方案2可以說(shuō)各有優(yōu)缺點(diǎn)。方案1中,在事先不知道有多少線程會(huì)訪問(wèn)分布式隊(duì)列的情況下,預(yù)先創(chuàng)建好本地隊(duì)列會(huì)造成程序初始化時(shí)間過(guò)長(zhǎng),并且可能有一些創(chuàng)建好的隊(duì)列得不到使用。

方案2中,采用線程訪問(wèn)分布式隊(duì)列時(shí)才創(chuàng)建本地隊(duì)列,初始化時(shí)比較簡(jiǎn)單,并且不會(huì)造成多創(chuàng)建了本地隊(duì)列的情況。缺點(diǎn)是編程時(shí),隊(duì)列的操作代碼會(huì)變復(fù)雜一些,效率會(huì)有所降低。

下面的代碼中,給出的是方案2的實(shí)現(xiàn)。

  • 1) 類(lèi)的原型定義和成員函數(shù)

//獲取線程Id回調(diào)函數(shù)定義

typedef int (*GetThreadIdFunc)(void *pArg);

 

template <class T, class LocalQueue, class SharedQueue>

class CDistributedQueue {

private:

    LocalQueue **   m_ppLocalQueue;    // 本地隊(duì)列數(shù)組

    SharedQueue *   m_pSharedQueue;    // 共享隊(duì)列池或共享隊(duì)列

 

    int             m_nLocalQueueSize;

    int             m_nSharedQueueSize;

    int             m_nLocalQueueCount;

    int             m_nSharedQueueCount;

    DWORD       m_dwTlsIndex;        //線程本地存儲(chǔ)索引

    LONG volatile   m_lThreadIdIndex;     //線程編號(hào)最大值

    GetThreadIdFunc m_GetThreadIdFunc;   //獲取線程編號(hào)回調(diào)函數(shù),如果由外面

                                                                  //的線程池提供編號(hào)時(shí),需要傳入回調(diào)函數(shù)

    void *          m_pThreadIdFuncArg;  //獲取線程編號(hào)回調(diào)函數(shù)的參數(shù)

 

 CFastLock       m_LocalQueueResizeLock; //專(zhuān)為下面的ResizeLocalQueue函數(shù)使用

 void ResizeLocalQueue();             //將m_ppLocalQueue數(shù)組的大小擴(kuò)大一倍

public:

    CDistributedQueue(){

        m_GetThreadIdFunc = NULL;

        m_pThreadIdFuncArg = NULL;

        m_lThreadIdIndex = 0;

    };

    void Create( int nLocalQueueSize, int nLocalQueueCount,

        int nSharedQueueSize, int nSharedQueueCount);

    void Create( int nLocalQueueSize, int nLocalQueueCount,

        int nSharedQueueSize, int nSharedQueueCount,

        GetThreadIdFunc GetThreadId, void * pThreadIdFuncArg);

 

    virtual ~CDistributedQueue();

 

    LONG ThreadIdGet();

 

    void EnQueue(T &Data);

    int  DeQueue(T &Data);

 

    void PushToLocalQueue(T &Data);

    void PushToLocalQueue(T &Data, int nIndex);

    int PopFromLocalQueue(T &Data);

 

    SharedQueue *GetSharedQueue() { return m_pSharedQueue; };

    int PrivatizeSharedQueue(int nSharedQueueIndex);

};

 

說(shuō)明一下:CDistributedQueue類(lèi)中有三個(gè)模板參數(shù),第1個(gè)模板參數(shù)T是表示數(shù)據(jù)類(lèi)型;第2個(gè)模板參數(shù)是表示本地隊(duì)列類(lèi)的類(lèi)型,為一個(gè)不需要使用鎖的普通隊(duì)列,比如環(huán)形隊(duì)列等;第3個(gè)模板參數(shù)是表示一個(gè)需要使用鎖的共享隊(duì)列類(lèi),可以是一個(gè)隊(duì)列池類(lèi),也可以是普通的使用鎖的共享隊(duì)列類(lèi)。

 

  • 1) 構(gòu)造函數(shù)和析構(gòu)函數(shù)代碼

/**   分布式隊(duì)列的創(chuàng)建函數(shù)

 

       @param  int nLocalQueueSize - 本地子隊(duì)列的大小 

       @param  int nLocalQueueCount - 本地隊(duì)列的個(gè)數(shù)(數(shù)組的大小)   

       @param  int nSharedQueueSize - 共享子隊(duì)列的大小      

       @param  int nSharedQueueCount - 共享子隊(duì)列的個(gè)數(shù)    

       @return  void - 無(wú)

*/

template <class T, class LocalQueue, class SharedQueue>

void CDistributedQueue<T, LocalQueue, SharedQueue>::Create(

                   int nLocalQueueSize, int nLocalQueueCount,

                   int nSharedQueueSize, int nSharedQueueCount)

{

    m_nLocalQueueSize = nLocalQueueSize;

    m_nSharedQueueSize = nSharedQueueSize;

    if ( nLocalQueueCount != 0 )

    {

        m_nLocalQueueCount = nLocalQueueCount;

    }

    else

    {

        m_nLocalQueueCount = omp_get_num_procs();

    }

 

    if ( nSharedQueueCount != 0 )

    {

        m_nSharedQueueCount = nSharedQueueCount;

    }

    else

    {

        m_nSharedQueueCount = omp_get_num_procs();

    }

 

    m_ppLocalQueue =  new LocalQueue *[m_nLocalQueueCount];

    int i;

    for ( i = 0; i < m_nLocalQueueCount; i++ )

    {

        m_ppLocalQueue[i] = NULL;

    }

    m_pSharedQueue = new SharedQueue(m_nSharedQueueCount, m_nSharedQueueSize);

    m_dwTlsIndex = TlsAlloc();

    m_lThreadIdIndex = 0;

}

 

 

/**   分布式隊(duì)列的創(chuàng)建函數(shù)

 

       @param  int nLocalQueueSize - 本地子隊(duì)列的大小 

       @param  int nLocalQueueCount - 本地隊(duì)列的個(gè)數(shù)(數(shù)組的大小)   

       @param  int nSharedQueueSize - 共享子隊(duì)列的大小      

       @param  int nSharedQueueCount - 共享子隊(duì)列的個(gè)數(shù)    

       @param  GetThreadIdFunc GetThreadId - 獲取線程Id回調(diào)函數(shù)  

       @param  void * pThreadIdFuncArg - GetThreadId回調(diào)函數(shù)的參數(shù)

       @return  void - 無(wú)

*/

template <class T, class LocalQueue, class SharedQueue>

void CDistributedQueue<T, LocalQueue, SharedQueue>::Create(

    int nLocalQueueSize, int nLocalQueueCount,

    int nSharedQueueSize, int nSharedQueueCount,

    GetThreadIdFunc GetThreadId, void * pThreadIdFuncArg)

{

    m_GetThreadIdFunc = GetThreadId;

    m_pThreadIdFuncArg = pThreadIdFuncArg;

    Create(nLocalQueueSize, nLocalQueueCount, nSharedQueueSize, nSharedQueueCount);

}

 

/**   分布式隊(duì)列的析構(gòu)函數(shù)

 

       @return  - 無(wú)     

*/

template <class T, class LocalQueue, class SharedQueue>

CDistributedQueue<T, LocalQueue, SharedQueue>::~CDistributedQueue()

{

    int i;

    for ( i = 0; i < m_nLocalQueueCount; i++ )

    {

        if ( m_ppLocalQueue[i] != NULL )

        {

            delete m_ppLocalQueue[i];

        }

    }

    delete [] m_ppLocalQueue;

    delete m_pSharedQueue;

    TlsFree(m_dwTlsIndex);

}

 

  • 2) 將本地隊(duì)列數(shù)組擴(kuò)大一倍的內(nèi)部成員函數(shù)代碼

這個(gè)函數(shù)主要是考慮有可能程序升級(jí)后,訪問(wèn)的線程數(shù)量可能大于本地隊(duì)列數(shù)組的大小的情況,此時(shí)采取將本地隊(duì)列數(shù)組擴(kuò)大一倍的策略。

/**   分布式隊(duì)列的將本地隊(duì)列數(shù)組擴(kuò)大一倍的內(nèi)部成員函數(shù)

 

       @return  void - 無(wú)

*/

template <class T, class LocalQueue, class SharedQueue>

void CDistributedQueue<T, LocalQueue, SharedQueue>::ResizeLocalQueue()

{

    //將本地隊(duì)列數(shù)組擴(kuò)大一倍, 防止線程數(shù)量多于隊(duì)列數(shù)量,以保證程序安全

    int i;

 

    LocalQueue **ppQueue = new LocalQueue *[m_nLocalQueueCount * 2];

    for ( i = 0; i < m_nLocalQueueCount; i++ )

    {

        ppQueue[i] = m_ppLocalQueue[i];

    }

    for ( i = m_nLocalQueueCount; i < m_nLocalQueueCount * 2; i++ )

    {

        ppQueue[i] = NULL;

    }

    delete [] m_ppLocalQueue;

    m_ppLocalQueue = ppQueue;

 

    //使用原子操作避免m_nLocalQueueCount的數(shù)據(jù)競(jìng)爭(zhēng)問(wèn)題

    AtomicWrite((LONG volatile *)&m_nLocalQueueCount, m_nLocalQueueCount * 2);

}

 

  • 3) 獲取線程Id成員函數(shù)代碼

獲取線程Id成員函數(shù)中,這個(gè)函數(shù)中完成本地隊(duì)列的創(chuàng)建和分派工作。先是判斷獲取的線程Id是否為0,如果為0則表明還沒(méi)有創(chuàng)建本地隊(duì)列,此時(shí)需要給線程進(jìn)行編號(hào),并創(chuàng)建一個(gè)新的本地隊(duì)列放到數(shù)組中下標(biāo)等于線程編號(hào)的位置上。

/**   分布式隊(duì)列的獲取線程Id函數(shù)

       如果m_GetThreadIdFunc回調(diào)函數(shù)不為空,則使用它獲取Id

       否則根據(jù)分布式隊(duì)列內(nèi)部的編號(hào)機(jī)制獲取線程Id

 

       @return  LONG - 返回線程的編號(hào)   

*/

template <class T, class LocalQueue, class SharedQueue>

LONG CDistributedQueue<T, LocalQueue, SharedQueue>::ThreadIdGet()

{

    LONG Id;

    LocalQueue *pQueue = NULL;

 

    if ( m_GetThreadIdFunc != NULL )

    {

        Id = (*m_GetThreadIdFunc)(m_pThreadIdFuncArg);

        if ( Id >= m_nLocalQueueCount )

        {

            CScopedLock<CFastLock> slock(m_LocalQueueResizeLock);

            if ( Id >= m_nLocalQueueCount )

            {

                ResizeLocalQueue();

            }

        }

        if ( m_ppLocalQueue[Id] == NULL )

        {

            m_ppLocalQueue[Id] = new LocalQueue(m_nLocalQueueSize);

        }

        return Id;

    }

    else

    {

        Id = (LONG )TlsGetValue(m_dwTlsIndex);

        if ( Id == 0 )

        {

            Id = AtomicIncrement(&m_lThreadIdIndex);

            TlsSetValue(m_dwTlsIndex, (void *)Id);

            pQueue = new LocalQueue(m_nLocalQueueSize);

        }

        --Id;

    }

 

    if ( Id >= m_nLocalQueueCount)

    {

        CScopedLock<CFastLock> slock(m_LocalQueueResizeLock);

        if ( Id >= m_nLocalQueueCount )

        {

            ResizeLocalQueue();

        }

    }

    if ( pQueue != NULL )

    {

        m_ppLocalQueue[Id] = pQueue;

    }

return Id;

}

 

  • 4) 進(jìn)隊(duì)操作策略1的進(jìn)隊(duì)操作代碼

/**   分布式隊(duì)列的進(jìn)隊(duì)操作函數(shù)

       這里假定了本地隊(duì)列可以無(wú)限進(jìn)隊(duì)

       進(jìn)隊(duì)策略按以下優(yōu)先級(jí)進(jìn)行:

       1、本地隊(duì)列空時(shí)進(jìn)入本地隊(duì)列,、共享隊(duì)列未滿(mǎn)時(shí)進(jìn)入共享隊(duì)列

       3、共享隊(duì)列滿(mǎn)時(shí)進(jìn)入本地隊(duì)列

 

       @param  T &Data - 要進(jìn)隊(duì)的數(shù)據(jù)    

       @return  void - 無(wú)

*/

template <class T, class LocalQueue, class SharedQueue>

void CDistributedQueue<T, LocalQueue, SharedQueue>::EnQueue(T &Data)

{

    int nId = ThreadIdGet();

    if ( m_ppLocalQueue[nId]->IsEmpty() )

    {

        m_ppLocalQueue[nId]->EnQueue(Data);

    }

    else if ( m_pSharedQueue->Push(Data) != CAPI_SUCCESS )

    {

        int nId = ThreadIdGet();

        m_ppLocalQueue[nId]->EnQueue(Data);

    }

    else

    {

        //這個(gè)分支不需要做任何事

    }

    return;

}

 

  • 5) 本地隊(duì)列的操作代碼

/**   分布式隊(duì)列的本地隊(duì)列進(jìn)隊(duì)函數(shù)

       將數(shù)據(jù)進(jìn)入到當(dāng)前線程的本地隊(duì)列中

 

       @param  T &Data - 要進(jìn)隊(duì)的數(shù)據(jù)    

       @return  void - 無(wú)

*/

template <class T, class LocalQueue, class SharedQueue>

void CDistributedQueue<T, LocalQueue, SharedQueue>::PushToLocalQueue(

T &Data)

{

    int nId = ThreadIdGet();

    m_ppLocalQueue[nId]->EnQueue(Data);

    return;

}

 

/**   分布式隊(duì)列的指定序號(hào)本地隊(duì)列進(jìn)隊(duì)函數(shù)

    這是一個(gè)為特殊需求而設(shè)計(jì)的函數(shù)

    使用這個(gè)函數(shù)要特別小心,必須保證不會(huì)發(fā)生數(shù)據(jù)競(jìng)爭(zhēng)問(wèn)題

 

       @param  T &Data - 要進(jìn)隊(duì)的數(shù)據(jù)    

       @param  int nIndex - 本地隊(duì)列的序號(hào)     

       @return  void - 無(wú)

*/

template <class T, class LocalQueue, class SharedQueue>

void CDistributedQueue<T, LocalQueue, SharedQueue>::PushToLocalQueue(

T &Data, int nIndex)

{

    if ( nIndex >= m_nLocalQueueCount * 2)

    {

        return;

    }

 

    if ( nIndex >= m_nLocalQueueCount )

    {

        CScopedLock<CFastLock> slock(m_LocalQueueResizeLock);

        if ( nIndex >= m_nLocalQueueCount )

        {

            ResizeLocalQueue();

        }

    }

 

    if ( m_ppLocalQueue[nIndex] == NULL )

    {

        m_ppLocalQueue[nIndex] = new LocalQueue(m_nLocalQueueSize);

    }

 

    m_ppLocalQueue[nIndex]->EnQueue(Data);

    return;

}

 

/**   分布式隊(duì)列的本地隊(duì)列出隊(duì)函數(shù)

 

       @param  T &Data - 接收出隊(duì)的數(shù)據(jù)

       @return  int - 出隊(duì)成功返回CAPI_SUCCESS, 失敗(隊(duì)列為空)返回CAPI_FAILED.      

*/

template <class T, class LocalQueue, class SharedQueue>

int CDistributedQueue<T, LocalQueue, SharedQueue>::PopFromLocalQueue(

T &Data)

{

    int nId = ThreadIdGet();

    return m_ppLocalQueue[nId]->DeQueue(Data);

}

 

  • 6) 出隊(duì)操作代碼

/**   分布式隊(duì)列的出隊(duì)函數(shù)

    出隊(duì)操作策略為,先從本地隊(duì)列中出隊(duì),如果失敗則從共享隊(duì)列中出隊(duì)

 

       @param  T &Data - 接收出隊(duì)的數(shù)據(jù)

       @return  int - 成功返回CAPI_SUCCESS, 失敗返回CAPI_FAILED.    

*/

template <class T, class LocalQueue, class SharedQueue>

int CDistributedQueue<T, LocalQueue, SharedQueue>::DeQueue(T &Data)

{

    int nRet;

 

    int nId = ThreadIdGet();

   

    nRet = m_ppLocalQueue[nId]->DeQueue(Data);

    if ( nRet == CAPI_FAILED )

    {

        nRet = m_pSharedQueue->Pop(Data);

    }

    return nRet;  

}


本站僅提供存儲(chǔ)服務(wù),所有內(nèi)容均由用戶(hù)發(fā)布,如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊舉報(bào)。
打開(kāi)APP,閱讀全文并永久保存 查看更多類(lèi)似文章
猜你喜歡
類(lèi)似文章
阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式
Java并發(fā)面試題
利用隊(duì)列實(shí)現(xiàn)打印楊輝三角
msgsnd()函數(shù)
osa_mbx.c函數(shù)分析
如何清除串口緩沖區(qū)中的數(shù)據(jù)
更多類(lèi)似文章 >>
生活服務(wù)
分享 收藏 導(dǎo)長(zhǎng)圖 關(guān)注 下載文章
綁定賬號(hào)成功
后續(xù)可登錄賬號(hào)暢享VIP特權(quán)!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服