Package flumotion :: Package worker :: Package checks :: Module gst010
[hide private]

Source Code for Module flumotion.worker.checks.gst010

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import gobject 
 19  import gst 
 20  import gst.interfaces 
 21  from twisted.internet.threads import deferToThread 
 22  from twisted.internet import defer 
 23   
 24  from flumotion.common import gstreamer, errors, log, messages 
 25  from flumotion.common.i18n import N_, gettexter 
 26  from flumotion.twisted import defer as fdefer 
 27  from flumotion.worker.checks import check 
 28   
 29  __version__ = "$Rev$" 
 30  T_ = gettexter() 
 31   
 32   
33 -class BusResolution(fdefer.Resolution):
34 pipeline = None 35 signal_id = None 36
37 - def cleanup(self):
38 if self.pipeline: 39 if self.signal_id: 40 self.pipeline.get_bus().remove_signal_watch() 41 self.pipeline.get_bus().disconnect(self.signal_id) 42 self.signal_id = None 43 self.pipeline.set_state(gst.STATE_NULL) 44 self.pipeline = None
45 46
47 -def do_element_check(pipeline_str, element_name, check_proc, state=None, 48 set_state_deferred=False):
49 """ 50 Parse the given pipeline and set it to the given state. 51 When the bin reaches that state, perform the given check function on the 52 element with the given name. 53 54 @param pipeline_str: description of the pipeline used to test 55 @param element_name: name of the element being checked 56 @param check_proc: a function to call with the GstElement as argument. 57 @param state: an unused keyword parameter that will be removed when 58 support for GStreamer 0.8 is dropped. 59 @param set_state_deferred: a flag to say whether the set_state is run in 60 a deferToThread 61 @type set_state_deferred: bool 62 @returns: a deferred that will fire with the result of check_proc, or 63 fail. 64 @rtype: L{twisted.internet.defer.Deferred} 65 """ 66 67 def run_check(pipeline, resolution): 68 element = pipeline.get_by_name(element_name) 69 try: 70 retval = check_proc(element) 71 resolution.callback(retval) 72 except check.CheckProcError, e: 73 log.debug('check', 'CheckProcError when running %r: %r', 74 check_proc, e.data) 75 resolution.errback(errors.RemoteRunError(e.data)) 76 except Exception, e: 77 log.debug('check', 'Unhandled exception while running %r: %r', 78 check_proc, e) 79 resolution.errback(errors.RemoteRunError( 80 log.getExceptionMessage(e))) 81 # set pipeline state to NULL so worker does not consume 82 # unnecessary resources 83 pipeline.set_state(gst.STATE_NULL)
84 85 def message_rcvd(bus, message, pipeline, resolution): 86 t = message.type 87 if t == gst.MESSAGE_STATE_CHANGED: 88 if message.src == pipeline: 89 old, new, pending = message.parse_state_changed() 90 if new == gst.STATE_PLAYING: 91 run_check(pipeline, resolution) 92 elif t == gst.MESSAGE_ERROR: 93 gerror, debug = message.parse_error() 94 # set pipeline state to NULL so worker does not consume 95 # unnecessary resources 96 pipeline.set_state(gst.STATE_NULL) 97 resolution.errback(errors.GStreamerGstError( 98 message.src, gerror, debug)) 99 elif t == gst.MESSAGE_EOS: 100 resolution.errback(errors.GStreamerError( 101 "Unexpected end of stream")) 102 else: 103 log.debug('check', 'message: %s: %s:' % ( 104 message.src.get_path_string(), 105 message.type.value_nicks[1])) 106 if message.structure: 107 log.debug('check', 'message: %s' % 108 message.structure.to_string()) 109 else: 110 log.debug('check', 'message: (no structure)') 111 return True 112 113 resolution = BusResolution() 114 115 log.debug('check', 'parsing pipeline %s' % pipeline_str) 116 try: 117 pipeline = gst.parse_launch(pipeline_str) 118 log.debug('check', 'parsed pipeline %s' % pipeline_str) 119 except gobject.GError, e: 120 resolution.errback(errors.GStreamerError(e.message)) 121 return resolution.d 122 123 bus = pipeline.get_bus() 124 bus.add_signal_watch() 125 signal_id = bus.connect('message', message_rcvd, pipeline, resolution) 126 127 resolution.signal_id = signal_id 128 resolution.pipeline = pipeline 129 log.debug('check', 'setting state to playing') 130 if set_state_deferred: 131 d = deferToThread(pipeline.set_state, gst.STATE_PLAYING) 132 133 def stateChanged(res): 134 return resolution.d 135 d.addCallback(stateChanged) 136 return d 137 else: 138 pipeline.set_state(gst.STATE_PLAYING) 139 return resolution.d 140 141
142 -def check1394(mid, guid):
143 """ 144 Probe the firewire device. 145 146 Return a deferred firing a result. 147 148 The result is either: 149 - succesful, with a None value: no device found 150 - succesful, with a dictionary of width, height, and par as a num/den pair 151 - failed 152 153 @param mid: the id to set on the message. 154 @param guid: the id of the selected device. 155 156 @rtype: L{twisted.internet.defer.Deferred} of 157 L{flumotion.common.messages.Result} 158 """ 159 result = messages.Result() 160 161 def do_check(demux): 162 pad = demux.get_pad('video') 163 164 if not pad or pad.get_negotiated_caps() == None: 165 raise errors.GStreamerError('Pipeline failed to negotiate?') 166 167 caps = pad.get_negotiated_caps() 168 s = caps.get_structure(0) 169 w = s['width'] 170 h = s['height'] 171 par = s['pixel-aspect-ratio'] 172 # FIXME: not a good idea to reuse the result name which 173 # also exists in the parent context. 174 # pychecker should warn; however it looks like 175 # the parent result doesn't get stored as name, 176 # but instead with STORE_DEREF 177 result = dict(width=w, height=h, par=(par.num, par.denom)) 178 log.debug('check', 'returning dict %r' % result) 179 return result
180 181 pipeline = \ 182 'dv1394src guid=%s ! dvdemux name=demux .video ! fakesink' % guid 183 184 d = do_element_check(pipeline, 'demux', do_check) 185 186 def errbackResult(failure): 187 log.debug('check', 'returning failed Result, %r' % failure) 188 m = None 189 if failure.check(errors.GStreamerGstError): 190 source, gerror, debug = failure.value.args 191 log.debug('check', 'GStreamer GError: %s (debug: %s)' % ( 192 gerror.message, debug)) 193 if gerror.domain == "gst-resource-error-quark": 194 if gerror.code == int(gst.RESOURCE_ERROR_NOT_FOUND): 195 # dv1394src was fixed after gst-plugins-good 0.10.2 196 # to distinguish NOT_FOUND and OPEN_READ 197 version = gstreamer.get_plugin_version('1394') 198 if version >= (0, 10, 0, 0) and version <= (0, 10, 2, 0): 199 m = messages.Error(T_( 200 N_("Could not find or open the Firewire device. " 201 "Check the device node and its permissions."))) 202 else: 203 m = messages.Error(T_( 204 N_("No Firewire device found."))) 205 elif gerror.code == int(gst.RESOURCE_ERROR_OPEN_READ): 206 m = messages.Error(T_( 207 N_("Could not open Firewire device for reading. " 208 "Check permissions on the device."))) 209 210 if not m: 211 m = check.handleGStreamerDeviceError(failure, 'Firewire', 212 mid=mid) 213 214 if not m: 215 m = messages.Error(T_(N_("Could not probe Firewire device.")), 216 debug=check.debugFailure(failure)) 217 218 m.id = mid 219 result.add(m) 220 return result 221 d.addCallback(check.callbackResult, result) 222 d.addErrback(errbackResult) 223 224 return d 225