1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import os
19 from collections import deque
20
21 from Crypto.Cipher import AES
22
23 from twisted.internet import reactor
24 from flumotion.component.common.streamer.fragmentedresource import\
25 FragmentNotAvailable, FragmentNotFound, PlaylistNotFound, KeyNotFound
26
27
29 """
30 I write HTTP Live Streaming playlists based on added fragments.
31 """
32
34 self._hostname = ''
35 self.mainPlaylist = ''
36 self.streamPlaylist = ''
37 self.streamBitrate = 0
38 self.title = ''
39 self.fragmentPrefix = ''
40 self.newFragmentTolerance = 0
41 self.window = 0
42 self.keysURI = ''
43 self.filenameExt = 'webm'
44
45 self.allowCache = True
46 self._fragments = []
47 self._dummyFragments = []
48 self._counter = 0
49 self._isAutoUpdate = False
50
52 if hostname.startswith('/'):
53 hostname = hostname[1:]
54 if not hostname.endswith('/'):
55 hostname = hostname + '/'
56 if not hostname.startswith('http://'):
57 hostname = 'http://' + hostname
58 self._hostname = hostname
59
61 self.allowCache = allowed
62
64 return '%s-%s.%s' % (self.fragmentPrefix, sequenceNumber,
65 self.filenameExt)
66
68 sorted_list = sorted(self._fragments, key=lambda fragment: fragment[1])
69 return int(sorted_list[0][1])
70
76
78
79 if not sequenceNumber in [frag[0] for frag in self._fragments]:
80
81 self._fragments.append((sequenceNumber, duration, encrypted,
82 sequenceNumber != self._counter and self._counter != 0))
83 self._counter = sequenceNumber + 1
84
85 while len(self._fragments) > self.window:
86
87 fragName = self._getFragmentName(self._fragments[0][0])
88 if fragName in self._dummyFragments:
89 self._dummyFragments.remove(fragName)
90 del self._fragments[0]
91
92
93
94 if self.newFragmentTolerance != 0:
95 reactor.callLater(self._isAutoUpdate and
96 duration or duration * (1 + self.newFragmentTolerance),
97 self._autoUpdate, self._counter)
98 self._isAutoUpdate= False
99
100 return self._getFragmentName(sequenceNumber)
101
103 if 'FLUREQID' in args:
104 del args['FLUREQID']
105 if len(args) == 0:
106 return ''
107
108 return '?' + '&'.join(["%s=%s" % (k, v[0]) for k, v in
109 args.iteritems()])
110
111 - def _renderMainPlaylist(self, args):
112 lines = []
113
114 lines.append("#EXTM3U")
115
116 lines.append("#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=%s" %
117 self.streamBitrate)
118 lines.append("".join([self._hostname, self.streamPlaylist,
119 self.renderArgs(args)]))
120 lines.append("")
121
122 return "\n".join(lines)
123
125 lines = []
126
127 lines.append("#EXTM3U")
128 lines.append("#EXT-X-ALLOW-CACHE:%s" %
129 (self.allowCache and 'YES' or 'NO'))
130 lines.append("#EXT-X-TARGETDURATION:%d" % self._getTargetDuration())
131 lines.append("#EXT-X-MEDIA-SEQUENCE:%s" % self._fragments[0][0])
132
133 for sequenceNumber, duration, encrypted, discon in self._fragments:
134 if discon:
135 lines.append("#EXT-X-DISCONTINUITY")
136
137 if encrypted:
138 lines.append('#EXT-X-KEY:METHOD=AES-128,URI="%s?key=%s"' %
139 (self.keysURI, fragment))
140 lines.append("#EXTINF:%d,%s" % (duration, self.title))
141 lines.append(''.join([self._hostname,
142 self._getFragmentName(sequenceNumber), self.renderArgs(args)]))
143
144 lines.append("")
145
146 return "\n".join(lines)
147
158
159
161 '''
162 I hold a ring with the fragments available in the playlist
163 and update the playlist according to this.
164 '''
165
166 BLOCK_SIZE = 16
167 PADDING = '0'
168
169 - def __init__(self, mainPlaylist, streamPlaylist,
170 streamBitrate=300000, title='', fragmentPrefix='fragment',
171 newFragTolerance = 0, window=5, maxExtraBuffers=None,
172 keyInterval=0, keysURI=None):
173 '''
174 @param mainPlaylist: resource name of the main playlist
175 @type mainPlaylist: str
176 @param streamPlaylists: resource names of the playlists
177 @type streamPlaylist: str
178 @param streamBitrate: Bitrate of the stream in bps
179 @type streamBitrate: int
180 @param title: description of the stream
181 @type title: str
182 @param fragmentPrefix: fragment name prefix
183 @type fragmentPrefix: str
184 @param newFragTolerance:Tolerance to automatically add new fragments.
185 @type newFragTolerance:float
186 @param window: maximum number of fragments to buffer
187 @type window: int
188 @param maxExtraBuffers: maximum number of extra fragments to buffer
189 @type maxExtraBuffers: int
190 @param keyInterval: number of fragments sharing the same encryption
191 key. O if not using encryption
192 @type keyInterval: int
193 @param keysURI URI used to retrieve the encription keys
194 @type keysURI str
195 '''
196
197 Playlister.__init__(self)
198 self.mainPlaylist = mainPlaylist
199 self.streamPlaylist = streamPlaylist
200 self.streamBitrate = streamBitrate
201 self.title = title
202 self.fragmentPrefix = fragmentPrefix
203 self.newFragmentTolerance = newFragTolerance
204 self.window = window
205 if maxExtraBuffers is None:
206 self.maxBuffers = 2 * window +1
207 else:
208 self.maxBuffers = window + maxExtraBuffers
209 self.keyInterval = keyInterval
210 self.keysURI = keysURI or self._hostname
211 self._encrypted = (keyInterval != 0)
212 self._fragmentsDict = {}
213 self._keysDict = {}
214 self._secret = ''
215 self._availableFragments = deque('')
216 self._lastSequence = None
217
219
220 right_pad = lambda s: s + (self.BLOCK_SIZE -len(s) % self.BLOCK_SIZE)\
221 * self.PADDING
222 left_pad = lambda s: (self.BLOCK_SIZE -len(s) % self.BLOCK_SIZE)\
223 * self.PADDING + s
224 EncodeAES = lambda c, s: c.encrypt(right_pad(s))
225
226 cipher = AES.new(secret, AES.MODE_CBC, left_pad(str(IV)))
227 return EncodeAES(cipher, fragment)
228
230 self._fragmentsDict = {}
231 self._keysDict = {}
232 self._secret = ''
233 self._availableFragments = deque('')
234 self._fragments = []
235 self._dummyFragments = []
236 self._lastSequence = None
237 self._counter = 0
238
239 - def addFragment(self, fragment, sequenceNumber, duration):
240 '''
241 Adds a fragment to the ring and updates the playlist.
242 If the ring is full, removes the oldest fragment.
243
244 @param fragment: mpegts raw fragment
245 @type fragment: array
246 @param sequenceNumber: sequence number relative to the stream's start
247 @type sequenceNumber: int
248 @param duration: duration of the the segment in seconds
249 @type duration: int
250
251 @return: the name used in the playlist for the fragment
252 @rtype : str
253 '''
254
255
256
257 fragmentName = self._addPlaylistFragment(sequenceNumber, duration,
258 self._encrypted)
259
260 if fragmentName in self._availableFragments:
261 return
262 self._lastSequence = sequenceNumber
263
264
265 while len(self._fragmentsDict) >= self.maxBuffers:
266 pop = self._availableFragments.popleft()
267 del self._fragmentsDict[pop]
268 if pop in self._keysDict:
269 del self._keysDict[pop]
270
271 self._availableFragments.append(fragmentName)
272 if self._encrypted:
273 if sequenceNumber % self.keyInterval == 0:
274 self._secret = os.urandom(self.BLOCK_SIZE)
275 fragment = self._encryptFragment(fragment, self._secret,
276 sequenceNumber)
277 self._keysDict[fragmentName] = self._secret
278 self._fragmentsDict[fragmentName] = fragment
279 return fragmentName
280
282 '''
283 Returns a fragment of the playlist or raises an Exception
284 if the fragment is not found
285
286 @param fragmentName: name of the fragment to retrieve
287 @type fragmentName: str
288
289 @return: an mpegts raw fragment
290 @rtype: array
291 '''
292
293 if fragmentName in self._fragmentsDict:
294 return self._fragmentsDict[fragmentName]
295 if fragmentName in self._dummyFragments:
296 raise FragmentNotAvailable()
297 raise FragmentNotFound()
298
300 '''
301 Returns an encryption key from the keys dict or raises an
302 Exception if the key is not found
303
304 @param key: name of the key to retrieve
305 @type key: str
306
307 @return: the encryption key
308 @rtype: str
309 '''
310
311 if key in self._keysDict:
312 return self._keysDict[key]
313 raise KeyNotFound()
314