21 #ifndef WORKITEMPOOL_H
22 #define WORKITEMPOOL_H
26 #include </usr/include/errno.h>
30 #define MAX_MESSAGE_LEN (26214400-5)
88 int mutexDestroyResult = pthread_mutex_destroy( &m_Mutex );
89 if ( 0 != mutexDestroyResult )
92 stringstream errorMessage;
95 strerror_s( errBuffer,
sizeof( errBuffer ), errCode );
96 TRACE_LOG(
"Unable to destroy mutex WorkItem::CounterTypeWrapper::m_Mutex [" << mutexDestroyResult <<
"] Error code : " << errCode <<
" [" << errBuffer <<
"]" );
98 TRACE_LOG(
"Unable to destroy mutex WorkItem::CounterTypeWrapper::m_Mutex [" << mutexDestroyResult <<
"] Error code : " << errCode <<
" [" << strerror( errCode ) <<
"]" );
105 int mutexInitResult = pthread_mutex_init( &m_Mutex, NULL );
106 if ( 0 != mutexInitResult )
108 TRACE_LOG(
"Unable to init mutex WorkItem::CounterTypeWrapper::m_Mutex [" << mutexInitResult <<
"]" );
114 int mutexInitResult = pthread_mutex_init( &m_Mutex, NULL );
115 if ( 0 != mutexInitResult )
117 TRACE_LOG(
"Unable to init mutex WorkItem::CounterTypeWrapper::m_Mutex [" << mutexInitResult <<
"]" );
123 if (
this == &source )
135 pthread_mutex_t&
getLock() {
return m_Mutex; }
153 inline WorkItem() : m_ItemRef( NULL ), m_RefCount( NULL )
155 m_OwnerThread = pthread_self();
158 explicit inline WorkItem(
volatile T* item ) : m_ItemRef( item )
160 m_OwnerThread = pthread_self();
164 inline WorkItem(
const WorkItem< T >& source ) : m_ItemRef( source.m_ItemRef ), m_OwnerThread( source.m_OwnerThread ), m_RefCount( source.m_RefCount )
179 lpRefCount->increment();
184 inline void RemoveReference()
volatile
187 if ( m_RefCount == NULL )
192 bool deletable =
false;
196 deletable = ( lpRefCount->decrement() == 0 );
202 if ( m_ItemRef != NULL )
206 DEBUG_LOG(
"Unreferenced item destroyed" );
228 if ( m_ItemRef != NULL )
240 inline T*
get()
const
243 if( m_RefCount == NULL )
244 throw runtime_error(
"Item not created." );
247 if ( lpRefCount->get() == 0 )
248 throw runtime_error(
"Item already destroyed." );
250 return const_cast< T*
>( m_ItemRef );
253 inline unsigned int getRefCount()
volatile
255 if ( m_RefCount != NULL )
258 return lpRefCount->get();
260 throw runtime_error(
"Item reference count was destroyed" );
263 inline pthread_t getOwnerThread()
265 return m_OwnerThread;
279 explicit WorkItemNotFound(
const string& message =
"Work item not found" ) : runtime_error( message ) {};
301 static void DeleteReserves(
void* data )
303 unsigned int* threadReserve = (
unsigned int* )data;
304 if ( threadReserve != NULL )
305 delete threadReserve;
318 int mutexInitResult = pthread_mutex_init( &PoolSyncMutex, NULL );
319 if ( 0 != mutexInitResult )
321 TRACE_LOG(
"Unable to init mutex WorkItemPool::PoolSyncMutex [" << mutexInitResult <<
"]" );
324 mutexInitResult = pthread_mutex_init( &ReserveSyncMutex, NULL );
325 if ( 0 != mutexInitResult )
327 TRACE_LOG(
"Unable to init mutex WorkItemPool::ReserveSyncMutex [" << mutexInitResult <<
"]" );
330 int condInitResult = pthread_cond_init( &PoolReaderBarrier, NULL );
331 if ( 0 != condInitResult )
333 TRACE_LOG(
"Unable to init condition WorkItemPool::PoolReaderBarrier [" << condInitResult <<
"]" );
336 condInitResult = pthread_cond_init( &PoolWriterBarrier, NULL );
337 if ( 0 != condInitResult )
339 TRACE_LOG(
"Unable to init condition WorkItemPool::PoolWriterBarrier [" << condInitResult <<
"]" );
342 int keyCreateResult = pthread_key_create( &ReserveKey, &WorkItemPool::DeleteReserves );
343 if ( 0 != keyCreateResult )
345 TRACE_LOG(
"Unable to create thread key WorkItemPool::ReserveKey [" << keyCreateResult <<
"]" );
347 DEBUG_LOG(
"Created pool reserve key [" << ReserveKey <<
"]" );
354 int mutexDestroyResult = pthread_mutex_destroy( &ReserveSyncMutex );
355 if ( 0 != mutexDestroyResult )
357 TRACE_LOG(
"Unable to destroy mutex WorkItemPool::ReserveSyncMutex [" << mutexDestroyResult <<
"]" );
360 mutexDestroyResult = pthread_mutex_destroy( &PoolSyncMutex );
361 if ( 0 != mutexDestroyResult )
363 TRACE_LOG(
"Unable to destroy mutex WorkItemPool::PoolSyncMutex [" << mutexDestroyResult <<
"]" );
366 int condDestroyResult = pthread_cond_destroy( &PoolReaderBarrier );
367 if ( 0 != condDestroyResult )
369 TRACE_LOG(
"Unable to destroy condition WorkItemPool::PoolReaderBarrier [" << condDestroyResult <<
"]" );
372 condDestroyResult = pthread_cond_destroy( &PoolWriterBarrier );
373 if ( 0 != condDestroyResult )
375 TRACE_LOG(
"Unable to destroy condition WorkItemPool::PoolWriterBarrier [" << condDestroyResult <<
"]" );
379 void reservePoolSize(
const unsigned int reservedPoolSize )
volatile
381 unsigned int* threadReserve = (
unsigned int* )pthread_getspecific( ReserveKey );
382 if ( threadReserve == NULL )
383 threadReserve =
new unsigned int;
385 *threadReserve = reservedPoolSize;
386 int setSpecificResult = pthread_setspecific( ReserveKey, threadReserve );
387 if ( 0 != setSpecificResult )
389 TRACE_LOG(
"Signal poolreaderbarrier failed [" << setSpecificResult <<
"]" );
393 void waitForPoolEmpty(
const unsigned int secWait = 0 )
volatile
400 if ( !lpPool->empty() )
404 condWait = pthread_cond_wait( const_cast< pthread_cond_t* >( &PoolWriterBarrier ), const_cast< pthread_mutex_t* >( &PoolSyncMutex ) );
408 struct timespec wakePerf;
410 wakePerf.tv_sec = time( NULL ) + secWait;
411 wakePerf.tv_nsec = 0;
412 condWait = pthread_cond_timedwait( const_cast< pthread_cond_t* >( &PoolWriterBarrier ), const_cast< pthread_mutex_t* >( &PoolSyncMutex ), &wakePerf );
416 if ( lpPool->empty() || ( ( secWait > 0 ) && ( condWait == ETIMEDOUT ) ) )
421 void waitToWrite( pthread_t selfId )
volatile
424 unsigned int countActual = 0;
425 bool reserveCreated =
false;
427 unsigned int* countReserve = (
unsigned int* )pthread_getspecific( ReserveKey );
428 if ( countReserve == NULL )
430 DEBUG_LOG(
"Creating initial pool reserve for thread [" << selfId <<
"]" );
431 reservePoolSize( 10 );
432 reserveCreated =
true;
434 countReserve = (
unsigned int* )pthread_getspecific( ReserveKey );
440 if ( reserveCreated )
442 lpWriterItems->insert( pair< pthread_t, unsigned int >( selfId, countActual ) );
449 countActual = ( *lpWriterItems )[ selfId ];
450 DEBUG_LOG(
"Work items in pool for thread [" << selfId <<
"] is now [" << countActual <<
"/" << *countReserve <<
"]" );
453 if ( countActual >= *countReserve )
454 condWait = pthread_cond_wait( const_cast< pthread_cond_t* >( &PoolWriterBarrier ), const_cast< pthread_mutex_t* >( &ReserveSyncMutex ) );
459 while( countActual >= *countReserve );
463 ( ( *lpWriterItems )[ selfId ] )++;
469 pthread_t selfId = pthread_self();
472 waitToWrite( selfId );
483 int condSignalResult = pthread_cond_signal( const_cast< pthread_cond_t* >( &PoolReaderBarrier ) );
484 if ( 0 != condSignalResult )
486 TRACE_LOG(
"Signal PoolReaderBarrier failed [" << condSignalResult <<
"]" );
490 void addUniquePoolItem(
const string id,
const WorkItem< T >& item )
volatile
493 pthread_t selfId = pthread_self();
498 for(
typename WorkItemPool_QueueType::const_iterator poolItemFinder = lpPool->begin(); poolItemFinder != lpPool->end(); poolItemFinder++ )
500 if ( poolItemFinder->first ==
id )
502 DEBUG_LOG(
"Item [" <<
id <<
"] not added to pool. Unique constraint failed." );
509 waitToWrite( selfId );
518 int condSignalResult = pthread_cond_signal( const_cast< pthread_cond_t* >( &PoolReaderBarrier ) );
519 if ( 0 != condSignalResult )
521 TRACE_LOG(
"Signal PoolReaderBarrier failed [" << condSignalResult <<
"]" );
525 WorkItem< T > getPoolItem(
const string id,
const bool throwOnError =
true )
volatile
532 typename WorkItemPool_QueueType::const_iterator poolItemFinder = lpPool->begin();
533 while( poolItemFinder != lpPool->end() )
535 if ( poolItemFinder->first ==
id )
547 return poolItemFinder->second;
562 while( lpPool->empty() )
567 int condWaitResult = pthread_cond_wait( const_cast< pthread_cond_t* >( &PoolReaderBarrier ), const_cast< pthread_mutex_t* >( &PoolSyncMutex ) );
568 if ( 0 != condWaitResult )
570 TRACE_LOG(
"Condition wait on PoolReaderBarrier failed [" << condWaitResult <<
"]" );
579 return lpPool->front().second;
590 while( lpPool->empty() )
598 int condWaitResult = pthread_cond_wait( const_cast< pthread_cond_t* >( &PoolReaderBarrier ), const_cast< pthread_mutex_t* >( &PoolSyncMutex ) );
599 if ( 0 != condWaitResult )
601 TRACE_LOG(
"Condition wait on PoolReaderBarrier failed [" << condWaitResult <<
"]" );
615 item = lpPool->front().second;
618 catch(
const std::exception& ex )
620 TRACE_LOG(
"A [" <<
typeid( ex ).name() <<
"] has occured [" << ex.what() <<
"] while removing item from pool" );
625 TRACE_LOG(
"An [unknown exception] has occured while removing item from pool." );
637 int condBroadcastResult = pthread_cond_broadcast( const_cast< pthread_cond_t * >( &PoolWriterBarrier ) );
638 if ( 0 != condBroadcastResult )
640 TRACE_LOG(
"Condition broadcast on PoolWriterBarrier failed [" << condBroadcastResult <<
"]" );
647 void erasePoolItem(
string id,
const bool throwOnError =
true )
volatile
649 pthread_t itemOwnerThread;
656 typename WorkItemPool_QueueType::iterator poolItemFinder = lpPool->begin();
657 while( poolItemFinder != lpPool->end() )
659 if ( poolItemFinder->first ==
id )
661 itemOwnerThread = poolItemFinder->second.getOwnerThread();
673 lpPool->erase( poolItemFinder );
675 catch(
const std::exception& ex )
677 TRACE_LOG(
"A [" <<
typeid( ex ).name() <<
"] has occured [" << ex.what() <<
"] while removing item from pool" );
683 TRACE_LOG(
"An [unknown exception] has occured while removing item from pool." );
693 ( ( *lpWriterItems )[ itemOwnerThread ] )--;
696 int condBroadcastResult = pthread_cond_broadcast( const_cast< pthread_cond_t* >( &PoolWriterBarrier ) );
697 if ( 0 != condBroadcastResult )
699 TRACE_LOG(
"Condition broadcast on PoolWriterBarrier failed [" << condBroadcastResult <<
"]" );
704 WorkItem< T > removePoolItem(
string id,
const bool throwOnError =
true )
volatile
713 typename WorkItemPool_QueueType::iterator poolItemFinder = lpPool->begin();
714 while( poolItemFinder != lpPool->end() )
716 if ( poolItemFinder->first ==
id )
729 item = poolItemFinder->second;
730 lpPool->erase( poolItemFinder );
732 catch(
const std::exception& ex )
734 TRACE_LOG(
"A [" <<
typeid( ex ).name() <<
"] has occured [" << ex.what() <<
"] while removing item from pool" );
742 TRACE_LOG(
"An [unknown exception] has occured while removing item from pool." );
757 int condBroadcastResult = pthread_cond_broadcast( const_cast< pthread_cond_t* >( &PoolWriterBarrier ) );
758 if ( 0 != condBroadcastResult )
760 TRACE_LOG(
"Condition broadcast on PoolWriterBarrier failed [" << condBroadcastResult <<
"]" );
770 typename WorkItemPool_QueueType::const_iterator poolItemFinder = lpQueue->begin();
772 while( poolItemFinder != lpQueue->end() )
774 output << *( poolItemFinder->second.get() );
781 unsigned int getSize()
volatile
784 return lpPool->size();
787 void SignalAllReaders()
789 int condSignalResult = pthread_cond_broadcast( &PoolReaderBarrier );
790 if ( 0 != condSignalResult )
792 TRACE_LOG(
"Condition broadcast on PoolReaderBarrier failed [" << condSignalResult <<
"]" );
798 int condSignalResult = pthread_cond_signal( &PoolReaderBarrier );
799 if ( 0 != condSignalResult )
801 TRACE_LOG(
"Condition signal on PoolReaderBarrier failed [" << condSignalResult <<
"]" );
807 int condSignalResult = pthread_cond_signal( &PoolWriterBarrier );
808 if ( 0 != condSignalResult )
810 TRACE_LOG(
"Condition signal on PoolWriterBarrier failed [" << condSignalResult <<
"]" );
825 void ShutdownPoolWriters()
835 bool IsRunning()
const
840 #if defined( TESTDLL_EXPORT ) || defined ( TESTDLL_IMPORT )
884 void allocate(
unsigned long size );
886 unsigned long size()
const
891 unsigned long max_size()
const
893 return m_MaxBufferSize;
896 unsigned char* buffer()
const
898 return *m_BufferAddr;
907 string str(
const string::size_type size )
const;
909 void copyFrom(
const string& source,
const unsigned long maxBufferSize =
MAX_MESSAGE_LEN );
910 void copyFrom(
const unsigned char* source,
unsigned long size,
unsigned long maxBufferSize =
MAX_MESSAGE_LEN );
914 void truncate(
const unsigned long index )
917 if ( index >= m_BufferSize )
919 m_BufferSize = index;
937 #endif // WORKITEMPOOL_H