Package flumotion :: Package worker :: Module medium
[hide private]

Source Code for Module flumotion.worker.medium

  1  # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """ 
 19  worker-side objects to handle worker clients 
 20  """ 
 21   
 22  import signal 
 23   
 24  from twisted.internet import reactor, error 
 25  from twisted.spread import flavors 
 26  from zope.interface import implements 
 27   
 28  from flumotion.common import errors, interfaces, debug 
 29  from flumotion.common import medium 
 30  from flumotion.common.vfs import listDirectory, registerVFSJelly 
 31  from flumotion.twisted.pb import ReconnectingFPBClientFactory 
 32   
 33  __version__ = "$Rev$" 
 34  JOB_SHUTDOWN_TIMEOUT = 5 
 35   
 36   
37 -class WorkerClientFactory(ReconnectingFPBClientFactory):
38 """ 39 I am a client factory for the worker to log in to the manager. 40 """ 41 logCategory = 'worker' 42 perspectiveInterface = interfaces.IWorkerMedium 43
44 - def __init__(self, medium, host, port):
45 """ 46 @type medium: L{flumotion.worker.medium.WorkerMedium} 47 @type host: str 48 @type port: int 49 """ 50 self._managerHost = host 51 self._managerPort = port 52 self.medium = medium 53 # doing this as a class method triggers a doc error 54 ReconnectingFPBClientFactory.__init__(self) 55 # maximum 10 second delay for workers to attempt to log in again 56 self.maxDelay = 10
57
58 - def clientConnectionFailed(self, connector, reason):
59 """ 60 @param reason: L{twisted.spread.pb.failure.Failure} 61 """ 62 # this method exists so that we log the failure 63 ReconnectingFPBClientFactory.clientConnectionFailed(self, 64 connector, reason) 65 # delay is now updated 66 self.debug("failed to connect, will try to reconnect in %f seconds" % 67 self.delay)
68 69 ### ReconnectingPBClientFactory methods 70
71 - def gotDeferredLogin(self, d):
72 # the deferred from the login is now available 73 # add some of our own to it 74 75 def remoteDisconnected(remoteReference): 76 if reactor.killed: 77 self.log('Connection to manager lost due to shutdown') 78 else: 79 self.warning('Lost connection to manager, ' 80 'will attempt to reconnect')
81 82 def loginCallback(reference): 83 self.info("Logged in to manager") 84 self.debug("remote reference %r" % reference) 85 86 self.medium.setRemoteReference(reference) 87 reference.notifyOnDisconnect(remoteDisconnected)
88 89 def alreadyConnectedErrback(failure): 90 failure.trap(errors.AlreadyConnectedError) 91 self.warning('A worker with the name "%s" is already connected.' % 92 failure.value) 93 94 def accessDeniedErrback(failure): 95 failure.trap(errors.NotAuthenticatedError) 96 self.warning('Access denied.') 97 98 def connectionRefusedErrback(failure): 99 failure.trap(error.ConnectionRefusedError) 100 self.warning('Connection to %s:%d refused.' % (self._managerHost, 101 self._managerPort)) 102 103 def NoSuchMethodErrback(failure): 104 failure.trap(flavors.NoSuchMethod) 105 # failure.value is a str 106 if failure.value.find('remote_getKeycardClasses') > -1: 107 self.warning( 108 "Manager %s:%d is older than version 0.3.0. " 109 "Please upgrade." % (self._managerHost, self._managerPort)) 110 return 111 112 return failure 113 114 def loginFailedErrback(failure): 115 self.warning('Login failed, reason: %s' % str(failure)) 116 117 d.addCallback(loginCallback) 118 d.addErrback(accessDeniedErrback) 119 d.addErrback(connectionRefusedErrback) 120 d.addErrback(alreadyConnectedErrback) 121 d.addErrback(NoSuchMethodErrback) 122 d.addErrback(loginFailedErrback) 123 124
125 -class WorkerMedium(medium.PingingMedium):
126 """ 127 I am a medium interfacing with the manager-side WorkerAvatar. 128 129 @ivar brain: the worker brain 130 @type brain: L{worker.WorkerBrain} 131 @ivar factory: the worker client factory 132 @type factory: L{WorkerClientFactory} 133 """ 134 135 logCategory = 'workermedium' 136 137 implements(interfaces.IWorkerMedium) 138
139 - def __init__(self, brain):
140 """ 141 @type brain: L{worker.WorkerBrain} 142 """ 143 self.brain = brain 144 self.factory = None 145 registerVFSJelly()
146
147 - def startConnecting(self, connectionInfo):
148 info = connectionInfo 149 150 self.factory = WorkerClientFactory(self, info.host, info.port) 151 self.factory.startLogin(info.authenticator) 152 153 if info.use_ssl: 154 from flumotion.common import common 155 common.assertSSLAvailable() 156 from twisted.internet import ssl 157 reactor.connectSSL(info.host, info.port, self.factory, 158 ssl.ClientContextFactory()) 159 else: 160 reactor.connectTCP(info.host, info.port, self.factory)
161
162 - def stopConnecting(self):
163 # only called by test suites 164 self.factory.disconnect() 165 self.factory.stopTrying()
166 167 ### pb.Referenceable method for the manager's WorkerAvatar 168
169 - def remote_getPorts(self):
170 """ 171 Gets the set of TCP ports that this worker is configured to use. 172 173 @rtype: 2-tuple: (list of int, bool) 174 @return: list of ports, and a boolean if we allocate ports 175 randomly 176 """ 177 return self.brain.getPorts()
178
179 - def remote_getFeedServerPort(self):
180 """ 181 Return the TCP port the Feed Server is listening on. 182 183 @rtype: int, or NoneType 184 @return: TCP port number, or None if there is no feed server 185 """ 186 return self.brain.getFeedServerPort()
187
188 - def remote_create(self, avatarId, type, moduleName, methodName, 189 nice, conf):
190 """ 191 Start a component of the given type with the given nice level. 192 Will spawn a new job process to run the component in. 193 194 @param avatarId: avatar identification string 195 @type avatarId: str 196 @param type: type of the component to create 197 @type type: str 198 @param moduleName: name of the module to create the component from 199 @type moduleName: str 200 @param methodName: the factory method to use to create the component 201 @type methodName: str 202 @param nice: nice level 203 @type nice: int 204 @param conf: component config 205 @type conf: dict 206 207 @returns: a deferred fired when the process has started and created 208 the component 209 """ 210 return self.brain.create(avatarId, type, moduleName, methodName, 211 nice, conf)
212
213 - def remote_checkElements(self, elementNames):
214 """ 215 Checks if one or more GStreamer elements are present and can be 216 instantiated. 217 218 @param elementNames: names of the Gstreamer elements 219 @type elementNames: list of str 220 221 @rtype: list of str 222 @returns: a list of instantiatable element names 223 """ 224 return self.brain.runCheck('flumotion.worker.checks.check', 225 'checkElements', elementNames)
226
227 - def remote_checkImport(self, moduleName):
228 """ 229 Checks if the given module can be imported. 230 231 @param moduleName: name of the module to check 232 @type moduleName: str 233 234 @returns: None or Failure 235 """ 236 return self.brain.runCheck( 237 'flumotion.worker.checks.check', 'checkImport', 238 moduleName)
239
240 - def remote_runCheck(self, module, function, *args, **kwargs):
241 """ 242 Runs the given function in the given module with the given arguments. 243 244 @param module: module the function lives in 245 @type module: str 246 @param function: function to run 247 @type function: str 248 249 @returns: the return value of the given function in the module. 250 """ 251 return self.brain.runCheck(module, function, *args, **kwargs)
252 remote_runFunction = remote_runCheck 253
254 - def remote_getComponents(self):
255 """ 256 I return a list of componentAvatarIds, I have. I am called by the 257 manager soon after I attach to it. This is needed on reconnects 258 so that the manager knows what components it needs to start on me. 259 260 @returns: a list of componentAvatarIds 261 """ 262 return self.brain.getComponents()
263
264 - def remote_killJob(self, avatarId, signum=signal.SIGKILL):
265 """Kill one of the worker's jobs. 266 267 This method is intended for exceptional purposes only; a normal 268 component shutdown is performed by the manager via calling 269 remote_stop() on the component avatar. 270 271 Raises L{flumotion.common.errors.UnknownComponentError} if the 272 job is unknown. 273 274 @param avatarId: the avatar Id of the component, e.g. 275 '/default/audio-encoder' 276 @type avatarId: string 277 @param signum: Signal to send, optional. Defaults to SIGKILL. 278 @type signum: int 279 """ 280 self.brain.killJob(avatarId, signum)
281
282 - def remote_getVersions(self):
283 return debug.getVersions()
284
285 - def remote_listDirectory(self, directoryName):
286 """List the directory called path. 287 288 Raises L{flumotion.common.errors.NotDirectoryError} if directoryName is 289 not a directory. 290 291 @param directoryName: the name of the directory to list 292 @type directoryName: string 293 @returns: the directory 294 @rtype: deferred that will fire an object implementing L{IDirectory} 295 """ 296 return listDirectory(directoryName)
297