vdr  2.2.0
ringbuffer.c
Go to the documentation of this file.
1 /*
2  * ringbuffer.c: A ring buffer
3  *
4  * See the main source file 'vdr.c' for copyright information and
5  * how to reach the author.
6  *
7  * Parts of this file were inspired by the 'ringbuffy.c' from the
8  * LinuxDVB driver (see linuxtv.org).
9  *
10  * $Id: ringbuffer.c 3.0 2012/09/22 11:26:49 kls Exp $
11  */
12 
13 #include "ringbuffer.h"
14 #include <stdlib.h>
15 #include <unistd.h>
16 #include "tools.h"
17 
18 // --- cRingBuffer -----------------------------------------------------------
19 
20 #define OVERFLOWREPORTDELTA 5 // seconds between reports
21 #define PERCENTAGEDELTA 10
22 #define PERCENTAGETHRESHOLD 70
23 #define IOTHROTTLELOW 20
24 #define IOTHROTTLEHIGH 50
25 
26 cRingBuffer::cRingBuffer(int Size, bool Statistics)
27 {
28  size = Size;
29  statistics = Statistics;
30  getThreadTid = 0;
31  maxFill = 0;
32  lastPercent = 0;
33  putTimeout = getTimeout = 0;
36  ioThrottle = NULL;
37 }
38 
40 {
41  delete ioThrottle;
42  if (statistics)
43  dsyslog("buffer stats: %d (%d%%) used", maxFill, maxFill * 100 / (size - 1));
44 }
45 
47 {
48  if (Fill > maxFill)
49  maxFill = Fill;
50  int percent = Fill * 100 / (Size() - 1) / PERCENTAGEDELTA * PERCENTAGEDELTA; // clamp down to nearest quantum
51  if (percent != lastPercent) {
52  if (percent >= PERCENTAGETHRESHOLD && percent > lastPercent || percent < PERCENTAGETHRESHOLD && lastPercent >= PERCENTAGETHRESHOLD) {
53  dsyslog("buffer usage: %d%% (tid=%d)", percent, getThreadTid);
54  lastPercent = percent;
55  }
56  }
57  if (ioThrottle) {
58  if (percent >= IOTHROTTLEHIGH)
60  else if (percent < IOTHROTTLELOW)
62  }
63 }
64 
66 {
67  if (putTimeout)
69 }
70 
72 {
73  if (getTimeout)
75 }
76 
78 {
79  if (putTimeout && Free() > Size() / 10)
81 }
82 
84 {
85  if (getTimeout && Available() > Size() / 10)
87 }
88 
89 void cRingBuffer::SetTimeouts(int PutTimeout, int GetTimeout)
90 {
91  putTimeout = PutTimeout;
92  getTimeout = GetTimeout;
93 }
94 
96 {
97  if (!ioThrottle)
98  ioThrottle = new cIoThrottle;
99 }
100 
102 {
103  overflowCount++;
104  overflowBytes += Bytes;
105  if (time(NULL) - lastOverflowReport > OVERFLOWREPORTDELTA) {
106  esyslog("ERROR: %d ring buffer overflow%s (%d bytes dropped)", overflowCount, overflowCount > 1 ? "s" : "", overflowBytes);
108  lastOverflowReport = time(NULL);
109  }
110 }
111 
112 // --- cRingBufferLinear -----------------------------------------------------
113 
114 #ifdef DEBUGRINGBUFFERS
115 #define MAXRBLS 30
116 #define DEBUGRBLWIDTH 45
117 
118 cRingBufferLinear *cRingBufferLinear::RBLS[MAXRBLS] = { NULL };
119 
120 void cRingBufferLinear::AddDebugRBL(cRingBufferLinear *RBL)
121 {
122  for (int i = 0; i < MAXRBLS; i++) {
123  if (!RBLS[i]) {
124  RBLS[i] = RBL;
125  break;
126  }
127  }
128 }
129 
130 void cRingBufferLinear::DelDebugRBL(cRingBufferLinear *RBL)
131 {
132  for (int i = 0; i < MAXRBLS; i++) {
133  if (RBLS[i] == RBL) {
134  RBLS[i] = NULL;
135  break;
136  }
137  }
138 }
139 
140 void cRingBufferLinear::PrintDebugRBL(void)
141 {
142  bool printed = false;
143  for (int i = 0; i < MAXRBLS; i++) {
144  cRingBufferLinear *p = RBLS[i];
145  if (p) {
146  printed = true;
147  int lh = p->lastHead;
148  int lt = p->lastTail;
149  int h = lh * DEBUGRBLWIDTH / p->Size();
150  int t = lt * DEBUGRBLWIDTH / p->Size();
151  char buf[DEBUGRBLWIDTH + 10];
152  memset(buf, '-', DEBUGRBLWIDTH);
153  if (lt <= lh)
154  memset(buf + t, '*', max(h - t, 1));
155  else {
156  memset(buf, '*', h);
157  memset(buf + t, '*', DEBUGRBLWIDTH - t);
158  }
159  buf[t] = '<';
160  buf[h] = '>';
161  buf[DEBUGRBLWIDTH] = 0;
162  printf("%2d %s %8d %8d %s\n", i, buf, p->lastPut, p->lastGet, p->description);
163  }
164  }
165  if (printed)
166  printf("\n");
167  }
168 #endif
169 
170 cRingBufferLinear::cRingBufferLinear(int Size, int Margin, bool Statistics, const char *Description)
171 :cRingBuffer(Size, Statistics)
172 {
173  description = Description ? strdup(Description) : NULL;
174  tail = head = margin = Margin;
175  gotten = 0;
176  buffer = NULL;
177  if (Size > 1) { // 'Size - 1' must not be 0!
178  if (Margin <= Size / 2) {
179  buffer = MALLOC(uchar, Size);
180  if (!buffer)
181  esyslog("ERROR: can't allocate ring buffer (size=%d)", Size);
182  Clear();
183  }
184  else
185  esyslog("ERROR: invalid margin for ring buffer (%d > %d)", Margin, Size / 2);
186  }
187  else
188  esyslog("ERROR: invalid size for ring buffer (%d)", Size);
189 #ifdef DEBUGRINGBUFFERS
190  lastHead = head;
191  lastTail = tail;
192  lastPut = lastGet = -1;
193  AddDebugRBL(this);
194 #endif
195 }
196 
198 {
199 #ifdef DEBUGRINGBUFFERS
200  DelDebugRBL(this);
201 #endif
202  free(buffer);
203  free(description);
204 }
205 
206 int cRingBufferLinear::DataReady(const uchar *Data, int Count)
207 {
208  return Count >= margin ? Count : 0;
209 }
210 
212 {
213  int diff = head - tail;
214  return (diff >= 0) ? diff : Size() + diff - margin;
215 }
216 
218 {
219  tail = head = margin;
220 #ifdef DEBUGRINGBUFFERS
221  lastHead = head;
222  lastTail = tail;
223  lastPut = lastGet = -1;
224 #endif
225  maxFill = 0;
226  EnablePut();
227 }
228 
229 int cRingBufferLinear::Read(int FileHandle, int Max)
230 {
231  int Tail = tail;
232  int diff = Tail - head;
233  int free = (diff > 0) ? diff - 1 : Size() - head;
234  if (Tail <= margin)
235  free--;
236  int Count = -1;
237  errno = EAGAIN;
238  if (free > 0) {
239  if (0 < Max && Max < free)
240  free = Max;
241  Count = safe_read(FileHandle, buffer + head, free);
242  if (Count > 0) {
243  int Head = head + Count;
244  if (Head >= Size())
245  Head = margin;
246  head = Head;
247  if (statistics) {
248  int fill = head - Tail;
249  if (fill < 0)
250  fill = Size() + fill;
251  else if (fill >= Size())
252  fill = Size() - 1;
253  UpdatePercentage(fill);
254  }
255  }
256  }
257 #ifdef DEBUGRINGBUFFERS
258  lastHead = head;
259  lastPut = Count;
260 #endif
261  EnableGet();
262  if (free == 0)
263  WaitForPut();
264  return Count;
265 }
266 
268 {
269  int Tail = tail;
270  int diff = Tail - head;
271  int free = (diff > 0) ? diff - 1 : Size() - head;
272  if (Tail <= margin)
273  free--;
274  int Count = -1;
275  errno = EAGAIN;
276  if (free > 0) {
277  if (0 < Max && Max < free)
278  free = Max;
279  Count = File->Read(buffer + head, free);
280  if (Count > 0) {
281  int Head = head + Count;
282  if (Head >= Size())
283  Head = margin;
284  head = Head;
285  if (statistics) {
286  int fill = head - Tail;
287  if (fill < 0)
288  fill = Size() + fill;
289  else if (fill >= Size())
290  fill = Size() - 1;
291  UpdatePercentage(fill);
292  }
293  }
294  }
295 #ifdef DEBUGRINGBUFFERS
296  lastHead = head;
297  lastPut = Count;
298 #endif
299  EnableGet();
300  if (free == 0)
301  WaitForPut();
302  return Count;
303 }
304 
305 int cRingBufferLinear::Put(const uchar *Data, int Count)
306 {
307  if (Count > 0) {
308  int Tail = tail;
309  int rest = Size() - head;
310  int diff = Tail - head;
311  int free = ((Tail < margin) ? rest : (diff > 0) ? diff : Size() + diff - margin) - 1;
312  if (statistics) {
313  int fill = Size() - free - 1 + Count;
314  if (fill >= Size())
315  fill = Size() - 1;
316  UpdatePercentage(fill);
317  }
318  if (free > 0) {
319  if (free < Count)
320  Count = free;
321  if (Count >= rest) {
322  memcpy(buffer + head, Data, rest);
323  if (Count - rest)
324  memcpy(buffer + margin, Data + rest, Count - rest);
325  head = margin + Count - rest;
326  }
327  else {
328  memcpy(buffer + head, Data, Count);
329  head += Count;
330  }
331  }
332  else
333  Count = 0;
334 #ifdef DEBUGRINGBUFFERS
335  lastHead = head;
336  lastPut = Count;
337 #endif
338  EnableGet();
339  if (Count == 0)
340  WaitForPut();
341  }
342  return Count;
343 }
344 
346 {
347  int Head = head;
348  if (getThreadTid <= 0)
350  int rest = Size() - tail;
351  if (rest < margin && Head < tail) {
352  int t = margin - rest;
353  memcpy(buffer + t, buffer + tail, rest);
354  tail = t;
355  rest = Head - tail;
356  }
357  int diff = Head - tail;
358  int cont = (diff >= 0) ? diff : Size() + diff - margin;
359  if (cont > rest)
360  cont = rest;
361  uchar *p = buffer + tail;
362  if ((cont = DataReady(p, cont)) > 0) {
363  Count = gotten = cont;
364  return p;
365  }
366  WaitForGet();
367  return NULL;
368 }
369 
370 void cRingBufferLinear::Del(int Count)
371 {
372  if (Count > gotten) {
373  esyslog("ERROR: invalid Count in cRingBufferLinear::Del: %d (limited to %d)", Count, gotten);
374  Count = gotten;
375  }
376  if (Count > 0) {
377  int Tail = tail;
378  Tail += Count;
379  gotten -= Count;
380  if (Tail >= Size())
381  Tail = margin;
382  tail = Tail;
383  EnablePut();
384  }
385 #ifdef DEBUGRINGBUFFERS
386  lastTail = tail;
387  lastGet = Count;
388 #endif
389 }
390 
391 // --- cFrame ----------------------------------------------------------------
392 
393 cFrame::cFrame(const uchar *Data, int Count, eFrameType Type, int Index, uint32_t Pts)
394 {
395  count = abs(Count);
396  type = Type;
397  index = Index;
398  pts = Pts;
399  if (Count < 0)
400  data = (uchar *)Data;
401  else {
402  data = MALLOC(uchar, count);
403  if (data)
404  memcpy(data, Data, count);
405  else
406  esyslog("ERROR: can't allocate frame buffer (count=%d)", count);
407  }
408  next = NULL;
409 }
410 
412 {
413  free(data);
414 }
415 
416 // --- cRingBufferFrame ------------------------------------------------------
417 
419 :cRingBuffer(Size, Statistics)
420 {
421  head = NULL;
422  currentFill = 0;
423 }
424 
426 {
427  Clear();
428 }
429 
431 {
432  Lock();
433  cFrame *p;
434  while ((p = Get()) != NULL)
435  Drop(p);
436  Unlock();
437  EnablePut();
438  EnableGet();
439 }
440 
442 {
443  if (Frame->Count() <= Free()) {
444  Lock();
445  if (head) {
446  Frame->next = head->next;
447  head->next = Frame;
448  head = Frame;
449  }
450  else {
451  head = Frame->next = Frame;
452  }
453  currentFill += Frame->Count();
454  Unlock();
455  EnableGet();
456  return true;
457  }
458  return false;
459 }
460 
462 {
463  Lock();
464  cFrame *p = head ? head->next : NULL;
465  Unlock();
466  return p;
467 }
468 
470 {
471  currentFill -= Frame->Count();
472  delete Frame;
473 }
474 
476 {
477  Lock();
478  if (head) {
479  if (Frame == head->next) {
480  if (head->next != head) {
481  head->next = Frame->next;
482  Delete(Frame);
483  }
484  else {
485  Delete(head);
486  head = NULL;
487  }
488  }
489  else
490  esyslog("ERROR: attempt to drop wrong frame from ring buffer!");
491  }
492  Unlock();
493  EnablePut();
494 }
495 
497 {
498  Lock();
499  int av = currentFill;
500  Unlock();
501  return av;
502 }
void EnableGet(void)
Definition: ringbuffer.c:83
unsigned char uchar
Definition: tools.h:30
virtual int DataReady(const uchar *Data, int Count)
By default a ring buffer has data ready as soon as there are at least &#39;margin&#39; bytes available...
Definition: ringbuffer.c:206
int putTimeout
Definition: ringbuffer.h:19
virtual void Clear(void)
Immediately clears the ring buffer.
Definition: ringbuffer.c:217
int overflowBytes
Definition: ringbuffer.h:24
void EnablePut(void)
Definition: ringbuffer.c:77
eFrameType
Definition: ringbuffer.h:105
virtual int Available(void)
Definition: ringbuffer.c:496
#define dsyslog(a...)
Definition: tools.h:36
virtual void Clear(void)
Definition: ringbuffer.c:430
#define PERCENTAGEDELTA
Definition: ringbuffer.c:21
ssize_t Read(void *Data, size_t Size)
Definition: tools.c:1774
int lastPercent
Definition: ringbuffer.h:29
void Signal(void)
Signals a caller of Wait() that the condition it is waiting for is met.
Definition: thread.c:85
void Unlock(void)
Definition: ringbuffer.h:136
cRingBufferFrame(int Size, bool Statistics=false)
Definition: ringbuffer.c:418
int Count(void) const
Definition: ringbuffer.h:123
#define esyslog(a...)
Definition: tools.h:34
void Drop(cFrame *Frame)
Definition: ringbuffer.c:475
cRingBufferLinear(int Size, int Margin=0, bool Statistics=false, const char *Description=NULL)
Creates a linear ring buffer.
Definition: ringbuffer.c:170
T max(T a, T b)
Definition: tools.h:55
cCondWait readyForGet
Definition: ringbuffer.h:18
int Put(const uchar *Data, int Count)
Puts at most Count bytes of Data into the ring buffer.
Definition: ringbuffer.c:305
void UpdatePercentage(int Fill)
Definition: ringbuffer.c:46
cFrame * next
Definition: ringbuffer.h:110
cUnbufferedFile is used for large files that are mainly written or read in a streaming manner...
Definition: tools.h:418
#define MALLOC(type, size)
Definition: tools.h:46
bool statistics
Definition: ringbuffer.h:30
int getTimeout
Definition: ringbuffer.h:20
void WaitForGet(void)
Definition: ringbuffer.c:71
virtual int Free(void)
Definition: ringbuffer.h:38
int Read(int FileHandle, int Max=0)
Reads at most Max bytes from FileHandle and stores them in the ring buffer.
Definition: ringbuffer.c:229
virtual int Available(void)=0
tThreadId getThreadTid
Definition: ringbuffer.h:27
cFrame * Get(void)
Definition: ringbuffer.c:461
bool Wait(int TimeoutMs=0)
Waits at most TimeoutMs milliseconds for a call to Signal(), or forever if TimeoutMs is 0...
Definition: thread.c:63
static tThreadId ThreadId(void)
Definition: thread.c:341
void Activate(void)
Activates the global I/O throttling mechanism.
Definition: thread.c:421
~cFrame()
Definition: ringbuffer.c:411
void Del(int Count)
Deletes at most Count bytes from the ring buffer.
Definition: ringbuffer.c:370
cFrame(const uchar *Data, int Count, eFrameType=ftUnknown, int Index=-1, uint32_t Pts=0)
Creates a new cFrame object.
Definition: ringbuffer.c:393
bool Put(cFrame *Frame)
Definition: ringbuffer.c:441
void SetIoThrottle(void)
Definition: ringbuffer.c:95
virtual ~cRingBufferLinear()
Definition: ringbuffer.c:197
#define IOTHROTTLEHIGH
Definition: ringbuffer.c:24
uchar * Get(int &Count)
Gets data from the ring buffer.
Definition: ringbuffer.c:345
void Lock(void)
Definition: ringbuffer.h:135
int Size(void)
Definition: ringbuffer.h:39
cCondWait readyForPut
Definition: ringbuffer.h:18
ssize_t safe_read(int filedes, void *buffer, size_t size)
Definition: tools.c:53
void Delete(cFrame *Frame)
Definition: ringbuffer.c:469
cRingBuffer(int Size, bool Statistics=false)
Definition: ringbuffer.c:26
cIoThrottle * ioThrottle
Definition: ringbuffer.h:25
void WaitForPut(void)
Definition: ringbuffer.c:65
#define IOTHROTTLELOW
Definition: ringbuffer.c:23
virtual ~cRingBufferFrame()
Definition: ringbuffer.c:425
int overflowCount
Definition: ringbuffer.h:23
#define PERCENTAGETHRESHOLD
Definition: ringbuffer.c:22
virtual ~cRingBuffer()
Definition: ringbuffer.c:39
void SetTimeouts(int PutTimeout, int GetTimeout)
Definition: ringbuffer.c:89
char * description
Definition: ringbuffer.h:64
#define OVERFLOWREPORTDELTA
Definition: ringbuffer.c:20
void Release(void)
Releases the global I/O throttling mechanism.
Definition: thread.c:432
void ReportOverflow(int Bytes)
Definition: ringbuffer.c:101
time_t lastOverflowReport
Definition: ringbuffer.h:22
virtual int Available(void)
Definition: ringbuffer.c:211