ccRTP
iqueue.h
Go to the documentation of this file.
1 // Copyright (C) 2001,2002,2004 Federico Montesino Pouzols <fedemp@altern.org>.
2 //
3 // This program is free software; you can redistribute it and/or modify
4 // it under the terms of the GNU General Public License as published by
5 // the Free Software Foundation; either version 2 of the License, or
6 // (at your option) any later version.
7 //
8 // This program is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 // GNU General Public License for more details.
12 //
13 // You should have received a copy of the GNU General Public License
14 // along with this program; if not, write to the Free Software
15 // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
16 //
17 // As a special exception, you may use this file as part of a free software
18 // library without restriction. Specifically, if other files instantiate
19 // templates or use macros or inline functions from this file, or you compile
20 // this file and link it with other files to produce an executable, this
21 // file does not by itself cause the resulting executable to be covered by
22 // the GNU General Public License. This exception does not however
23 // invalidate any other reasons why the executable file might be covered by
24 // the GNU General Public License.
25 //
26 // This exception applies only to the code released under the name GNU
27 // ccRTP. If you copy code from other releases into a copy of GNU
28 // ccRTP, as the General Public License permits, the exception does
29 // not apply to the code that you add in this way. To avoid misleading
30 // anyone as to the status of such modified files, you must delete
31 // this exception notice from them.
32 //
33 // If you write modifications of your own for GNU ccRTP, it is your choice
34 // whether to permit this exception to apply to your modifications.
35 // If you do not wish that, delete this exception notice.
36 //
37 
44 #ifndef CCXX_RTP_IQUEUE_H_
45 #define CCXX_RTP_IQUEUE_H_
46 
47 #include <ccrtp/queuebase.h>
48 #include <ccrtp/CryptoContext.h>
49 
50 #include <list>
51 
52 NAMESPACE_COMMONCPP
53 
68 class __EXPORT Members
69 {
70 public:
71  inline void
72  setMembersCount(uint32 n)
73  { members = n; }
74 
75  inline void
76  increaseMembersCount()
77  { members++; }
78 
79  inline void
80  decreaseMembersCount()
81  { members--; }
82 
83  inline uint32
84  getMembersCount() const
85  { return members; }
86 
87  inline void
88  setSendersCount(uint32 n)
89  { activeSenders = n; }
90 
91  inline void
92  increaseSendersCount()
93  { activeSenders++; }
94 
95  inline void
96  decreaseSendersCount()
97  { activeSenders--; }
98 
99  inline uint32
100  getSendersCount() const
101  { return activeSenders; }
102 
103 protected:
105  members(0),
106  activeSenders(0)
107  { }
108 
109  inline virtual ~Members()
110  { }
111 
112 private:
114  uint32 members;
116  uint32 activeSenders;
117 };
118 
125 class __EXPORT SyncSourceHandler
126 {
127 public:
134  inline void*
135  getLink(const SyncSource& source) const
136  { return source.getLink(); }
137 
138  inline void
139  setLink(SyncSource& source, void* link)
140  { source.setLink(link); }
141 
142  inline void
143  setParticipant(SyncSource& source, Participant& p)
144  { source.setParticipant(p); }
145 
146  inline void
147  setState(SyncSource& source, SyncSource::State ns)
148  { source.setState(ns); }
149 
150  inline void
151  setSender(SyncSource& source, bool active)
152  { source.setSender(active); }
153 
154  inline void
155  setDataTransportPort(SyncSource& source, tpport_t p)
156  { source.setDataTransportPort(p); }
157 
158  inline void
159  setControlTransportPort(SyncSource& source, tpport_t p)
160  { source.setControlTransportPort(p); }
161 
162  inline void
163  setNetworkAddress(SyncSource& source, InetAddress addr)
164  { source.setNetworkAddress(addr); }
165 
166 protected:
168  { }
169 
170  inline virtual ~SyncSourceHandler()
171  { }
172 };
173 
180 class __EXPORT ParticipantHandler
181 {
182 public:
183  inline void
184  setSDESItem(Participant* part, SDESItemType item,
185  const std::string& val)
186  { part->setSDESItem(item,val); }
187 
188  inline void
189  setPRIVPrefix(Participant* part, const std::string val)
190  { part->setPRIVPrefix(val); }
191 
192 protected:
194  { }
195 
196  inline virtual ~ParticipantHandler()
197  { }
198 };
199 
206 class __EXPORT ApplicationHandler
207 {
208 public:
209  inline void
210  addParticipant(RTPApplication& app, Participant& part)
211  { app.addParticipant(part); }
212 
213  inline void
214  removeParticipant(RTPApplication& app,
215  RTPApplication::ParticipantLink* pl)
216  { app.removeParticipant(pl); }
217 
218 protected:
220  { }
221 
222  inline virtual ~ApplicationHandler()
223  { }
224 };
225 
233 class __EXPORT ConflictHandler
234 {
235 public:
237  {
238  ConflictingTransportAddress(InetAddress na,
239  tpport_t dtp, tpport_t ctp);
240 
241  void setNext(ConflictingTransportAddress* nc)
242  { next = nc; }
243 
244  inline const InetAddress& getNetworkAddress( ) const
245  { return networkAddress; }
246 
247  inline tpport_t getDataTransportPort() const
248  { return dataTransportPort; }
249 
250  inline tpport_t getControlTransportPort() const
251  { return controlTransportPort; }
252 
253  InetAddress networkAddress;
257  // arrival time of last data or control packet.
258  timeval lastPacketTime;
259  };
260 
265  ConflictingTransportAddress* searchDataConflict(InetAddress na,
266  tpport_t dtp);
271  ConflictingTransportAddress* searchControlConflict(InetAddress na,
272  tpport_t ctp);
273 
274  void updateConflict(ConflictingTransportAddress& ca)
275  { SysTime::gettimeofday(&(ca.lastPacketTime),NULL); }
276 
277  void addConflict(const InetAddress& na, tpport_t dtp, tpport_t ctp);
278 
279 protected:
281  { firstConflict = lastConflict = NULL; }
282 
283  inline virtual ~ConflictHandler()
284  { }
285 
287 };
288 
299 class __EXPORT MembershipBookkeeping :
300  public SyncSourceHandler,
301  public ParticipantHandler,
302  public ApplicationHandler,
303  public ConflictHandler,
304  private Members
305 {
306 public:
307  inline size_t getDefaultMembersHashSize()
308  { return defaultMembersHashSize; }
309 
310 protected:
311 
325  MembershipBookkeeping(uint32 initialSize = defaultMembersHashSize);
326 
331  inline virtual
333  { endMembers(); }
334 
335  struct SyncSourceLink;
336 
337  inline SyncSourceLink* getLink(const SyncSource& source) const
338  { return static_cast<SyncSourceLink*>(SyncSourceHandler::getLink(source)); }
343  inline bool isMine(const SyncSource& source) const
344  { return getLink(source)->getMembership() == this; }
345 
353  {
355  struct timeval& recv_ts,
356  uint32 shifted_ts,
357  IncomingRTPPktLink* sp,
358  IncomingRTPPktLink* sn,
360  IncomingRTPPktLink* n) :
361  packet(pkt),
362  sourceLink(sLink),
363  prev(p), next(n),
364  srcPrev(sp), srcNext(sn),
365  receptionTime(recv_ts),
366  shiftedTimestamp(shifted_ts)
367  { }
368 
370  { }
371 
372  inline SyncSourceLink* getSourceLink() const
373  { return sourceLink; }
374 
375  inline void setSourceLink(SyncSourceLink* src)
376  { sourceLink = src; }
377 
378  inline IncomingRTPPktLink* getNext() const
379  { return next; }
380 
381  inline void setNext(IncomingRTPPktLink* nl)
382  { next = nl; }
383 
384  inline IncomingRTPPktLink* getPrev() const
385  { return prev; }
386 
387  inline void setPrev(IncomingRTPPktLink* pl)
388  { prev = pl; }
389 
390  inline IncomingRTPPktLink* getSrcNext() const
391  { return srcNext; }
392 
393  inline void setSrcNext(IncomingRTPPktLink* sn)
394  { srcNext = sn; }
395 
396  inline IncomingRTPPktLink* getSrcPrev() const
397  { return srcPrev; }
398 
399  inline void setSrcPrev(IncomingRTPPktLink* sp)
400  { srcPrev = sp; }
401 
402  inline IncomingRTPPkt* getPacket() const
403  { return packet; }
404 
405  inline void setPacket(IncomingRTPPkt* pkt)
406  { packet = pkt; }
407 
415  inline void setRecvTime(const timeval &t)
416  { receptionTime = t; }
417 
421  inline timeval getRecvTime() const
422  { return receptionTime; }
423 
432  inline uint32 getTimestamp() const
433  { return shiftedTimestamp; }
434 
435  inline void setTimestamp(uint32 ts)
436  { shiftedTimestamp = ts;}
437 
438  // the packet this link refers to.
440  // the synchronization source this packet comes from.
442  // global incoming packet queue links.
444  // source specific incoming packet queue links.
446  // time this packet was received at
447  struct timeval receptionTime;
448  // timestamp of the packet in host order and after
449  // substracting the initial timestamp for its source
450  // (it is an increment from the initial timestamp).
452  };
453 
471  {
472  // 2^16
473  static const uint32 SEQNUMMOD;
474 
476  SyncSource* s,
477  IncomingRTPPktLink* fp = NULL,
478  IncomingRTPPktLink* lp = NULL,
479  SyncSourceLink* ps = NULL,
480  SyncSourceLink* ns = NULL,
481  SyncSourceLink* ncollis = NULL) :
482  membership(m), source(s), first(fp), last(lp),
483  prev(ps), next(ns), nextCollis(ncollis),
484  prevConflict(NULL)
485  { m->setLink(*s,this); // record that the source is associated
486  initStats(); // to this link.
487  }
488 
492  ~SyncSourceLink();
493 
494  inline MembershipBookkeeping* getMembership()
495  { return membership; }
496 
501  inline SyncSource* getSource() { return source; }
502 
507  inline IncomingRTPPktLink* getFirst()
508  { return first; }
509 
510  inline void setFirst(IncomingRTPPktLink* fp)
511  { first = fp; }
512 
517  inline IncomingRTPPktLink* getLast()
518  { return last; }
519 
520  inline void setLast(IncomingRTPPktLink* lp)
521  { last = lp; }
522 
526  inline SyncSourceLink* getPrev()
527  { return prev; }
528 
529  inline void setPrev(SyncSourceLink* ps)
530  { prev = ps; }
531 
535  inline SyncSourceLink* getNext()
536  { return next; }
537 
538  inline void setNext(SyncSourceLink *ns)
539  { next = ns; }
540 
547  inline SyncSourceLink* getNextCollis()
548  { return nextCollis; }
549 
550  inline void setNextCollis(SyncSourceLink* ns)
551  { nextCollis = ns; }
552 
553  inline ConflictingTransportAddress* getPrevConflict() const
554  { return prevConflict; }
555 
559  void setPrevConflict(InetAddress& addr, tpport_t dataPort,
560  tpport_t controlPort);
561 
562  unsigned char* getSenderInfo()
563  { return senderInfo; }
564 
565  void setSenderInfo(unsigned char* si);
566 
567  unsigned char* getReceiverInfo()
568  { return receiverInfo; }
569 
570  void setReceiverInfo(unsigned char* ri);
571 
572  inline timeval getLastPacketTime() const
573  { return lastPacketTime; }
574 
575  inline timeval getLastRTCPPacketTime() const
576  { return lastRTCPPacketTime; }
577 
578  inline timeval getLastRTCPSRTime() const
579  { return lastRTCPSRTime; }
580 
585  inline uint32 getObservedPacketCount() const
586  { return obsPacketCount; }
587 
588  inline void incObservedPacketCount()
589  { obsPacketCount++; }
590 
595  inline uint32 getObservedOctetCount() const
596  { return obsOctetCount; }
597 
598  inline void incObservedOctetCount(uint32 n)
599  { obsOctetCount += n; }
600 
604  uint16
605  getMaxSeqNum() const
606  { return maxSeqNum; }
607 
612  void
613  setMaxSeqNum(uint16 max)
614  { maxSeqNum = max; }
615 
616  inline uint32
617  getExtendedMaxSeqNum() const
618  { return extendedMaxSeqNum; }
619 
620  inline void
621  setExtendedMaxSeqNum(uint32 seq)
622  { extendedMaxSeqNum = seq; }
623 
624  inline uint32 getCumulativePacketLost() const
625  { return cumulativePacketLost; }
626 
627  inline void setCumulativePacketLost(uint32 pl)
628  { cumulativePacketLost = pl; }
629 
630  inline uint8 getFractionLost() const
631  { return fractionLost; }
632 
633  inline void setFractionLost(uint8 fl)
634  { fractionLost = fl; }
635 
636  inline uint32 getLastPacketTransitTime()
637  { return lastPacketTransitTime; }
638 
639  inline void setLastPacketTransitTime(uint32 time)
640  { lastPacketTransitTime = time; }
641 
642  inline float getJitter() const
643  { return jitter; }
644 
645  inline void setJitter(float j)
646  { jitter = j; }
647 
648  inline uint32 getInitialDataTimestamp() const
649  { return initialDataTimestamp; }
650 
651  inline void setInitialDataTimestamp(uint32 ts)
652  { initialDataTimestamp = ts; }
653 
654  inline timeval getInitialDataTime() const
655  { return initialDataTime; }
656 
657  inline void setInitialDataTime(timeval it)
658  { initialDataTime = it; }
659 
667  bool getGoodbye()
668  {
669  if(!flag)
670  return false;
671  flag = false;
672  return true;
673  }
674 
681  bool getHello() {
682  if(flag)
683  return false;
684  flag = true;
685  return true;
686  }
687 
688  inline uint32 getBadSeqNum() const
689  { return badSeqNum; }
690 
691  inline void setBadSeqNum(uint32 seq)
692  { badSeqNum = seq; }
693 
694  uint8 getProbation() const
695  { return probation; }
696 
697  inline void setProbation(uint8 p)
698  { probation = p; }
699 
700  inline void decProbation()
701  { --probation; }
702 
703  bool isValid() const
704  { return 0 == probation; }
705 
706  inline uint16 getBaseSeqNum() const
707  { return baseSeqNum; }
708 
709  inline uint32 getSeqNumAccum() const
710  { return seqNumAccum; }
711 
712  inline void incSeqNumAccum()
713  { seqNumAccum += SEQNUMMOD; }
714 
718  inline void initSequence(uint16 seqnum)
719  { maxSeqNum = seqNumAccum = seqnum; }
720 
731  void recordInsertion(const IncomingRTPPktLink& pl);
732 
733  void initStats();
734 
739  void computeStats();
740 
742  // The source this link object refers to.
744  // first/last packets from this source in the queue.
746  // Links for synchronization sources located before
747  // and after this one in the list of sources.
749  // Prev and next inside the hash table collision list.
751  ConflictingTransportAddress* prevConflict;
752  unsigned char* senderInfo;
753  unsigned char* receiverInfo;
754  // time the last RTP packet from this source was
755  // received at.
756  timeval lastPacketTime;
757  // time the last RTCP packet was received.
759  // time the lasrt RTCP SR was received. Required for
760  // DLSR computation.
761  timeval lastRTCPSRTime;
762 
763  // for outgoing RR reports.
764  // number of packets received from this source.
766  // number of octets received from this source.
768  // the higher sequence number seen from this source
769  uint16 maxSeqNum;
773  // for interarrivel jitter computation
775  // interarrival jitter of packets from this source.
776  float jitter;
779 
780  // this flag assures we only call one gotHello and one
781  // gotGoodbye for this src.
782  bool flag;
783 
784  // for source validation:
785  uint32 badSeqNum;
786  uint8 probation; // packets in sequence before valid.
787  uint16 baseSeqNum;
790  uint32 seqNumAccum;
791  };
792 
797  bool
798  isRegistered(uint32 ssrc);
799 
809  getSourceBySSRC(uint32 ssrc, bool& created);
810 
821  bool
822  BYESource(uint32 ssrc);
823 
831  bool
832  removeSource(uint32 ssrc);
833 
834  inline SyncSourceLink* getFirst()
835  { return first; }
836 
837  inline SyncSourceLink* getLast()
838  { return last; }
839 
840  inline uint32
842  { return Members::getMembersCount(); }
843 
844  inline void
845  setMembersCount(uint32 n)
847 
848  inline uint32
850  { return Members::getSendersCount(); }
851 
852  static const size_t defaultMembersHashSize;
853  static const uint32 SEQNUMMOD;
854 
855 private:
857 
859  operator=(const MembershipBookkeeping &o);
860 
865  void
866  endMembers();
867 
868  // Hash table with sources of RTP and RTCP packets
869  uint32 sourceBucketsNum;
870  SyncSourceLink** sourceLinks;
871  // List of sources, ordered from older to newer
872  SyncSourceLink* first, * last;
873 };
874 
882  protected MembershipBookkeeping
883 {
884 public:
891  {
892  public:
893  typedef std::forward_iterator_tag iterator_category;
895  typedef std::ptrdiff_t difference_type;
896  typedef const SyncSource* pointer;
897  typedef const SyncSource& reference;
898 
899  SyncSourcesIterator(SyncSourceLink* l = NULL) :
900  link(l)
901  { }
902 
904  link(si.link)
905  { }
906 
907  reference operator*() const
908  { return *(link->getSource()); }
909 
910  pointer operator->() const
911  { return link->getSource(); }
912 
914  link = link->getNext();
915  return *this;
916  }
917 
919  SyncSourcesIterator result(*this);
920  ++(*this);
921  return result;
922  }
923 
924  friend bool operator==(const SyncSourcesIterator& l,
925  const SyncSourcesIterator& r)
926  { return l.link == r.link; }
927 
928  friend bool operator!=(const SyncSourcesIterator& l,
929  const SyncSourcesIterator& r)
930  { return l.link != r.link; }
931 
932  private:
933  SyncSourceLink *link;
934  };
935 
938 
940  { return SyncSourcesIterator(NULL); }
941 
951  const AppDataUnit*
952  getData(uint32 stamp, const SyncSource* src = NULL);
953 
954 
961  bool
962  isWaiting(const SyncSource* src = NULL) const;
963 
970  uint32
971  getFirstTimestamp(const SyncSource* src = NULL) const;
972 
995  void
996  setMinValidPacketSequence(uint8 packets)
997  { minValidPacketSequence = packets; }
998 
999  uint8
1000  getDefaultMinValidPacketSequence() const
1001  { return defaultMinValidPacketSequence; }
1002 
1007  uint8
1008  getMinValidPacketSequence() const
1009  { return minValidPacketSequence; }
1010 
1011  void
1012  setMaxPacketMisorder(uint16 packets)
1013  { maxPacketMisorder = packets; }
1014 
1015  uint16
1016  getDefaultMaxPacketMisorder() const
1017  { return defaultMaxPacketMisorder; }
1018 
1019  uint16
1020  getMaxPacketMisorder() const
1021  { return maxPacketMisorder; }
1022 
1028  void
1029  setMaxPacketDropout(uint16 packets) // default: 3000.
1030  { maxPacketDropout = packets; }
1031 
1032  uint16
1033  getDefaultMaxPacketDropout() const
1034  { return defaultMaxPacketDropout; }
1035 
1036  uint16
1037  getMaxPacketDropout() const
1038  { return maxPacketDropout; }
1039 
1040  // default value for constructors that allow to specify
1041  // members table s\ize
1042  inline static size_t
1043  getDefaultMembersSize()
1044  { return defaultMembersSize; }
1045 
1054  void
1055  setInQueueCryptoContext(CryptoContext* cc);
1056 
1067  void
1068  removeInQueueCryptoContext(CryptoContext* cc);
1069 
1077  CryptoContext*
1078  getInQueueCryptoContext(uint32 ssrc);
1079 
1080 protected:
1084  IncomingDataQueue(uint32 size);
1085 
1087  { }
1088 
1101  bool checkSSRCInIncomingRTPPkt(SyncSourceLink& sourceLink,
1102  bool is_new, InetAddress& na,
1103  tpport_t tp);
1104 
1120  void setSourceExpirationPeriod(uint8 intervals)
1121  { sourceExpirationPeriod = intervals; }
1122 
1129  virtual size_t
1130  takeInDataPacket();
1131 
1132  void renewLocalSSRC();
1133 
1144  getWaiting(uint32 timestamp, const SyncSource *src = NULL);
1145 
1161  bool
1162  recordReception(SyncSourceLink& srcLink, const IncomingRTPPkt& pkt,
1163  const timeval recvtime);
1164 
1171  void
1172  recordExtraction(const IncomingRTPPkt& pkt);
1173 
1174  void purgeIncomingQueue();
1175 
1182  inline virtual void
1183  onNewSyncSource(const SyncSource&)
1184  { }
1185 
1186 protected:
1203  inline virtual bool
1204  onRTPPacketRecv(IncomingRTPPkt&)
1205  { return true; }
1206 
1215  inline virtual void onExpireRecv(IncomingRTPPkt&)
1216  { return; }
1217 
1231  inline virtual bool
1232  onSRTPPacketError(IncomingRTPPkt& pkt, int32 errorCode)
1233  { return false; }
1234 
1235  inline virtual bool
1236  end2EndDelayed(IncomingRTPPktLink&)
1237  { return false; }
1238 
1254  bool
1255  insertRecvPacket(IncomingRTPPktLink* packetLink);
1256 
1268  virtual size_t
1269  recvData(unsigned char* buffer, size_t length,
1270  InetHostAddress& host, tpport_t& port) = 0;
1271 
1272  virtual size_t
1273  getNextDataPacketSize() const = 0;
1274 
1275  mutable ThreadLock recvLock;
1276  // reception queue
1277  IncomingRTPPktLink* recvFirst, * recvLast;
1278  // values for packet validation.
1279  static const uint8 defaultMinValidPacketSequence;
1280  static const uint16 defaultMaxPacketMisorder;
1281  static const uint16 defaultMaxPacketDropout;
1285  static const size_t defaultMembersSize;
1287  mutable Mutex cryptoMutex;
1288  std::list<CryptoContext *> cryptoContexts;
1289 };
1290  // iqueue
1292 
1293 END_NAMESPACE
1294 
1295 #endif //CCXX_RTP_IQUEUE_H_
1296