Package flumotion :: Package component :: Package producers :: Package playlist :: Module playlist
[hide private]

Source Code for Module flumotion.component.producers.playlist.playlist

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import time 
 19   
 20  import gst 
 21  from twisted.internet import defer, reactor 
 22   
 23  from flumotion.common import messages, fxml, gstreamer, documentation 
 24  from flumotion.common.i18n import N_, gettexter 
 25  from flumotion.component import feedcomponent 
 26  from flumotion.component.base import watcher 
 27   
 28  import smartscale 
 29  import singledecodebin 
 30  import playlistparser 
 31   
 32  __version__ = "$Rev$" 
 33  T_ = gettexter() 
 34   
 35   
36 -def _tsToString(ts):
37 """ 38 Return a string in local time from a gstreamer timestamp value 39 """ 40 return time.ctime(ts/gst.SECOND)
41 42
43 -def videotest_gnl_src(name, start, duration, priority, pattern=None):
44 src = gst.element_factory_make('videotestsrc') 45 if pattern: 46 src.props.pattern = pattern 47 else: 48 # Set videotestsrc to all black. 49 src.props.pattern = 2 50 gnlsrc = gst.element_factory_make('gnlsource', name) 51 gnlsrc.props.start = start 52 gnlsrc.props.duration = duration 53 gnlsrc.props.media_start = 0 54 gnlsrc.props.media_duration = duration 55 gnlsrc.props.priority = priority 56 gnlsrc.add(src) 57 58 return gnlsrc
59 60
61 -def audiotest_gnl_src(name, start, duration, priority, wave=None):
62 src = gst.element_factory_make('audiotestsrc') 63 if wave: 64 src.props.wave = wave 65 else: 66 # Set audiotestsrc to use silence. 67 src.props.wave = 4 68 gnlsrc = gst.element_factory_make('gnlsource', name) 69 gnlsrc.props.start = start 70 gnlsrc.props.duration = duration 71 gnlsrc.props.media_start = 0 72 gnlsrc.props.media_duration = duration 73 gnlsrc.props.priority = priority 74 gnlsrc.add(src) 75 76 return gnlsrc
77 78
79 -def file_gnl_src(name, uri, caps, start, duration, offset, priority):
80 src = singledecodebin.SingleDecodeBin(caps, uri) 81 gnlsrc = gst.element_factory_make('gnlsource', name) 82 gnlsrc.props.start = start 83 gnlsrc.props.duration = duration 84 gnlsrc.props.media_start = offset 85 gnlsrc.props.media_duration = duration 86 gnlsrc.props.priority = priority 87 gnlsrc.props.caps = caps 88 gnlsrc.add(src) 89 90 return gnlsrc
91 92
93 -class PlaylistProducerMedium(feedcomponent.FeedComponentMedium):
94
95 - def __init__(self, comp):
97
98 - def remote_add_playlist(self, data):
99 self.comp.addPlaylist(data)
100 101
102 -class PlaylistProducer(feedcomponent.FeedComponent):
103 logCategory = 'playlist-prod' 104 componentMediumClass = PlaylistProducerMedium 105
106 - def init(self):
107 self.basetime = -1 108 109 self._hasAudio = True 110 self._hasVideo = True 111 112 # The gnlcompositions for audio and video 113 self.videocomp = None 114 self.audiocomp = None 115 116 self.videocaps = gst.Caps("video/x-raw-yuv;video/x-raw-rgb") 117 self.audiocaps = gst.Caps("audio/x-raw-int;audio/x-raw-float") 118 119 self._vsrcs = {} # { PlaylistItem -> gnlsource } 120 self._asrcs = {} # { PlaylistItem -> gnlsource } 121 122 self.uiState.addListKey("playlist")
123
124 - def _buildAudioPipeline(self, pipeline, src):
125 audiorate = gst.element_factory_make("audiorate") 126 audioconvert = gst.element_factory_make('audioconvert') 127 resampler = 'audioresample' 128 if gstreamer.element_factory_exists('legacyresample'): 129 resampler = 'legacyresample' 130 audioresample = gst.element_factory_make(resampler) 131 outcaps = gst.Caps( 132 "audio/x-raw-int,channels=%d,rate=%d,width=16,depth=16" % 133 (self._channels, self._samplerate)) 134 135 capsfilter = gst.element_factory_make("capsfilter") 136 capsfilter.props.caps = outcaps 137 138 pipeline.add(audiorate, audioconvert, audioresample, capsfilter) 139 src.link(audioconvert) 140 audioconvert.link(audioresample) 141 audioresample.link(audiorate) 142 audiorate.link(capsfilter) 143 144 return capsfilter.get_pad('src')
145
146 - def _buildVideoPipeline(self, pipeline, src):
147 outcaps = gst.Caps( 148 "video/x-raw-yuv,width=%d,height=%d,framerate=%d/%d," 149 "pixel-aspect-ratio=1/1" % 150 (self._width, self._height, self._framerate[0], 151 self._framerate[1])) 152 153 cspace = gst.element_factory_make("ffmpegcolorspace") 154 scaler = smartscale.SmartVideoScale() 155 scaler.set_caps(outcaps) 156 videorate = gst.element_factory_make("videorate") 157 capsfilter = gst.element_factory_make("capsfilter") 158 capsfilter.props.caps = outcaps 159 160 pipeline.add(cspace, scaler, videorate, capsfilter) 161 162 src.link(cspace) 163 cspace.link(scaler) 164 scaler.link(videorate) 165 videorate.link(capsfilter) 166 return capsfilter.get_pad('src')
167
168 - def _buildPipeline(self):
169 pipeline = gst.Pipeline() 170 171 for mediatype in ['audio', 'video']: 172 if (mediatype == 'audio' and not self._hasAudio) or ( 173 mediatype == 'video' and not self._hasVideo): 174 continue 175 176 # For each of audio, video, we build a pipeline that looks roughly 177 # like: 178 # 179 # gnlcomposition ! identity sync=true ! 180 # identity single-segment=true ! audio/video-elements ! sink 181 182 composition = gst.element_factory_make("gnlcomposition", 183 mediatype + "-composition") 184 185 segmentidentity = gst.element_factory_make("identity") 186 segmentidentity.set_property("single-segment", True) 187 segmentidentity.set_property("silent", True) 188 syncidentity = gst.element_factory_make("identity") 189 syncidentity.set_property("silent", True) 190 syncidentity.set_property("sync", True) 191 192 pipeline.add(composition, segmentidentity, syncidentity) 193 194 def _padAddedCb(element, pad, target): 195 self.debug("Pad added, linking") 196 pad.link(target)
197 composition.connect('pad-added', _padAddedCb, 198 syncidentity.get_pad("sink")) 199 syncidentity.link(segmentidentity) 200 201 if mediatype == 'audio': 202 self.audiocomp = composition 203 srcpad = self._buildAudioPipeline(pipeline, segmentidentity) 204 else: 205 self.videocomp = composition 206 srcpad = self._buildVideoPipeline(pipeline, segmentidentity) 207 208 feedername = self.feeders[mediatype].elementName 209 #FIXME: rethink how we expose the feeder pipeline strings 210 feederchunk = \ 211 feedcomponent.ParseLaunchComponent.FEEDER_TMPL \ 212 % {'name': feedername} 213 214 binstr = "bin.("+feederchunk+" )" 215 self.debug("Parse for media composition is %s", binstr) 216 217 bin = gst.parse_launch(binstr) 218 pad = bin.find_unconnected_pad(gst.PAD_SINK) 219 ghostpad = gst.GhostPad(mediatype + "-feederpad", pad) 220 bin.add_pad(ghostpad) 221 222 pipeline.add(bin) 223 srcpad.link(ghostpad) 224 225 return pipeline
226
227 - def _createDefaultSources(self, properties):
228 if self._hasVideo: 229 vsrc = videotest_gnl_src("videotestdefault", 0, 2**63 - 1, 230 2**31 - 1, properties.get('video-pattern', None)) 231 self.videocomp.add(vsrc) 232 233 if self._hasAudio: 234 asrc = audiotest_gnl_src("videotestdefault", 0, 2**63 - 1, 235 2**31 - 1, properties.get('audio-wave', None)) 236 self.audiocomp.add(asrc)
237
238 - def set_master_clock(self, ip, port, base_time):
239 raise NotImplementedError("Playlist producer doesn't support slaving")
240
241 - def provide_master_clock(self, port):
242 # Most of this copied from feedcomponent010, but changed in various 243 # ways. Refactor the base class? 244 if self.medium: 245 ip = self.medium.getIP() 246 else: 247 ip = "127.0.0.1" 248 249 clock = self.pipeline.get_clock() 250 self.clock_provider = gst.NetTimeProvider(clock, None, port) 251 # small window here but that's ok 252 self.clock_provider.set_property('active', False) 253 254 self._master_clock_info = (ip, port, self.basetime) 255 256 return defer.succeed(self._master_clock_info)
257
258 - def get_master_clock(self):
259 return self._master_clock_info
260
261 - def _setupClock(self, pipeline):
262 # Configure our pipeline to use a known basetime and clock. 263 clock = gst.system_clock_obtain() 264 clock.set_property('clock-type', 'realtime') 265 # It doesn't matter too much what this basetime is, so long as we know 266 # the value. 267 self.basetime = clock.get_time() 268 269 # We force usage of the system clock. 270 pipeline.use_clock(clock) 271 # Now we disable default basetime distribution 272 pipeline.set_new_stream_time(gst.CLOCK_TIME_NONE) 273 # And we choose our own basetime... 274 self.debug("Setting basetime of %d", self.basetime) 275 pipeline.set_base_time(self.basetime)
276
277 - def timeReport(self):
278 ts = self.pipeline.get_clock().get_time() 279 self.debug("Pipeline clock is now at %d -> %s", ts, _tsToString(ts)) 280 reactor.callLater(10, self.timeReport)
281
282 - def getCurrentPosition(self):
283 return self.pipeline.query_position(gst.FORMAT_TIME)[0]
284
285 - def scheduleItem(self, item):
286 """ 287 Schedule a given playlist item in our playback compositions. 288 """ 289 start = item.timestamp - self.basetime 290 self.debug("Starting item %s at %d seconds from start: %s", item.uri, 291 start/gst.SECOND, _tsToString(item.timestamp)) 292 293 # If we schedule things to start before the current pipeline position, 294 # gnonlin will adjust this to start now. However, it does this 295 # separately for audio and video, so we start from different points, 296 # thus we're out of sync. 297 # So, always start slightly in the future... 5 seconds seems to work 298 # fine in practice. 299 now = self.getCurrentPosition() 300 neareststarttime = now + 5 * gst.SECOND 301 302 if start < neareststarttime: 303 if start + item.duration < neareststarttime: 304 self.debug("Item too late; skipping entirely") 305 return False 306 else: 307 change = neareststarttime - start 308 self.debug("Starting item with offset %d", change) 309 item.duration -= change 310 item.offset += change 311 start = neareststarttime 312 313 end = start + item.duration 314 timeuntilend = end - now 315 # After the end time, remove this item from the composition, otherwise 316 # it will continue to use huge gobs of memory and lots of threads. 317 reactor.callLater(timeuntilend/gst.SECOND + 5, 318 self.unscheduleItem, item) 319 320 if self._hasVideo and item.hasVideo: 321 self.debug("Adding video source with start %d, duration %d, " 322 "offset %d", start, item.duration, item.offset) 323 vsrc = file_gnl_src(None, item.uri, self.videocaps, 324 start, item.duration, item.offset, 0) 325 self.videocomp.add(vsrc) 326 self._vsrcs[item] = vsrc 327 if self._hasAudio and item.hasAudio: 328 self.debug("Adding audio source with start %d, duration %d, " 329 "offset %d", start, item.duration, item.offset) 330 asrc = file_gnl_src(None, item.uri, self.audiocaps, 331 start, item.duration, item.offset, 0) 332 self.audiocomp.add(asrc) 333 self._asrcs[item] = asrc 334 self.debug("Done scheduling: start at %s, end at %s", 335 _tsToString(start + self.basetime), 336 _tsToString(start + self.basetime + item.duration)) 337 338 self.uiState.append("playlist", (item.timestamp, 339 item.uri, 340 item.duration, 341 item.offset, 342 item.hasAudio, 343 item.hasVideo)) 344 return True
345
346 - def unscheduleItem(self, item):
347 self.debug("Unscheduling item at uri %s", item.uri) 348 if self._hasVideo and item.hasVideo and item in self._vsrcs: 349 vsrc = self._vsrcs.pop(item) 350 self.videocomp.remove(vsrc) 351 vsrc.set_state(gst.STATE_NULL) 352 if self._hasAudio and item.hasAudio and item in self._asrcs: 353 asrc = self._asrcs.pop(item) 354 self.audiocomp.remove(asrc) 355 asrc.set_state(gst.STATE_NULL) 356 for entry in self.uiState.get("playlist"): 357 if entry[0] == item.timestamp: 358 self.uiState.remove("playlist", entry)
359
360 - def adjustItemScheduling(self, item):
361 if self._hasVideo and item.hasVideo: 362 vsrc = self._vsrcs[item] 363 vsrc.props.start = item.timestamp - self.basetime 364 vsrc.props.duration = item.duration 365 vsrc.props.media_duration = item.duration 366 if self._hasAudio and item.hasAudio: 367 asrc = self._asrcs[item] 368 asrc.props.start = item.timestamp - self.basetime 369 asrc.props.duration = item.duration 370 asrc.props.media_duration = item.duration
371
372 - def addPlaylist(self, data):
373 self.playlistparser.parseData(data)
374
375 - def create_pipeline(self):
376 props = self.config['properties'] 377 378 self._playlistfile = props.get('playlist', None) 379 self._playlistdirectory = props.get('playlist-directory', None) 380 self._baseDirectory = props.get('base-directory', None) 381 382 self._width = props.get('width', 320) 383 self._height = props.get('height', 240) 384 self._framerate = props.get('framerate', (15, 1)) 385 self._samplerate = props.get('samplerate', 44100) 386 self._channels = props.get('channels', 2) 387 388 self._hasAudio = props.get('audio', True) 389 self._hasVideo = props.get('video', True) 390 391 pipeline = self._buildPipeline() 392 self._setupClock(pipeline) 393 394 self._createDefaultSources(props) 395 396 return pipeline
397
398 - def _watchDirectory(self, dir):
399 self.debug("Watching directory %s", dir) 400 self._filesAdded = {} 401 402 self._directoryWatcher = watcher.DirectoryWatcher(dir) 403 self._directoryWatcher.subscribe(fileChanged=self._watchFileChanged, 404 fileDeleted=self._watchFileDeleted) 405 406 # in the start call watcher should find all the existing 407 # files, so we block discovery while the watcher starts 408 self.playlistparser.blockDiscovery() 409 try: 410 self._directoryWatcher.start() 411 finally: 412 self.playlistparser.unblockDiscovery()
413
414 - def _watchFileDeleted(self, file):
415 self.debug("File deleted: %s", file) 416 if file in self._filesAdded: 417 self.playlistparser.playlist.removeItems(file) 418 self._filesAdded.pop(file) 419 420 self._cleanMessage(file)
421
422 - def _cleanMessage(self, file):
423 # There's no message removal API! We have to do this instead. Ick? 424 msgid = ("playlist-parse-error", file) 425 for m in self.state.get('messages'): 426 if m.id == msgid: 427 self.state.remove('messages', m)
428
429 - def _watchFileChanged(self, file):
430 self.debug("File changed: %s", file) 431 if file in self._filesAdded: 432 self.debug("Removing existing items for changed playlist") 433 self.playlistparser.playlist.removeItems(file) 434 435 self._filesAdded[file] = None 436 self._cleanMessage(file) 437 try: 438 self.debug("Parsing file: %s", file) 439 self.playlistparser.parseFile(file, piid=file) 440 except fxml.ParserError, e: 441 self.warning("Failed to parse playlist file: %r", e) 442 # Since this isn't done directly via the remote method, add a 443 # message so people can find out that it failed... 444 # Use a tuple including the filename to identify the warning, so we 445 # can add/remove one per file 446 msgid = ("playlist-parse-error", file) 447 self.addMessage( 448 messages.Warning(T_(N_( 449 "Failed to parse a playlist from file %s: %s" % 450 (file, e))), mid=msgid))
451
452 - def do_check(self):
453 454 def check_gnl(element): 455 exists = gstreamer.element_factory_exists(element) 456 if not exists: 457 m = messages.Error(T_(N_( 458 "%s is missing. Make sure your gnonlin " 459 "installation is complete."), element)) 460 documentation.messageAddGStreamerInstall(m) 461 self.debug(m) 462 self.addMessage(m)
463 464 for el in ["gnlsource", "gnlcomposition"]: 465 check_gnl(el) 466
467 - def do_setup(self):
468 playlist = playlistparser.Playlist(self) 469 self.playlistparser = playlistparser.PlaylistXMLParser(playlist) 470 if self._baseDirectory: 471 self.playlistparser.setBaseDirectory(self._baseDirectory) 472 473 if self._playlistfile: 474 try: 475 self.playlistparser.parseFile(self._playlistfile) 476 except fxml.ParserError, e: 477 self.warning("Failed to parse playlist file: %r", e) 478 479 if self._playlistdirectory: 480 self._watchDirectory(self._playlistdirectory) 481 482 reactor.callLater(10, self.timeReport)
483