1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 worker-side objects for components
20 """
21
22 import os
23 import time
24 import socket
25
26 from twisted.internet import reactor, error, defer
27 from twisted.spread import pb
28 from twisted.python import reflect
29 from zope.interface import implements
30
31 from flumotion.common import interfaces, errors, log, planet, medium
32 from flumotion.common import componentui, common, messages
33 from flumotion.common import interfaces, reflectcall, debug
34 from flumotion.common.i18n import N_, gettexter
35 from flumotion.common.planet import moods
36 from flumotion.common.poller import Poller
37 from flumotion.twisted import pb as fpb
38 from flumotion.twisted.flavors import IStateCacheableListener
39
40
41 __version__ = "$Rev$"
42 T_ = gettexter()
43
44
46 """
47 I am a client factory for a component logging in to the manager.
48 """
49 logCategory = 'component'
50 perspectiveInterface = interfaces.IComponentMedium
51
70
74
75
76
78
79 def remoteDisconnected(remoteReference):
80 if reactor.killed:
81 self.log('Connection to manager lost due to shutdown')
82 else:
83 self.warning('Lost connection to manager, '
84 'will attempt to reconnect')
85
86 def loginCallback(reference):
87 self.info("Logged in to manager")
88 self.debug("remote reference %r" % reference)
89
90 self.medium.setRemoteReference(reference)
91 reference.notifyOnDisconnect(remoteDisconnected)
92
93 def loginFailedDisconnect(failure):
94
95
96 self.debug('Login failed, reason: %s, disconnecting', failure)
97 self.disconnect()
98 return failure
99
100 def accessDeniedErrback(failure):
101 failure.trap(errors.NotAuthenticatedError)
102 self.warning('Access denied.')
103
104 def connectionRefusedErrback(failure):
105 failure.trap(error.ConnectionRefusedError)
106 self.warning('Connection to manager refused.')
107
108 def alreadyLoggedInErrback(failure):
109 failure.trap(errors.AlreadyConnectedError)
110 self.warning('Component with id %s is already logged in.',
111 self.medium.authenticator.avatarId)
112
113 def loginFailedErrback(failure):
114 self.warning('Login failed, reason: %s' % failure)
115
116 d.addCallback(loginCallback)
117 d.addErrback(loginFailedDisconnect)
118 d.addErrback(accessDeniedErrback)
119 d.addErrback(connectionRefusedErrback)
120 d.addErrback(alreadyLoggedInErrback)
121 d.addErrback(loginFailedErrback)
122
123
124
128
129
131 """
132 Creates a deferred chain created by chaining calls to the given
133 procedures, each of them made with the given args and kwargs.
134 Only the result of the last procedure is returned; results for the
135 other procedures are discarded.
136
137 Failures triggered during any of the procedure short-circuit execution
138 of the other procedures and should be handled by the errbacks attached
139 to the deferred returned here.
140
141 @rtype: L{twisted.internet.defer.Deferred}
142 """
143
144 def call_proc(_, p):
145 log.debug('', 'calling %r', p)
146 return p(*args, **kwargs)
147 if not procs:
148 return defer.succeed(None)
149 p, procs = procs[0], procs[1:]
150 d = defer.maybeDeferred(call_proc, None, p)
151 for p in procs:
152 d.addCallback(call_proc, p)
153 return d
154
155
156
157
159 """
160 I am a medium interfacing with a manager-side avatar.
161 I implement a Referenceable for the manager's avatar to call on me.
162 I have a remote reference to the manager's avatar to call upon.
163 I am created by the L{ComponentClientFactory}.
164
165 @cvar authenticator: the authenticator used to log in to manager
166 @type authenticator: L{flumotion.twisted.pb.Authenticator}
167 """
168
169 implements(interfaces.IComponentMedium)
170 logCategory = 'basecompmed'
171
173 """
174 @param component: L{flumotion.component.component.BaseComponent}
175 """
176 self.comp = component
177 self.authenticator = None
178 self.broker = None
179
183
184
185
186 - def setup(self, config):
188
190 """
191 Return the manager IP as seen by us.
192 """
193 assert self.remote or self.broker
194 broker = self.broker or self.remote.broker
195 peer = broker.transport.getPeer()
196 try:
197 host = peer.host
198 except AttributeError:
199 host = peer[1]
200
201 res = socket.gethostbyname(host)
202 self.debug("getManagerIP(): we think the manager's IP is %r" % res)
203 return res
204
206 """
207 Return the IP of this component based on connection to the manager.
208
209 Note: this is insufficient in general, and should be replaced by
210 network mapping stuff later.
211 """
212 assert self.remote, "%r does not have a remote connection" % self
213 host = self.remote.broker.transport.getHost()
214 self.debug("getIP(): using %r as our IP", host.host)
215 return host.host
216
218 """
219 Set the authenticator the client factory has used to log in to the
220 manager. Can be reused by the component's medium to make
221 feed connections which also get authenticated by the manager's
222 bouncer.
223
224 @type authenticator: L{flumotion.twisted.pb.Authenticator}
225 """
226 self.authenticator = authenticator
227
228
229
230
232 """
233 Return the state of the component, which will be serialized to a
234 L{flumotion.common.planet.ManagerJobState} object.
235
236 @rtype: L{flumotion.common.planet.WorkerJobState}
237 @returns: state of component
238 """
239
240
241 self.comp.state.set('manager-ip', self.getManagerIP())
242 return self.comp.state
243
245 """
246 Return the configuration of the component.
247
248 @rtype: dict
249 @returns: component's current configuration
250 """
251 return self.comp.config
252
254 self.info('Stopping component')
255 return self.comp.stop()
256
261
263 """Get a WorkerComponentUIState containing details needed to
264 present an admin-side UI state
265 """
266 return self.comp.uiState
267
269 """Get mood of the component
270 """
271 return self.comp.getMood()
272
274 """
275 Base implementation of getMasterClockInfo, can be overridden by
276 subclasses. By default, just returns None.
277 """
278 return None
279
282
284 """
285 Sets the Flumotion debugging levels based on the passed debug string.
286
287 @since: 0.6.0
288 """
289 self.debug('Setting Flumotion debug level to %s' % debug)
290 self.comp.uiState.set('flu-debug', debug)
291 log.setDebug(debug)
292
294 """
295 Modifies a component property on the fly
296
297 @since: 0.9.0
298 """
299 self.info('Modifying property %s to %s', property, value)
300 return self.comp.modifyProperty(property, value)
301
302
304 """
305 I am the base class for all Flumotion components.
306
307 @ivar name: the name of the component
308 @type name: string
309 @ivar medium: the component's medium
310 @type medium: L{BaseComponentMedium}
311 @ivar uiState: state of the component to be shown in a UI.
312 Contains at least the following keys.
313 - cpu-percent: percentage of CPU use in last interval
314 - start-time: time when component was started, in epoch
315 seconds
316 - current-time: current time in epoch seconds, as seen on
317 component's machine, which might be out of
318 sync
319 - virtual-size: virtual memory size in bytes
320 Subclasses can add additional keys for their respective UI.
321 @type uiState: L{componentui.WorkerComponentUIState}
322
323 @cvar componentMediumClass: the medium class to use for this component
324 @type componentMediumClass: child class of L{BaseComponentMedium}
325 """
326
327 logCategory = 'basecomp'
328 componentMediumClass = BaseComponentMedium
329
330 implements(IStateCacheableListener)
331
332 - def __init__(self, config, haveError=None):
333 """
334 Subclasses should not override __init__ at all.
335
336 Instead, they should implement init(), which will be called
337 by this implementation automatically.
338
339 L{flumotion.common.common.InitMixin} for more details.
340 """
341 self.debug("initializing %r with config %r", type(self), config)
342 self.config = config
343 self._haveError = haveError
344
345
346 common.InitMixin.__init__(self)
347
348 self.setup()
349
350
351
353 """
354 A subclass should do as little as possible in its init method.
355 In particular, it should not try to access resources.
356
357 Failures during init are marshalled back to the manager through
358 the worker's remote_create method, since there is no component state
359 proxied to the manager yet at the time of init.
360 """
361 self.state = planet.WorkerJobState()
362
363 self.name = self.config['name']
364
365 self.state.set('pid', os.getpid())
366 self.setMood(moods.waking)
367
368 self.medium = None
369
370 self.uiState = componentui.WorkerComponentUIState()
371 self.uiState.addKey('cpu-percent')
372 self.uiState.addKey('start-time')
373 self.uiState.addKey('current-time')
374 self.uiState.addKey('virtual-size')
375 self.uiState.addKey('total-memory')
376 self.uiState.addKey('num-cpus')
377 self.uiState.addKey('flu-debug')
378 self.uiState.addKey('properties')
379
380 self.uiState.addHook(self)
381
382 self.plugs = {}
383
384 self._happyWaits = []
385
386
387 self._lastTime = time.time()
388 self._lastClock = time.clock()
389 self._cpuPoller = Poller(self._pollCPU, 5, start=False)
390 self._memoryPoller = Poller(self._pollMemory, 60, start=False)
391 self._cpuPollerDC = None
392 self._memoryPollerDC = None
393 self._shutdownHook = None
394
395
396
398 """
399 Triggered when a uiState observer was added.
400
401 Default implementation is to start the memory and cpu pollers.
402
403 Note:
404 Subclasses can override me but should chain me up to start these
405 pollers
406 """
407 self.debug("observer has started watching us, starting pollers")
408 if not self._cpuPoller.running and not self._cpuPollerDC:
409 self._cpuPollerDC = reactor.callLater(0,
410 self._cpuPoller.start,
411 immediately=True)
412 if not self._memoryPoller.running and not self._memoryPollerDC:
413 self._memoryPollerDC = reactor.callLater(0,
414 self._memoryPoller.start,
415 immediately=True)
416
418 """
419 Triggered when a uiState observer has left.
420
421 Default implementation is to stop the memory and cpu pollers
422 when the total number of observers denoted by the 'num'
423 argument becomes zero.
424
425 Note:
426 Subclasses can override me but should chain me up to stop these
427 pollers
428 """
429 if num == 0:
430 self.debug("no more observers left, shutting down pollers")
431
432 if self._cpuPollerDC:
433 self._cpuPollerDC.cancel()
434 self._cpuPollerDC = None
435 if self._memoryPollerDC:
436 self._memoryPollerDC.cancel()
437 self._memoryPollerDC = None
438
439 if self._cpuPoller:
440 self._cpuPoller.stop()
441 if self._memoryPoller:
442 self._memoryPoller.stop()
443
445 """
446 Subclasses can implement me to run any checks before the component
447 performs setup.
448
449 Messages can be added to the component state's 'messages' list key.
450 Any error messages added will trigger the component going to sad,
451 with L{flumotion.common.errors.ComponentSetupError} being raised
452 before getting to setup stage; do_setup() will not be called.
453
454 In the event of a fatal problem that can't be expressed through an
455 error message, this method should raise an exception or return a
456 failure.
457
458 It is not necessary to chain up in this function. The return
459 value may be a deferred.
460 """
461 return defer.maybeDeferred(self.check_properties,
462 self.config['properties'],
463 self.addMessage)
464
466 """
467 BaseComponent convenience vmethod for running checks.
468
469 A component implementation can override this method to run any
470 checks that it needs to. Typically, a check_properties
471 implementation will call the provided addMessage() callback to
472 note warnings or errors. For errors, addMessage() will set
473 component's mood to sad, which will abort the init process
474 before getting to do_setup().
475
476 @param properties: The component's properties
477 @type properties: dict of string => object
478 @param addMessage: Thunk to add a message to the component
479 state. Will raise an exception if the
480 message is of level ERROR.
481 @type addMessage: L{flumotion.common.messages.Message} -> None
482 """
483 pass
484
486 """
487 Subclasses can implement me to set up the component before it is
488 started. It should set up the component, possibly opening files
489 and resources.
490 Non-programming errors should not be raised, but returned as a
491 failing deferred.
492
493 The return value may be a deferred.
494 """
495 plug_starts = []
496 for socket, plugs in self.config['plugs'].items():
497 self.plugs[socket] = []
498 for plug in plugs:
499 entry = plug['entries']['default']
500 instance = reflectcall.reflectCall(entry['module-name'],
501 entry['function-name'],
502 plug)
503 self.plugs[socket].append(instance)
504 self.debug('Starting plug %r on socket %s',
505 instance, socket)
506 plug_starts.append(instance.start)
507
508
509
510 checks = common.get_all_methods(self, 'do_check', False)
511
512 def checkErrorCallback(result):
513
514
515
516
517 current = self.state.get('mood')
518 if current == moods.sad.value:
519 self.warning('Running checks made the component sad.')
520 raise errors.ComponentSetupHandledError()
521
522 checks.append(checkErrorCallback)
523
524 return _maybeDeferredChain(plug_starts + checks, self)
525
527 """
528 BaseComponent vmethod for stopping.
529 The component should do any cleanup it needs, but must not set the
530 component's mood to sleeping.
531
532 @Returns: L{twisted.internet.defer.Deferred}
533 """
534 plug_stops = []
535 for socket, plugs in self.plugs.items():
536 for plug in plugs:
537 self.debug('Stopping plug %r on socket %s', plug, socket)
538 plug_stops.append(plug.stop)
539
540 for message in self.state.get('messages'):
541
542 self.state.remove('messages', message)
543
544
545 if self._cpuPollerDC:
546 self._cpuPollerDC.cancel()
547 self._cpuPollerDC = None
548 if self._memoryPollerDC:
549 self._memoryPollerDC.cancel()
550 self._memoryPollerDC = None
551
552 if self._cpuPoller:
553 self._cpuPoller.stop()
554 self._cpuPoller = None
555 if self._memoryPoller:
556 self._memoryPoller.stop()
557 self._memoryPoller = None
558
559 if self._shutdownHook:
560 self.debug('_stoppedCallback: firing shutdown hook')
561 self._shutdownHook()
562
563 return _maybeDeferredChain(plug_stops, self)
564
565
566
568 """
569 Sets up the component. Called during __init__, so be sure not
570 to raise exceptions, instead adding messages to the component
571 state.
572 """
573
574 def run_setups():
575 setups = common.get_all_methods(self, 'do_setup', False)
576 return _maybeDeferredChain(setups, self)
577
578 def setup_complete(_):
579 self.debug('setup completed')
580 self.setup_completed()
581
582 def got_error(failure):
583 txt = log.getFailureMessage(failure)
584 self.debug('got_error: %s', txt)
585 if not failure.check(errors.ComponentSetupHandledError):
586 self.warning('Setup failed: %s', txt)
587 m = messages.Error(T_(N_("Could not setup component.")),
588 debug=txt,
589 mid="component-setup-%s" % self.name)
590
591 self.addMessage(m)
592
593
594 return None
595
596 self.setMood(moods.waking)
597 self.uiState.set('start-time', time.time())
598
599 self.uiState.set('total-memory', self._getTotalMemory())
600 self.uiState.set('num-cpus', self._getNumberOfCPUs())
601 self.uiState.set('flu-debug', log.getDebug())
602
603 d = run_setups()
604 d.addCallbacks(setup_complete, got_error)
605
606
608 self.debug('turning happy')
609 self.setMood(moods.happy)
610
612 """
613 Set the shutdown hook for this component (replacing any previous hook).
614 When a component is stopped, then this hook will be fired.
615 """
616 self._shutdownHook = shutdownHook
617
619 """
620 Tell the component to stop.
621 The connection to the manager will be closed.
622 The job process will also finish.
623 """
624 self.debug('BaseComponent.stop')
625
626
627 self.setMood(moods.waking)
628
629
630 stops = common.get_all_methods(self, 'do_stop', True)
631 return _maybeDeferredChain(stops, self)
632
633
634
637
639 self.state.set('workerName', workerName)
640
643
651
653 """
654 Set the given mood on the component if it's different from the current
655 one.
656 """
657 current = self.state.get('mood')
658
659 if current == mood.value:
660 self.log('already in mood %r' % mood)
661 return
662 elif current == moods.sad.value:
663 self.info('tried to set mood to %r, but already sad :-(' % mood)
664 return
665
666 self.doLog(log.DEBUG, -2, 'MOOD changed to %r by caller', mood)
667 self.state.set('mood', mood.value)
668
669 if mood == moods.happy:
670 while self._happyWaits:
671 self._happyWaits.pop(0).callback(None)
672 elif mood == moods.sad:
673 while self._happyWaits:
674 self._happyWaits.pop(0).errback(errors.ComponentStartError())
675
677 """
678 Gets the mood on the component.
679
680 @rtype: int
681 """
682 return self.state.get('mood')
683
694
696 """
697 Add a message to the component.
698 If any of the messages is an error, the component will turn sad.
699
700 @type message: L{flumotion.common.messages.Message}
701 """
702 self.removeMessage(message.id)
703 self.state.append('messages', message)
704 if message.level == messages.ERROR:
705 self.debug('error message, turning sad')
706 self.setMood(moods.sad)
707 if self._haveError:
708 self._haveError(message)
709
711 """
712 Remove a message with a given id from the component.
713 @type mid: str
714 """
715 for msg in self.state.get('messages', []):
716 if msg.id == mid:
717 self.state.remove('messages', msg)
718
720 """
721 Add a warning messages for deprecated properties.
722
723 @param list: list of property names.
724 @type list: list of str
725 """
726 msg = ("Your configuration uses deprecated properties. "
727 "Please update your configuration and correct them.\n")
728 m = messages.Warning(T_(N_(msg)), mid="deprecated")
729 for prop in list:
730 m.add(T_(N_(
731 "Please remove '%s' property.\n"), prop))
732 self.addMessage(m)
733 self.warning(msg)
734
736 """
737 Fix properties that have been renamed from a previous version,
738 and add a warning for them.
739
740 @param properties: properties; will be modified as a result.
741 @type properties: dict
742 @param list: list of (old, new) tuples of property names.
743 @type list: list of tuple of (str, str)
744 """
745 found = []
746 for old, new in list:
747 if old in properties:
748 found.append((old, new))
749
750 if found:
751 m = messages.Warning(T_(N_(
752 "Your configuration uses deprecated properties. "
753 "Please update your configuration and correct them.\n")),
754 mid="deprecated")
755 for old, new in found:
756 m.add(T_(N_(
757 "Please rename '%s' to '%s'.\n"),
758 old, new))
759 self.debug("Setting new property '%s' to %r", new,
760 properties[old])
761 properties[new] = properties[old]
762 del properties[old]
763 self.addMessage(m)
764
766 """
767 Call a remote method on all admin client views on this component.
768
769 This gets serialized through the manager and multiplexed to all
770 admin clients, and from there on to all views connected to each
771 admin client model.
772
773 Because there can be any number of admin clients that this call
774 will go out do, it does not make sense to have one return value.
775 This function will return None always.
776 """
777 if self.medium:
778 self.medium.callRemote("adminCallRemote", methodName,
779 *args, **kwargs)
780 else:
781 self.debug('asked to adminCallRemote(%s, *%r, **%r), but '
782 'no manager.'
783 % (methodName, args, kwargs))
784
786 """
787 Modifies a property of the compoment.
788
789 Components with modifiable properties (properties that can be changed
790 on the fly) should implement modify_property_(propertyName) to receive
791 the call
792
793 @param property_name: Name of the property to change
794 @type property_name: str
795 @param value: Value to set
796 """
797
798
799 p = ''.join([t.title() for t in property_name.split('-')])
800 method_name = "modify_property_%s" % p
801 if not hasattr(self, method_name):
802 raise errors.PropertyNotModifiableError("%s" % (property_name))
803 method = getattr(self, method_name)
804 if not method(value):
805 return False
806 self.config['properties'][property_name] = value
807 self.uiState.set('properties', self.config['properties'])
808 return True
809
811 """
812 Check that the value to be set in a property is of the correct type
813
814 @returns: True if the value is of the correct type
815 """
816 if type(value) != allowed_type:
817 self.warning("Could not set the property %s in %s. "
818 "'value' must be of %s", property_name, self,
819 allowed_type)
820 return False
821 return True
822
824 try:
825 namespace = plug.get_namespace()
826 except AttributeError:
827 self.debug("Plug %r does not provide namespace, "
828 "its interface will not be exposed", plug)
829 return
830
831 self.debug("Exposing plug's %r interface in namespace %r",
832 plug, namespace)
833 for method in filter(callable,
834 [getattr(plug, m) for m in dir(plug)
835 if m.startswith('remote_')]):
836 if namespace:
837 name = "".join(("remote_", namespace, "_",
838 method.__name__[len("remote_"):]))
839 else:
840 name = method.__name__
841 self.debug("Exposing method %r as %r in %r", method, name, medium)
842 setattr(medium, name, method)
843
845 self._cpuPollerDC = None
846
847 nowTime = time.time()
848 nowClock = time.clock()
849 deltaTime = nowTime - self._lastTime
850 deltaClock = nowClock - self._lastClock
851 self._lastTime = nowTime
852 self._lastClock = nowClock
853
854 if deltaClock >= 0:
855 CPU = deltaClock/deltaTime
856 self.log('latest CPU use: %r', CPU)
857 self.uiState.set('cpu-percent', CPU)
858
859 self.uiState.set('current-time', nowTime)
860
862 self._memoryPollerDC = None
863
864
865 handle = open('/proc/%d/stat' % os.getpid())
866 line = handle.read()
867 handle.close()
868 fields = line.split()
869
870
871 vsize = int(fields[22])
872 self.log('vsize is %d', vsize)
873 self.uiState.set('virtual-size', vsize)
874
876 f = open("/proc/meminfo")
877 memtotal = f.readline()
878 f.close()
879 return int(memtotal[memtotal.index(":") + 1: -3]) * 1024
880
882 try:
883 return open('/proc/cpuinfo').read().count('processor\t:')
884 except IOError:
885 self.debug('Can not determine number of CPUs on this system')
886 return 1
887