fintp_utils
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
WorkItemPool.h
Go to the documentation of this file.
1 /*
2 * FinTP - Financial Transactions Processing Application
3 * Copyright (C) 2013 Business Information Systems (Allevo) S.R.L.
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>
17 * or contact Allevo at : 031281 Bucuresti, 23C Calea Vitan, Romania,
18 * phone +40212554577, office@allevo.ro <mailto:office@allevo.ro>, www.allevo.ro.
19 */
20 
21 #ifndef WORKITEMPOOL_H
22 #define WORKITEMPOOL_H
23 
24 #ifdef LINUX
25 //TODO check ETIMEDOUT
26 #include </usr/include/errno.h>
27 #endif
28 
29 // 4M, allow some slack
30 #define MAX_MESSAGE_LEN (26214400-5)
31 
32 #include "DllMainUtils.h"
33 
34 #include <typeinfo>
35 #include <pthread.h>
36 #include <deque>
37 #include <exception>
38 #include <stdexcept>
39 #include <map>
40 
41 #include "Log.h"
42 
43 using namespace std;
44 
45 #include "ThreadingUtils.h"
46 
47 // scenario :
48 // class MessagePoolHolder
49 //{
50 // public :
51 // static WorkItemPool< RoutingMessage > MessagePool( 2 );
52 // };
53 //
54 // thread 1 ( writer ) :
55 // MessagePoolHolder::MessagePool.reservePoolSize( 10 );
56 // loop
57 // {
58 // RoutingMessage *theMessage = NULL;
59 // // do work... get theMessage
60 //
61 // WorkItem< RoutingMessage > item( theMessage );
62 // m_MessagePool.addPoolItem( item ); -> waits until the pool is unlocked ( writer barrier ), adds the item
63 //
64 // // don't destroy the message. it will be destroyed by the WorkItem
65 // }
66 //
67 // thread 2 ( reader ) :
68 // loop
69 // {
70 // WorkItem< RoutingMessage > item = MessagePoolHolder::MessagePool.getPoolItem()
71 // }
72 
73 namespace FinTP
74 {
75  template< class T >
76  class WorkItem
77  {
79  {
80  private :
82  pthread_mutex_t m_Mutex;
83 
84  public :
85 
87  {
88  int mutexDestroyResult = pthread_mutex_destroy( &m_Mutex );
89  if ( 0 != mutexDestroyResult )
90  {
91  int errCode = errno;
92  stringstream errorMessage;
93 #ifdef CRT_SECURE
94  char errBuffer[ 95 ];
95  strerror_s( errBuffer, sizeof( errBuffer ), errCode );
96  TRACE_LOG( "Unable to destroy mutex WorkItem::CounterTypeWrapper::m_Mutex [" << mutexDestroyResult << "] Error code : " << errCode << " [" << errBuffer << "]" );
97 #else
98  TRACE_LOG( "Unable to destroy mutex WorkItem::CounterTypeWrapper::m_Mutex [" << mutexDestroyResult << "] Error code : " << errCode << " [" << strerror( errCode ) << "]" );
99 #endif
100  }
101  }
102 
103  explicit CounterTypeWrapper( UIntType::base_type value ) : m_Value( const_cast< UIntType::base_type& >( value ) )
104  {
105  int mutexInitResult = pthread_mutex_init( &m_Mutex, NULL );
106  if ( 0 != mutexInitResult )
107  {
108  TRACE_LOG( "Unable to init mutex WorkItem::CounterTypeWrapper::m_Mutex [" << mutexInitResult << "]" );
109  }
110  }
111 
112  CounterTypeWrapper( const CounterTypeWrapper& source ) : m_Value( source.m_Value )
113  {
114  int mutexInitResult = pthread_mutex_init( &m_Mutex, NULL );
115  if ( 0 != mutexInitResult )
116  {
117  TRACE_LOG( "Unable to init mutex WorkItem::CounterTypeWrapper::m_Mutex [" << mutexInitResult << "]" );
118  }
119  }
120 
121  CounterTypeWrapper& operator=( const CounterTypeWrapper& source )
122  {
123  if ( this == &source )
124  return *this;
125  m_Value = source.m_Value;
126  return *this;
127  }
128 
129  UIntType::base_type decrement() { return UIntType::decrement( &m_Value ); }
130  UIntType::base_type increment() { return UIntType::increment( &m_Value ); }
131 
132  UIntType::base_type get() const { return m_Value; }
133  void set( const UIntType::base_type value ) { m_Value = value; }
134 
135  pthread_mutex_t& getLock() { return m_Mutex; }
136 
137  private :
138 
140  };
141 
142  //add volatile
143  typedef volatile CounterTypeWrapper CounterType;
144 
145  private :
146 
147  volatile T* m_ItemRef;
148  pthread_t m_OwnerThread;
150 
151  public :
152 
153  inline WorkItem() : m_ItemRef( NULL ), m_RefCount( NULL )
154  {
155  m_OwnerThread = pthread_self();
156  }
157 
158  explicit inline WorkItem( volatile T* item ) : m_ItemRef( item )
159  {
160  m_OwnerThread = pthread_self();
161  m_RefCount = new CounterTypeWrapper( 1 );
162  }
163 
164  inline WorkItem( const WorkItem< T >& source ) : m_ItemRef( source.m_ItemRef ), m_OwnerThread( source.m_OwnerThread ), m_RefCount( source.m_RefCount )
165  {
166  Clone();
167  }
168 
169  inline ~WorkItem()
170  {
171  RemoveReference();
172  }
173 
174  inline const WorkItem< T >& Clone()
175  {
176  LockingPtr< CounterTypeWrapper > lpRefCount( *m_RefCount, const_cast< CounterTypeWrapper * >( m_RefCount )->getLock() );
177 
178  // increment reference count
179  lpRefCount->increment();
180 
181  return *this;
182  }
183 
184  inline void RemoveReference() volatile
185  {
186  // this may happen if delete is called on a WorkItem* ( should't be use like that anyway )
187  if ( m_RefCount == NULL )
188  return;
189 
190  // decrement reference count.. still, the race condition exists
191  // delete the referenced item if reference count == 0 ( only one reference, now being deleted )
192  bool deletable = false;
193 
194  {
195  LockingPtr< CounterTypeWrapper > lpRefCount( *m_RefCount, const_cast< CounterTypeWrapper * >( m_RefCount )->getLock() );
196  deletable = ( lpRefCount->decrement() == 0 );
197 
198  if ( deletable )
199  {
200  try
201  {
202  if ( m_ItemRef != NULL )
203  delete m_ItemRef;
204 
205  #ifdef NO_CPPUNIT
206  DEBUG_LOG( "Unreferenced item destroyed" );
207  #endif
208  }
209  catch( ... )
210  {
211  TRACE_LOG( "Item already deleted" );
212  // already deleted ref
213  }
214  m_ItemRef = NULL;
215  }
216  }
217 
218  if ( deletable )
219  {
220  delete m_RefCount;
221  m_RefCount = NULL;
222  }
223  }
224 
225  inline WorkItem& operator=( const WorkItem< T >& source )
226  {
227  // if this item is valid, give it up
228  if ( m_ItemRef != NULL )
229  RemoveReference();
230 
231  // get pointers and ref count address
232  m_ItemRef = source.m_ItemRef;
233  m_RefCount = source.m_RefCount;
234  m_OwnerThread = source.m_OwnerThread;
235 
236  Clone();
237  return *this;
238  }
239 
240  inline T* get() const
241  {
242  // this may occur for an unitialized work item
243  if( m_RefCount == NULL )
244  throw runtime_error( "Item not created." );
245 
246  LockingPtr< CounterTypeWrapper > lpRefCount( *m_RefCount, const_cast< CounterTypeWrapper * >( m_RefCount )->getLock() );
247  if ( lpRefCount->get() == 0 )
248  throw runtime_error( "Item already destroyed." );
249 
250  return const_cast< T* >( m_ItemRef );
251  }
252 
253  inline unsigned int getRefCount() volatile
254  {
255  if ( m_RefCount != NULL )
256  {
257  LockingPtr< CounterTypeWrapper > lpRefCount( *m_RefCount, const_cast< CounterTypeWrapper * >( m_RefCount )->getLock() );
258  return lpRefCount->get();
259  }
260  throw runtime_error( "Item reference count was destroyed" );
261  }
262 
263  inline pthread_t getOwnerThread()
264  {
265  return m_OwnerThread;
266  }
267  };
268 
269  class WorkPoolEmpty : public runtime_error
270  {
271  public :
272  WorkPoolEmpty() : runtime_error( "Work pool empty" ) {};
273  ~WorkPoolEmpty() throw() {};
274  };
275 
276  class WorkItemNotFound : public runtime_error
277  {
278  public :
279  explicit WorkItemNotFound( const string& message = "Work item not found" ) : runtime_error( message ) {};
280  ~WorkItemNotFound() throw() {};
281  };
282 
283  class WorkPoolShutdown : public runtime_error
284  {
285  public :
286  WorkPoolShutdown() : runtime_error( "Work pool shutting down" ) {};
287  ~WorkPoolShutdown() throw() {};
288  };
289 
290  template< class T >
292  {
293  private :
294 
295  typedef pair< string, WorkItem< T > > WorkItemPool_QueuedItemType;
296  typedef deque< WorkItemPool_QueuedItemType > WorkItemPool_QueueType;
297  typedef map< pthread_t, unsigned int > WorkItemPool_CounterType;
298 
299  pthread_key_t ReserveKey;
300 
301  static void DeleteReserves( void* data )
302  {
303  unsigned int* threadReserve = ( unsigned int* )data;
304  if ( threadReserve != NULL )
305  delete threadReserve;
306 
307  //pthread_setspecific( ReservesKey, NULL );
308  }
309 
310  // disable copying
311  WorkItemPool( const WorkItemPool& );
312  WorkItemPool& operator=( const WorkItemPool& );
313 
314  public :
315 
316  WorkItemPool() : m_Shutdown( false )
317  {
318  int mutexInitResult = pthread_mutex_init( &PoolSyncMutex, NULL );
319  if ( 0 != mutexInitResult )
320  {
321  TRACE_LOG( "Unable to init mutex WorkItemPool::PoolSyncMutex [" << mutexInitResult << "]" );
322  }
323 
324  mutexInitResult = pthread_mutex_init( &ReserveSyncMutex, NULL );
325  if ( 0 != mutexInitResult )
326  {
327  TRACE_LOG( "Unable to init mutex WorkItemPool::ReserveSyncMutex [" << mutexInitResult << "]" );
328  }
329 
330  int condInitResult = pthread_cond_init( &PoolReaderBarrier, NULL );
331  if ( 0 != condInitResult )
332  {
333  TRACE_LOG( "Unable to init condition WorkItemPool::PoolReaderBarrier [" << condInitResult << "]" );
334  }
335 
336  condInitResult = pthread_cond_init( &PoolWriterBarrier, NULL );
337  if ( 0 != condInitResult )
338  {
339  TRACE_LOG( "Unable to init condition WorkItemPool::PoolWriterBarrier [" << condInitResult << "]" );
340  }
341 
342  int keyCreateResult = pthread_key_create( &ReserveKey, &WorkItemPool::DeleteReserves );
343  if ( 0 != keyCreateResult )
344  {
345  TRACE_LOG( "Unable to create thread key WorkItemPool::ReserveKey [" << keyCreateResult << "]" );
346  }
347  DEBUG_LOG( "Created pool reserve key [" << ReserveKey << "]" );
348  }
349 
351  {
352  m_Shutdown = true;
353 
354  int mutexDestroyResult = pthread_mutex_destroy( &ReserveSyncMutex );
355  if ( 0 != mutexDestroyResult )
356  {
357  TRACE_LOG( "Unable to destroy mutex WorkItemPool::ReserveSyncMutex [" << mutexDestroyResult << "]" );
358  }
359 
360  mutexDestroyResult = pthread_mutex_destroy( &PoolSyncMutex );
361  if ( 0 != mutexDestroyResult )
362  {
363  TRACE_LOG( "Unable to destroy mutex WorkItemPool::PoolSyncMutex [" << mutexDestroyResult << "]" );
364  }
365 
366  int condDestroyResult = pthread_cond_destroy( &PoolReaderBarrier );
367  if ( 0 != condDestroyResult )
368  {
369  TRACE_LOG( "Unable to destroy condition WorkItemPool::PoolReaderBarrier [" << condDestroyResult << "]" );
370  }
371 
372  condDestroyResult = pthread_cond_destroy( &PoolWriterBarrier );
373  if ( 0 != condDestroyResult )
374  {
375  TRACE_LOG( "Unable to destroy condition WorkItemPool::PoolWriterBarrier [" << condDestroyResult << "]" );
376  }
377  }
378 
379  void reservePoolSize( const unsigned int reservedPoolSize ) volatile
380  {
381  unsigned int* threadReserve = ( unsigned int* )pthread_getspecific( ReserveKey );
382  if ( threadReserve == NULL )
383  threadReserve = new unsigned int;
384 
385  *threadReserve = reservedPoolSize;
386  int setSpecificResult = pthread_setspecific( ReserveKey, threadReserve );
387  if ( 0 != setSpecificResult )
388  {
389  TRACE_LOG( "Signal poolreaderbarrier failed [" << setSpecificResult << "]" );
390  }
391  }
392 
393  void waitForPoolEmpty( const unsigned int secWait = 0 ) volatile
394  {
395  LockingPtr< WorkItemPool_QueueType > lpPool( m_Pool, PoolSyncMutex );
396 
397  for(;;)
398  {
399  int condWait = 0;
400  if ( !lpPool->empty() )
401  {
402  if ( secWait == 0 )
403  {
404  condWait = pthread_cond_wait( const_cast< pthread_cond_t* >( &PoolWriterBarrier ), const_cast< pthread_mutex_t* >( &PoolSyncMutex ) );
405  }
406  else
407  {
408  struct timespec wakePerf;
409 
410  wakePerf.tv_sec = time( NULL ) + secWait;
411  wakePerf.tv_nsec = 0;
412  condWait = pthread_cond_timedwait( const_cast< pthread_cond_t* >( &PoolWriterBarrier ), const_cast< pthread_mutex_t* >( &PoolSyncMutex ), &wakePerf );
413  }
414  }
415  // if the pool is empty, or a timeout was specified, return
416  if ( lpPool->empty() || ( ( secWait > 0 ) && ( condWait == ETIMEDOUT ) ) )
417  break;
418  }
419  }
420 
421  void waitToWrite( pthread_t selfId ) volatile
422  {
423  // default 10 for reserve and 0 for requests pending
424  unsigned int countActual = 0;
425  bool reserveCreated = false;
426 
427  unsigned int* countReserve = ( unsigned int* )pthread_getspecific( ReserveKey );
428  if ( countReserve == NULL )
429  {
430  DEBUG_LOG( "Creating initial pool reserve for thread [" << selfId << "]" );
431  reservePoolSize( 10 );
432  reserveCreated = true;
433 
434  countReserve = ( unsigned int* )pthread_getspecific( ReserveKey );
435  }
436 
437  // find how many items we've written / how many we are allowed to write
438  LockingPtr< WorkItemPool_CounterType > lpWriterItems( m_WriterItems, ReserveSyncMutex );
439 
440  if ( reserveCreated )
441  {
442  lpWriterItems->insert( pair< pthread_t, unsigned int >( selfId, countActual ) );
443  }
444  else
445  {
446  // loop and wait for the thread to be allowed to write
447  do
448  {
449  countActual = ( *lpWriterItems )[ selfId ];
450  DEBUG_LOG( "Work items in pool for thread [" << selfId << "] is now [" << countActual << "/" << *countReserve << "]" );
451 
452  int condWait = 0;
453  if ( countActual >= *countReserve )
454  condWait = pthread_cond_wait( const_cast< pthread_cond_t* >( &PoolWriterBarrier ), const_cast< pthread_mutex_t* >( &ReserveSyncMutex ) );
455 
456  if ( m_Shutdown )
457  throw WorkPoolShutdown();
458  }
459  while( countActual >= *countReserve );
460  }
461 
462  // HACK increment here ( assume a write will follow );
463  ( ( *lpWriterItems )[ selfId ] )++;
464  }
465 
466  void addPoolItem( const string& id, const WorkItem< T >& item ) volatile
467  {
468  // check for write access
469  pthread_t selfId = pthread_self();
470 
471  // wait until we can write
472  waitToWrite( selfId );
473  LockingPtr< WorkItemPool_QueueType > lpPool( m_Pool, PoolSyncMutex );
474 
475  // create a temp object, store a reference on the queue;
476  //lpPool->push_back( WorkItemPool::QueuedItemType( id, const_cast< WorkItem< T >* >( &item )->Clone() ) );
477  lpPool->push_back( WorkItemPool_QueuedItemType( id, item ) );
478 
479  #ifdef NO_CPPUNIT
480  Dump();
481  #endif
482  // signal readers
483  int condSignalResult = pthread_cond_signal( const_cast< pthread_cond_t* >( &PoolReaderBarrier ) );
484  if ( 0 != condSignalResult )
485  {
486  TRACE_LOG( "Signal PoolReaderBarrier failed [" << condSignalResult << "]" );
487  }
488  }
489 
490  void addUniquePoolItem( const string id, const WorkItem< T >& item ) volatile
491  {
492  // check for write access
493  pthread_t selfId = pthread_self();
494 
495  // lock the pool here to ensure iterator stability
496  {
497  LockingPtr< WorkItemPool_QueueType > lpPool( m_Pool, PoolSyncMutex );
498  for( typename WorkItemPool_QueueType::const_iterator poolItemFinder = lpPool->begin(); poolItemFinder != lpPool->end(); poolItemFinder++ )
499  {
500  if ( poolItemFinder->first == id )
501  {
502  DEBUG_LOG( "Item [" << id << "] not added to pool. Unique constraint failed." );
503  return;
504  }
505  }
506  }
507 
508  // wait until we can write ( if wait returns false, we don't write ( unique violated ) )
509  waitToWrite( selfId );
510 
511  // reaquire the lock, so that we can safely add messages
512  LockingPtr< WorkItemPool_QueueType > lpPool( m_Pool, PoolSyncMutex );
513 
514  //lpPool->push_back( WorkItemPool::QueuedItemType( id, const_cast< WorkItem< T >* >( &item )->Clone() ) );
515  lpPool->push_back( WorkItemPool_QueuedItemType( id, item ) );
516 
517  // signal readers
518  int condSignalResult = pthread_cond_signal( const_cast< pthread_cond_t* >( &PoolReaderBarrier ) );
519  if ( 0 != condSignalResult )
520  {
521  TRACE_LOG( "Signal PoolReaderBarrier failed [" << condSignalResult << "]" );
522  }
523  }
524 
525  WorkItem< T > getPoolItem( const string id, const bool throwOnError = true ) volatile
526  {
527  LockingPtr< WorkItemPool_QueueType > lpPool( m_Pool, PoolSyncMutex );
528 
529  try
530  {
531  bool found = false;
532  typename WorkItemPool_QueueType::const_iterator poolItemFinder = lpPool->begin();
533  while( poolItemFinder != lpPool->end() )
534  {
535  if ( poolItemFinder->first == id )
536  {
537  found = true;
538  break;
539  }
540  poolItemFinder++;
541  }
542  if ( m_Shutdown )
543  throw WorkPoolShutdown();
544  if ( !found )
545  throw WorkItemNotFound( id );
546 
547  return poolItemFinder->second;
548  }
549  catch( ... )
550  {
551  if ( throwOnError )
552  throw;
553  }
554  return WorkItem< T >();
555  }
556 
557  WorkItem< T > getPoolItem( const bool lock = true ) volatile
558  {
559  LockingPtr< WorkItemPool_QueueType > lpPool( m_Pool, PoolSyncMutex );
560 
561  // while the pool is empty ( the condition variable may be signaled for different reasons )
562  while( lpPool->empty() )
563  {
564  // if we lock on the pool, wait for items
565  if ( lock )
566  {
567  int condWaitResult = pthread_cond_wait( const_cast< pthread_cond_t* >( &PoolReaderBarrier ), const_cast< pthread_mutex_t* >( &PoolSyncMutex ) );
568  if ( 0 != condWaitResult )
569  {
570  TRACE_LOG( "Condition wait on PoolReaderBarrier failed [" << condWaitResult << "]" );
571  }
572  }
573  else
574  throw WorkPoolEmpty();
575 
576  if ( m_Shutdown )
577  throw WorkPoolShutdown();
578  }
579  return lpPool->front().second;
580  }
581 
582  WorkItem< T > removePoolItem( const bool lock = true ) volatile
583  {
584  WorkItem< T > item;
585  try
586  {
587  LockingPtr< WorkItemPool_QueueType > lpPool( m_Pool, PoolSyncMutex );
588 
589  // while the pool is empty ( the condition variable may be signaled for different reasons )
590  while( lpPool->empty() )
591  {
592  if ( m_Shutdown )
593  throw WorkPoolShutdown();
594 
595  // if we lock on the pool, wait for items
596  if ( lock )
597  {
598  int condWaitResult = pthread_cond_wait( const_cast< pthread_cond_t* >( &PoolReaderBarrier ), const_cast< pthread_mutex_t* >( &PoolSyncMutex ) );
599  if ( 0 != condWaitResult )
600  {
601  TRACE_LOG( "Condition wait on PoolReaderBarrier failed [" << condWaitResult << "]" );
602  }
603  }
604  else
605  throw WorkPoolEmpty();
606 
607  if ( m_Shutdown )
608  throw WorkPoolShutdown();
609  }
610 
611  #ifdef NO_CPPUNIT
612  Dump();
613  #endif
614 
615  item = lpPool->front().second;
616  lpPool->pop_front();
617  }
618  catch( const std::exception& ex )
619  {
620  TRACE_LOG( "A [" << typeid( ex ).name() << "] has occured [" << ex.what() << "] while removing item from pool" );
621  throw;
622  }
623  catch( ... )
624  {
625  TRACE_LOG( "An [unknown exception] has occured while removing item from pool." );
626  throw;
627  }
628 
629  // signal writers
630  {
631  LockingPtr< WorkItemPool_CounterType > lpWriterItems( m_WriterItems, ReserveSyncMutex );
632 
633  // decrement thread item count
634  ( ( *lpWriterItems )[ item.getOwnerThread() ] )--;
635 
636  // signal writer thread ( allow it to write again )
637  int condBroadcastResult = pthread_cond_broadcast( const_cast< pthread_cond_t * >( &PoolWriterBarrier ) );
638  if ( 0 != condBroadcastResult )
639  {
640  TRACE_LOG( "Condition broadcast on PoolWriterBarrier failed [" << condBroadcastResult << "]" );
641  }
642  }
643 
644  return item;
645  }
646 
647  void erasePoolItem( string id, const bool throwOnError = true ) volatile
648  {
649  pthread_t itemOwnerThread;
650 
651  try
652  {
653  LockingPtr< WorkItemPool_QueueType > lpPool( m_Pool, PoolSyncMutex );
654 
655  bool found = false;
656  typename WorkItemPool_QueueType::iterator poolItemFinder = lpPool->begin();
657  while( poolItemFinder != lpPool->end() )
658  {
659  if ( poolItemFinder->first == id )
660  {
661  itemOwnerThread = poolItemFinder->second.getOwnerThread();
662  found = true;
663  break;
664  }
665  poolItemFinder++;
666  }
667  if ( m_Shutdown )
668  throw WorkPoolShutdown();
669 
670  if ( !found )
671  throw WorkItemNotFound( id );
672 
673  lpPool->erase( poolItemFinder );
674  }
675  catch( const std::exception& ex )
676  {
677  TRACE_LOG( "A [" << typeid( ex ).name() << "] has occured [" << ex.what() << "] while removing item from pool" );
678  if ( throwOnError )
679  throw;
680  }
681  catch( ... )
682  {
683  TRACE_LOG( "An [unknown exception] has occured while removing item from pool." );
684  if ( throwOnError )
685  throw;
686  }
687 
688  // signal writers
689  {
690  LockingPtr< WorkItemPool_CounterType > lpWriterItems( m_WriterItems, ReserveSyncMutex );
691 
692  // decrement thread item count
693  ( ( *lpWriterItems )[ itemOwnerThread ] )--;
694 
695  // signal writer thread ( allow it to write again )
696  int condBroadcastResult = pthread_cond_broadcast( const_cast< pthread_cond_t* >( &PoolWriterBarrier ) );
697  if ( 0 != condBroadcastResult )
698  {
699  TRACE_LOG( "Condition broadcast on PoolWriterBarrier failed [" << condBroadcastResult << "]" );
700  }
701  }
702  }
703 
704  WorkItem< T > removePoolItem( string id, const bool throwOnError = true ) volatile
705  {
706  WorkItem< T > item;
707 
708  try
709  {
710  LockingPtr< WorkItemPool_QueueType > lpPool( m_Pool, PoolSyncMutex );
711 
712  bool found = false;
713  typename WorkItemPool_QueueType::iterator poolItemFinder = lpPool->begin();
714  while( poolItemFinder != lpPool->end() )
715  {
716  if ( poolItemFinder->first == id )
717  {
718  found = true;
719  break;
720  }
721  poolItemFinder++;
722  }
723  if ( m_Shutdown )
724  throw WorkPoolShutdown();
725 
726  if ( !found )
727  throw WorkItemNotFound( id );
728 
729  item = poolItemFinder->second;
730  lpPool->erase( poolItemFinder );
731  }
732  catch( const std::exception& ex )
733  {
734  TRACE_LOG( "A [" << typeid( ex ).name() << "] has occured [" << ex.what() << "] while removing item from pool" );
735  if ( throwOnError )
736  throw;
737  else
738  return item;
739  }
740  catch( ... )
741  {
742  TRACE_LOG( "An [unknown exception] has occured while removing item from pool." );
743  if ( throwOnError )
744  throw;
745  else
746  return item;
747  }
748 
749  // signal writers
750  {
751  LockingPtr< WorkItemPool_CounterType > lpWriterItems( m_WriterItems, ReserveSyncMutex );
752 
753  // decrement thread item count
754  ( ( *lpWriterItems )[ item.getOwnerThread() ] )--;
755 
756  // signal writer thread ( allow it to write again )
757  int condBroadcastResult = pthread_cond_broadcast( const_cast< pthread_cond_t* >( &PoolWriterBarrier ) );
758  if ( 0 != condBroadcastResult )
759  {
760  TRACE_LOG( "Condition broadcast on PoolWriterBarrier failed [" << condBroadcastResult << "]" );
761  }
762  }
763 
764  return item;
765  }
766 
767  void Dump() volatile
768  {
769  WorkItemPool_QueueType* lpQueue = const_cast< WorkItemPool_QueueType* >( &m_Pool );
770  typename WorkItemPool_QueueType::const_iterator poolItemFinder = lpQueue->begin();
771  stringstream output;
772  while( poolItemFinder != lpQueue->end() )
773  {
774  output << *( poolItemFinder->second.get() );
775  poolItemFinder++;
776  }
777 
778  DEBUG_LOG( output.str() );
779  }
780 
781  unsigned int getSize() volatile
782  {
783  LockingPtr< WorkItemPool_QueueType > lpPool( m_Pool, PoolSyncMutex );
784  return lpPool->size();
785  }
786 
787  void SignalAllReaders()
788  {
789  int condSignalResult = pthread_cond_broadcast( &PoolReaderBarrier );
790  if ( 0 != condSignalResult )
791  {
792  TRACE_LOG( "Condition broadcast on PoolReaderBarrier failed [" << condSignalResult << "]" );
793  }
794  }
795 
796  void SignalReaders()
797  {
798  int condSignalResult = pthread_cond_signal( &PoolReaderBarrier );
799  if ( 0 != condSignalResult )
800  {
801  TRACE_LOG( "Condition signal on PoolReaderBarrier failed [" << condSignalResult << "]" );
802  }
803  }
804 
805  void SignalWriters()
806  {
807  int condSignalResult = pthread_cond_signal( &PoolWriterBarrier );
808  if ( 0 != condSignalResult )
809  {
810  TRACE_LOG( "Condition signal on PoolWriterBarrier failed [" << condSignalResult << "]" );
811  }
812  }
813 
814  void ShutdownPool()
815  {
816  // mark shutdown
817  {
818  LockingPtr< WorkItemPool_QueueType > lpPool( m_Pool, PoolSyncMutex );
819  m_Shutdown = true;
820  }
821  SignalAllReaders();
822  SignalWriters();
823  }
824 
825  void ShutdownPoolWriters()
826  {
827  // mark shutdown
828  {
829  LockingPtr< WorkItemPool_QueueType > lpPool( m_Pool, PoolSyncMutex );
830  m_Shutdown = true;
831  }
832  SignalWriters();
833  }
834 
835  bool IsRunning() const
836  {
837  return !m_Shutdown;
838  }
839 
840  #if defined( TESTDLL_EXPORT ) || defined ( TESTDLL_IMPORT )
841  bool Start() const
842  {
843  m_Shutdown = false;
844  }
845  #endif
846 
847  private :
848 
852 
853  pthread_mutex_t PoolSyncMutex;
854  pthread_mutex_t ReserveSyncMutex;
855 
856  pthread_cond_t PoolReaderBarrier;
857  pthread_cond_t PoolWriterBarrier;
858  };
859 
861  {
862  public :
863 
865  {
866  // will adopt the buffer, destroy it upon exit
868  // will reference the buffer, will not destroy it on destructor
870  // create a copy of the buffer, leave it to the caller to destroy original
871  Copy
872  };
873 
874  public :
875 
876  explicit ManagedBuffer( unsigned char* buffer = NULL, ManagedBuffer::BufferType bufferType = ManagedBuffer::Adopt, unsigned long size = 0, unsigned long maxBufferSize = MAX_MESSAGE_LEN );
877  ManagedBuffer( const ManagedBuffer& source );
878  ManagedBuffer& operator=( const ManagedBuffer& source );
879 
880  ManagedBuffer getRef();
881 
882  ~ManagedBuffer();
883 
884  void allocate( unsigned long size );
885 
886  unsigned long size() const
887  {
888  return m_BufferSize;
889  }
890 
891  unsigned long max_size() const
892  {
893  return m_MaxBufferSize;
894  }
895 
896  unsigned char* buffer() const
897  {
898  return *m_BufferAddr;
899  }
900 
902  {
903  return m_BufferType;
904  }
905 
906  string str() const;
907  string str( const string::size_type size ) const;
908 
909  void copyFrom( const string& source, const unsigned long maxBufferSize = MAX_MESSAGE_LEN );
910  void copyFrom( const unsigned char* source, unsigned long size, unsigned long maxBufferSize = MAX_MESSAGE_LEN );
911  void copyFrom( const ManagedBuffer& source );
912  void copyFrom( const ManagedBuffer* source );
913 
914  void truncate( const unsigned long index )
915  {
916  //TODO delete and realloc
917  if ( index >= m_BufferSize )
918  return;
919  m_BufferSize = index;
920  }
921 
922  // operators
923  ManagedBuffer operator+( const unsigned long offset ) const;
924  ManagedBuffer& operator+=( const unsigned long offset );
925  long operator-( const ManagedBuffer& source ) const;
926 
927  private :
928 
930 
931  unsigned char** m_BufferAddr;
932  unsigned long m_MaxBufferSize;
933  unsigned long m_BufferSize;
934  };
935 }
936 
937 #endif // WORKITEMPOOL_H