At this post you will see how to use pthread library for multi-threaded programming and implementing Producer/Consumer pattern in simplest way.
you will see how can you avoid busy waiting with condition variables and how to do thread synchronizing.
at this example, producer generates random traffic and put it in buffer, and simultaneously consumer retrieves the produced traffic and check if they are suspicious or not.
please take care that we can't pass none-static class member as pthread_create function. for handling the problem you need to add a static method to the class and use it in the pthread_create function.
this project runs on bot Windows+MinGw and Linux.
(( don't forget to add -lpthread for g++ when compiling this file ))
#include <iostream> #include <pthread.h> #include <queue> #include <string> #include <sstream> using namespace std; class Requests { private: queue<string> mBuffer; static const int MAX_BUFFER_LEN=100; static const int MAX_PROCESS_LEN=1000; int mProcessed; int mLen; pthread_mutex_t mLock; pthread_cond_t mEmptyCond; pthread_cond_t mFullCond; public: Requests():mLen(0),mProcessed(0) { pthread_mutex_init(&mLock,NULL); pthread_cond_init(&mFullCond,NULL); pthread_cond_init(&mEmptyCond,NULL); } ~Requests() { pthread_mutex_destroy(&mLock); pthread_cond_destroy(&mFullCond); pthread_cond_destroy(&mEmptyCond); } void Add(string pRequest) { pthread_mutex_lock(&mLock); if(mLen==MAX_BUFFER_LEN) { cout << "Add waiting because of Full Condition...\n"; pthread_cond_wait(&mFullCond,&mLock); } if(ProcessedEnough()) { pthread_mutex_unlock(&mLock); return ; } mProcessed++; cout << "Pushing :"<<pRequest<<endl; mBuffer.push(pRequest); mLen++; pthread_mutex_unlock(&mLock); pthread_cond_signal(&mEmptyCond); } string Remove(bool &pProcessFinished) { pProcessFinished=false; pthread_mutex_lock(&mLock); if(mLen==0) { if(ProcessedEnough()) { pProcessFinished=true; pthread_mutex_unlock(&mLock); pthread_cond_signal(&mFullCond); return ""; } cout << "Remove waiting because of Empty Condition...\n"; pthread_cond_wait(&mEmptyCond,&mLock); } string ret=mBuffer.front(); mBuffer.pop(); cout << ">>>>>>Popping :"<<ret<<endl; mLen--; pthread_mutex_unlock(&mLock); pthread_cond_signal(&mFullCond); return ret; } bool ProcessedEnough() { return mProcessed > MAX_PROCESS_LEN; } }; class TrafficProducer { private : Requests *mRequestBuffer; public: TrafficProducer(Requests *pReq):mRequestBuffer(pReq) { } void * ProduceTraffic() { int i=0; while(! mRequestBuffer->ProcessedEnough()) { Sleep(rand()%10); ostringstream os; os << i++; mRequestBuffer->Add(os.str()); } return NULL; } static void * Run(void *pContext) { return static_cast<TrafficProducer *>(pContext)->ProduceTraffic(); } }; class TrafficInspector { private : Requests *mRequestBuffer; public: TrafficInspector(Requests *pReq):mRequestBuffer(pReq) { } void * InspectTraffic() { int i; string s; while(1) { Sleep(rand()%5); bool finished; s= mRequestBuffer->Remove(finished); if(finished) return NULL; i=atoi(s.c_str()); if(i%19 == 0) { cout << "suspicios data recieved:"<<i<<endl; } } return NULL; } static void * Run(void *pContext) { return static_cast<TrafficInspector*>(pContext)->InspectTraffic(); } }; int main() { Requests reqBuff; TrafficProducer producer(&reqBuff); TrafficInspector consumer(&reqBuff); pthread_t th_producer; pthread_t th_consumer; pthread_create(&th_producer,NULL,TrafficProducer::Run,static_cast<void*>(&producer)); pthread_create(&th_consumer,NULL,TrafficInspector::Run,static_cast<void*>(&consumer)); pthread_join(th_producer,NULL); pthread_join(th_consumer,NULL); cout << "Finished" << endl; return 0; }