Chinaunix首页 | 论坛 | 博客
  • 博客访问: 2708362
  • 博文数量: 416
  • 博客积分: 10220
  • 博客等级: 上将
  • 技术积分: 4193
  • 用 户 组: 普通用户
  • 注册时间: 2006-12-15 09:47
文章分类

全部博文(416)

文章存档

2022年(1)

2021年(1)

2020年(1)

2019年(5)

2018年(7)

2017年(6)

2016年(7)

2015年(11)

2014年(1)

2012年(5)

2011年(7)

2010年(35)

2009年(64)

2008年(48)

2007年(177)

2006年(40)

我的朋友

分类: C/C++

2015-02-16 09:47:23

lfringqueue.h
#ifndef INCLUDED_UTILS_LFRINGQUEUE
#define INCLUDED_UTILS_LFRINGQUEUE

#include
#include
#include
#include
#include
#include
#include


// Lock free ring queue 
//std::atomic_flag m_lEventSet(ATOMIC_FLAG_INIT);       // a flag to use whether we should change the item flag
//std::atomic_flag m_bHasItem(ATOMIC_FLAG_INIT);        // a flag to indicate whether there is an item enqueued
template < typename _TyData, long _uiCount = 100000 >
class lfringqueue
{
public:
    lfringqueue( long uiCount = _uiCount ) : 
//m_lEventSet(ATOMIC_FLAG_INIT), m_bHasItem(ATOMIC_FLAG_INIT),
         m_lTailIterator(0), m_lHeadIterator(0), m_uiCount( uiCount )
    {
        m_queue = new _TyData*[m_uiCount];
        memset( m_queue, 0, sizeof(_TyData*) * m_uiCount );
    }


    ~lfringqueue()
    {
        if ( m_queue )
            delete [] m_queue;
    }
 
    bool enqueue( _TyData *pdata, unsigned int uiRetries = 1000 )
    {
        if ( NULL == pdata )
        {
            // Null enqueues are not allowed
            return false;
        }


        unsigned int uiCurrRetries = 0;
        while ( uiCurrRetries < uiRetries )
        {
            // Release fence in order to prevent memory reordering 
            // of any read or write with following write
            std::atomic_thread_fence(std::memory_order_release);
            
            long lHeadIterator = m_lHeadIterator;


            if ( NULL == m_queue[lHeadIterator] )
            {
                long lHeadIteratorOrig = lHeadIterator;


                ++lHeadIterator;
                if ( lHeadIterator >= m_uiCount )
                        lHeadIterator = 0;


                // Don't worry if this CAS fails.  It only means some thread else has
                // already inserted an item and set it.
                if ( std::atomic_compare_exchange_strong( &m_lHeadIterator, &lHeadIteratorOrig, lHeadIterator ) )             
                {
                    // void* are always atomic (you wont set a partial pointer).
                    m_queue[lHeadIteratorOrig] = pdata;
                  
                    if ( m_lEventSet.test_and_set( ))
                    {
                        m_bHasItem.test_and_set();
                    }
                    return true;
                }
            }
            else
            {
                // The queue is full.  Spin a few times to check to see if an item is popped off.
                ++uiCurrRetries;
            }
        }
        return false;
    }


    bool dequeue( _TyData **ppdata )
    {
        if ( !ppdata )
        {
            // Null dequeues are not allowed!
            return false;
        }


        bool bDone = false;
        bool bCheckQueue = true;


        while ( !bDone )
        {
            // Acquire fence in order to prevent memory reordering 
            // of any read or write with following read
            std::atomic_thread_fence(std::memory_order_acquire);
            //MemoryBarrier();
            long lTailIterator = m_lTailIterator;
            _TyData *pdata = m_queue[lTailIterator];
            //volatile _TyData *pdata = m_queue[lTailIterator];            
            if ( NULL != pdata )
            {
                bCheckQueue = true;
                long lTailIteratorOrig = lTailIterator;


                ++lTailIterator;
                if ( lTailIterator >= m_uiCount )
                        lTailIterator = 0;


                //if ( lTailIteratorOrig == atomic_cas( (volatile long*)&m_lTailIterator, lTailIterator, lTailIteratorOrig ))
                if ( std::atomic_compare_exchange_strong( &m_lTailIterator, &lTailIteratorOrig, lTailIterator ))
                {
                        // Sets of sizeof(void*) are always atomic (you wont set a partial pointer).
                        m_queue[lTailIteratorOrig] = NULL;


                        // Gets of sizeof(void*) are always atomic (you wont get a partial pointer).
                        *ppdata = (_TyData*)pdata;


                        return true;
                }
            }
            else
            {
                bDone = true;
                m_lEventSet.clear();
            }
        }
        *ppdata = NULL;
        return false;
    }


