pion-net  4.0.9
PionLockedQueue.hpp
1 // -----------------------------------------------------------------------
2 // pion-common: a collection of common libraries used by the Pion Platform
3 // -----------------------------------------------------------------------
4 // Copyright (C) 2007-2008 Atomic Labs, Inc. (http://www.atomiclabs.com)
5 //
6 // Distributed under the Boost Software License, Version 1.0.
7 // See http://www.boost.org/LICENSE_1_0.txt
8 //
9 
10 #ifndef __PION_PIONLOCKEDQUEUE_HEADER__
11 #define __PION_PIONLOCKEDQUEUE_HEADER__
12 
13 #include <new>
14 #include <boost/cstdint.hpp>
15 #include <boost/noncopyable.hpp>
16 #include <boost/thread/thread.hpp>
17 #include <boost/thread/mutex.hpp>
18 #include <boost/thread/condition.hpp>
19 #include <boost/detail/atomic_count.hpp>
20 #include <pion/PionConfig.hpp>
21 #include <pion/PionException.hpp>
22 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
23  #include <boost/lockfree/detail/freelist.hpp>
24 #endif
25 
26 
27 // NOTE: the data structures contained in this file are based upon algorithms
28 // published in the paper "Simple, Fast, and Practical Non-Blocking and Blocking
29 // Concurrent Queue Algorithms" (1996, Maged M. Michael and Michael L. Scott,
30 // Department of Computer Science, University of Rochester).
31 // See http://www.cs.rochester.edu/u/scott/papers/1996_PODC_queues.pdf
32 
33 
34 namespace pion { // begin namespace pion
35 
36 
40 template <typename T,
41  boost::uint32_t MaxSize = 250000,
42  boost::uint32_t SleepMilliSec = 10 >
44  private boost::noncopyable
45 {
46 protected:
47 
49  struct QueueNode {
50  T data; //< data wrapped by the node item
51  QueueNode * next; //< points to the next node in the queue
52  boost::uint32_t version; //< the node item's version number
53  };
54 
56  inline QueueNode *createNode(void) {
57 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
58  return new (m_free_list.allocate()) QueueNode();
59 #else
60  return new QueueNode();
61 #endif
62  }
63 
65  inline void destroyNode(QueueNode *node_ptr) {
66 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
67  node_ptr->~QueueNode();
68  m_free_list.deallocate(node_ptr);
69 #else
70  delete node_ptr;
71 #endif
72  }
73 
75  inline void initialize(void) {
76  // initialize with a dummy node since m_head_ptr is always
77  // pointing to the item before the head of the list
78  m_head_ptr = m_tail_ptr = createNode();
79  m_head_ptr->next = NULL;
80  m_head_ptr->version = 0;
81  }
82 
91  inline bool dequeue(T& t, boost::uint32_t& version) {
92  // just return if the list is empty
93  boost::mutex::scoped_lock head_lock(m_head_mutex);
94  QueueNode *new_head_ptr = m_head_ptr->next;
95  if (! new_head_ptr) {
96  version = m_head_ptr->version;
97  return false;
98  }
99 
100  // get a copy of the item at the head of the list
101  version = new_head_ptr->version;
102  t = new_head_ptr->data;
103 
104  // update the pointer to the head of the list
105  QueueNode *old_head_ptr = m_head_ptr;
106  m_head_ptr = new_head_ptr;
107  head_lock.unlock();
108 
109  // free the QueueNode for the old head of the list
110  destroyNode(old_head_ptr);
111 
112  // decrement size
113  --m_size;
114 
115  // item successfully dequeued
116  return true;
117  }
118 
119 
120 public:
121 
124  public:
125 
130  ConsumerThread(void) : m_is_running(true), m_next_ptr(NULL),
131  m_wakeup_time(boost::posix_time::not_a_date_time) {}
132 
139  template <typename DurationType>
140  ConsumerThread(const DurationType& d)
141  : m_is_running(true), m_next_ptr(NULL), m_wakeup_time(d)
142  {}
143 
145  inline bool isRunning(void) const { return m_is_running; }
146 
148  inline void stop(void) { m_is_running = false; m_wakeup_event.notify_one(); }
149 
151  inline void reset(void) { m_is_running = true; m_next_ptr = NULL; }
152 
154  inline bool hasWakeupTimer(void) const { return !m_wakeup_time.is_not_a_date_time(); }
155 
157  inline const boost::posix_time::time_duration& getWakeupTimer(void) const {
158  return m_wakeup_time;
159  }
160 
161  private:
162 
164  friend class PionLockedQueue;
165 
166  volatile bool m_is_running; //< true while the thread is running/active
167  ConsumerThread * m_next_ptr; //< pointer to the next idle thread
168  boost::condition m_wakeup_event; //< triggered when a new item is available
169  boost::posix_time::time_duration m_wakeup_time; //< inactivity wakeup timer duration
170  };
171 
172 
175  : m_head_ptr(NULL), m_tail_ptr(NULL), m_idle_ptr(NULL),
176  m_next_version(1), m_size(0)
177  {
178  initialize();
179  }
180 
182  virtual ~PionLockedQueue() {
183  clear();
184  destroyNode(m_tail_ptr);
185  }
186 
188  inline bool empty(void) const { return (m_head_ptr->next == NULL); }
189 
191  std::size_t size(void) const {
192  return m_size;
193  }
194 
196  void clear(void) {
197  boost::mutex::scoped_lock tail_lock(m_tail_mutex);
198  boost::mutex::scoped_lock head_lock(m_head_mutex);
199  // also delete dummy node and reinitialize it to clear old value
200  while (m_head_ptr) {
201  m_tail_ptr = m_head_ptr;
202  m_head_ptr = m_head_ptr->next;
203  destroyNode(m_tail_ptr);
204  if (m_head_ptr)
205  --m_size;
206  }
207  initialize();
208  }
209 
215  void push(const T& t) {
216  // sleep while MaxSize is exceeded
217  if (MaxSize > 0) {
218  boost::system_time wakeup_time;
219  while (size() >= MaxSize) {
220  wakeup_time = boost::get_system_time()
221  + boost::posix_time::millisec(SleepMilliSec);
222  boost::thread::sleep(wakeup_time);
223  }
224  }
225 
226  // create a new list node for the queue item
227  QueueNode *node_ptr = createNode();
228  node_ptr->data = t;
229  node_ptr->next = NULL;
230  node_ptr->version = 0;
231 
232  // append node to the end of the list
233  boost::mutex::scoped_lock tail_lock(m_tail_mutex);
234  node_ptr->version = (m_next_version += 2);
235  m_tail_ptr->next = node_ptr;
236 
237  // update the tail pointer for the new node
238  m_tail_ptr = node_ptr;
239 
240  // increment size
241  ++m_size;
242 
243  // wake up an idle thread (if any)
244  if (m_idle_ptr) {
245  ConsumerThread *idle_ptr = m_idle_ptr;
246  m_idle_ptr = m_idle_ptr->m_next_ptr;
247  idle_ptr->m_wakeup_event.notify_one();
248  }
249  }
250 
261  bool pop(T& t, ConsumerThread& thread_info) {
262  boost::uint32_t last_known_version;
263  while (thread_info.isRunning()) {
264  // try to get the next value
265  if ( dequeue(t, last_known_version) )
266  return true; // got an item
267 
268  // queue is empty
269  boost::mutex::scoped_lock tail_lock(m_tail_mutex);
270  if (m_tail_ptr->version == last_known_version) {
271  // still empty after acquiring lock
272  thread_info.m_next_ptr = m_idle_ptr;
273  m_idle_ptr = & thread_info;
274  // get wakeup time (if any)
275  if (thread_info.hasWakeupTimer()) {
276  // wait for an item to become available
277  const boost::posix_time::ptime wakeup_time(boost::get_system_time() + thread_info.getWakeupTimer());
278  if (!thread_info.m_wakeup_event.timed_wait(tail_lock, wakeup_time))
279  return false; // timer expired if timed_wait() returns false
280  } else {
281  // wait for an item to become available
282  thread_info.m_wakeup_event.wait(tail_lock);
283  }
284  }
285  }
286  return false;
287  }
288 
296  inline bool pop(T& t) { boost::uint32_t version; return dequeue(t, version); }
297 
298 
299 private:
300 
301 #if defined(PION_HAVE_LOCKFREE) && !defined(_MSC_VER)
302 
304 #endif
305 
307  boost::mutex m_head_mutex;
308 
310  boost::mutex m_tail_mutex;
311 
313  QueueNode * m_head_ptr;
314 
316  QueueNode * m_tail_ptr;
317 
319  ConsumerThread * m_idle_ptr;
320 
322  boost::uint32_t m_next_version;
323 
325  boost::detail::atomic_count m_size;
326 };
327 
328 
329 } // end namespace pion
330 
331 #endif