1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 worker-side objects for components
20 """
21
22 import os
23 import time
24 import socket
25
26 from twisted.internet import reactor, error, defer
27 from twisted.spread import pb
28 from twisted.python import reflect
29 from zope.interface import implements
30
31 from flumotion.common import interfaces, errors, log, planet, medium
32 from flumotion.common import componentui, common, messages
33 from flumotion.common import interfaces, reflectcall, debug
34 from flumotion.common.i18n import N_, gettexter
35 from flumotion.common.planet import moods
36 from flumotion.common.poller import Poller
37 from flumotion.twisted import pb as fpb
38 from flumotion.twisted.flavors import IStateCacheableListener
39
40
41 __version__ = "$Rev$"
42 T_ = gettexter()
43
44
46 """
47 I am a client factory for a component logging in to the manager.
48 """
49 logCategory = 'component'
50 perspectiveInterface = interfaces.IComponentMedium
51
70
74
75
76
78
79 def remoteDisconnected(remoteReference):
80 if reactor.killed:
81 self.log('Connection to manager lost due to shutdown')
82 else:
83 self.warning('Lost connection to manager, '
84 'will attempt to reconnect')
85
86 def loginCallback(reference):
87 self.info("Logged in to manager")
88 self.debug("remote reference %r" % reference)
89
90 self.medium.setRemoteReference(reference)
91 reference.notifyOnDisconnect(remoteDisconnected)
92
93 def loginFailedDisconnect(failure):
94
95
96 self.debug('Login failed, reason: %s, disconnecting', failure)
97 self.disconnect()
98 return failure
99
100 def accessDeniedErrback(failure):
101 failure.trap(errors.NotAuthenticatedError)
102 self.warning('Access denied.')
103
104 def connectionRefusedErrback(failure):
105 failure.trap(error.ConnectionRefusedError)
106 self.warning('Connection to manager refused.')
107
108 def alreadyLoggedInErrback(failure):
109 failure.trap(errors.AlreadyConnectedError)
110 self.warning('Component with id %s is already logged in.',
111 self.medium.authenticator.avatarId)
112
113 def loginFailedErrback(failure):
114 self.warning('Login failed, reason: %s' % failure)
115
116 d.addCallback(loginCallback)
117 d.addErrback(loginFailedDisconnect)
118 d.addErrback(accessDeniedErrback)
119 d.addErrback(connectionRefusedErrback)
120 d.addErrback(alreadyLoggedInErrback)
121 d.addErrback(loginFailedErrback)
122
123
124
128
129
131 """
132 Creates a deferred chain created by chaining calls to the given
133 procedures, each of them made with the given args and kwargs.
134 Only the result of the last procedure is returned; results for the
135 other procedures are discarded.
136
137 Failures triggered during any of the procedure short-circuit execution
138 of the other procedures and should be handled by the errbacks attached
139 to the deferred returned here.
140
141 @rtype: L{twisted.internet.defer.Deferred}
142 """
143
144 def call_proc(_, p):
145 log.debug('', 'calling %r', p)
146 return p(*args, **kwargs)
147 if not procs:
148 return defer.succeed(None)
149 p, procs = procs[0], procs[1:]
150 d = defer.maybeDeferred(call_proc, None, p)
151 for p in procs:
152 d.addCallback(call_proc, p)
153 return d
154
155
156
157
159 """
160 I am a medium interfacing with a manager-side avatar.
161 I implement a Referenceable for the manager's avatar to call on me.
162 I have a remote reference to the manager's avatar to call upon.
163 I am created by the L{ComponentClientFactory}.
164
165 @cvar authenticator: the authenticator used to log in to manager
166 @type authenticator: L{flumotion.twisted.pb.Authenticator}
167 """
168
169 implements(interfaces.IComponentMedium)
170 logCategory = 'basecompmed'
171
173 """
174 @param component: L{flumotion.component.component.BaseComponent}
175 """
176 self.comp = component
177 self.authenticator = None
178 self.broker = None
179
183
184
185
186 - def setup(self, config):
188
190 """
191 Return the manager IP as seen by us.
192 """
193 assert self.remote or self.broker
194 broker = self.broker or self.remote.broker
195 peer = broker.transport.getPeer()
196 try:
197 host = peer.host
198 except AttributeError:
199 host = peer[1]
200
201 res = socket.gethostbyname(host)
202 self.debug("getManagerIP(): we think the manager's IP is %r" % res)
203 return res
204
206 """
207 Return the IP of this component based on connection to the manager.
208
209 Note: this is insufficient in general, and should be replaced by
210 network mapping stuff later.
211 """
212 assert self.remote, "%r does not have a remote connection" % self
213 host = self.remote.broker.transport.getHost()
214 self.debug("getIP(): using %r as our IP", host.host)
215 return host.host
216
218 """
219 Set the authenticator the client factory has used to log in to the
220 manager. Can be reused by the component's medium to make
221 feed connections which also get authenticated by the manager's
222 bouncer.
223
224 @type authenticator: L{flumotion.twisted.pb.Authenticator}
225 """
226 self.authenticator = authenticator
227
228
229
230
232 """
233 Return the state of the component, which will be serialized to a
234 L{flumotion.common.planet.ManagerJobState} object.
235
236 @rtype: L{flumotion.common.planet.WorkerJobState}
237 @returns: state of component
238 """
239
240
241 self.comp.state.set('manager-ip', self.getManagerIP())
242 return self.comp.state
243
245 """
246 Return the configuration of the component.
247
248 @rtype: dict
249 @returns: component's current configuration
250 """
251 return self.comp.config
252
254 self.info('Stopping component')
255 return self.comp.stop()
256
261
263 """Get a WorkerComponentUIState containing details needed to
264 present an admin-side UI state
265 """
266 return self.comp.uiState
267
269 """Get mood of the component
270 """
271 return self.comp.getMood()
272
274 """
275 Base implementation of getMasterClockInfo, can be overridden by
276 subclasses. By default, just returns None.
277 """
278 return None
279
282
284 """
285 Sets the Flumotion debugging levels based on the passed debug string.
286
287 @since: 0.6.0
288 """
289 self.debug('Setting Flumotion debug level to %s' % debug)
290 self.comp.uiState.set('flu-debug', debug)
291 log.setDebug(debug)
292
294 """
295 Modifies a component property on the fly
296
297 @since: 0.9.0
298 """
299 self.info('Modifying property %s to %s', property, value)
300 return self.comp.modifyProperty(property, value)
301
302
304 """
305 I am the base class for all Flumotion components.
306
307 @ivar name: the name of the component
308 @type name: string
309 @ivar medium: the component's medium
310 @type medium: L{BaseComponentMedium}
311 @ivar uiState: state of the component to be shown in a UI.
312 Contains at least the following keys.
313 - cpu-percent: percentage of CPU use in last interval
314 - start-time: time when component was started, in epoch
315 seconds
316 - current-time: current time in epoch seconds, as seen on
317 component's machine, which might be out of
318 sync
319 - virtual-size: virtual memory size in bytes
320 Subclasses can add additional keys for their respective UI.
321 @type uiState: L{componentui.WorkerComponentUIState}
322
323 @cvar componentMediumClass: the medium class to use for this component
324 @type componentMediumClass: child class of L{BaseComponentMedium}
325 """
326
327 logCategory = 'basecomp'
328 componentMediumClass = BaseComponentMedium
329
330 implements(IStateCacheableListener)
331
332 - def __init__(self, config, haveError=None):
333 """
334 Subclasses should not override __init__ at all.
335
336 Instead, they should implement init(), which will be called
337 by this implementation automatically.
338
339 L{flumotion.common.common.InitMixin} for more details.
340 """
341 self.debug("initializing %r with config %r", type(self), config)
342 self.config = config
343 self._haveError = haveError
344
345
346 common.InitMixin.__init__(self)
347
348 self.setup()
349
350
351
353 """
354 A subclass should do as little as possible in its init method.
355 In particular, it should not try to access resources.
356
357 Failures during init are marshalled back to the manager through
358 the worker's remote_create method, since there is no component state
359 proxied to the manager yet at the time of init.
360 """
361 self.state = planet.WorkerJobState()
362
363 self.name = self.config['name']
364
365 self.state.set('pid', os.getpid())
366 self.setMood(moods.waking)
367
368 self.medium = None
369
370 self.uiState = componentui.WorkerComponentUIState()
371 self.uiState.addKey('cpu-percent')
372 self.uiState.addKey('start-time')
373 self.uiState.addKey('current-time')
374 self.uiState.addKey('virtual-size')
375 self.uiState.addKey('total-memory')
376 self.uiState.addKey('num-cpus')
377 self.uiState.addKey('flu-debug')
378 self.uiState.addKey('properties')
379
380 self.uiState.addHook(self)
381
382 self.plugs = {}
383
384 self._happyWaits = []
385
386
387 self._lastTime = time.time()
388 self._lastClock = time.clock()
389 self._cpuPoller = Poller(self._pollCPU, 5, start=False)
390 self._memoryPoller = Poller(self._pollMemory, 60, start=False)
391 self._cpuPollerDC = None
392 self._memoryPollerDC = None
393 self._shutdownHook = None
394
395
396
398 """
399 Triggered when a uiState observer was added.
400
401 Default implementation is to start the memory and cpu pollers.
402
403 Note:
404 Subclasses can override me but should chain me up to start these
405 pollers
406 """
407 self.debug("observer has started watching us, starting pollers")
408 if not self._cpuPoller.running and not self._cpuPollerDC:
409 self._cpuPollerDC = reactor.callLater(0,
410 self._cpuPoller.start,
411 immediately=True)
412 if not self._memoryPoller.running and not self._memoryPollerDC:
413 self._memoryPollerDC = reactor.callLater(0,
414 self._memoryPoller.start,
415 immediately=True)
416
418 """
419 Triggered when a uiState observer has left.
420
421 Default implementation is to stop the memory and cpu pollers
422 when the total number of observers denoted by the 'num'
423 argument becomes zero.
424
425 Note:
426 Subclasses can override me but should chain me up to stop these
427 pollers
428 """
429 if num == 0:
430 self.debug("no more observers left, shutting down pollers")
431
432 if self._cpuPollerDC:
433 self._cpuPollerDC.cancel()
434 self._cpuPollerDC = None
435 if self._memoryPollerDC:
436 self._memoryPollerDC.cancel()
437 self._memoryPollerDC = None
438
439 if self._cpuPoller:
440 self._cpuPoller.stop()
441 if self._memoryPoller:
442 self._memoryPoller.stop()
443
445 """
446 Subclasses can implement me to run any checks before the component
447 performs setup.
448
449 Messages can be added to the component state's 'messages' list key.
450 Any error messages added will trigger the component going to sad,
451 with L{flumotion.common.errors.ComponentSetupError} being raised
452 before getting to setup stage; do_setup() will not be called.
453
454 In the event of a fatal problem that can't be expressed through an
455 error message, this method should raise an exception or return a
456 failure.
457
458 It is not necessary to chain up in this function. The return
459 value may be a deferred.
460 """
461 return defer.maybeDeferred(self.check_properties,
462 self.config['properties'],
463 self.addMessage)
464
466 """
467 BaseComponent convenience vmethod for running checks.
468
469 A component implementation can override this method to run any
470 checks that it needs to. Typically, a check_properties
471 implementation will call the provided addMessage() callback to
472 note warnings or errors. For errors, addMessage() will set
473 component's mood to sad, which will abort the init process
474 before getting to do_setup().
475
476 @param properties: The component's properties
477 @type properties: dict of string => object
478 @param addMessage: Thunk to add a message to the component
479 state. Will raise an exception if the
480 message is of level ERROR.
481 @type addMessage: L{flumotion.common.messages.Message} -> None
482 """
483 pass
484
486 """
487 Subclasses can implement me to set up the component before it is
488 started. It should set up the component, possibly opening files
489 and resources.
490 Non-programming errors should not be raised, but returned as a
491 failing deferred.
492
493 The return value may be a deferred.
494 """
495 plug_starts = []
496 for socket, plugs in self.config['plugs'].items():
497 self.plugs[socket] = []
498 for plug in plugs:
499 entry = plug['entries']['default']
500 instance = reflectcall.reflectCall(entry['module-name'],
501 entry['function-name'],
502 plug)
503 self.plugs[socket].append(instance)
504 self.debug('Starting plug %r on socket %s',
505 instance, socket)
506 plug_starts.append(instance.start)
507
508
509
510 checks = common.get_all_methods(self, 'do_check', False)
511
512 def checkErrorCallback(result):
513
514
515
516
517 current = self.state.get('mood')
518 if current == moods.sad.value:
519 self.warning('Running checks made the component sad.')
520 raise errors.ComponentSetupHandledError()
521
522 checks.append(checkErrorCallback)
523
524 return _maybeDeferredChain(plug_starts + checks, self)
525
527 """
528 BaseComponent vmethod for stopping.
529 The component should do any cleanup it needs, but must not set the
530 component's mood to sleeping.
531
532 @Returns: L{twisted.internet.defer.Deferred}
533 """
534 plug_stops = []
535 for socket, plugs in self.plugs.items():
536 for plug in plugs:
537 self.debug('Stopping plug %r on socket %s', plug, socket)
538 plug_stops.append(plug.stop)
539
540 for message in self.state.get('messages'):
541
542 self.state.remove('messages', message)
543
544
545 if self._cpuPollerDC:
546 self._cpuPollerDC.cancel()
547 self._cpuPollerDC = None
548 if self._memoryPollerDC:
549 self._memoryPollerDC.cancel()
550 self._memoryPollerDC = None
551
552 if self._cpuPoller:
553 self._cpuPoller.stop()
554 self._cpuPoller = None
555 if self._memoryPoller:
556 self._memoryPoller.stop()
557 self._memoryPoller = None
558
559 if self._shutdownHook:
560 self.debug('_stoppedCallback: firing shutdown hook')
561 self._shutdownHook()
562
563 return _maybeDeferredChain(plug_stops, self)
564
565
566
568 """
569 Sets up the component. Called during __init__, so be sure not
570 to raise exceptions, instead adding messages to the component
571 state.
572 """
573
574 def run_setups():
575 setups = common.get_all_methods(self, 'do_setup', False)
576 return _maybeDeferredChain(setups, self)
577
578 def setup_complete(_):
579 self.debug('setup completed')
580 self.setup_completed()
581
582 def got_error(failure):
583 txt = log.getFailureMessage(failure)
584 self.debug('got_error: %s', txt)
585 if not failure.check(errors.ComponentSetupHandledError):
586 self.warning('Setup failed: %s', txt)
587 m = messages.Error(T_(N_("Could not setup component.")),
588 debug=txt,
589 mid="component-setup-%s" % self.name)
590
591 self.addMessage(m)
592
593
594 return None
595
596 self.setMood(moods.waking)
597 self.uiState.set('start-time', time.time())
598
599 self.uiState.set('total-memory', self._getTotalMemory())
600 self.uiState.set('num-cpus', self._getNumberOfCPUs())
601 self.uiState.set('flu-debug', log.getDebug())
602
603 d = run_setups()
604 d.addCallbacks(setup_complete, got_error)
605
606
608 self.debug('turning happy')
609 self.setMood(moods.happy)
610
612 """
613 Set the shutdown hook for this component (replacing any previous hook).
614 When a component is stopped, then this hook will be fired.
615 """
616 self._shutdownHook = shutdownHook
617
619 """
620 Tell the component to stop.
621 The connection to the manager will be closed.
622 The job process will also finish.
623 """
624 self.debug('BaseComponent.stop')
625
626
627 self.setMood(moods.waking)
628
629
630 stops = common.get_all_methods(self, 'do_stop', True)
631 return _maybeDeferredChain(stops, self)
632
633
634
637
639 self.state.set('workerName', workerName)
640
643
651
653 """
654 Set the given mood on the component if it's different from the current
655 one.
656 """
657 current = self.state.get('mood')
658
659 if current == mood.value:
660 self.log('already in mood %r' % mood)
661 return
662 elif current == moods.sad.value:
663 self.info('tried to set mood to %r, but already sad :-(' % mood)
664 return
665
666 self.doLog(log.DEBUG, -2, 'MOOD changed to %r by caller', mood)
667 self.state.set('mood', mood.value)
668
669 if mood == moods.happy:
670 while self._happyWaits:
671 self._happyWaits.pop(0).callback(None)
672 elif mood == moods.sad:
673 while self._happyWaits:
674 self._happyWaits.pop(0).errback(errors.ComponentStartError())
675
677 """
678 Gets the mood on the component.
679
680 @rtype: int
681 """
682 return self.state.get('mood')
683
694
696 """
697 Add a message to the component.
698 If any of the messages is an error, the component will turn sad.
699
700 @type message: L{flumotion.common.messages.Message}
701 """
702 self.state.append('messages', message)
703 if message.level == messages.ERROR:
704 self.debug('error message, turning sad')
705 self.setMood(moods.sad)
706 if self._haveError:
707 self._haveError(message)
708
710 """
711 Remove a message with a given id from the component.
712 @type mid: str
713 """
714 for msg in self.state.get('messages', []):
715 if msg.id == mid:
716 self.state.remove('messages', msg)
717
719 """
720 Add a warning messages for deprecated properties.
721
722 @param list: list of property names.
723 @type list: list of str
724 """
725 msg = ("Your configuration uses deprecated properties. "
726 "Please update your configuration and correct them.\n")
727 m = messages.Warning(T_(N_(msg)), mid="deprecated")
728 for prop in list:
729 m.add(T_(N_(
730 "Please remove '%s' property.\n"), prop))
731 self.addMessage(m)
732 self.warning(msg)
733
735 """
736 Fix properties that have been renamed from a previous version,
737 and add a warning for them.
738
739 @param properties: properties; will be modified as a result.
740 @type properties: dict
741 @param list: list of (old, new) tuples of property names.
742 @type list: list of tuple of (str, str)
743 """
744 found = []
745 for old, new in list:
746 if old in properties:
747 found.append((old, new))
748
749 if found:
750 m = messages.Warning(T_(N_(
751 "Your configuration uses deprecated properties. "
752 "Please update your configuration and correct them.\n")),
753 mid="deprecated")
754 for old, new in found:
755 m.add(T_(N_(
756 "Please rename '%s' to '%s'.\n"),
757 old, new))
758 self.debug("Setting new property '%s' to %r", new,
759 properties[old])
760 properties[new] = properties[old]
761 del properties[old]
762 self.addMessage(m)
763
765 """
766 Call a remote method on all admin client views on this component.
767
768 This gets serialized through the manager and multiplexed to all
769 admin clients, and from there on to all views connected to each
770 admin client model.
771
772 Because there can be any number of admin clients that this call
773 will go out do, it does not make sense to have one return value.
774 This function will return None always.
775 """
776 if self.medium:
777 self.medium.callRemote("adminCallRemote", methodName,
778 *args, **kwargs)
779 else:
780 self.debug('asked to adminCallRemote(%s, *%r, **%r), but '
781 'no manager.'
782 % (methodName, args, kwargs))
783
785 """
786 Modifies a property of the compoment.
787
788 Components with modifiable properties (properties that can be changed
789 on the fly) should implement modify_property_(propertyName) to receive
790 the call
791
792 @param property_name: Name of the property to change
793 @type property_name: str
794 @param value: Value to set
795 """
796
797
798 p = ''.join([t.title() for t in property_name.split('-')])
799 method_name = "modify_property_%s" % p
800 if not hasattr(self, method_name):
801 raise errors.PropertyNotModifiableError("%s" % (property_name))
802 method = getattr(self, method_name)
803 if not method(value):
804 return False
805 self.config['properties'][property_name] = value
806 self.uiState.set('properties', self.config['properties'])
807 return True
808
810 """
811 Check that the value to be set in a property is of the correct type
812
813 @returns: True if the value is of the correct type
814 """
815 if type(value) != allowed_type:
816 self.warning("Could not set the property %s in %s. "
817 "'value' must be of %s", property_name, self,
818 allowed_type)
819 return False
820 return True
821
823 try:
824 namespace = plug.get_namespace()
825 except AttributeError:
826 self.debug("Plug %r does not provide namespace, "
827 "its interface will not be exposed", plug)
828 return
829
830 self.debug("Exposing plug's %r interface in namespace %r",
831 plug, namespace)
832 for method in filter(callable,
833 [getattr(plug, m) for m in dir(plug)
834 if m.startswith('remote_')]):
835 if namespace:
836 name = "".join(("remote_", namespace, "_",
837 method.__name__[len("remote_"):]))
838 else:
839 name = method.__name__
840 self.debug("Exposing method %r as %r in %r", method, name, medium)
841 setattr(medium, name, method)
842
844 self._cpuPollerDC = None
845
846 nowTime = time.time()
847 nowClock = time.clock()
848 deltaTime = nowTime - self._lastTime
849 deltaClock = nowClock - self._lastClock
850 self._lastTime = nowTime
851 self._lastClock = nowClock
852
853 if deltaClock >= 0:
854 CPU = deltaClock/deltaTime
855 self.log('latest CPU use: %r', CPU)
856 self.uiState.set('cpu-percent', CPU)
857
858 self.uiState.set('current-time', nowTime)
859
861 self._memoryPollerDC = None
862
863
864 handle = open('/proc/%d/stat' % os.getpid())
865 line = handle.read()
866 handle.close()
867 fields = line.split()
868
869
870 vsize = int(fields[22])
871 self.log('vsize is %d', vsize)
872 self.uiState.set('virtual-size', vsize)
873
875 f = open("/proc/meminfo")
876 memtotal = f.readline()
877 f.close()
878 return int(memtotal[memtotal.index(":") + 1: -3]) * 1024
879
881 try:
882 return open('/proc/cpuinfo').read().count('processor\t:')
883 except IOError:
884 self.debug('Can not determine number of CPUs on this system')
885 return 1
886