    long countguess() const
    {
        long lCount = trycount();


        if ( 0 != lCount )
                return lCount;


        // If the queue is full then the item right before the tail item will be valid.  If it
        // is empty then the item should be set to NULL.
        long lLastInsert = m_lTailIterator - 1;
        if ( lLastInsert < 0 )
                lLastInsert = m_uiCount - 1;


        _TyData *pdata = m_queue[lLastInsert];
        if ( pdata != NULL ) 
                return m_uiCount;


        return 0;
    }


    long getmaxsize() const
    {
        return m_uiCount;
    }


    bool HasItem()
    {
        return m_bHasItem.test_and_set();
    }


    void SetItemFlagBack()
    {
        m_bHasItem.clear();
    }

private:
    long trycount() const
    {
        long lHeadIterator = m_lHeadIterator;
        long lTailIterator = m_lTailIterator;


        if ( lTailIterator > lHeadIterator )
                return m_uiCount - lTailIterator + lHeadIterator;


        // This has a bug where it returns 0 if the queue is full.
        return lHeadIterator - lTailIterator;
    }

private:    
    std::atomic m_lHeadIterator;  // enqueue index
    std::atomic m_lTailIterator;  // dequeue index
    _TyData **m_queue;                  // array of pointers to the data
    long m_uiCount;                     // size of the array
    std::atomic_flag m_lEventSet;       // a flag to use whether we should change the item flag
    std::atomic_flag m_bHasItem;        // a flag to indicate whether there is an item enqueued
};

#endif //INCLUDED_UTILS_LFRINGQUEUE

////////////////// main.cpp ///////////////////////////////////////

/* 
 * File:   main.cpp
 * Author: Peng
 *
 * Created on February 22, 2014, 9:55 PM
 */


#include
#include "lfringqueue.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include


#include
#include
#include


//#include


const long NUM_DATA = 50;
const int NUM_ENQUEUE_THREAD = 4;
const int NUM_DEQUEUE_THREAD = 1;
const long NUM_ITEM = 100000;
        
using namespace std;
class Data
{
public:   
    Data( int i = 0 ) : m_iData(i)
    {
        //stringstream ss;
       // ss << i;
       // m_szDataString = ss.str();
        //sprintf( m_szDataString, "%l-d", i);    
    }
        
    bool operator< ( const Data & aData) const
    {
        if ( m_iData < aData.m_iData)
            return true;
        else
            return false;
    }
    
    int& GetData()
    {
        return m_iData;
    }
public:   
    int m_iData;
    string m_szDataString;
    //char m_szDataString[MAX_DATA_SIZE];
};


Data DataArray[NUM_DATA];


constexpr long size = 0.5 * NUM_DATA;
lfringqueue < Data, 1000> LockFreeQueue; 
boost::lockfree::queue BoostQueue(1000);


