Package flumotion :: Package launch :: Module main
[hide private]

Source Code for Module flumotion.launch.main

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """ 
 19  Flumotion-launch: A gst-launch analog for Flumotion. 
 20   
 21  The goal of flumotion-launch is to provide an easy way for testing 
 22  flumotion components, without involving much of Flumotion's core code. 
 23   
 24  Flumotion-launch takes a terse gst-launch-like syntax, translates that 
 25  into a component graph, and starts the components. An example would be:: 
 26   
 27    flumotion-launch videotest ! theora-encoder ! ogg-muxer ! http-streamer 
 28   
 29  You can also set properties:: 
 30   
 31    flumotion-launch videotest framerate=15/2 
 32   
 33  You can link specific feeders as well:: 
 34   
 35    flumotion-launch firewire .audio ! vorbis-encoder 
 36    flumotion-launch firewire firewire0.audio ! vorbis-encoder 
 37   
 38  Components can be backreferenced using their names:: 
 39   
 40    flumotion-launch videotest audiotest videotest0. ! ogg-muxer \ 
 41                     audiotest0. ! ogg-muxer0. 
 42   
 43  In addition, components can have plugs:: 
 44   
 45    flumotion-launch http-streamer /requestlogger-file,logfile=/dev/stdout 
 46   
 47  Compound properties can be specified with: 
 48   
 49    propname=[subname1=value1,subname2=[subsubname1=subsubvalue1]] 
 50   
 51  Characters '\', '[' and ']' can be escaped with '\' 
 52  to remove there special meaning. 
 53   
 54  Flumotion-launch explicitly avoids much of Flumotion's core logic. It 
 55  does not import flumotion.manager, flumotion.admin, or flumotion.worker. 
 56  There is no depgraph, no feed server, no job process. Although it might 
 57  be useful in the future to add a way to use the standard interfaces to 
 58  start components via admin, manager, worker, and job instances, this 
 59  low-level interface is useful in debugging problems and should be kept. 
 60  """ 
 61   
 62   
 63  import os 
 64  import sys 
 65   
 66  from twisted.python import reflect 
 67  from twisted.internet import reactor, defer 
 68   
 69  from flumotion.common import log, common, registry, errors, messages 
 70  from flumotion.common import i18n 
 71  from flumotion.common.options import OptionParser 
 72  from flumotion.configure import configure 
 73   
 74  from flumotion.launch import parse 
 75   
 76  from gettext import gettext as _ 
 77   
 78  __version__ = "$Rev$" 
 79  _headings = { 
 80      messages.ERROR: _('Error'), 
 81      messages.WARNING: _('Warning'), 
 82      messages.INFO: _('Note')} 
 83   
 84   
