1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 Flumotion-launch: A gst-launch analog for Flumotion.
20
21 The goal of flumotion-launch is to provide an easy way for testing
22 flumotion components, without involving much of Flumotion's core code.
23
24 Flumotion-launch takes a terse gst-launch-like syntax, translates that
25 into a component graph, and starts the components. An example would be::
26
27 flumotion-launch videotest ! theora-encoder ! ogg-muxer ! http-streamer
28
29 You can also set properties::
30
31 flumotion-launch videotest framerate=15/2
32
33 You can link specific feeders as well::
34
35 flumotion-launch firewire .audio ! vorbis-encoder
36 flumotion-launch firewire firewire0.audio ! vorbis-encoder
37
38 Components can be backreferenced using their names::
39
40 flumotion-launch videotest audiotest videotest0. ! ogg-muxer \
41 audiotest0. ! ogg-muxer0.
42
43 In addition, components can have plugs::
44
45 flumotion-launch http-streamer /requestlogger-file,logfile=/dev/stdout
46
47 Compound properties can be specified with:
48
49 propname=[subname1=value1,subname2=[subsubname1=subsubvalue1]]
50
51 Characters '\', '[' and ']' can be escaped with '\'
52 to remove there special meaning.
53
54 Flumotion-launch explicitly avoids much of Flumotion's core logic. It
55 does not import flumotion.manager, flumotion.admin, or flumotion.worker.
56 There is no depgraph, no feed server, no job process. Although it might
57 be useful in the future to add a way to use the standard interfaces to
58 start components via admin, manager, worker, and job instances, this
59 low-level interface is useful in debugging problems and should be kept.
60 """
61
62
63 import os
64 import sys
65
66 from twisted.python import reflect
67 from twisted.internet import reactor, defer
68
69 from flumotion.common import log, common, registry, errors, messages
70 from flumotion.common import i18n
71 from flumotion.common.options import OptionParser
72 from flumotion.configure import configure
73
74 from flumotion.launch import parse
75
76 from gettext import gettext as _
77
78 __version__ = "$Rev$"
79 _headings = {
80 messages.ERROR: _('Error'),
81 messages.WARNING: _('Warning'),
82 messages.INFO: _('Note')}
83
84
88
89
131
136
139
142
146
147 - def eatFromFD(self, eaterAlias, feedId, fd):
151
152
154 fds = {}
155 wrappersByName = dict([(wrapper.name, wrapper)
156 for wrapper in wrappers])
157
158 def starter(wrapper, feedName, write):
159 return lambda: wrapper.feedToFD(feedName, write)
160 for wrapper in wrappers:
161 eaters = wrapper.config.get('eater', {})
162 for eaterName in eaters:
163 for feedId, eaterAlias in eaters[eaterName]:
164 compName, feederName = common.parseFeedId(feedId)
165 read, write = os.pipe()
166 log.debug('launch', '%s: read from fd %d, write to fd %d',
167 feedId, read, write)
168 start = starter(wrappersByName[compName], feederName, write)
169 fds[feedId] = (read, start)
170 return fds
171
172
174
175
176 def provide_clock():
177
178 need_sync = [x for x in wrappers if x.config['clock-master']]
179
180 if need_sync:
181 master = None
182 for x in need_sync:
183 if x.config['clock-master'] == x.config['avatarId']:
184 master = x
185 break
186 assert master
187 need_sync.remove(master)
188 d = master.provideMasterClock(7600 - 1)
189
190 def addNeedSync(clocking):
191 return need_sync, clocking
192 d.addCallback(addNeedSync)
193 return d
194 else:
195 return defer.succeed((None, None))
196
197 def do_start(synchronization, wrapper):
198 need_sync, clocking = synchronization
199
200
201 eaters = wrapper.config.get('eater', {})
202 for eaterName in eaters:
203 for feedId, eaterAlias in eaters[eaterName]:
204 read, start = fds[feedId]
205 wrapper.eatFromFD(eaterAlias, feedId, read)
206 start()
207 if (not need_sync) or (wrapper not in need_sync) or (not clocking):
208 clocking = None
209 if clocking:
210 wrapper.set_master_clock(*clocking)
211 return synchronization
212
213 def do_stop(failure):
214 for wrapper in wrappers:
215 wrapper.stop()
216 return failure
217
218 for wrapper in wrappers:
219 if not wrapper.instantiate():
220
221
222 return defer.fail(errors.ComponentStartError(wrapper))
223 d = provide_clock()
224 for wrapper in wrappers:
225 d.addCallback(do_start, wrapper)
226 d.addErrback(do_stop)
227 return d
228
229
231 from flumotion.common import setup
232 setup.setupPackagePath()
233 from flumotion.configure import configure
234 log.debug('launch', 'Running Flumotion version %s' %
235 configure.version)
236 import twisted.copyright
237 log.debug('launch', 'Running against Twisted version %s' %
238 twisted.copyright.version)
239 from flumotion.project import project
240 for p in project.list():
241 log.debug('launch', 'Registered project %s version %s' % (
242 p, project.get(p, 'version')))
243
244 parser = OptionParser(domain="flumotion-launch")
245
246 log.debug('launch', 'Parsing arguments (%r)' % ', '.join(args))
247 options, args = parser.parse_args(args)
248
249 i18n.installGettext()
250
251
252 if options.verbose:
253 log.setFluDebug("*:3")
254
255
256 if options.version:
257 print common.version("flumotion-launch")
258 return 0
259
260 if options.debug:
261 log.setFluDebug(options.debug)
262
263
264 configs = parse.parse_args(args[1:])
265
266
267 wrappers = [ComponentWrapper(config) for config in configs]
268
269
270 fds = make_pipes(wrappers)
271
272 reactor.running = False
273 reactor.failure = False
274 reactor.callLater(0, lambda: setattr(reactor, 'running', True))
275
276 d = start_components(wrappers, fds)
277
278 def errback(failure):
279 log.debug('launch', log.getFailureMessage(failure))
280 print "Error occurred: %s" % failure.getErrorMessage()
281 failure.printDetailedTraceback()
282 reactor.failure = True
283 if reactor.running:
284 print "Stopping reactor."
285 reactor.stop()
286 d.addErrback(errback)
287
288 if not reactor.failure:
289 print 'Running the reactor. Press Ctrl-C to exit.'
290
291 log.debug('launch', 'Starting reactor')
292 reactor.run()
293
294 log.debug('launch', 'Reactor stopped')
295
296 if reactor.failure:
297 return 1
298 else:
299 return 0
300