Package flumotion :: Package component :: Package base :: Module scheduler
[hide private]

Source Code for Module flumotion.component.base.scheduler

  1  # -*- test-case-name: flumotion.test.test_component_base_scheduler -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18   
 19  import os 
 20  import time 
 21  import datetime 
 22   
 23  from twisted.internet import reactor 
 24   
 25  from flumotion.common import log, eventcalendar, tz 
 26  from flumotion.component.base import watcher 
 27   
 28  __version__ = "$Rev$" 
 29   
 30   
31 -def _timedeltaToSeconds(td):
32 return max(td.days * 24 * 60 * 60 + td.seconds + td.microseconds / 1e6, 0)
33 34
35 -class Scheduler(log.Loggable):
36 """ 37 I provide notifications when events start and end. 38 I use a L{eventcalendar.Calendar} for scheduling. 39 40 @cvar windowSize: how much time to look ahead when scheduling 41 @type windowSize: L{datetime.timedelta} 42 """ 43 windowSize = datetime.timedelta(days=1) 44
45 - def __init__(self):
46 self._delayedCall = None # tracks next call for scheduling 47 self._subscribeId = 0 # counter fo unique sid's 48 self._subscribers = {} # sid -> tuple of callable 49 self._nextStart = 0 # only used in testsuite 50 self._calendar = None # our currently active calendar
51 52 ### public API 53
54 - def getCalendar(self):
55 """ 56 Return the calendar used for scheduling. 57 58 @rtype: L{eventcalendar.Calendar} 59 """ 60 return self._calendar
61
62 - def setCalendar(self, calendar, when=None):
63 """ 64 Set the given calendar to use for scheduling. 65 66 This function will send start notifications for all new events that 67 should currently be in progress, if they were not registered in 68 the old calendar or if there was no old calendar. 69 70 If the scheduler previously had a calendar, it will send end 71 notifications for all events currently in progress that are not in the 72 new calendar. 73 74 @param calendar: the new calendar to set 75 @type calendar: L{eventcalendar.Calendar} 76 @param when: the time at which to consider the calendar to be set; 77 defaults to now 78 @type when: L{datetime.datetime} 79 """ 80 if not self._calendar: 81 self.debug('Setting new calendar %r', calendar) 82 else: 83 self.debug('Replacing existing calendar %r with new %r', 84 self._calendar, calendar) 85 86 # we want to make sure we use the same when for getting old and new 87 # instances if it wasn't specified 88 if not when: 89 when = datetime.datetime.now(tz.UTC) 90 91 # FIXME: convert Content lists to dicts to speed things up 92 # because they are used as a lookup inside loops 93 oldInstances = [] 94 if self._calendar: 95 oldInstances = self._calendar.getActiveEventInstances(when) 96 oldInstancesContent = [i.event.content for i in oldInstances] 97 98 newInstances = calendar.getActiveEventInstances(when) 99 newInstancesContent = [i.event.content for i in newInstances] 100 101 # we do comparison of instances by content, since, while the timing 102 # information may have changed, if the content is still the same, 103 # then the event is still considered 'active' 104 self._calendar = calendar 105 for instance in oldInstances: 106 if instance.event.content not in newInstancesContent: 107 self.debug( 108 'old active %r for %r not in new calendar, ending', 109 instance, instance.event.content) 110 self._eventInstanceEnded(instance) 111 112 for instance in newInstances: 113 if instance.event.content not in oldInstancesContent: 114 self.debug( 115 'new active %r for %r not in old calendar, starting', 116 instance, instance.event.content) 117 self._eventInstanceStarted(instance) 118 119 self._reschedule()
120
121 - def getPoints(self, when=None):
122 """ 123 Get all points on this scheduler's event horizon. 124 """ 125 if not when: 126 when = datetime.datetime.now(tz.LOCAL) 127 128 self.debug('getPoints at %s', str(when)) 129 130 points = self._calendar.getPoints(when, self.windowSize) 131 132 self.debug('%d points in given windowsize %s', 133 len(points), str(self.windowSize)) 134 135 return points
136
137 - def cleanup(self):
138 """ 139 Clean up all resources used by this scheduler. 140 141 This cancels all pending scheduling calls. 142 """ 143 self._cancelScheduledCalls()
144 145 ### subscription interface 146
147 - def subscribe(self, eventInstanceStarted, eventInstanceEnded):
148 """ 149 Subscribe to event happenings in the scheduler. 150 151 @param eventInstanceStarted: function that will be called when an 152 event instance starts 153 @type eventInstanceStarted: function with signature L{EventInstance} 154 @param eventInstanceEnded: function that will be called when an 155 event instance ends 156 @type eventInstanceEnded: function with signature L{EventInstance} 157 158 @rtype: int 159 @returns: A subscription ID that can later be passed to 160 unsubscribe(). 161 """ 162 sid = self._subscribeId 163 self._subscribeId += 1 164 self._subscribers[sid] = (eventInstanceStarted, eventInstanceEnded) 165 return sid
166
167 - def unsubscribe(self, id):
168 """ 169 Unsubscribe from event happenings in the scheduler. 170 171 @type id: int 172 @param id: Subscription ID received from subscribe() 173 """ 174 del self._subscribers[id]
175
176 - def _eventInstanceStarted(self, eventInstance):
177 self.debug('notifying %d subscribers of start of instance %r', 178 len(self._subscribers), eventInstance) 179 for started, _ in self._subscribers.values(): 180 started(eventInstance)
181
182 - def _eventInstanceEnded(self, eventInstance):
183 self.debug('notifying %d subscribers of end of instance %r', 184 len(self._subscribers), eventInstance) 185 for _, ended in self._subscribers.values(): 186 ended(eventInstance)
187 188 ### private API 189
190 - def _reschedule(self):
191 192 start = time.time() 193 194 self.debug("reschedule events") 195 self._cancelScheduledCalls() 196 197 now = datetime.datetime.now(tz.LOCAL) 198 199 def _getNextPoints(): 200 # get the next list of points in time that all start at the same 201 # time 202 self.debug('_getNextPoints at %s', str(now)) 203 result = [] 204 205 points = self.getPoints(now) 206 207 if not points: 208 return result 209 210 earliest = points[0].dt 211 for point in points: 212 if point.dt > earliest: 213 break 214 result.append(point) 215 216 if result: 217 self.debug('%d points at %s, first point is for %r', 218 len(result), str(result[0].dt), 219 result[0].eventInstance.event.content) 220 221 return result
222 223 def _handlePoints(points): 224 for point in points: 225 self.debug( 226 "handle %s event %r in %s at %s", 227 point.which, 228 point.eventInstance.event.content, 229 str(point.dt - now), 230 point.dt) 231 if point.which == 'start': 232 self._eventInstanceStarted(point.eventInstance) 233 elif point.which == 'end': 234 self._eventInstanceEnded(point.eventInstance) 235 236 self._reschedule()
237 238 points = _getNextPoints() 239 240 if points: 241 seconds = _timedeltaToSeconds(points[0].dt - now) 242 self.debug( 243 "schedule next point at %s in %.2f seconds", 244 str(points[0].dt), seconds) 245 dc = reactor.callLater(seconds, _handlePoints, points) 246 247 else: 248 self.debug( 249 "schedule rescheduling in %s", str(self.windowSize / 2)) 250 seconds = _timedeltaToSeconds(self.windowSize / 2) 251 dc = reactor.callLater(seconds, self._reschedule) 252 self._nextStart = seconds 253 self._delayedCall = dc 254 255 delta = time.time() - start 256 if delta < 0.5: 257 self.debug('_reschedule took %.3f seconds', delta) 258 else: 259 self.warning('Rescheduling took more than half a second') 260
261 - def _cancelScheduledCalls(self):
262 if self._delayedCall: 263 if self._delayedCall.active(): 264 self._delayedCall.cancel() 265 self._delayedCall = None
266 267
268 -class ICalScheduler(Scheduler):
269 270 watcher = None 271 272 # FIXME: having fileObj in the constructor causes events to be sent 273 # before anything can subscribe 274 # FIXME: this class should also be able to handle watching a URL 275 # and downloading it when it changes 276
277 - def __init__(self, fileObj):
278 """ 279 I am a scheduler that takes its data from an ical file and watches 280 that file every timeout. 281 282 @param fileObj: The fileObj. It must be already opened. 283 @type fileObj: file handle 284 """ 285 Scheduler.__init__(self) 286 287 self.watcher = None 288 289 if not fileObj: 290 return 291 292 self._parseFromFile(fileObj) 293 294 if hasattr(fileObj, 'name') and os.path.isfile(fileObj.name): 295 296 def fileChanged(filename): 297 self.info("ics file %s changed", filename) 298 try: 299 self._parseFromFile(open(filename, 'r')) 300 except: 301 self.warning("error parsing ics file %s", filename) 302 raise
303 304 self.watcher = watcher.FilesWatcher([fileObj.name]) 305 fileObj.close() 306 self.watcher.subscribe(fileChanged=fileChanged) 307 self.watcher.start()
308
309 - def stopWatchingIcalFile(self):
310 """ 311 Stop watching the ical file. 312 """ 313 if self.watcher: 314 self.watcher.stop()
315
316 - def cleanup(self):
317 Scheduler.cleanup(self) 318 self.stopWatchingIcalFile()
319
320 - def _parseFromFile(self, f):
321 calendar = eventcalendar.fromFile(f) 322 self.setCalendar(calendar)
323