85 -def err(x):
86 sys.stderr.write(x + '\n') 87 raise SystemExit(1)
88 89
90 -class ComponentWrapper(object, log.Loggable):
91 logCategory = "compwrapper" 92
93 - def __init__(self, config):
94 self.name = config['name'] 95 self.config = config 96 self.procedure = self._getProcedure(config['type']) 97 self.component = None
98
99 - def _getProcedure(self, type):
100 r = registry.getRegistry() 101 c = r.getComponent(type) 102 try: 103 entry = c.getEntryByType('component') 104 except KeyError: 105 err('Component %s has no component entry' % self.name) 106 importname = entry.getModuleName(c.getBase()) 107 try: 108 module = reflect.namedAny(importname) 109 except Exception, e: 110 err('Could not load module %s for component %s: %s' 111 % (importname, self.name, e)) 112 return getattr(module, entry.getFunction())
113
114 - def instantiate(self):
115 errors = [] 116 117 def haveError(value): 118 translator = i18n.Translator() 119 localedir = os.path.join(configure.localedatadir, 'locale') 120 # FIXME: add locales as messages from domains come in 121 translator.addLocaleDir(configure.PACKAGE, localedir) 122 print "%s: %s" % (_headings[value.level], 123 translator.translate(value)) 124 if value.debug: 125 print "Debug information:", value.debug 126 errors.append(value)
127 128 self.component = self.procedure(self.config, 129 haveError=haveError) 130 return not bool(errors)
131
132 - def provideMasterClock(self, port):
133 # rtype: defer.Deferred 134 d = self.component.provide_master_clock(port) 135 return d
136
137 - def set_master_clock(self, ip, port, base_time):
138 return self.component.set_master_clock(ip, port, base_time)
139
140 - def stop(self):
141 return self.component.stop()
142
143 - def feedToFD(self, feedName, fd):
144 self.debug('feedToFD(feedName=%s, %d)' % (feedName, fd)) 145 return self.component.feedToFD(feedName, fd, os.close)
146
147 - def eatFromFD(self, eaterAlias, feedId, fd):
148 self.debug('eatFromFD(eaterAlias=%s, feedId=%s, %d)', 149 eaterAlias, feedId, fd) 150 return self.component.eatFromFD(eaterAlias, feedId, fd)
151 152
153 -def make_pipes(wrappers):
154 fds = {} # feedcompname:feeder => (fd, start()) 155 wrappersByName = dict([(wrapper.name, wrapper) 156 for wrapper in wrappers]) 157 158 def starter(wrapper, feedName, write): 159 return lambda: wrapper.feedToFD(feedName, write)
160 for wrapper in wrappers: 161 eaters = wrapper.config.get('eater', {}) 162 for eaterName in eaters: 163 for feedId, eaterAlias in eaters[eaterName]: 164 compName, feederName = common.parseFeedId(feedId) 165 read, write = os.pipe() 166 log.debug('launch', '%s: read from fd %d, write to fd %d', 167 feedId, read, write) 168 start = starter(wrappersByName[compName], feederName, write) 169 fds[feedId] = (read, start) 170 return fds 171 172
173 -def start_components(wrappers, fds):
174 # figure out the links and start the components 175 176 def provide_clock(): 177 # second phase: clocking 178 need_sync = [x for x in wrappers if x.config['clock-master']] 179 180 if need_sync: 181 master = None 182 for x in need_sync: 183 if x.config['clock-master'] == x.config['avatarId']: 184 master = x 185 break 186 assert master 187 need_sync.remove(master) 188 d = master.provideMasterClock(7600 - 1) # hack! 189 190 def addNeedSync(clocking): 191 return need_sync, clocking
192 d.addCallback(addNeedSync) 193 return d 194 else: 195 return defer.succeed((None, None)) 196 197 def do_start(synchronization, wrapper): 198 need_sync, clocking = synchronization 199 200 # start it up, with clocking data only if it needs it 201 eaters = wrapper.config.get('eater', {}) 202 for eaterName in eaters: 203 for feedId, eaterAlias in eaters[eaterName]: 204 read, start = fds[feedId] 205 wrapper.eatFromFD(eaterAlias, feedId, read) 206 start() 207 if (not need_sync) or (wrapper not in need_sync) or (not clocking): 208 clocking = None 209 if clocking: 210 wrapper.set_master_clock(*clocking) 211 return synchronization 212 213 def do_stop(failure): 214 for wrapper in wrappers: 215 wrapper.stop() 216 return failure 217 218 for wrapper in wrappers: 219 if not wrapper.instantiate(): 220 # we don't have a ComponentState, so we cheat and give the 221 # exception a wrapper 222 return defer.fail(errors.ComponentStartError(wrapper)) 223 d = provide_clock() 224 for wrapper in wrappers: 225 d.addCallback(do_start, wrapper) 226 d.addErrback(do_stop) 227 return d 228 229
230 -def main(args):
231 from flumotion.common import setup 232 setup.setupPackagePath() 233 from flumotion.configure import configure 234 log.debug('launch', 'Running Flumotion version %s' % 235 configure.version) 236 import twisted.copyright 237 log.debug('launch', 'Running against Twisted version %s' % 238 twisted.copyright.version) 239 from flumotion.project import project 240 for p in project.list(): 241 log.debug('launch', 'Registered project %s version %s' % ( 242 p, project.get(p, 'version'))) 243 244 parser = OptionParser(domain="flumotion-launch") 245 246 log.debug('launch', 'Parsing arguments (%r)' % ', '.join(args)) 247 options, args = parser.parse_args(args) 248 249 i18n.installGettext() 250 251 # verbose overrides --debug 252 if options.verbose: 253 log.setFluDebug("*:3") 254 255 # handle all options 256 if options.version: 257 print common.version("flumotion-launch") 258 return 0 259 260 if options.debug: 261 log.setFluDebug(options.debug) 262 263 # note parser versus parse 264 configs = parse.parse_args(args[1:]) 265 266 # load the modules, make the component 267 wrappers = [ComponentWrapper(config) for config in configs] 268 269 # make socket pairs 270 fds = make_pipes(wrappers) 271 272 reactor.running = False 273 reactor.failure = False 274 reactor.callLater(0, lambda: setattr(reactor, 'running', True)) 275 276 d = start_components(wrappers, fds) 277 278 def errback(failure): 279 log.debug('launch', log.getFailureMessage(failure)) 280 print "Error occurred: %s" % failure.getErrorMessage() 281 failure.printDetailedTraceback() 282 reactor.failure = True 283 if reactor.running: 284 print "Stopping reactor." 285 reactor.stop()
286 d.addErrback(errback) 287 288 if not reactor.failure: 289 print 'Running the reactor. Press Ctrl-C to exit.' 290 291 log.debug('launch', 'Starting reactor') 292 reactor.run() 293 294 log.debug('launch', 'Reactor stopped') 295 296 if reactor.failure: 297 return 1 298 else: 299 return 0 300