1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """a data structure to manage asynchronous avatar starts and shutdowns
19 """
20
21 from twisted.internet import defer
22
23 from flumotion.common import log
24
25 __version__ = "$Rev$"
26
27
28
29
30
31
32
33
35
36 - def __init__(self, avatarLoggedIn, alreadyStartingError,
37 alreadyRunningError):
38 """Create a StartSet, a data structure for managing starts and
39 stops of remote processes, for example jobs in a jobheaven.
40
41 @param avatarLoggedIn: a procedure of type avatarId->boolean;
42 should return True if the avatarId is logged in and "ready", and
43 False otherwise. An avatarId is ready if avatarStarted() could
44 have been called on it. This interface is made this way because
45 it is assumed that whatever code instantiates a StartSet keeps
46 track of "ready" remote processes, and this way we prevent data
47 duplication.
48 @param alreadyStartingError: An exception class to raise if
49 createStart() is called, but there is already a create deferred
50 registered for that avatarId.
51 @param alreadyRunningError: An exception class to raise if
52 createStart() is called, but there is already a "ready" process
53 with that avatarId.
54 """
55 self._avatarLoggedIn = avatarLoggedIn
56 self._alreadyStartingError = alreadyStartingError
57 self._alreadyRunningError = alreadyRunningError
58
59 self._createDeferreds = {}
60 self._shutdownDeferreds = {}
61
63 """
64 Create and register a deferred for starting a given process.
65 The deferred will be fired when the process is ready, as
66 triggered by a call to createSuccess().
67
68 @param avatarId: the id of the remote process, for example the
69 avatarId of the job
70
71 @rtype: L{twisted.internet.defer.Deferred}
72 """
73 self.debug('making create deferred for %s', avatarId)
74
75 d = defer.Deferred()
76
77
78
79
80
81 if avatarId in self._createDeferreds:
82
83
84 self.info('already have a create deferred for %s', avatarId)
85 raise self._alreadyStartingError(avatarId)
86 elif avatarId in self._shutdownDeferreds:
87
88
89 self.debug('waiting for previous %s to shut down like it '
90 'said it would', avatarId)
91
92
93 def ensureShutdown(res,
94 shutdown=self._shutdownDeferreds[avatarId]):
95 shutdown.addCallback(lambda _: res)
96 return shutdown
97 d.addCallback(ensureShutdown)
98 elif self._avatarLoggedIn(avatarId):
99
100 self.info('avatar named %s already running', avatarId)
101 raise self._alreadyRunningError(avatarId)
102 else:
103
104 pass
105
106 self.debug('registering deferredCreate for %s', avatarId)
107 self._createDeferreds[avatarId] = d
108 return d
109
111 """
112 Trigger a deferred start previously registerd via createStart().
113 For example, a JobHeaven might call this method when a job has
114 logged in and been told to start a component.
115
116 @param avatarId: the id of the remote process, for example the
117 avatarId of the job
118 """
119 self.debug('triggering create deferred for %s', avatarId)
120 if not avatarId in self._createDeferreds:
121 self.warning('No create deferred registered for %s', avatarId)
122 return
123
124 d = self._createDeferreds[avatarId]
125 del self._createDeferreds[avatarId]
126
127 d.callback(avatarId)
128
130 """
131 Notify the caller that a create has failed, and remove the create
132 from the list of pending creates.
133
134 @param avatarId: the id of the remote process, for example the
135 avatarId of the job
136 @param exception: either an exception or a failure describing
137 why the create failed.
138 """
139 self.debug('create deferred failed for %s', avatarId)
140 if not avatarId in self._createDeferreds:
141 self.warning('No create deferred registered for %s', avatarId)
142 return
143
144 d = self._createDeferreds[avatarId]
145 del self._createDeferreds[avatarId]
146 d.errback(exception)
147
149 """
150 Check if a deferred create has been registered for the given avatarId.
151
152 @param avatarId: the id of the remote process, for example the
153 avatarId of the job
154
155 @returns: The deferred create, if one has been registered.
156 Otherwise None.
157 """
158 return self._createDeferreds.get(avatarId, None)
159
161 """
162 Create and register a deferred that will be fired when a process
163 has shut down cleanly.
164
165 @param avatarId: the id of the remote process, for example the
166 avatarId of the job
167
168 @rtype: L{twisted.internet.defer.Deferred}
169 """
170 self.debug('making shutdown deferred for %s', avatarId)
171
172 if avatarId in self._shutdownDeferreds:
173 self.warning('already have a shutdown deferred for %s',
174 avatarId)
175 return self._shutdownDeferreds[avatarId]
176 else:
177 self.debug('registering shutdown for %s', avatarId)
178 d = defer.Deferred()
179 self._shutdownDeferreds[avatarId] = d
180 return d
181
183 """
184 Trigger a callback on a deferred previously registered via
185 shutdownStart(). For example, a JobHeaven would call this when a
186 job for which shutdownStart() was called is reaped.
187
188 @param avatarId: the id of the remote process, for example the
189 avatarId of the job
190 """
191 self.debug('triggering shutdown deferred for %s', avatarId)
192 if not avatarId in self._shutdownDeferreds:
193 self.warning('No shutdown deferred registered for %s', avatarId)
194 return
195
196 d = self._shutdownDeferreds.pop(avatarId)
197 d.callback(avatarId)
198
200 """
201 Check if a deferred shutdown has been registered for the given
202 avatarId.
203
204 @param avatarId: the id of the remote process, for example the
205 avatarId of the job
206
207 @returns: True if a deferred shutdown has been registered for
208 this object, False otherwise
209 """
210 return avatarId in self._shutdownDeferreds
211
213 """
214 Notify the startset that an avatar has started. If there was a
215 create deferred registered for this avatar, this will cause
216 createSuccess() to be called.
217
218 @param avatarId: the id of the remote process, for example the
219 avatarId of the job
220 """
221 if avatarId in self._createDeferreds:
222 self.createSuccess(avatarId)
223 else:
224 self.log('avatar %s started, but we were not waiting for'
225 ' it', avatarId)
226
228 """
229 Notify the startset that an avatar has stopped. If there was a
230 shutdown deferred registered for this avatar, this will cause
231 shutdownSuccess() to be called.
232
233 On the other hand, if there was a create deferred still pending,
234 we will call createFailed with the result of calling getFailure.
235
236 If no start or create was registered, we do nothing.
237
238 @param avatarId: the id of the remote process, for example the
239 avatarId of the job
240 @param getFailure: procedure of type avatarId -> Failure. The
241 returned failure should describe the reason that the job failed.
242 """
243 if avatarId in self._createDeferreds:
244 self.createFailed(avatarId, getFailure(avatarId))
245 elif avatarId in self._shutdownDeferreds:
246 self.shutdownSuccess(avatarId)
247 else:
248 self.debug('unknown avatar %s logged out', avatarId)
249