Package flumotion :: Package component :: Package consumers :: Package hlsstreamer :: Module hlsring
[hide private]

Source Code for Module flumotion.component.consumers.hlsstreamer.hlsring

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import os 
 19  from collections import deque 
 20   
 21  from Crypto.Cipher import AES 
 22   
 23  from twisted.internet import reactor 
 24  from flumotion.component.common.streamer.fragmentedresource import\ 
 25      FragmentNotAvailable, FragmentNotFound, PlaylistNotFound, KeyNotFound 
 26   
 27   
28 -class Playlister:
29 """ 30 I write HTTP Live Streaming playlists based on added fragments. 31 """ 32
33 - def __init__(self):
34 self._hostname = '' 35 self.mainPlaylist = '' 36 self.streamPlaylist = '' 37 self.streamBitrate = 0 38 self.title = '' 39 self.fragmentPrefix = '' 40 self.newFragmentTolerance = 0 41 self.window = 0 42 self.keysURI = '' 43 self.filenameExt = 'webm' 44 #FIXME: Make it a property 45 self.allowCache = True 46 self._fragments = [] 47 self._dummyFragments = [] 48 self._counter = 0 49 self._isAutoUpdate = False
50
51 - def setHostname(self, hostname):
52 if hostname.startswith('/'): 53 hostname = hostname[1:] 54 if not hostname.endswith('/'): 55 hostname = hostname + '/' 56 if not hostname.startswith('http://'): 57 hostname = 'http://' + hostname 58 self._hostname = hostname
59
60 - def setAllowCache(self, allowed):
61 self.allowCache = allowed
62
63 - def _getFragmentName(self, sequenceNumber):
64 return '%s-%s.%s' % (self.fragmentPrefix, sequenceNumber, 65 self.filenameExt)
66
67 - def _getTargetDuration(self):
68 sorted_list = sorted(self._fragments, key=lambda fragment: fragment[1]) 69 return int(sorted_list[0][1])
70
71 - def _autoUpdate(self, count):
72 if self._counter == count: 73 self._isAutoUpdate = True 74 self._dummyFragments.append(self._getFragmentName(count)) 75 self._addPlaylistFragment(count, self._duration, False)
76
77 - def _addPlaylistFragment(self, sequenceNumber, duration, encrypted):
78 # Add the fragment to the playlist if it wasn't added before 79 if not sequenceNumber in [frag[0] for frag in self._fragments]: 80 # Add a discontinuity if the sequenceNumber is not the expected 81 self._fragments.append((sequenceNumber, duration, encrypted, 82 sequenceNumber != self._counter and self._counter != 0)) 83 self._counter = sequenceNumber + 1 84 # Remove fragments that are out of the window 85 while len(self._fragments) > self.window: 86 # If it's a dummy fragment, remove it from the list too 87 fragName = self._getFragmentName(self._fragments[0][0]) 88 if fragName in self._dummyFragments: 89 self._dummyFragments.remove(fragName) 90 del self._fragments[0] 91 92 # Auto update the playlist when the next fragment was not added 93 # If the fragment was automatically added update again after 'duration' 94 if self.newFragmentTolerance != 0: 95 reactor.callLater(self._isAutoUpdate and 96 duration or duration * (1 + self.newFragmentTolerance), 97 self._autoUpdate, self._counter) 98 self._isAutoUpdate= False 99 100 return self._getFragmentName(sequenceNumber)
101
102 - def renderArgs(self, args):
103 if 'FLUREQID' in args: 104 del args['FLUREQID'] 105 if len(args) == 0: 106 return '' 107 108 return '?' + '&'.join(["%s=%s" % (k, v[0]) for k, v in 109 args.iteritems()])
110
111 - def _renderMainPlaylist(self, args):
112 lines = [] 113 114 lines.append("#EXTM3U") 115 #The bandwith value is not significant for single bitrate 116 lines.append("#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=%s" % 117 self.streamBitrate) 118 lines.append("".join([self._hostname, self.streamPlaylist, 119 self.renderArgs(args)])) 120 lines.append("") 121 122 return "\n".join(lines)
123
124 - def _renderStreamPlaylist(self, args):
125 lines = [] 126 127 lines.append("#EXTM3U") 128 lines.append("#EXT-X-ALLOW-CACHE:%s" % 129 (self.allowCache and 'YES' or 'NO')) 130 lines.append("#EXT-X-TARGETDURATION:%d" % self._getTargetDuration()) 131 lines.append("#EXT-X-MEDIA-SEQUENCE:%s" % self._fragments[0][0]) 132 133 for sequenceNumber, duration, encrypted, discon in self._fragments: 134 if discon: 135 lines.append("#EXT-X-DISCONTINUITY") 136 # FIXME: Not fully implemented yet 137 if encrypted: 138 lines.append('#EXT-X-KEY:METHOD=AES-128,URI="%s?key=%s"' % 139 (self.keysURI, fragment)) 140 lines.append("#EXTINF:%d,%s" % (duration, self.title)) 141 lines.append(''.join([self._hostname, 142 self._getFragmentName(sequenceNumber), self.renderArgs(args)])) 143 144 lines.append("") 145 146 return "\n".join(lines)
147
148 - def renderPlaylist(self, playlist, args):
149 ''' 150 Returns a string representation of the requested playlist or raise 151 an Exception if the playlist is not found 152 ''' 153 if playlist == self.mainPlaylist: 154 return self._renderMainPlaylist(args) 155 elif playlist == self.streamPlaylist: 156 return self._renderStreamPlaylist(args) 157 raise PlaylistNotFound()
158 159
160 -class HLSRing(Playlister):
161 ''' 162 I hold a ring with the fragments available in the playlist 163 and update the playlist according to this. 164 ''' 165 166 BLOCK_SIZE = 16 167 PADDING = '0' 168
169 - def __init__(self, mainPlaylist, streamPlaylist, 170 streamBitrate=300000, title='', fragmentPrefix='fragment', 171 newFragTolerance = 0, window=5, maxExtraBuffers=None, 172 keyInterval=0, keysURI=None):
173 ''' 174 @param mainPlaylist: resource name of the main playlist 175 @type mainPlaylist: str 176 @param streamPlaylists: resource names of the playlists 177 @type streamPlaylist: str 178 @param streamBitrate: Bitrate of the stream in bps 179 @type streamBitrate: int 180 @param title: description of the stream 181 @type title: str 182 @param fragmentPrefix: fragment name prefix 183 @type fragmentPrefix: str 184 @param newFragTolerance:Tolerance to automatically add new fragments. 185 @type newFragTolerance:float 186 @param window: maximum number of fragments to buffer 187 @type window: int 188 @param maxExtraBuffers: maximum number of extra fragments to buffer 189 @type maxExtraBuffers: int 190 @param keyInterval: number of fragments sharing the same encryption 191 key. O if not using encryption 192 @type keyInterval: int 193 @param keysURI URI used to retrieve the encription keys 194 @type keysURI str 195 ''' 196 197 Playlister.__init__(self) 198 self.mainPlaylist = mainPlaylist 199 self.streamPlaylist = streamPlaylist 200 self.streamBitrate = streamBitrate 201 self.title = title 202 self.fragmentPrefix = fragmentPrefix 203 self.newFragmentTolerance = newFragTolerance 204 self.window = window 205 if maxExtraBuffers is None: 206 self.maxBuffers = 2 * window +1 207 else: 208 self.maxBuffers = window + maxExtraBuffers 209 self.keyInterval = keyInterval 210 self.keysURI = keysURI or self._hostname 211 self._encrypted = (keyInterval != 0) 212 self._fragmentsDict = {} 213 self._keysDict = {} 214 self._secret = '' 215 self._availableFragments = deque('') 216 self._lastSequence = None
217
218 - def _encryptFragment(self, fragment, secret, IV):
219 # FIXME: Not tested 220 right_pad = lambda s: s + (self.BLOCK_SIZE -len(s) % self.BLOCK_SIZE)\ 221 * self.PADDING 222 left_pad = lambda s: (self.BLOCK_SIZE -len(s) % self.BLOCK_SIZE)\ 223 * self.PADDING + s 224 EncodeAES = lambda c, s: c.encrypt(right_pad(s)) 225 226 cipher = AES.new(secret, AES.MODE_CBC, left_pad(str(IV))) 227 return EncodeAES(cipher, fragment)
228
229 - def reset(self):
230 self._fragmentsDict = {} 231 self._keysDict = {} 232 self._secret = '' 233 self._availableFragments = deque('') 234 self._fragments = [] 235 self._dummyFragments = [] 236 self._lastSequence = None 237 self._counter = 0
238
239 - def addFragment(self, fragment, sequenceNumber, duration):
240 ''' 241 Adds a fragment to the ring and updates the playlist. 242 If the ring is full, removes the oldest fragment. 243 244 @param fragment: mpegts raw fragment 245 @type fragment: array 246 @param sequenceNumber: sequence number relative to the stream's start 247 @type sequenceNumber: int 248 @param duration: duration of the the segment in seconds 249 @type duration: int 250 251 @return: the name used in the playlist for the fragment 252 @rtype : str 253 ''' 254 255 # We only care about the name used in the playlist, we let the 256 # playlister name it using an appropiate extension 257 fragmentName = self._addPlaylistFragment(sequenceNumber, duration, 258 self._encrypted) 259 # Don't add duplicated fragments 260 if fragmentName in self._availableFragments: 261 return 262 self._lastSequence = sequenceNumber 263 264 # If the ring is full, delete the oldest segment. 265 while len(self._fragmentsDict) >= self.maxBuffers: 266 pop = self._availableFragments.popleft() 267 del self._fragmentsDict[pop] 268 if pop in self._keysDict: 269 del self._keysDict[pop] 270 271 self._availableFragments.append(fragmentName) 272 if self._encrypted: 273 if sequenceNumber % self.keyInterval == 0: 274 self._secret = os.urandom(self.BLOCK_SIZE) 275 fragment = self._encryptFragment(fragment, self._secret, 276 sequenceNumber) 277 self._keysDict[fragmentName] = self._secret 278 self._fragmentsDict[fragmentName] = fragment 279 return fragmentName
280
281 - def getFragment(self, fragmentName):
282 ''' 283 Returns a fragment of the playlist or raises an Exception 284 if the fragment is not found 285 286 @param fragmentName: name of the fragment to retrieve 287 @type fragmentName: str 288 289 @return: an mpegts raw fragment 290 @rtype: array 291 ''' 292 293 if fragmentName in self._fragmentsDict: 294 return self._fragmentsDict[fragmentName] 295 if fragmentName in self._dummyFragments: 296 raise FragmentNotAvailable() 297 raise FragmentNotFound()
298
299 - def getEncryptionKey(self, key):
300 ''' 301 Returns an encryption key from the keys dict or raises an 302 Exception if the key is not found 303 304 @param key: name of the key to retrieve 305 @type key: str 306 307 @return: the encryption key 308 @rtype: str 309 ''' 310 311 if key in self._keysDict: 312 return self._keysDict[key] 313 raise KeyNotFound()
314