1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import gobject
19 import gst
20 import gst.interfaces
21 from twisted.internet.threads import deferToThread
22 from twisted.internet import defer
23
24 from flumotion.common import gstreamer, errors, log, messages
25 from flumotion.common.i18n import N_, gettexter
26 from flumotion.twisted import defer as fdefer
27 from flumotion.worker.checks import check
28
29 __version__ = "$Rev$"
30 T_ = gettexter()
31
32
45
46
47 -def do_element_check(pipeline_str, element_name, check_proc, state=None,
48 set_state_deferred=False):
49 """
50 Parse the given pipeline and set it to the given state.
51 When the bin reaches that state, perform the given check function on the
52 element with the given name.
53
54 @param pipeline_str: description of the pipeline used to test
55 @param element_name: name of the element being checked
56 @param check_proc: a function to call with the GstElement as argument.
57 @param state: an unused keyword parameter that will be removed when
58 support for GStreamer 0.8 is dropped.
59 @param set_state_deferred: a flag to say whether the set_state is run in
60 a deferToThread
61 @type set_state_deferred: bool
62 @returns: a deferred that will fire with the result of check_proc, or
63 fail.
64 @rtype: L{twisted.internet.defer.Deferred}
65 """
66
67 def run_check(pipeline, resolution):
68 element = pipeline.get_by_name(element_name)
69 try:
70 retval = check_proc(element)
71 resolution.callback(retval)
72 except check.CheckProcError, e:
73 log.debug('check', 'CheckProcError when running %r: %r',
74 check_proc, e.data)
75 resolution.errback(errors.RemoteRunError(e.data))
76 except Exception, e:
77 log.debug('check', 'Unhandled exception while running %r: %r',
78 check_proc, e)
79 resolution.errback(errors.RemoteRunError(
80 log.getExceptionMessage(e)))
81
82
83 pipeline.set_state(gst.STATE_NULL)
84
85 def message_rcvd(bus, message, pipeline, resolution):
86 t = message.type
87 if t == gst.MESSAGE_STATE_CHANGED:
88 if message.src == pipeline:
89 old, new, pending = message.parse_state_changed()
90 if new == gst.STATE_PLAYING:
91 run_check(pipeline, resolution)
92 elif t == gst.MESSAGE_ERROR:
93 gerror, debug = message.parse_error()
94
95
96 pipeline.set_state(gst.STATE_NULL)
97 resolution.errback(errors.GStreamerGstError(
98 message.src, gerror, debug))
99 elif t == gst.MESSAGE_EOS:
100 resolution.errback(errors.GStreamerError(
101 "Unexpected end of stream"))
102 else:
103 log.debug('check', 'message: %s: %s:' % (
104 message.src.get_path_string(),
105 message.type.value_nicks[1]))
106 if message.structure:
107 log.debug('check', 'message: %s' %
108 message.structure.to_string())
109 else:
110 log.debug('check', 'message: (no structure)')
111 return True
112
113 resolution = BusResolution()
114
115 log.debug('check', 'parsing pipeline %s' % pipeline_str)
116 try:
117 pipeline = gst.parse_launch(pipeline_str)
118 log.debug('check', 'parsed pipeline %s' % pipeline_str)
119 except gobject.GError, e:
120 resolution.errback(errors.GStreamerError(e.message))
121 return resolution.d
122
123 bus = pipeline.get_bus()
124 bus.add_signal_watch()
125 signal_id = bus.connect('message', message_rcvd, pipeline, resolution)
126
127 resolution.signal_id = signal_id
128 resolution.pipeline = pipeline
129 log.debug('check', 'setting state to playing')
130 if set_state_deferred:
131 d = deferToThread(pipeline.set_state, gst.STATE_PLAYING)
132
133 def stateChanged(res):
134 return resolution.d
135 d.addCallback(stateChanged)
136 return d
137 else:
138 pipeline.set_state(gst.STATE_PLAYING)
139 return resolution.d
140
141
143 """
144 Probe the firewire device.
145
146 Return a deferred firing a result.
147
148 The result is either:
149 - succesful, with a None value: no device found
150 - succesful, with a dictionary of width, height, and par as a num/den pair
151 - failed
152
153 @param mid: the id to set on the message.
154 @param guid: the id of the selected device.
155
156 @rtype: L{twisted.internet.defer.Deferred} of
157 L{flumotion.common.messages.Result}
158 """
159 result = messages.Result()
160
161 def do_check(demux):
162 pad = demux.get_pad('video')
163
164 if not pad or pad.get_negotiated_caps() == None:
165 raise errors.GStreamerError('Pipeline failed to negotiate?')
166
167 caps = pad.get_negotiated_caps()
168 s = caps.get_structure(0)
169 w = s['width']
170 h = s['height']
171 par = s['pixel-aspect-ratio']
172
173
174
175
176
177 result = dict(width=w, height=h, par=(par.num, par.denom))
178 log.debug('check', 'returning dict %r' % result)
179 return result
180
181 pipeline = \
182 'dv1394src guid=%s ! dvdemux name=demux .video ! fakesink' % guid
183
184 d = do_element_check(pipeline, 'demux', do_check)
185
186 def errbackResult(failure):
187 log.debug('check', 'returning failed Result, %r' % failure)
188 m = None
189 if failure.check(errors.GStreamerGstError):
190 source, gerror, debug = failure.value.args
191 log.debug('check', 'GStreamer GError: %s (debug: %s)' % (
192 gerror.message, debug))
193 if gerror.domain == "gst-resource-error-quark":
194 if gerror.code == int(gst.RESOURCE_ERROR_NOT_FOUND):
195
196
197 version = gstreamer.get_plugin_version('1394')
198 if version >= (0, 10, 0, 0) and version <= (0, 10, 2, 0):
199 m = messages.Error(T_(
200 N_("Could not find or open the Firewire device. "
201 "Check the device node and its permissions.")))
202 else:
203 m = messages.Error(T_(
204 N_("No Firewire device found.")))
205 elif gerror.code == int(gst.RESOURCE_ERROR_OPEN_READ):
206 m = messages.Error(T_(
207 N_("Could not open Firewire device for reading. "
208 "Check permissions on the device.")))
209
210 if not m:
211 m = check.handleGStreamerDeviceError(failure, 'Firewire',
212 mid=mid)
213
214 if not m:
215 m = messages.Error(T_(N_("Could not probe Firewire device.")),
216 debug=check.debugFailure(failure))
217
218 m.id = mid
219 result.add(m)
220 return result
221 d.addCallback(check.callbackResult, result)
222 d.addErrback(errbackResult)
223
224 return d
225