Package flumotion :: Package component :: Package producers :: Package looper :: Module looper
[hide private]

Source Code for Module flumotion.component.producers.looper.looper

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import gst 
 19  import gobject 
 20   
 21  from flumotion.common import errors, messages 
 22  from flumotion.common.i18n import N_, gettexter 
 23  from flumotion.component import feedcomponent 
 24  from flumotion.component.common.avproducer import avproducer 
 25   
 26  __version__ = "$Rev$" 
 27  T_ = gettexter() 
 28   
 29   
30 -class LooperMedium(feedcomponent.FeedComponentMedium):
31
32 - def __init__(self, comp):
34
35 - def remote_restartLoop(self):
36 return self.comp.do_seek(False)
37
38 - def remote_getNbIterations(self):
39 return self.comp.nbiterations
40
42 return self.comp.fileinformation
43 44 45 # How to start the first segment: 46 # 1) Make your pipeline, but don't link the sinks 47 # 2) Block the source pads of what would be the sinks' peers 48 # 3) When both block functions fire, link the pads, then do a segment seek 49 # 4) Then you can unblock pads and the sinks will receive exactly one 50 # new segment with all gst versions 51 # 52 # To loop a segment, when you get the segment_done message 53 # asynchronously, just do a new segment seek. 54 55
56 -class Looper(avproducer.AVProducerBase):
57 58 componentMediumClass = LooperMedium 59
60 - def init(self):
61 self.initial_seek = False 62 self.nbiterations = 0 63 self.fileinformation = None 64 self.timeoutid = 0 65 self.pads_awaiting_block = [] 66 self.pads_to_link = [] 67 self.bus = None 68 self.uiState.addKey('info-location', '') 69 self.uiState.addKey('info-duration', 0) 70 self.uiState.addKey('info-audio', None) 71 self.uiState.addKey('info-video', None) 72 self.uiState.addKey('num-iterations', 0) 73 self.uiState.addKey('position', 0)
74
75 - def _do_extra_checks(self):
76 from flumotion.component.producers import checks 77 version = checks.get_pygst_version(gst) 78 if version >= (0, 10, 11, 0) and version < (0, 10, 14, 0): 79 # if it's going to segfault it won't have time to deliver 80 # messages to manager, otherwise we don't need to show it! 81 # but we can add a log message 82 self.warning('the version of gst-python you are using is known to ' 83 'cause segfault in the looper component, please ' 84 'update to the latest release') 85 self.warning('... just so you know, in case it crashes') 86 87 return [checks.checkTicket349()]
88
89 - def get_raw_video_element(self):
90 return self.pipeline.get_by_name('vident')
91
92 - def get_pipeline_template(self):
93 template = ( 94 'filesrc location=%(location)s' 95 ' ! oggdemux name=demux' 96 ' demux. ! queue ! theoradec name=theoradec' 97 ' ! identity name=vident single-segment=true sync=true ' 98 ' silent=true' 99 ' ! @feeder:video@' 100 ' demux. ! queue ! vorbisdec name=vorbisdec' 101 ' ! volume name=setvolume' 102 ' ! level name=volumelevel message=true ' 103 ' ! identity name=aident single-segment=true sync=true ' 104 ' silent=true' 105 ' ! @feeder:audio@' 106 % dict(location=self.filelocation)) 107 108 return template
109
110 - def make_message_for_gstreamer_error(self, gerror, debug):
111 if gerror.domain == 'gst-resource-error-quark': 112 return messages.Error(T_(N_( 113 "Could not open file '%s' for reading."), self.filelocation), 114 debug='%s\n%s' % (gerror.message, debug), 115 mid=gerror.domain, priority=40) 116 base = feedcomponent.ParseLaunchComponent 117 return base.make_message_for_gstreamer_error(self, gerror, debug)
118
119 - def run_discoverer(self):
120 121 def discovered(d, ismedia): 122 self.uiState.set('info-location', self.filelocation) 123 self.uiState.set('info-duration', 124 max(d.audiolength, d.videolength)) 125 if d.is_audio: 126 self.uiState.set('info-audio', 127 "%d channel(s) %dHz" % (d.audiochannels, 128 d.audiorate)) 129 if d.is_video: 130 self.uiState.set('info-video', 131 "%d x %d at %d/%d fps" % (d.videowidth, 132 d.videoheight, 133 d.videorate.num, 134 d.videorate.denom))
135 136 from gst.extend import discoverer 137 d = discoverer.Discoverer(self.filelocation) 138 d.connect('discovered', discovered) 139 d.discover()
140
141 - def on_segment_done(self):
142 self.do_seek(False) 143 self.nbiterations += 1 144 self.uiState.set('num-iterations', self.nbiterations)
145
146 - def on_pads_blocked(self):
147 for src, sink in self.pads_to_link: 148 src.link(sink) 149 self.do_seek(True) 150 for src, sink in self.pads_to_link: 151 src.set_blocked_async(False, lambda *x: None) 152 self.pads_to_link = [] 153 self.nbiterations = 0 154 self.uiState.set('num-iterations', self.nbiterations)
155
156 - def configure_pipeline(self, pipeline, properties):
157 avproducer.AVProducerBase.configure_pipeline(self, pipeline, 158 properties) 159 160 def on_message(bus, message): 161 handlers = {(pipeline, gst.MESSAGE_SEGMENT_DONE): 162 self.on_segment_done, 163 (pipeline, gst.MESSAGE_APPLICATION): 164 self.on_pads_blocked} 165 166 if (message.src, message.type) in handlers: 167 handlers[(message.src, message.type)]()
168 169 self.oggdemux = pipeline.get_by_name("demux") 170 171 for name in 'aident', 'vident': 172 173 def blocked(x, is_blocked): 174 if not x in self.pads_awaiting_block: 175 return 176 self.pads_awaiting_block.remove(x) 177 if not self.pads_awaiting_block: 178 s = gst.Structure('pads-blocked') 179 m = gst.message_new_application(pipeline, s) 180 # marshal to the main thread 181 pipeline.post_message(m) 182 183 e = pipeline.get_by_name(name) 184 src = e.get_pad('src') 185 sink = src.get_peer() 186 src.unlink(sink) 187 src.set_blocked_async(True, blocked) 188 self.pads_awaiting_block.append(src) 189 self.pads_to_link.append((src, sink)) 190 191 self.bus = pipeline.get_bus() 192 self.bus.add_signal_watch() 193 194 self.bus.connect('message', on_message) 195
196 - def do_seek(self, flushing):
197 """ 198 Restarts the looping. 199 200 Returns True if the seeking was accepted, 201 Returns False otherwiser 202 """ 203 self.debug("restarting looping") 204 flags = gst.SEEK_FLAG_SEGMENT | (flushing and gst.SEEK_FLAG_FLUSH or 0) 205 return self.oggdemux.seek(1.0, gst.FORMAT_TIME, flags, 206 gst.SEEK_TYPE_SET, 0, gst.SEEK_TYPE_END, 0)
207
208 - def do_setup(self):
209 210 def check_time(): 211 self.log("checking position") 212 try: 213 pos, _ = self.pipeline.query_position(gst.FORMAT_TIME) 214 except: 215 self.debug("position query didn't succeed") 216 else: 217 self.uiState.set('position', pos) 218 return True
219 220 if not self.timeoutid: 221 self.timeoutid = gobject.timeout_add(500, check_time) 222
223 - def do_stop(self):
224 if self.bus: 225 self.bus.remove_signal_watch() 226 self.bus = None 227 228 if self.timeoutid: 229 gobject.source_remove(self.timeoutid) 230 self.timeoutid = 0 231 232 self.nbiterations = 0
233
234 - def _parse_aditional_properties(self, properties):
235 # setup the properties 236 self.bus = None 237 self.filelocation = properties.get('location') 238 self.run_discoverer()
239