Producer & consumer is a classic computer problem. It's a very good example for learning P/V operations.
This example is designed to support multiple OS's such as Linux/Win32/Mac. So differences between these OS's should be encapsulated. The following infrastuctures of OS are used:
I'm not familiar with windows and windows programming so in this example only Linux is supported. But the design patterns used make it easy to support more platforms - the interface is not carefully designed, though.
Before start, some key points should be kept in mind:
- Abstract Factory and Singleton Patterns
- Multi-thread programming
- Semaphore and Mutex
- Protection on Data, NOT on code
First let's look at the abstract factory to create platform specific infrastuctres:
Acutally IThread is not exactly the same in code:
/* * Simplified thread interface. */ typedef void * (*ThreadFunc)(void *);
class IThread { public: IThread() {} virtual ~IThread() {}
public: virtual void Init() = 0; virtual void Run(ThreadFunc threadFunc, void* arg) = 0; virtual void Join() = 0; };
|
This is the declaration of factory base class:
/* * Abstract factory to create platform specific implementations. */ class IPlatformFactory { public: IPlatformFactory() {} virtual ~IPlatformFactory() {}
public: virtual ISemaphore* CreateSem(int size) = 0; virtual IMutex* CreateMutex() = 0; virtual IThread* CreateThreadObj() = 0; virtual ICommandLineParser* CreateCommandLineParser(int argc, char** argv) = 0; };
|
Then we need data buffer which contains data blocks:
Buffer creator is so simple that no need to describe here. DataBlock is a structure, very simple, too. We only need to have a look at the core functionalities of SharedBuffer:
Read data from buffer:
void SharedBuffer::ReadDataBlock(unsigned long* data, int size, Consumer* consumer) { int i = 0; mSemFull->Decreament(); mMutex->Lock(); // Find the first available data block for (; i < mCnt; ++i) { if (!mBlocks[i]->mIsEmpty) break; } if (i < mCnt) { // Consumes the data block memcpy(data, mBlocks[i]->mData, size); memset(mBlocks[i]->mData, 0, mBlockSize); mBlocks[i]->mIsEmpty = true;
// To demonstrate the producer and consumer, the data to be consumed // is write to console immediately. cout << (*consumer->GetName()) << " consumes data block [#" << i << "]:" << endl << flush; cout << " +------------ Consumed Data ------------+" << endl << flush; cout << " | " << (char*)data << " |" << endl << flush; cout << " +------------ Consumed Data ------------+" << endl << flush; } mMutex->Unlock(); mSemEmpty->Increament(); }
|
Write data to buffer:
void SharedBuffer::WriteDataBlock(unsigned long* data, int size, Producer* producer) { int i = 0; mSemEmpty->Decreament(); mMutex->Lock(); // Find the first available data block for (; i < mCnt; ++i) { if (mBlocks[i]->mIsEmpty) break; } if (i < mCnt) { // To demonstrate the producer and consumer, the data to be produced // is write to console immediately. cout << (*producer->GetName()) << " produces data block [#" << i << "]:" << endl << flush; cout << " +------------ Produced Data ------------+" << endl << flush; cout << " | " << (char*)data << " |" << endl << flush; cout << " +------------ Produced Data ------------+" << endl << flush;
// Produces the data block memset(mBlocks[i]->mData, 0, mBlockSize); memcpy(mBlocks[i]->mData, data , size); mBlocks[i]->mIsEmpty = false; } mMutex->Unlock(); mSemFull->Increament(); }
|
The mMutex is of Mutex type, to serialize the access to the shared buffer. mSemFull and mSemEmpty are semaphore to indicate how many data blocks (resources) available for reader and writer (consumer and producer).
The consumer and producer look like below:
Consume() and Produce() are made static because they are invoked in threads:
void* Consumer::Consume(void* arg) { Consumer* self = (Consumer*)arg; char* data = new char [self->mDataSize];
// Just to demonstrate if (self->mIters == 1) self->mIters *= 5; while (self->mIters--) { memset(data, 0, self->mDataSize); self->mBuffer->ReadDataBlock((unsigned long*)data, self->mDataSize, self); } delete [] data; // Should not output anything to console. Just to demonstrate. cout << "** " << (*self->GetName()) << " Done!" << endl; return 0; }
|
void* Producer::Produce(void* arg) { Producer* self = (Producer*)arg; char* data = new char [self->mDataSize]; strcpy(data, "Data block produced by: "); strcat(data, self->mName.c_str());
// Just to demonstrate if (self->mIters == 1) self->mIters *= 5; while (self->mIters--) { self->mBuffer->WriteDataBlock((unsigned long*)data, self->mDataSize, self); } delete [] data; // Should not output anything to console. Just to demonstrate. cout << "** " << (*self->GetName()) << " Done!" << endl << flush; return 0; }
|
To demonstrate the co-work of producer and consumer:
int main(int argc, char** argv) { /* * Parses command line arguments. */ IPlatformFactory* factory = new Factory(); ICommandLineParser* cmdParser = factory->CreateCommandLineParser(argc, argv); if (!cmdParser->Parse()) { cmdParser->GetHelp(); return -1; }
int nConsumers = cmdParser->GetNumOfConsumers(); int nProducers = cmdParser->GetNumOfProducers(); int nBlocks = cmdParser->GetNumOfDataBlocks(); int nSize = cmdParser->GetDataBlockSize();
delete factory; delete cmdParser;
/* * Creates buffers, consumers & producers, and threads that producing * and consuming data. Each consumer / producer per thread. */ SharedBuffer* pBuf = BufferCreator::GetBuffers(nBlocks, nSize); pBuf->Init();
// Number of iterations of consumers must be the number of producers // so that no producer will be hung for available empty data block. Consumer* consumers = InitConsumers(nConsumers, nProducers, pBuf);
// Number of iterations of producers must be the number of consumers // so that no consumers will be hung for available full data block. Producer* producers = InitProducers(nProducers, nConsumers, pBuf);
IThread** consumerThreads = new IThread* [nConsumers]; IThread** producerThreads = new IThread* [nProducers]; InitThreads(consumerThreads, nConsumers); InitThreads(producerThreads, nProducers);
/* * Begins the producers and consumers threads. */ cout << "*************** Begin ***************" << endl;
int i;
// Starts consumer threads first. for (i = 0; i < nConsumers; ++i) consumerThreads[i]->Run(Consumer::Consume, (void*)&consumers[i]);
// Then Starts producer threads. for (i = 0; i < nProducers; ++i) producerThreads[i]->Run(Producer::Produce, (void*)&producers[i]);
// Waits for the exit of all threads. for (i = 0; i < nConsumers; ++i) consumerThreads[i]->Join(); for (i = 0; i < nProducers; ++i) producerThreads[i]->Join();
cout << "*************** Done ***************" << endl;
|
OKay... I have to say that the code is such a mess... Anyway, this example is simple enough so that coding is very simple, given the class diagrams.
阅读(1440) | 评论(1) | 转发(0) |