00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00044 #ifndef CCXX_RTP_IQUEUE_H_
00045 #define CCXX_RTP_IQUEUE_H_
00046
00047 #include <ccrtp/queuebase.h>
00048 #include <ccrtp/CryptoContext.h>
00049
00050 #include <list>
00051
00052 NAMESPACE_COMMONCPP
00053
00068 class __EXPORT Members
00069 {
00070 public:
00071 inline void
00072 setMembersCount(uint32 n)
00073 { members = n; }
00074
00075 inline void
00076 increaseMembersCount()
00077 { members++; }
00078
00079 inline void
00080 decreaseMembersCount()
00081 { members--; }
00082
00083 inline uint32
00084 getMembersCount() const
00085 { return members; }
00086
00087 inline void
00088 setSendersCount(uint32 n)
00089 { activeSenders = n; }
00090
00091 inline void
00092 increaseSendersCount()
00093 { activeSenders++; }
00094
00095 inline void
00096 decreaseSendersCount()
00097 { activeSenders--; }
00098
00099 inline uint32
00100 getSendersCount() const
00101 { return activeSenders; }
00102
00103 protected:
00104 Members() :
00105 members(0),
00106 activeSenders(0)
00107 { }
00108
00109 inline virtual ~Members()
00110 { }
00111
00112 private:
00114 uint32 members;
00116 uint32 activeSenders;
00117 };
00118
00125 class __EXPORT SyncSourceHandler
00126 {
00127 public:
00134 inline void*
00135 getLink(const SyncSource& source) const
00136 { return source.getLink(); }
00137
00138 inline void
00139 setLink(SyncSource& source, void* link)
00140 { source.setLink(link); }
00141
00142 inline void
00143 setParticipant(SyncSource& source, Participant& p)
00144 { source.setParticipant(p); }
00145
00146 inline void
00147 setState(SyncSource& source, SyncSource::State ns)
00148 { source.setState(ns); }
00149
00150 inline void
00151 setSender(SyncSource& source, bool active)
00152 { source.setSender(active); }
00153
00154 inline void
00155 setDataTransportPort(SyncSource& source, tpport_t p)
00156 { source.setDataTransportPort(p); }
00157
00158 inline void
00159 setControlTransportPort(SyncSource& source, tpport_t p)
00160 { source.setControlTransportPort(p); }
00161
00162 inline void
00163 setNetworkAddress(SyncSource& source, InetAddress addr)
00164 { source.setNetworkAddress(addr); }
00165
00166 protected:
00167 SyncSourceHandler()
00168 { }
00169
00170 inline virtual ~SyncSourceHandler()
00171 { }
00172 };
00173
00180 class __EXPORT ParticipantHandler
00181 {
00182 public:
00183 inline void
00184 setSDESItem(Participant* part, SDESItemType item,
00185 const std::string& val)
00186 { part->setSDESItem(item,val); }
00187
00188 inline void
00189 setPRIVPrefix(Participant* part, const std::string val)
00190 { part->setPRIVPrefix(val); }
00191
00192 protected:
00193 ParticipantHandler()
00194 { }
00195
00196 inline virtual ~ParticipantHandler()
00197 { }
00198 };
00199
00206 class __EXPORT ApplicationHandler
00207 {
00208 public:
00209 inline void
00210 addParticipant(RTPApplication& app, Participant& part)
00211 { app.addParticipant(part); }
00212
00213 inline void
00214 removeParticipant(RTPApplication& app,
00215 RTPApplication::ParticipantLink* pl)
00216 { app.removeParticipant(pl); }
00217
00218 protected:
00219 ApplicationHandler()
00220 { }
00221
00222 inline virtual ~ApplicationHandler()
00223 { }
00224 };
00225
00233 class __EXPORT ConflictHandler
00234 {
00235 public:
00236 struct ConflictingTransportAddress
00237 {
00238 ConflictingTransportAddress(InetAddress na,
00239 tpport_t dtp, tpport_t ctp);
00240
00241 void setNext(ConflictingTransportAddress* nc)
00242 { next = nc; }
00243
00244 inline const InetAddress& getNetworkAddress( ) const
00245 { return networkAddress; }
00246
00247 inline tpport_t getDataTransportPort() const
00248 { return dataTransportPort; }
00249
00250 inline tpport_t getControlTransportPort() const
00251 { return controlTransportPort; }
00252
00253 InetAddress networkAddress;
00254 tpport_t dataTransportPort;
00255 tpport_t controlTransportPort;
00256 ConflictingTransportAddress* next;
00257
00258 timeval lastPacketTime;
00259 };
00260
00265 ConflictingTransportAddress* searchDataConflict(InetAddress na,
00266 tpport_t dtp);
00271 ConflictingTransportAddress* searchControlConflict(InetAddress na,
00272 tpport_t ctp);
00273
00274 void updateConflict(ConflictingTransportAddress& ca)
00275 { SysTime::gettimeofday(&(ca.lastPacketTime),NULL); }
00276
00277 void addConflict(const InetAddress& na, tpport_t dtp, tpport_t ctp);
00278
00279 protected:
00280 ConflictHandler()
00281 { firstConflict = lastConflict = NULL; }
00282
00283 inline virtual ~ConflictHandler()
00284 { }
00285
00286 ConflictingTransportAddress* firstConflict, * lastConflict;
00287 };
00288
00299 class __EXPORT MembershipBookkeeping :
00300 public SyncSourceHandler,
00301 public ParticipantHandler,
00302 public ApplicationHandler,
00303 public ConflictHandler,
00304 private Members
00305 {
00306 public:
00307 inline size_t getDefaultMembersHashSize()
00308 { return defaultMembersHashSize; }
00309
00310 protected:
00311
00325 MembershipBookkeeping(uint32 initialSize = defaultMembersHashSize);
00326
00331 inline virtual
00332 ~MembershipBookkeeping()
00333 { endMembers(); }
00334
00335 struct SyncSourceLink;
00336
00337 inline SyncSourceLink* getLink(const SyncSource& source) const
00338 { return static_cast<SyncSourceLink*>(SyncSourceHandler::getLink(source)); }
00343 inline bool isMine(const SyncSource& source) const
00344 { return getLink(source)->getMembership() == this; }
00345
00352 struct IncomingRTPPktLink
00353 {
00354 IncomingRTPPktLink(IncomingRTPPkt* pkt, SyncSourceLink* sLink,
00355 struct timeval& recv_ts,
00356 uint32 shifted_ts,
00357 IncomingRTPPktLink* sp,
00358 IncomingRTPPktLink* sn,
00359 IncomingRTPPktLink* p,
00360 IncomingRTPPktLink* n) :
00361 packet(pkt),
00362 sourceLink(sLink),
00363 prev(p), next(n),
00364 srcPrev(sp), srcNext(sn),
00365 receptionTime(recv_ts),
00366 shiftedTimestamp(shifted_ts)
00367 { }
00368
00369 ~IncomingRTPPktLink()
00370 { }
00371
00372 inline SyncSourceLink* getSourceLink() const
00373 { return sourceLink; }
00374
00375 inline void setSourceLink(SyncSourceLink* src)
00376 { sourceLink = src; }
00377
00378 inline IncomingRTPPktLink* getNext() const
00379 { return next; }
00380
00381 inline void setNext(IncomingRTPPktLink* nl)
00382 { next = nl; }
00383
00384 inline IncomingRTPPktLink* getPrev() const
00385 { return prev; }
00386
00387 inline void setPrev(IncomingRTPPktLink* pl)
00388 { prev = pl; }
00389
00390 inline IncomingRTPPktLink* getSrcNext() const
00391 { return srcNext; }
00392
00393 inline void setSrcNext(IncomingRTPPktLink* sn)
00394 { srcNext = sn; }
00395
00396 inline IncomingRTPPktLink* getSrcPrev() const
00397 { return srcPrev; }
00398
00399 inline void setSrcPrev(IncomingRTPPktLink* sp)
00400 { srcPrev = sp; }
00401
00402 inline IncomingRTPPkt* getPacket() const
00403 { return packet; }
00404
00405 inline void setPacket(IncomingRTPPkt* pkt)
00406 { packet = pkt; }
00407
00415 inline void setRecvTime(const timeval &t)
00416 { receptionTime = t; }
00417
00421 inline timeval getRecvTime() const
00422 { return receptionTime; }
00423
00432 inline uint32 getTimestamp() const
00433 { return shiftedTimestamp; }
00434
00435 inline void setTimestamp(uint32 ts)
00436 { shiftedTimestamp = ts;}
00437
00438
00439 IncomingRTPPkt* packet;
00440
00441 SyncSourceLink* sourceLink;
00442
00443 IncomingRTPPktLink* prev, * next;
00444
00445 IncomingRTPPktLink* srcPrev, * srcNext;
00446
00447 struct timeval receptionTime;
00448
00449
00450
00451 uint32 shiftedTimestamp;
00452 };
00453
00470 struct SyncSourceLink
00471 {
00472
00473 static const uint32 SEQNUMMOD;
00474
00475 SyncSourceLink(MembershipBookkeeping* m,
00476 SyncSource* s,
00477 IncomingRTPPktLink* fp = NULL,
00478 IncomingRTPPktLink* lp = NULL,
00479 SyncSourceLink* ps = NULL,
00480 SyncSourceLink* ns = NULL,
00481 SyncSourceLink* ncollis = NULL) :
00482 membership(m), source(s), first(fp), last(lp),
00483 prev(ps), next(ns), nextCollis(ncollis),
00484 prevConflict(NULL)
00485 { m->setLink(*s,this);
00486 initStats();
00487 }
00488
00492 ~SyncSourceLink();
00493
00494 inline MembershipBookkeeping* getMembership()
00495 { return membership; }
00496
00501 inline SyncSource* getSource() { return source; }
00502
00507 inline IncomingRTPPktLink* getFirst()
00508 { return first; }
00509
00510 inline void setFirst(IncomingRTPPktLink* fp)
00511 { first = fp; }
00512
00517 inline IncomingRTPPktLink* getLast()
00518 { return last; }
00519
00520 inline void setLast(IncomingRTPPktLink* lp)
00521 { last = lp; }
00522
00526 inline SyncSourceLink* getPrev()
00527 { return prev; }
00528
00529 inline void setPrev(SyncSourceLink* ps)
00530 { prev = ps; }
00531
00535 inline SyncSourceLink* getNext()
00536 { return next; }
00537
00538 inline void setNext(SyncSourceLink *ns)
00539 { next = ns; }
00540
00547 inline SyncSourceLink* getNextCollis()
00548 { return nextCollis; }
00549
00550 inline void setNextCollis(SyncSourceLink* ns)
00551 { nextCollis = ns; }
00552
00553 inline ConflictingTransportAddress* getPrevConflict() const
00554 { return prevConflict; }
00555
00559 void setPrevConflict(InetAddress& addr, tpport_t dataPort,
00560 tpport_t controlPort);
00561
00562 unsigned char* getSenderInfo()
00563 { return senderInfo; }
00564
00565 void setSenderInfo(unsigned char* si);
00566
00567 unsigned char* getReceiverInfo()
00568 { return receiverInfo; }
00569
00570 void setReceiverInfo(unsigned char* ri);
00571
00572 inline timeval getLastPacketTime() const
00573 { return lastPacketTime; }
00574
00575 inline timeval getLastRTCPPacketTime() const
00576 { return lastRTCPPacketTime; }
00577
00578 inline timeval getLastRTCPSRTime() const
00579 { return lastRTCPSRTime; }
00580
00585 inline uint32 getObservedPacketCount() const
00586 { return obsPacketCount; }
00587
00588 inline void incObservedPacketCount()
00589 { obsPacketCount++; }
00590
00595 inline uint32 getObservedOctetCount() const
00596 { return obsOctetCount; }
00597
00598 inline void incObservedOctetCount(uint32 n)
00599 { obsOctetCount += n; }
00600
00604 uint16
00605 getMaxSeqNum() const
00606 { return maxSeqNum; }
00607
00612 void
00613 setMaxSeqNum(uint16 max)
00614 { maxSeqNum = max; }
00615
00616 inline uint32
00617 getExtendedMaxSeqNum() const
00618 { return extendedMaxSeqNum; }
00619
00620 inline void
00621 setExtendedMaxSeqNum(uint32 seq)
00622 { extendedMaxSeqNum = seq; }
00623
00624 inline uint32 getCumulativePacketLost() const
00625 { return cumulativePacketLost; }
00626
00627 inline void setCumulativePacketLost(uint32 pl)
00628 { cumulativePacketLost = pl; }
00629
00630 inline uint8 getFractionLost() const
00631 { return fractionLost; }
00632
00633 inline void setFractionLost(uint8 fl)
00634 { fractionLost = fl; }
00635
00636 inline uint32 getLastPacketTransitTime()
00637 { return lastPacketTransitTime; }
00638
00639 inline void setLastPacketTransitTime(uint32 time)
00640 { lastPacketTransitTime = time; }
00641
00642 inline float getJitter() const
00643 { return jitter; }
00644
00645 inline void setJitter(float j)
00646 { jitter = j; }
00647
00648 inline uint32 getInitialDataTimestamp() const
00649 { return initialDataTimestamp; }
00650
00651 inline void setInitialDataTimestamp(uint32 ts)
00652 { initialDataTimestamp = ts; }
00653
00654 inline timeval getInitialDataTime() const
00655 { return initialDataTime; }
00656
00657 inline void setInitialDataTime(timeval it)
00658 { initialDataTime = it; }
00659
00667 bool getGoodbye()
00668 {
00669 if(!flag)
00670 return false;
00671 flag = false;
00672 return true;
00673 }
00674
00681 bool getHello() {
00682 if(flag)
00683 return false;
00684 flag = true;
00685 return true;
00686 }
00687
00688 inline uint32 getBadSeqNum() const
00689 { return badSeqNum; }
00690
00691 inline void setBadSeqNum(uint32 seq)
00692 { badSeqNum = seq; }
00693
00694 uint8 getProbation() const
00695 { return probation; }
00696
00697 inline void setProbation(uint8 p)
00698 { probation = p; }
00699
00700 inline void decProbation()
00701 { --probation; }
00702
00703 bool isValid() const
00704 { return 0 == probation; }
00705
00706 inline uint16 getBaseSeqNum() const
00707 { return baseSeqNum; }
00708
00709 inline uint32 getSeqNumAccum() const
00710 { return seqNumAccum; }
00711
00712 inline void incSeqNumAccum()
00713 { seqNumAccum += SEQNUMMOD; }
00714
00718 inline void initSequence(uint16 seqnum)
00719 { maxSeqNum = seqNumAccum = seqnum; }
00720
00731 void recordInsertion(const IncomingRTPPktLink& pl);
00732
00733 void initStats();
00734
00739 void computeStats();
00740
00741 MembershipBookkeeping* membership;
00742
00743 SyncSource* source;
00744
00745 IncomingRTPPktLink* first, * last;
00746
00747
00748 SyncSourceLink* prev, * next;
00749
00750 SyncSourceLink* nextCollis;
00751 ConflictingTransportAddress* prevConflict;
00752 unsigned char* senderInfo;
00753 unsigned char* receiverInfo;
00754
00755
00756 timeval lastPacketTime;
00757
00758 timeval lastRTCPPacketTime;
00759
00760
00761 timeval lastRTCPSRTime;
00762
00763
00764
00765 uint32 obsPacketCount;
00766
00767 uint32 obsOctetCount;
00768
00769 uint16 maxSeqNum;
00770 uint32 extendedMaxSeqNum;
00771 uint32 cumulativePacketLost;
00772 uint8 fractionLost;
00773
00774 uint32 lastPacketTransitTime;
00775
00776 float jitter;
00777 uint32 initialDataTimestamp;
00778 timeval initialDataTime;
00779
00780
00781
00782 bool flag;
00783
00784
00785 uint32 badSeqNum;
00786 uint8 probation;
00787 uint16 baseSeqNum;
00788 uint32 expectedPrior;
00789 uint32 receivedPrior;
00790 uint32 seqNumAccum;
00791 };
00792
00797 bool
00798 isRegistered(uint32 ssrc);
00799
00808 SyncSourceLink*
00809 getSourceBySSRC(uint32 ssrc, bool& created);
00810
00821 bool
00822 BYESource(uint32 ssrc);
00823
00831 bool
00832 removeSource(uint32 ssrc);
00833
00834 inline SyncSourceLink* getFirst()
00835 { return first; }
00836
00837 inline SyncSourceLink* getLast()
00838 { return last; }
00839
00840 inline uint32
00841 getMembersCount()
00842 { return Members::getMembersCount(); }
00843
00844 inline void
00845 setMembersCount(uint32 n)
00846 { Members::setMembersCount(n); }
00847
00848 inline uint32
00849 getSendersCount()
00850 { return Members::getSendersCount(); }
00851
00852 static const size_t defaultMembersHashSize;
00853 static const uint32 SEQNUMMOD;
00854
00855 private:
00856 MembershipBookkeeping(const MembershipBookkeeping &o);
00857
00858 MembershipBookkeeping&
00859 operator=(const MembershipBookkeeping &o);
00860
00865 void
00866 endMembers();
00867
00868
00869 uint32 sourceBucketsNum;
00870 SyncSourceLink** sourceLinks;
00871
00872 SyncSourceLink* first, * last;
00873 };
00874
00881 class __EXPORT IncomingDataQueue: public IncomingDataQueueBase,
00882 protected MembershipBookkeeping
00883 {
00884 public:
00890 class SyncSourcesIterator
00891 {
00892 public:
00893 typedef std::forward_iterator_tag iterator_category;
00894 typedef SyncSource value_type;
00895 typedef std::ptrdiff_t difference_type;
00896 typedef const SyncSource* pointer;
00897 typedef const SyncSource& reference;
00898
00899 SyncSourcesIterator(SyncSourceLink* l = NULL) :
00900 link(l)
00901 { }
00902
00903 SyncSourcesIterator(const SyncSourcesIterator& si) :
00904 link(si.link)
00905 { }
00906
00907 reference operator*() const
00908 { return *(link->getSource()); }
00909
00910 pointer operator->() const
00911 { return link->getSource(); }
00912
00913 SyncSourcesIterator& operator++() {
00914 link = link->getNext();
00915 return *this;
00916 }
00917
00918 SyncSourcesIterator operator++(int) {
00919 SyncSourcesIterator result(*this);
00920 ++(*this);
00921 return result;
00922 }
00923
00924 friend bool operator==(const SyncSourcesIterator& l,
00925 const SyncSourcesIterator& r)
00926 { return l.link == r.link; }
00927
00928 friend bool operator!=(const SyncSourcesIterator& l,
00929 const SyncSourcesIterator& r)
00930 { return l.link != r.link; }
00931
00932 private:
00933 SyncSourceLink *link;
00934 };
00935
00936 SyncSourcesIterator begin()
00937 { return SyncSourcesIterator(MembershipBookkeeping::getFirst()); }
00938
00939 SyncSourcesIterator end()
00940 { return SyncSourcesIterator(NULL); }
00941
00951 const AppDataUnit*
00952 getData(uint32 stamp, const SyncSource* src = NULL);
00953
00954
00961 bool
00962 isWaiting(const SyncSource* src = NULL) const;
00963
00970 uint32
00971 getFirstTimestamp(const SyncSource* src = NULL) const;
00972
00995 void
00996 setMinValidPacketSequence(uint8 packets)
00997 { minValidPacketSequence = packets; }
00998
00999 uint8
01000 getDefaultMinValidPacketSequence() const
01001 { return defaultMinValidPacketSequence; }
01002
01007 uint8
01008 getMinValidPacketSequence() const
01009 { return minValidPacketSequence; }
01010
01011 void
01012 setMaxPacketMisorder(uint16 packets)
01013 { maxPacketMisorder = packets; }
01014
01015 uint16
01016 getDefaultMaxPacketMisorder() const
01017 { return defaultMaxPacketMisorder; }
01018
01019 uint16
01020 getMaxPacketMisorder() const
01021 { return maxPacketMisorder; }
01022
01028 void
01029 setMaxPacketDropout(uint16 packets)
01030 { maxPacketDropout = packets; }
01031
01032 uint16
01033 getDefaultMaxPacketDropout() const
01034 { return defaultMaxPacketDropout; }
01035
01036 uint16
01037 getMaxPacketDropout() const
01038 { return maxPacketDropout; }
01039
01040
01041
01042 inline static size_t
01043 getDefaultMembersSize()
01044 { return defaultMembersSize; }
01045
01054 void
01055 setInQueueCryptoContext(CryptoContext* cc);
01056
01067 void
01068 removeInQueueCryptoContext(CryptoContext* cc);
01069
01077 CryptoContext*
01078 getInQueueCryptoContext(uint32 ssrc);
01079
01080 protected:
01084 IncomingDataQueue(uint32 size);
01085
01086 virtual ~IncomingDataQueue()
01087 { }
01088
01101 bool checkSSRCInIncomingRTPPkt(SyncSourceLink& sourceLink,
01102 bool is_new, InetAddress& na,
01103 tpport_t tp);
01104
01120 void setSourceExpirationPeriod(uint8 intervals)
01121 { sourceExpirationPeriod = intervals; }
01122
01129 virtual size_t
01130 takeInDataPacket();
01131
01132 void renewLocalSSRC();
01133
01143 IncomingDataQueue::IncomingRTPPktLink*
01144 getWaiting(uint32 timestamp, const SyncSource *src = NULL);
01145
01161 bool
01162 recordReception(SyncSourceLink& srcLink, const IncomingRTPPkt& pkt,
01163 const timeval recvtime);
01164
01171 void
01172 recordExtraction(const IncomingRTPPkt& pkt);
01173
01174 void purgeIncomingQueue();
01175
01182 inline virtual void
01183 onNewSyncSource(const SyncSource&)
01184 { }
01185
01186 protected:
01203 inline virtual bool
01204 onRTPPacketRecv(IncomingRTPPkt&)
01205 { return true; }
01206
01215 inline virtual void onExpireRecv(IncomingRTPPkt&)
01216 { return; }
01217
01231 inline virtual bool
01232 onSRTPPacketError(IncomingRTPPkt& pkt, int32 errorCode)
01233 { return false; }
01234
01235 inline virtual bool
01236 end2EndDelayed(IncomingRTPPktLink&)
01237 { return false; }
01238
01254 bool
01255 insertRecvPacket(IncomingRTPPktLink* packetLink);
01256
01268 virtual size_t
01269 recvData(unsigned char* buffer, size_t length,
01270 InetHostAddress& host, tpport_t& port) = 0;
01271
01272 virtual size_t
01273 getNextDataPacketSize() const = 0;
01274
01275 mutable ThreadLock recvLock;
01276
01277 IncomingRTPPktLink* recvFirst, * recvLast;
01278
01279 static const uint8 defaultMinValidPacketSequence;
01280 static const uint16 defaultMaxPacketMisorder;
01281 static const uint16 defaultMaxPacketDropout;
01282 uint8 minValidPacketSequence;
01283 uint16 maxPacketMisorder;
01284 uint16 maxPacketDropout;
01285 static const size_t defaultMembersSize;
01286 uint8 sourceExpirationPeriod;
01287 mutable Mutex cryptoMutex;
01288 std::list<CryptoContext *> cryptoContexts;
01289 };
01290
01292
01293 END_NAMESPACE
01294
01295 #endif //CCXX_RTP_IQUEUE_H_
01296