#include "observerwiththread.h" IObserverWithThread::IObserverWithThread() :m_thread(NULL),m_bRunning(false) { } int IObserverWithThread::OnReceiverData( WPARAM wParam,LPARAM lParam ) { //只有post_event的返回值为0时表明事件需要处理; int iret=post_event(wParam,lParam); if (iret==NONE) { tthread::lock_guard lock(m_queue_mutex); m_EventQueue.push(wParam); m_condvar.notify_all(); } return iret; } void IObserverWithThread::stop() { DEBUG_METHOD(); m_bRunning=false; } void IObserverWithThread::join() { if (NULL!=m_thread) { m_thread->join(); } } void thread_callback(void *thread_arg); void IObserverWithThread::start( bool bForceRestart/*=false*/ ) { DEBUG_METHOD(); DEBUG_PRINTF("current thread is running?%s",m_bRunning?"true":"false"); if (m_bRunning) { if (bForceRestart) { delete m_thread; m_thread=new thread(thread_callback,this); } } else { m_thread=new thread(thread_callback,this); } m_bRunning=true; } IObserverWithThread::~IObserverWithThread() { delete m_thread; m_thread=NULL; m_bRunning=false; } bool IObserverWithThread::is_running() { return m_bRunning; } bool IObserverWithThread::isEnable() { return m_bRunning; } void thread_callback( void* thread_arg ) { DEBUG_METHOD(); IObserverWithThread* handler_thread=reinterpret_cast(thread_arg); if (NULL==handler_thread) { return ; } DEBUG_VALUE_AND_TYPE_OF(handler_thread); DEBUG_MESSAGE(handler_thread->name()<<"thread_callback-->Enter"); //tthread::tthread::lock_guard lock(data_store_manager->gFastMutex); while(handler_thread->is_running()) { handler_thread->m_condvar.wait(handler_thread->m_mutex); tthread::lock_guard lock(handler_thread->m_queue_mutex); while (!handler_thread->m_EventQueue.empty()) { //将所有事件数据取出来; handler_thread->handler(handler_thread->m_EventQueue.front()); handler_thread->m_EventQueue.pop(); } } DEBUG_PRINTF("IObserverWithThread::thread_callback-->Leave"); }