1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import os
20 import time
21 import datetime
22
23 from twisted.internet import reactor
24
25 from flumotion.common import log, eventcalendar, tz
26 from flumotion.component.base import watcher
27
28 __version__ = "$Rev$"
29
30
32 return max(td.days * 24 * 60 * 60 + td.seconds + td.microseconds / 1e6, 0)
33
34
36 """
37 I provide notifications when events start and end.
38 I use a L{eventcalendar.Calendar} for scheduling.
39
40 @cvar windowSize: how much time to look ahead when scheduling
41 @type windowSize: L{datetime.timedelta}
42 """
43 windowSize = datetime.timedelta(days=1)
44
46 self._delayedCall = None
47 self._subscribeId = 0
48 self._subscribers = {}
49 self._nextStart = 0
50 self._calendar = None
51
52
53
55 """
56 Return the calendar used for scheduling.
57
58 @rtype: L{eventcalendar.Calendar}
59 """
60 return self._calendar
61
63 """
64 Set the given calendar to use for scheduling.
65
66 This function will send start notifications for all new events that
67 should currently be in progress, if they were not registered in
68 the old calendar or if there was no old calendar.
69
70 If the scheduler previously had a calendar, it will send end
71 notifications for all events currently in progress that are not in the
72 new calendar.
73
74 @param calendar: the new calendar to set
75 @type calendar: L{eventcalendar.Calendar}
76 @param when: the time at which to consider the calendar to be set;
77 defaults to now
78 @type when: L{datetime.datetime}
79 """
80 if not self._calendar:
81 self.debug('Setting new calendar %r', calendar)
82 else:
83 self.debug('Replacing existing calendar %r with new %r',
84 self._calendar, calendar)
85
86
87
88 if not when:
89 when = datetime.datetime.now(tz.UTC)
90
91
92
93 oldInstances = []
94 if self._calendar:
95 oldInstances = self._calendar.getActiveEventInstances(when)
96 oldInstancesContent = [i.event.content for i in oldInstances]
97
98 newInstances = calendar.getActiveEventInstances(when)
99 newInstancesContent = [i.event.content for i in newInstances]
100
101
102
103
104 self._calendar = calendar
105 for instance in oldInstances:
106 if instance.event.content not in newInstancesContent:
107 self.debug(
108 'old active %r for %r not in new calendar, ending',
109 instance, instance.event.content)
110 self._eventInstanceEnded(instance)
111
112 for instance in newInstances:
113 if instance.event.content not in oldInstancesContent:
114 self.debug(
115 'new active %r for %r not in old calendar, starting',
116 instance, instance.event.content)
117 self._eventInstanceStarted(instance)
118
119 self._reschedule()
120
122 """
123 Get all points on this scheduler's event horizon.
124 """
125 if not when:
126 when = datetime.datetime.now(tz.LOCAL)
127
128 self.debug('getPoints at %s', str(when))
129
130 points = self._calendar.getPoints(when, self.windowSize)
131
132 self.debug('%d points in given windowsize %s',
133 len(points), str(self.windowSize))
134
135 return points
136
138 """
139 Clean up all resources used by this scheduler.
140
141 This cancels all pending scheduling calls.
142 """
143 self._cancelScheduledCalls()
144
145
146
147 - def subscribe(self, eventInstanceStarted, eventInstanceEnded):
148 """
149 Subscribe to event happenings in the scheduler.
150
151 @param eventInstanceStarted: function that will be called when an
152 event instance starts
153 @type eventInstanceStarted: function with signature L{EventInstance}
154 @param eventInstanceEnded: function that will be called when an
155 event instance ends
156 @type eventInstanceEnded: function with signature L{EventInstance}
157
158 @rtype: int
159 @returns: A subscription ID that can later be passed to
160 unsubscribe().
161 """
162 sid = self._subscribeId
163 self._subscribeId += 1
164 self._subscribers[sid] = (eventInstanceStarted, eventInstanceEnded)
165 return sid
166
168 """
169 Unsubscribe from event happenings in the scheduler.
170
171 @type id: int
172 @param id: Subscription ID received from subscribe()
173 """
174 del self._subscribers[id]
175
177 self.debug('notifying %d subscribers of start of instance %r',
178 len(self._subscribers), eventInstance)
179 for started, _ in self._subscribers.values():
180 started(eventInstance)
181
183 self.debug('notifying %d subscribers of end of instance %r',
184 len(self._subscribers), eventInstance)
185 for _, ended in self._subscribers.values():
186 ended(eventInstance)
187
188
189
191
192 start = time.time()
193
194 self.debug("reschedule events")
195 self._cancelScheduledCalls()
196
197 now = datetime.datetime.now(tz.LOCAL)
198
199 def _getNextPoints():
200
201
202 self.debug('_getNextPoints at %s', str(now))
203 result = []
204
205 points = self.getPoints(now)
206
207 if not points:
208 return result
209
210 earliest = points[0].dt
211 for point in points:
212 if point.dt > earliest:
213 break
214 result.append(point)
215
216 if result:
217 self.debug('%d points at %s, first point is for %r',
218 len(result), str(result[0].dt),
219 result[0].eventInstance.event.content)
220
221 return result
222
223 def _handlePoints(points):
224 for point in points:
225 self.debug(
226 "handle %s event %r in %s at %s",
227 point.which,
228 point.eventInstance.event.content,
229 str(point.dt - now),
230 point.dt)
231 if point.which == 'start':
232 self._eventInstanceStarted(point.eventInstance)
233 elif point.which == 'end':
234 self._eventInstanceEnded(point.eventInstance)
235
236 self._reschedule()
237
238 points = _getNextPoints()
239
240 if points:
241 seconds = _timedeltaToSeconds(points[0].dt - now)
242 self.debug(
243 "schedule next point at %s in %.2f seconds",
244 str(points[0].dt), seconds)
245 dc = reactor.callLater(seconds, _handlePoints, points)
246
247 else:
248 self.debug(
249 "schedule rescheduling in %s", str(self.windowSize / 2))
250 seconds = _timedeltaToSeconds(self.windowSize / 2)
251 dc = reactor.callLater(seconds, self._reschedule)
252 self._nextStart = seconds
253 self._delayedCall = dc
254
255 delta = time.time() - start
256 if delta < 0.5:
257 self.debug('_reschedule took %.3f seconds', delta)
258 else:
259 self.warning('Rescheduling took more than half a second')
260
262 if self._delayedCall:
263 if self._delayedCall.active():
264 self._delayedCall.cancel()
265 self._delayedCall = None
266
267
308
310 """
311 Stop watching the ical file.
312 """
313 if self.watcher:
314 self.watcher.stop()
315
319
323