1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 Servicer object used in service scripts
20 """
21
22 import os
23 import glob
24 import time
25
26 from flumotion.configure import configure
27 from flumotion.common import errors, log
28 from flumotion.common.python import makedirs
29 from flumotion.common.process import checkPidRunning, deletePidFile, getPid, \
30 killPid, termPid, waitPidFile
31
32 __version__ = "$Rev$"
33
34
36 """
37 I manage running managers and workers on behalf of a service script.
38 """
39
40 logCategory = 'servicer'
41
42 - def __init__(self, configDir=None, logDir=None, runDir=None):
43 """
44 @type configDir: string
45 @param configDir: overridden path to the configuration directory.
46 @type logDir: string
47 @param logDir: overridden path to the log directory.
48 @type runDir: string
49 @param runDir: overridden path to the run directory.
50 """
51 self.managersDir = os.path.join(configure.configdir, 'managers')
52 self.workersDir = os.path.join(configure.configdir, 'workers')
53 self._overrideDir = {
54 'logdir': logDir,
55 'rundir': runDir,
56 }
57
59
60
61 managers = []
62 workers = []
63
64 if not args:
65 managers = self.getManagers().keys()
66 managers.sort()
67 workers = self.getWorkers()
68 workers.sort()
69 return (managers, workers)
70
71 which = args[0]
72 if which not in ['manager', 'worker']:
73 raise errors.FatalError, 'Please specify either manager or worker'
74
75 if len(args) < 2:
76 raise errors.FatalError, 'Please specify which %s to %s' % (
77 which, command)
78
79 name = args[1]
80 if which == 'manager':
81 managers = self.getManagers()
82 if not name in managers:
83 raise errors.FatalError, 'No manager "%s"' % name
84 managers = [name, ]
85 elif which == 'worker':
86 workers = self.getWorkers()
87 if not name in workers:
88 raise errors.FatalError, 'No worker with name %s' % name
89 workers = [name, ]
90
91 return (managers, workers)
92
94 """
95 Return a list of override directories for configure.configure
96 suitable for appending to a command line.
97 """
98 args = []
99 for key, value in self._overrideDir.items():
100 if value:
101 args.append('--%s=%s' % (key, value))
102 return " ".join(args)
103
105 """
106 @returns: a dictionary of manager names -> flow names
107 """
108 managers = {}
109
110 self.log('getManagers()')
111 if not os.path.exists(self.managersDir):
112 return managers
113
114 for managerDir in glob.glob(os.path.join(self.managersDir, '*')):
115 flows = []
116
117 flowsDir = os.path.join(managerDir, 'flows')
118 if os.path.exists(flowsDir):
119 flowFiles = glob.glob(os.path.join(flowsDir, '*.xml'))
120 for flowFile in flowFiles:
121 filename = os.path.split(flowFile)[1]
122 name = filename.split(".xml")[0]
123 flows.append(name)
124 managerName = os.path.split(managerDir)[1]
125 self.log('Adding flows %r to manager %s' % (flows, managerName))
126 managers[managerName] = flows
127 self.log('returning managers: %r' % managers)
128 return managers
129
131 """
132 @returns: a list of worker names
133 """
134 workers = []
135
136 if not os.path.exists(self.workersDir):
137 return workers
138
139 for workerFile in glob.glob(os.path.join(self.workersDir, '*.xml')):
140 filename = os.path.split(workerFile)[1]
141 name = filename.split(".xml")[0]
142 name = name.split("-disabled")[0]
143 workers.append(name)
144 workers.sort()
145 return workers
146
148 """
149 Start processes as given in the args.
150
151 If nothing specified, start all managers and workers.
152 If first argument is "manager", start given manager.
153 If first argument is "worker", start given worker.
154
155 @returns: an exit value reflecting the number of processes that failed
156 to start
157 """
158 (managers, workers) = self._parseManagersWorkers('start', args)
159 self.debug("Start managers %r and workers %r" % (managers, workers))
160 managersDict = self.getManagers()
161 exitvalue = 0
162
163 for name in managers:
164 if not self.startManager(name, managersDict[name]):
165 exitvalue += 1
166 for name in workers:
167 if not self.startWorker(name):
168 exitvalue += 1
169
170 return exitvalue
171
172 - def stop(self, args):
173 """
174 Stop processes as given in the args.
175
176 If nothing specified, stop all managers and workers.
177 If first argument is "manager", stop given manager.
178 If first argument is "worker", stop given worker.
179
180 @returns: an exit value reflecting the number of processes that failed
181 to stop
182 """
183 (managers, workers) = self._parseManagersWorkers('stop', args)
184 self.debug("Stop managers %r and workers %r" % (managers, workers))
185
186 exitvalue = 0
187
188 for name in workers:
189 if not self.stopWorker(name):
190 exitvalue += 1
191 for name in managers:
192 if not self.stopManager(name):
193 exitvalue += 1
194
195 return exitvalue
196
198 """
199 Give status on processes as given in the args.
200 """
201 (managers, workers) = self._parseManagersWorkers('status', args)
202 self.debug("Status managers %r and workers %r" % (managers, workers))
203 for kind, names in [('manager', managers), ('worker', workers)]:
204 for name in names:
205 pid = getPid(kind, name)
206 if not pid:
207 if self.checkDisabled(kind, name):
208 print "%s %s is disabled" % (kind, name)
209 else:
210 print "%s %s not running" % (kind, name)
211 continue
212 if checkPidRunning(pid):
213 print "%s %s is running with pid %d" % (kind, name, pid)
214 else:
215 print "%s %s dead (stale pid %d)" % (kind, name, pid)
216
218 """
219 Clean up dead process pid files as given in the args.
220 """
221 (managers, workers) = self._parseManagersWorkers('clean', args)
222 self.debug("Clean managers %r and workers %r" % (managers, workers))
223 for kind, names in [('manager', managers), ('worker', workers)]:
224 for name in names:
225 pid = getPid(kind, name)
226 if not pid:
227
228 try:
229 deletePidFile(kind, name)
230 print "deleted bogus pid file for %s %s" % (kind, name)
231 except OSError:
232 print ("failed to delete pid file for %s %s "
233 "- ignoring" % (kind, name))
234 continue
235 if not checkPidRunning(pid):
236 self.debug("Cleaning up stale pid %d for %s %s" % (
237 pid, kind, name))
238 print "deleting stale pid file for %s %s" % (kind, name)
239 deletePidFile(kind, name)
240
242 """
243 Restart running processes as given in the args.
244
245 If nothing specified, condrestart all managers and workers.
246 If first argument is "manager", condrestart given manager.
247 If first argument is "worker", condrestart given worker.
248
249 @returns: an exit value reflecting the number of processes that failed
250 to start
251 """
252 (managers, workers) = self._parseManagersWorkers('condrestart', args)
253 self.debug("condrestart managers %r and workers %r" % (
254 managers, workers))
255 managersDict = self.getManagers()
256 exitvalue = 0
257
258 for kind, names in [('manager', managers), ('worker', workers)]:
259 for name in names:
260 pid = getPid(kind, name)
261 if not pid:
262 continue
263 if checkPidRunning(pid):
264 if kind == 'manager':
265 if not self.stopManager(name):
266 exitvalue += 1
267 continue
268 if not self.startManager(name, managersDict[name]):
269 exitvalue += 1
270 elif kind == 'worker':
271 if not self.stopWorker(name):
272 exitvalue += 1
273 continue
274 if not self.startWorker(name):
275 exitvalue += 1
276 else:
277 print "%s %s dead (stale pid %d)" % (kind, name, pid)
278
279 return exitvalue
280
282
283
284
285
286 """
287 Create a default manager or worker config.
288 """
289 if len(args) == 0:
290 raise errors.FatalError, \
291 "Please specify 'manager' or 'worker' to create."
292 kind = args[0]
293 if len(args) == 1:
294 raise errors.FatalError, \
295 "Please specify name of %s to create." % kind
296 name = args[1]
297
298 port = 7531
299 if len(args) == 3:
300 port = int(args[2])
301
302 if kind == 'manager':
303 self.createManager(name, port)
304 elif kind == 'worker':
305 self.createWorker(name, managerPort=port, randomFeederports=True)
306 else:
307 raise errors.FatalError, \
308 "Please specify 'manager' or 'worker' to create."
309
311 """
312 Create a sample manager.
313
314 @returns: whether or not the config was created.
315 """
316 self.info("Creating manager %s" % name)
317 managerDir = os.path.join(self.managersDir, name)
318 if os.path.exists(managerDir):
319 raise errors.FatalError, \
320 "Manager directory %s already exists" % managerDir
321 makedirs(managerDir)
322
323 planetFile = os.path.join(managerDir, 'planet.xml')
324
325
326 pemFile = os.path.join(configure.configdir, 'default.pem')
327 if not os.path.exists(pemFile):
328
329 retval = os.system("sh %s %s" % (
330 os.path.join(configure.datadir, 'make-dummy-cert'), pemFile))
331
332
333
334
335
336 if retval != 0:
337 pemFile = 'default.pem'
338
339
340 handle = open(planetFile, 'w')
341 handle.write("""<planet>
342 <manager>
343 <debug>4</debug>
344 <host>localhost</host>
345 <port>%(port)d</port>
346 <transport>ssl</transport>
347 <!-- certificate path can be relative to $sysconfdir/flumotion,
348 or absolute -->
349 <certificate>%(pemFile)s</certificate>
350 <component name="manager-bouncer" type="htpasswdcrypt-bouncer">
351 <property name="data"><![CDATA[
352 user:PSfNpHTkpTx1M
353 ]]></property>
354 </component>
355 </manager>
356 </planet>
357 """ % locals())
358 handle.close()
359
360 return True
361
362 - def createWorker(self, name, managerPort=7531, randomFeederports=False):
363 """
364 Create a sample worker.
365
366 @returns: whether or not the config was created.
367 """
368 makedirs(self.workersDir)
369 self.info("Creating worker %s" % name)
370 workerFile = os.path.join(self.workersDir, "%s.xml" % name)
371 if os.path.exists(workerFile):
372 raise errors.FatalError, \
373 "Worker file %s already exists." % workerFile
374
375 feederports = " <!-- <feederports>8600-8639</feederports> -->"
376 if randomFeederports:
377 feederports = ' <feederports random="True" />'
378
379 handle = open(workerFile, 'w')
380 handle.write("""<worker>
381
382 <debug>4</debug>
383
384 <manager>
385 <host>localhost</host>
386 <port>%(managerPort)s</port>
387 </manager>
388
389 <authentication type="plaintext">
390 <username>user</username>
391 <password>test</password>
392 </authentication>
393
394 %(feederports)s
395
396 </worker>
397 """ % locals())
398 handle.close()
399
400 return True
401
403 """
404 Start the manager as configured in the manager directory for the given
405 manager name, together with the given flows.
406
407 @returns: whether or not the manager daemon started
408 """
409 self.info("Starting manager %s" % name)
410
411 if self.checkDisabled('manager', name):
412 print "manager %s is disabled, cannot start" % name
413 return
414
415 self.debug("Starting manager with flows %r" % flowNames)
416 managerDir = os.path.join(self.managersDir, name)
417 planetFile = os.path.join(managerDir, 'planet.xml')
418 if not os.path.exists(planetFile):
419 raise errors.FatalError, \
420 "Planet file %s does not exist" % planetFile
421 self.info("Loading planet %s" % planetFile)
422
423 flowsDir = os.path.join(managerDir, 'flows')
424 flowFiles = []
425 for flowName in flowNames:
426 flowFile = os.path.join(flowsDir, "%s.xml" % flowName)
427 if not os.path.exists(flowFile):
428 raise errors.FatalError, \
429 "Flow file %s does not exist" % flowFile
430 flowFiles.append(flowFile)
431 self.info("Loading flow %s" % flowFile)
432
433 pid = getPid('manager', name)
434 if pid:
435 if checkPidRunning(pid):
436 raise errors.FatalError, \
437 "Manager %s is already running (with pid %d)" % (name, pid)
438 else:
439
440
441 self.warning("Removing stale pid file %d for manager %s",
442 pid, name)
443 deletePidFile('manager', name)
444
445 dirOptions = self._getDirOptions()
446 command = "flumotion-manager %s -D --daemonize-to %s " \
447 "--service-name %s %s %s" % (
448 dirOptions, configure.daemondir, name, planetFile,
449 " ".join(flowFiles))
450 self.debug("starting process %s" % command)
451 retval = self.startProcess(command)
452
453 if retval == 0:
454 self.debug("Waiting for pid for manager %s" % name)
455 pid = waitPidFile('manager', name)
456 if pid:
457 self.info("Started manager %s with pid %d" % (name, pid))
458 return True
459 else:
460 self.warning("manager %s could not start" % name)
461 return False
462
463 self.warning("manager %s could not start (return value %d)" % (
464 name, retval))
465 return False
466
468 """
469 Start the worker as configured in the worker directory for the given
470 worker name.
471
472 @returns: whether or not the worker daemon started
473 """
474 self.info("Starting worker %s" % name)
475
476 if self.checkDisabled('worker', name):
477 print "worker %s is disabled, cannot start" % name
478 return
479
480 workerFile = os.path.join(self.workersDir, "%s.xml" % name)
481 if not os.path.exists(workerFile):
482 raise errors.FatalError, \
483 "Worker file %s does not exist" % workerFile
484
485 pid = getPid('worker', name)
486 if pid:
487 if checkPidRunning(pid):
488 raise errors.FatalError, \
489 "Worker %s is already running (with pid %d)" % (name, pid)
490 else:
491
492
493 self.warning("Removing stale pid file %d for worker %s",
494 pid, name)
495 deletePidFile('worker', name)
496
497
498 self.info("Loading worker %s" % workerFile)
499
500 dirOptions = self._getDirOptions()
501 command = "flumotion-worker %s -D --daemonize-to %s " \
502 "--service-name %s %s" % (
503 dirOptions, configure.daemondir, name, workerFile)
504 self.debug("Running %s" % command)
505 retval = self.startProcess(command)
506
507 if retval == 0:
508 self.debug("Waiting for pid for worker %s" % name)
509 pid = waitPidFile('worker', name)
510 if pid:
511 self.info("Started worker %s with pid %d" % (name, pid))
512 return True
513 else:
514 self.warning("worker %s could not start" % name)
515 return False
516
517 self.warning("worker %s could not start (return value %d)" % (
518 name, retval))
519 return False
520
522 """
523 Start the given process and block.
524 Returns the exit status of the process, or -1 in case of another error.
525 """
526 status = os.system(command)
527 if os.WIFEXITED(status):
528 retval = os.WEXITSTATUS(status)
529 return retval
530
531
532 return -1
533
561
590
592 """
593 Stop the process with the given pid.
594 Wait until the pid has disappeared.
595 """
596 startClock = time.clock()
597 termClock = startClock + configure.processTermWait
598 killClock = termClock + configure.processKillWait
599
600 self.debug('stopping process with pid %d' % pid)
601 if not termPid(pid):
602 self.warning('No process with pid %d' % pid)
603 return False
604
605
606 while (checkPidRunning(pid)):
607 if time.clock() > termClock:
608 self.warning("Process with pid %d has not responded to TERM " \
609 "for %d seconds, killing" % (pid,
610 configure.processTermWait))
611 killPid(pid)
612
613 termClock = killClock + 1.0
614
615 if time.clock() > killClock:
616 self.warning("Process with pid %d has not responded to KILL " \
617 "for %d seconds, stopping" % (pid,
618 configure.processKillWait))
619 return False
620
621
622
623 return True
624
649
651 if len(args) < 1:
652 raise errors.FatalError, 'Please specify what to disable'
653
654 which = args[0]
655 if which not in ['manager', 'worker']:
656 raise errors.FatalError, 'Please specify either manager or worker'
657
658 if len(args) < 2:
659 raise errors.FatalError, 'Please specify which %s to %s' % (
660 which, 'enable')
661
662 name = args[1]
663 if which == 'manager':
664 managers = self.getManagers()
665 if not name in managers:
666 raise errors.FatalError, 'No manager "%s"' % name
667 pid = getPid('manager', name)
668 if pid:
669 if checkPidRunning(pid):
670 raise errors.FatalError, "Manager %s is running" % name
671 self.disableManager(name)
672 elif which == 'worker':
673 workers = self.getWorkers()
674 if not name in workers:
675 raise errors.FatalError, 'No worker with name %s' % name
676 pid = getPid('worker', name)
677 if pid:
678 if checkPidRunning(pid):
679 raise errors.FatalError, "Worker %s is running" % name
680 self.disableWorker(name)
681 return
682
684 self.debug("Enabling manager %s" % name)
685 managerDir = os.path.join(self.managersDir, name)
686 planetDisabledFile = os.path.join(managerDir, 'planet-disabled.xml')
687 planetFile = os.path.join(managerDir, 'planet.xml')
688 if not os.path.exists(planetDisabledFile):
689 if not os.path.exists(planetFile):
690 raise errors.FatalError, \
691 "Planet file %s does not exist" % planetFile
692 else:
693 print "manager %s already enabled" % name
694 return
695 else:
696 os.rename(planetDisabledFile, planetFile)
697 print "manager %s enabled" %name
698
700 self.debug("Enabling worker %s" % name)
701 workerFile = os.path.join(self.workersDir, "%s.xml" % name)
702 workerDisFile = os.path.join(self.workersDir, "%s-disabled.xml" % name)
703 if not os.path.exists(workerDisFile):
704 if not os.path.exists(workerFile):
705 raise errors.FatalError, \
706 "Worker file %s does not exist" % workerFile
707 else:
708 print "worker %s already enabled" % name
709 else:
710 os.rename(workerDisFile, workerFile)
711 print "worker %s enabled" % name
712
714 self.debug("Disabling manager %s" % name)
715 managerDir = os.path.join(self.managersDir, name)
716 planetDisabledFile = os.path.join(managerDir, 'planet-disabled.xml')
717 planetFile = os.path.join(managerDir, 'planet.xml')
718 if not os.path.exists(planetFile):
719 if not os.path.exists(planetDisabledFile):
720 raise errors.FatalError, \
721 "Planet file %s does not exist" % planetFile
722 else:
723 print "manager %s already disabled" % name
724 return
725 else:
726 os.rename(planetFile, planetDisabledFile)
727 print "manager %s disabled" %name
728
730 self.debug("Disabling worker %s" % name)
731 workerFile = os.path.join(self.workersDir, "%s.xml" % name)
732 workerDisFile = os.path.join(self.workersDir, "%s-disabled.xml" % name)
733 if not os.path.exists(workerFile):
734 if not os.path.exists(workerDisFile):
735 raise errors.FatalError, \
736 "Worker file %s does not exist" % workerFile
737 else:
738 print "worker %s already disabled" % name
739 else:
740 os.rename(workerFile, workerDisFile)
741 print "worker %s disabled" % name
742
744 if type == 'manager':
745 managerDir = os.path.join(self.managersDir, name)
746 planetDisFile = os.path.join(managerDir, 'planet-disabled.xml')
747 planetFile = os.path.join(managerDir, 'planet.xml')
748 if not os.path.exists(planetFile):
749 if os.path.exists(planetDisFile):
750 return True
751 return False
752 elif type == 'worker':
753 workerFile = os.path.join(self.workersDir, "%s.xml" % name)
754 wkDisFile = os.path.join(self.workersDir, "%s-disabled.xml" % name)
755 if not os.path.exists(workerFile):
756 if os.path.exists(wkDisFile):
757 return True
758 return False
759
761 """
762 List all service parts managed.
763 """
764 managers = self.getManagers()
765 for name in managers.keys():
766 flows = managers[name]
767 print "manager %s" % name
768 if flows:
769 for flow in flows:
770 print " flow %s" % flow
771
772 workers = self.getWorkers()
773 for worker in workers:
774 print "worker %s" % worker
775