Package flumotion :: Package common :: Module worker
[hide private]

Source Code for Module flumotion.common.worker

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_common_worker -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """objects related to the state of workers. 
 19  """ 
 20   
 21  import os 
 22  import signal 
 23   
 24  from twisted.spread import pb 
 25  from twisted.internet import protocol 
 26   
 27  from flumotion.common import log, errors, messages 
 28  from flumotion.common.i18n import N_, gettexter 
 29  from flumotion.twisted import flavors 
 30   
 31  __version__ = "$Rev$" 
 32  T_ = gettexter() 
 33   
 34   
35 -class ProcessProtocol(protocol.ProcessProtocol):
36
37 - def __init__(self, loggable, avatarId, processType, where):
38 self.loggable = loggable 39 self.avatarId = avatarId 40 self.processType = processType # e.g., 'component' 41 self.where = where # e.g., 'worker 1' 42 43 self.setPid(None)
44
45 - def setPid(self, pid):
46 self.pid = pid
47
48 - def sendMessage(self, message):
49 raise NotImplementedError
50
51 - def processEnded(self, status):
52 # vmethod implementation 53 # status is an instance of failure.Failure 54 # status.value is a twisted.internet.error.ProcessTerminated 55 # status.value.status is the os.WAIT-like status value 56 message = None 57 obj = self.loggable 58 pid = None 59 # if we have a pid, then set pid to string value of pid 60 # otherwise set to "unknown" 61 if self.pid: 62 pid = str(self.pid) 63 else: 64 pid = "unknown" 65 if status.value.exitCode is not None: 66 obj.info("Reaped child with pid %s, exit value %d.", 67 pid, status.value.exitCode) 68 signum = status.value.signal 69 70 # SIGKILL is an explicit kill, and never generates a core dump. 71 # For any other signal we want to see if there is a core dump, 72 # and warn if not. 73 if signum is not None: 74 if signum == signal.SIGKILL: 75 obj.warning("Child with pid %s killed.", pid) 76 message = messages.Error(T_(N_("The %s was killed.\n"), 77 self.processType)) 78 else: 79 message = messages.Error(T_(N_("The %s crashed.\n"), 80 self.processType), 81 debug='Terminated with signal number %d' % signum) 82 83 # use some custom logging depending on signal 84 if signum == signal.SIGSEGV: 85 obj.warning("Child with pid %s segfaulted.", pid) 86 elif signum == signal.SIGTRAP: 87 # SIGTRAP occurs when registry is corrupt 88 obj.warning("Child with pid %s received a SIGTRAP.", 89 pid) 90 else: 91 # if we find any of these, possibly special-case them too 92 obj.info("Reaped child with pid %s signaled by " 93 "signal %d.", pid, signum) 94 95 if not os.WCOREDUMP(status.value.status): 96 obj.warning("No core dump generated. " 97 "Were core dumps enabled at the start ?") 98 message.add(T_(N_( 99 "However, no core dump was generated. " 100 "You may need to configure the environment " 101 "if you want to further debug this problem."))) 102 #message.description = T_(N_( 103 # "Learn how to enable core dumps.")) 104 else: 105 obj.info("Core dumped.") 106 corepath = os.path.join(os.getcwd(), 'core.%s' % pid) 107 if os.path.exists(corepath): 108 obj.info("Core file is probably '%s'." % corepath) 109 message.add(T_(N_( 110 "The core dump is '%s' on the host running '%s'."), 111 corepath, self.where)) 112 # FIXME: add an action that runs gdb and produces a 113 # backtrace; or produce it here and attach to the 114 # message as debug info. 115 message.description = T_(N_( 116 "Learn how to analyze core dumps.")) 117 message.section = 'chapter-debug' 118 message.anchor = 'section-os-analyze-core-dumps' 119 120 if message: 121 obj.debug('sending message to manager/admin') 122 self.sendMessage(message) 123 124 self.setPid(None)
125 126
127 -class PortSet(log.Loggable):
128 """ 129 A list of ports that keeps track of which are available for use on a 130 given machine. 131 """ 132 # not very efficient mkay 133
134 - def __init__(self, logName, ports, randomPorts=False):
135 self.logName = logName 136 self.ports = ports 137 self.used = [0] * len(ports) 138 self.random = randomPorts
139
140 - def reservePorts(self, numPorts):
141 ret = [] 142 while numPorts > 0: 143 if self.random: 144 ret.append(0) 145 numPorts -= 1 146 continue 147 if not 0 in self.used: 148 raise errors.ComponentStartError( 149 'could not allocate port on worker %s' % self.logName) 150 i = self.used.index(0) 151 ret.append(self.ports[i]) 152 self.used[i] = 1 153 numPorts -= 1 154 return ret
155
156 - def setPortsUsed(self, ports):
157 for port in ports: 158 try: 159 i = self.ports.index(port) 160 except ValueError: 161 self.warning('portset does not include port %d', port) 162 else: 163 if self.used[i]: 164 self.warning('port %d already in use!', port) 165 else: 166 self.used[i] = 1
167
168 - def releasePorts(self, ports):
169 """ 170 @param ports: list of ports to release 171 @type ports: list of int 172 """ 173 for p in ports: 174 try: 175 i = self.ports.index(p) 176 if self.used[i]: 177 self.used[i] = 0 178 else: 179 self.warning('releasing unallocated port: %d' % p) 180 except ValueError: 181 self.warning('releasing unknown port: %d' % p)
182
183 - def numFree(self):
184 return len(self.ports) - self.numUsed()
185
186 - def numUsed(self):
187 return len(filter(None, self.used))
188 189 # worker heaven state proxy objects 190 191
192 -class ManagerWorkerHeavenState(flavors.StateCacheable):
193 """ 194 I represent the state of the worker heaven on the manager. 195 196 I have the following keys: 197 198 - names (list): list of worker names that we have state for 199 - workers (list): list of L{ManagerWorkerState} 200 """ 201
202 - def __init__(self):
203 flavors.StateCacheable.__init__(self) 204 self.addListKey('names', []) 205 self.addListKey('workers', []) # should be a dict
206
207 - def __repr__(self):
208 return "%r" % self._dict
209 210
211 -class AdminWorkerHeavenState(flavors.StateRemoteCache):
212 """ 213 I represent the state of the worker heaven in the admin. 214 See L{ManagerWorkerHeavenState} 215 """ 216 pass
217 218 pb.setUnjellyableForClass(ManagerWorkerHeavenState, AdminWorkerHeavenState) 219 220
221 -class ManagerWorkerState(flavors.StateCacheable):
222 """ 223 I represent the state of a worker in the manager. 224 225 - name: name of the worker 226 - host: the IP address of the worker as seen by the manager 227 """ 228
229 - def __init__(self, **kwargs):
230 flavors.StateCacheable.__init__(self) 231 self.addKey('name') 232 self.addKey('host') 233 for k, v in kwargs.items(): 234 self.set(k, v)
235
236 - def __repr__(self):
237 return ("<ManagerWorkerState for %s on %s>" 238 % (self.get('name'), self.get('host')))
239 240
241 -class AdminWorkerState(flavors.StateRemoteCache):
242 """ 243 I represent the state of a worker in the admin. 244 245 See L{ManagerWorkerState} 246 """ 247 pass
248 249 pb.setUnjellyableForClass(ManagerWorkerState, AdminWorkerState) 250