// Since there is a chance that the searched number cannot be found, so the function should return boolean
bool BinarySearchNumberInSortedArray( Data datas[], int iStart, int iEnd, int SearchedNum, int &iFound )
{
    if ( iEnd - iStart <= 1 )
    {
        if ( datas[iStart].GetData() == SearchedNum )
        {
            iFound = iStart;
            return true;
        }
        else if ( datas[iEnd].GetData() == SearchedNum )
        {
            iFound = iEnd;
            return true;
        }
        else
            return false;
    }
    
    int mid = 0.5 * ( iStart + iEnd );
    
    if ( datas[mid].GetData() == SearchedNum )
    {
        iFound = mid;
        return true;
    }
    
    if ( datas[mid].GetData() > SearchedNum )
    {
        if ( mid - 1 >= 0)
            return BinarySearchNumberInSortedArray ( datas, iStart, mid - 1, SearchedNum, iFound);
        else
            return false;
    }
    else
    {
        if ( mid + 1 <= iEnd )
            return BinarySearchNumberInSortedArray ( datas, mid + 1, iEnd, SearchedNum, iFound);
        else
            return false;
    }
}
std::mutex g_mtx;
bool GenerateRandomNumber_FindPointerToTheNumber_EnQueue(int pid)
{
    std::uniform_int_distribution dis(1, NUM_DATA);
    default_random_engine engine{};
      
Data *pData, *pDataPop;
int n = 0, nUnitLen = 10;
int iFoundIndex;
int nCountPush = 0, nCountPop = 0;
int nDataTotalPush=0, nDataTotalPop = 0;
pid++;
    for ( long i = 0; i < NUM_ITEM; i++ )
    {
//std::unique_lock lock(g_mtx);
        int x = dis ( engine );
for (n = 0; n < nUnitLen; n++) {
iFoundIndex = pid;
//if (BinarySearchNumberInSortedArray(DataArray, 0, NUM_DATA - 1, x, iFoundIndex))
{
pData = &DataArray[iFoundIndex];
if (LockFreeQueue.enqueue(pData)) {
nCountPush++;
nDataTotalPush += pData->m_iData;
}
//BoostQueue.push( pData );
}
}


for (n = 0; n < nUnitLen; n++) {
if (LockFreeQueue.dequeue(&pDataPop)) {
if (pDataPop) {
nDataTotalPop += pDataPop->m_iData;
nCountPop++;
}
}
}
    }


if (nCountPop >= NUM_ITEM * nUnitLen) {

}
else {
bool bBreak = false;
for (long i = 0; i < NUM_ITEM; i++)
{
for (n = 0; n < nUnitLen; n++) {
if (LockFreeQueue.dequeue(&pDataPop)) {
if (pDataPop) {
nDataTotalPop += pDataPop->m_iData;
nCountPop++;
if (nCountPop >= NUM_ITEM * nUnitLen) {
bBreak = true;
break;
}
}
}
}
if (bBreak) {
break;
}
}
}


cout << " nCountPush = " << nCountPush << " nCountPop = " << nCountPop
<< " push=" << nDataTotalPush 
<< " pop="  << nDataTotalPop < return true;
}
bool Dequeue()
{
    Data *pData;


    for ( long i = 0; i < NUM_ITEM; i ++)
    {
        while (  LockFreeQueue.dequeue( &pData ) );       
        //while (  BoostQueue.pop( pData ) ) ;      
    }    


return true;
}


int main(int argc, char** argv) 
{
    for ( int i = 1; i < NUM_DATA + 1; i++ )
    {
        Data data(i);
data.m_iData = i;
        DataArray[i-1] = data;
    }
     
    std::thread PublishThread[NUM_ENQUEUE_THREAD]; 
    std::thread ConsumerThread[NUM_DEQUEUE_THREAD];
    std::chrono::duration elapsed_seconds;
  
auto start = std::chrono::high_resolution_clock::now();
    for ( int i = 0; i < NUM_ENQUEUE_THREAD;  i++ )
    {
        PublishThread[i] = std::thread( GenerateRandomNumber_FindPointerToTheNumber_EnQueue, i ); 
    }
    
    
   /* for ( int i = 0; i < NUM_DEQUEUE_THREAD; i++ )
    {
        ConsumerThread[i] = std::thread{ Dequeue};
    }
    
    for ( int i = 0; i < NUM_DEQUEUE_THREAD; i++ )
    {
        ConsumerThread[i].join();
    }   */


for (int i = 0; i < NUM_ENQUEUE_THREAD; i++)
{
PublishThread[i].join();
}


    auto end = std::chrono::high_resolution_clock::now();
    elapsed_seconds = end - start;
    std::cout << "Enqueue and Dequeue 10 million item in:" << elapsed_seconds.count() << std::endl;
             
    return 0;
}
加大此数之后就不稳定,还没有做到真正的并发.
const long NUM_ITEM = 100000;
阅读(1120) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~