Package flumotion :: Package component :: Package producers :: Package playlist :: Module playlistparser
[hide private]

Source Code for Module flumotion.component.producers.playlist.playlistparser

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import gst 
 19  from gst.extend import discoverer 
 20   
 21  import time 
 22  import calendar 
 23  from StringIO import StringIO 
 24   
 25  from xml.dom import Node 
 26   
 27  from twisted.internet import reactor 
 28   
 29  from flumotion.common import log, fxml 
 30   
 31  __version__ = "$Rev$" 
 32   
 33   
34 -class PlaylistItem(object, log.Loggable):
35
36 - def __init__(self, piid, timestamp, uri, offset, duration):
37 self.id = piid 38 self.timestamp = timestamp 39 self.uri = uri 40 self.offset = offset 41 self.duration = duration 42 43 self.hasAudio = True 44 self.hasVideo = True 45 46 self.next = None 47 self.prev = None
48 49
50 -class Playlist(object, log.Loggable):
51 logCategory = 'playlist-list' 52
53 - def __init__(self, producer):
54 """ 55 Create an initially empty playlist 56 """ 57 self.items = None # PlaylistItem linked list 58 self._itemsById = {} 59 60 self.producer = producer
61
62 - def _findItem(self, timePosition):
63 # timePosition is the position in terms of the clock time 64 # Get the item that corresponds to timePosition, or None 65 cur = self.items 66 while cur: 67 if cur.timestamp < timePosition and \ 68 cur.timestamp + cur.duration > timePosition: 69 return cur 70 if cur.timestamp > timePosition: 71 return None # fail without having to iterate over everything 72 cur = cur.next 73 return None
74
75 - def _getCurrentItem(self):
76 position = self.producer.pipeline.get_clock().get_time() 77 item = self._findItem(position) 78 self.debug("Item %r found as current for playback position %d", 79 item, position) 80 return item
81
82 - def removeItems(self, piid):
83 current = self._getCurrentItem() 84 85 if piid not in self._itemsById: 86 return 87 88 items = self._itemsById[piid] 89 for item in items: 90 self.debug("removeItems: item %r ts: %d", item, item.timestamp) 91 if current: 92 self.debug("current ts: %d current dur: %d", 93 current.timestamp, current.duration) 94 if (current and item.timestamp < current.timestamp + 95 current.duration): 96 self.debug("Not removing current item!") 97 continue 98 self.unlinkItem(item) 99 self.producer.unscheduleItem(item) 100 101 del self._itemsById[piid]
102
103 - def addItem(self, piid, timestamp, uri, offset, duration, 104 hasAudio, hasVideo):
105 """ 106 Add an item to the playlist. 107 108 This may remove overlapping entries, or adjust timestamps/durations of 109 entries to make the new one fit. 110 """ 111 current = self._getCurrentItem() 112 if current and timestamp < current.timestamp + current.duration: 113 self.warning("New object at uri %s starts during current object, " 114 "cannot add") 115 return None 116 # We don't care about anything older than now; drop references to them 117 if current: 118 self.items = current 119 120 newitem = PlaylistItem(piid, timestamp, uri, offset, duration) 121 newitem.hasAudio = hasAudio 122 newitem.hasVideo = hasVideo 123 124 if piid in self._itemsById: 125 self._itemsById[piid].append(newitem) 126 else: 127 self._itemsById[piid] = [newitem] 128 129 # prev starts strictly before the new item 130 # next starts after the new item, and ends after the 131 # end of the new item 132 prevItem = nextItem = None 133 item = self.items 134 while item: 135 if item.timestamp < newitem.timestamp: 136 prevItem = item 137 else: 138 break 139 item = item.next 140 141 if prevItem: 142 item = prevItem.next 143 while item: 144 if (item.timestamp > newitem.timestamp and 145 item.timestamp + item.duration > 146 newitem.timestamp + newitem.duration): 147 nextItem = item 148 break 149 item = item.next 150 151 if prevItem: 152 # Then things between prev and next (next might be None) are to be 153 # deleted. Do so. 154 cur = prevItem.next 155 while cur != nextItem: 156 self._itemsById[cur.id].remove(cur) 157 if not self._itemsById[cur.id]: 158 del self._itemsById[cur.id] 159 self.producer.unscheduleItem(cur) 160 cur = cur.next 161 162 # update links. 163 if prevItem: 164 prevItem.next = newitem 165 newitem.prev = prevItem 166 else: 167 self.items = newitem 168 169 if nextItem: 170 newitem.next = nextItem 171 nextItem.prev = newitem 172 173 # Duration adjustments -> Reflect into gnonlin timeline 174 if prevItem and \ 175 prevItem.timestamp + prevItem.duration > newitem.timestamp: 176 self.debug("Changing duration of previous item from %d to %d", 177 prevItem.duration, newitem.timestamp - prevItem.timestamp) 178 prevItem.duration = newitem.timestamp - prevItem.timestamp 179 self.producer.adjustItemScheduling(prevItem) 180 181 if nextItem and \ 182 newitem.timestamp + newitem.duration > nextItem.timestamp: 183 self.debug("Changing timestamp of next item from %d to %d to fit", 184 newitem.timestamp, newitem.timestamp + newitem.duration) 185 ts = newitem.timestamp + newitem.duration 186 duration = nextItem.duration - (ts - nextItem.timestamp) 187 nextItem.duration = duration 188 nextItem.timestamp = ts 189 self.producer.adjustItemScheduling(nextItem) 190 191 # Then we need to actually add newitem into the gnonlin timeline 192 if not self.producer.scheduleItem(newitem): 193 self.debug("Failed to schedule item, unlinking") 194 # Failed to schedule it. 195 self.unlinkItem(newitem) 196 return None 197 198 return newitem
199
200 - def unlinkItem(self, item):
201 if item.prev: 202 item.prev.next = item.next 203 else: 204 self.items = item.next 205 206 if item.next: 207 item.next.prev = item.prev
208 209
210 -class PlaylistParser(object, log.Loggable):
211 logCategory = 'playlist-parse' 212
213 - def __init__(self, playlist):
214 self.playlist = playlist 215 216 self._pending_items = [] 217 self._discovering = False 218 self._discovering_blocked = 0 219 220 self._baseDirectory = None
221
222 - def setBaseDirectory(self, baseDir):
223 if not baseDir.endswith('/'): 224 baseDir = baseDir + '/' 225 self._baseDirectory = baseDir
226
227 - def blockDiscovery(self):
228 """ 229 Prevent playlist parser from running discoverer on any pending 230 playlist entries. Multiple subsequent invocations will require 231 the same corresponding number of calls to L{unblockDiscovery} 232 to resume discovery. 233 """ 234 self._discovering_blocked += 1 235 self.debug(' blocking discovery: %d' % self._discovering_blocked)
236
237 - def unblockDiscovery(self):
238 """ 239 Resume discovering of any pending playlist entries. If 240 L{blockDiscovery} was called multiple times multiple 241 invocations of unblockDiscovery will be required to unblock 242 the discoverer. 243 """ 244 if self._discovering_blocked > 0: 245 self._discovering_blocked -= 1 246 self.debug('unblocking discovery: %d' % self._discovering_blocked) 247 if self._discovering_blocked < 1: 248 self.startDiscovery()
249
250 - def startDiscovery(self, doSort=True):
251 """ 252 Initiate discovery of any pending playlist entries. 253 254 @param doSort: should the pending entries be ordered 255 chronologically before initiating discovery 256 @type doSort: bool 257 """ 258 self.log('startDiscovery: discovering: %s, block: %d, pending: %d' % 259 (self._discovering, self._discovering_blocked, 260 len(self._pending_items))) 261 if not self._discovering and self._discovering_blocked < 1 \ 262 and self._pending_items: 263 if doSort: 264 self._sortPending() 265 self._discoverPending()
266
267 - def _sortPending(self):
268 self.debug('sort pending: %d' % len(self._pending_items)) 269 if not self._pending_items: 270 return 271 sortlist = [(elt[1], elt) for elt in self._pending_items] 272 sortlist.sort() 273 self._pending_items = [elt for (ts, elt) in sortlist]
274
275 - def _discoverPending(self):
276 277 def _discovered(disc, is_media): 278 self.debug("Discovered! is media: %d mime type %s", is_media, 279 disc.mimetype) 280 reactor.callFromThread(_discoverer_done, disc, is_media)
281 282 def _discoverer_done(disc, is_media): 283 if is_media: 284 self.debug("Discovery complete, media found") 285 # FIXME: does item exist because it is popped below ? 286 # if so, that's ugly and non-obvious and should be fixed 287 uri = "file://" + item[0] 288 timestamp = item[1] 289 duration = item[2] 290 offset = item[3] 291 piid = item[4] 292 293 hasA = disc.is_audio 294 hasV = disc.is_video 295 durationDiscovered = 0 296 if hasA and hasV: 297 durationDiscovered = min(disc.audiolength, 298 disc.videolength) 299 elif hasA: 300 durationDiscovered = disc.audiolength 301 elif hasV: 302 durationDiscovered = disc.videolength 303 if not duration or duration > durationDiscovered: 304 duration = durationDiscovered 305 306 if duration + offset > durationDiscovered: 307 offset = 0 308 309 if duration > 0: 310 self.playlist.addItem(piid, timestamp, uri, 311 offset, duration, hasA, hasV) 312 else: 313 self.warning("Duration of item is zero, not adding") 314 else: 315 self.warning("Discover failed to find media in %s", item[0]) 316 317 # We don't want to burn too much cpu discovering all the files; 318 # this throttles the discovery rate to a reasonable level 319 self.debug("Continuing on to next file in one second") 320 reactor.callLater(1, self._discoverPending)
321 322 if not self._pending_items: 323 self.debug("No more files to discover") 324 self._discovering = False 325 return 326 327 if self._discovering_blocked > 0: 328 self.debug("Discovering blocked: %d" % self._discovering_blocked) 329 self._discovering = False 330 return 331 332 self._discovering = True 333 334 item = self._pending_items.pop(0) 335 336 self.debug("Discovering file %s", item[0]) 337 disc = discoverer.Discoverer(item[0]) 338 339 disc.connect('discovered', _discovered) 340 disc.discover() 341
342 - def addItemToPlaylist(self, filename, timestamp, duration, offset, piid):
343 # We only want to add it if it's plausibly schedulable. 344 end = timestamp 345 if duration is not None: 346 end += duration 347 if end < time.time() * gst.SECOND: 348 self.debug("Early-out: ignoring add for item in past") 349 return 350 351 if filename[0] != '/' and self._baseDirectory: 352 filename = self._baseDirectory + filename 353 354 self._pending_items.append((filename, timestamp, duration, offset, 355 piid)) 356 357 # Now launch the discoverer for any pending items 358 self.startDiscovery()
359 360
361 -class PlaylistXMLParser(PlaylistParser):
362 logCategory = 'playlist-xml' 363
364 - def parseData(self, data):
365 """ 366 Parse playlist XML document data 367 """ 368 fileHandle = StringIO(data) 369 self.parseFile(fileHandle)
370
371 - def replaceFile(self, file, piid):
372 self.playlist.removeItems(piid) 373 self.parseFile(file, piid)
374
375 - def parseFile(self, file, piid=None):
376 """ 377 Parse a playlist file. Adds the contents of the file to the existing 378 playlist, overwriting any existing entries for the same time period. 379 """ 380 parser = fxml.Parser() 381 382 root = parser.getRoot(file) 383 384 node = root.documentElement 385 self.debug("Parsing playlist from file %s", file) 386 if node.nodeName != 'playlist': 387 raise fxml.ParserError("Root node is not 'playlist'") 388 389 self.blockDiscovery() 390 try: 391 for child in node.childNodes: 392 if child.nodeType == Node.ELEMENT_NODE and \ 393 child.nodeName == 'entry': 394 self.debug("Parsing entry") 395 self._parsePlaylistEntry(parser, child, piid) 396 finally: 397 self.unblockDiscovery()
398 399 # A simplified private version of this code from fxml without the 400 # undesirable unicode->str conversions. 401
402 - def _parseAttributes(self, node, required, optional):
403 out = [] 404 for k in required: 405 if node.hasAttribute(k): 406 out.append(node.getAttribute(k)) 407 else: 408 raise fxml.ParserError("Missing required attribute %s" % k) 409 410 for k in optional: 411 if node.hasAttribute(k): 412 out.append(node.getAttribute(k)) 413 else: 414 out.append(None) 415 return out
416
417 - def _parsePlaylistEntry(self, parser, entry, piid):
418 mandatory = ['filename', 'time'] 419 optional = ['duration', 'offset'] 420 421 (filename, timestamp, duration, offset) = self._parseAttributes( 422 entry, mandatory, optional) 423 424 if duration is not None: 425 duration = int(float(duration) * gst.SECOND) 426 if offset is None: 427 offset = 0 428 offset = int(offset) * gst.SECOND 429 430 timestamp = self._parseTimestamp(timestamp) 431 432 # Assume UTF-8 filesystem. 433 filename = filename.encode("UTF-8") 434 435 self.addItemToPlaylist(filename, timestamp, duration, offset, piid)
436
437 - def _parseTimestamp(self, ts):
438 # Take TS in YYYY-MM-DDThh:mm:ss.ssZ format, return timestamp in 439 # nanoseconds since the epoch 440 441 # time.strptime() doesn't handle the fractional seconds part. We ignore 442 # it entirely, after verifying that it has the right format. 443 tsmain, trailing = ts[:-4], ts[-4:] 444 if trailing[0] != '.' or trailing[3] != 'Z' or \ 445 not trailing[1].isdigit() or not trailing[2].isdigit(): 446 raise fxml.ParserError("Invalid timestamp %s" % ts) 447 formatString = "%Y-%m-%dT%H:%M:%S" 448 449 try: 450 timestruct = time.strptime(tsmain, formatString) 451 return int(calendar.timegm(timestruct) * gst.SECOND) 452 except ValueError: 453 raise fxml.ParserError("Invalid timestamp %s" % ts)
454