Package flumotion :: Package component :: Package consumers :: Package disker :: Module disker
[hide private]

Source Code for Module flumotion.component.consumers.disker.disker

   1  # -*- Mode: Python -*- 
   2  # vi:si:et:sw=4:sts=4:ts=4 
   3   
   4  # Flumotion - a streaming media server 
   5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
   6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
   7  # All rights reserved. 
   8  # 
   9  # This file may be distributed and/or modified under the terms of 
  10  # the GNU Lesser General Public License version 2.1 as published by 
  11  # the Free Software Foundation. 
  12  # This file is distributed without any warranty; without even the implied 
  13  # warranty of merchantability or fitness for a particular purpose. 
  14  # See "LICENSE.LGPL" in the source distribution for more information. 
  15  # 
  16  # Headers in this file shall remain intact. 
  17   
  18  import errno 
  19  import os 
  20  import time 
  21  import tempfile 
  22  import datetime as dt 
  23  import bisect 
  24   
  25  import gst 
  26   
  27  from twisted.internet import reactor 
  28   
  29  from flumotion.component import feedcomponent 
  30  from flumotion.common import log, gstreamer, messages,\ 
  31                               errors, common 
  32  from flumotion.common import documentation 
  33  from flumotion.common import format as formatting 
  34  from flumotion.common import eventcalendar, poller, tz 
  35  from flumotion.common.i18n import N_, gettexter 
  36  from flumotion.common.mimetypes import mimeTypeToExtention 
  37   
  38  #   the flumotion.twisted.flavors is not bundled, and as we only need it for 
  39  #   the interface, we can skip doing the import and thus not create 
  40  #   incompatibilities with workers running old versions of flavors that will be 
  41  #   asked to create diskers importing the IStateCacheableListener from that 
  42  #   module 
  43  # from flumotion.twisted.flavors import IStateCacheableListener 
  44   
  45  # proxy import 
  46  from flumotion.component.component import moods 
  47   
  48  __all__ = ['Disker'] 
  49  __version__ = "$Rev$" 
  50  T_ = gettexter() 
  51   
  52  # Disk Usage polling frequency 
  53  DISKPOLL_FREQ = 60 
  54   
  55  # Maximum number of information to store in the filelist 
  56  FILELIST_SIZE = 100 
  57   
  58  """ 
  59  Disker has a property 'ical-schedule'. This allows an ical file to be 
  60  specified in the config and have recordings scheduled based on events. 
  61  This file will be monitored for changes and events reloaded if this 
  62  happens. 
  63   
  64  The filename of a recording started from an ical file will be produced 
  65  via passing the ical event summary through strftime, so that an archive 
  66  can encode the date and time that it was begun. 
  67   
  68  The time that will be given to strftime will be given in the timezone of 
  69  the ical event. In practice this will either be UTC or the local time of 
  70  the machine running the disker, as the ical scheduler does not 
  71  understand arbitrary timezones. 
  72  """ 
  73   
  74   
75 -def _openFile(loggable, component, location, mode):
76 # used by both Disker and Index 77 try: 78 handle = open(location, mode) 79 return handle 80 except IOError, e: 81 loggable.warning("Failed to open output file %s: %s", 82 location, log.getExceptionMessage(e)) 83 if component is not None: 84 m = messages.Error(T_(N_( 85 "Failed to open output file '%s' for writing. " 86 "Check permissions on the file."), location)) 87 component.addMessage(m) 88 return None
89 90
91 -class Index(log.Loggable):
92 ''' 93 Creates an index of keyframes for a file, than can be used later for 94 seeking in non indexed formats or whithout parsing the headers. 95 96 The format of the index is very similar to the AVI Index, but it can also 97 include information about the real time of each entry in UNIX time. 98 (see 'man aviindex') 99 100 If the index is for an indexed format, the offset of the first entry will 101 not start from 0. This offset is the size of the headers. ''' 102 103 # CHK: Chunk number starting from 0 104 # POS: Absolute byte position of the chunk in the file 105 # LEN: Length in bytes of the chunk 106 # TS: Timestamp of the chunk (ns) 107 # DUR: Duration of the chunk (ns) 108 # KF: Whether it starts with a keyframe or not 109 # TDT: Time and date using a UNIX timestamp (s) 110 # TDUR: Duration of the chunk in UNIX time (s) 111 INDEX_HEADER = "FLUIDX1 #Flumotion\n" 112 INDEX_KEYS = ['CHK', 'POS', 'LEN', 'TS', 'DUR', 'KF', 'TDT', 'TDUR'] 113 INDEX_EXTENSION = 'index' 114 115 logCategory = "index" 116
117 - def __init__(self, component=None, location=None):
118 self._index = [] 119 self._headers_size = 0 120 self.comp = component 121 self.location = location
122 123 ### Public methods ### 124
125 - def updateStart(self, timestamp):
126 ''' 127 Remove entries in the index older than this timestamp 128 ''' 129 self.debug("Removing entries older than %s", timestamp) 130 self._index = self._filter_index(timestamp) or []
131
132 - def addEntry(self, offset, timestamp, keyframe, tdt=0, writeIndex=True):
133 ''' 134 Add a new entry to the the index and writes it to disk if 135 writeIndex is True 136 ''' 137 if len(self._index) > 0: 138 # Check that new entries have increasing timestamp, offset and tdt 139 if not self._checkEntriesContinuity(offset, timestamp, tdt): 140 return 141 # And update the length and duration of the last entry 142 self._updateLastEntry(offset, timestamp, tdt) 143 # Then write the last updated index entry to disk 144 if writeIndex and self.location: 145 f = _openFile(self, self.comp, self.location, 'a+') 146 if not f: 147 return 148 off = self._index[0]['offset'] - self._headers_size 149 self._write_index_entry(f, self._index[-1], off, 150 len(self._index)-1) 151 152 self._index.append({'offset': offset, 'length': -1, 153 'timestamp': timestamp, 'duration': -1, 154 'keyframe': keyframe, 'tdt': tdt, 155 'tdt-duration': -1}) 156 157 self.debug("Added new entry to the index: offset=%s timestamp=%s " 158 "keyframe=%s tdt=%s", offset, timestamp, keyframe, tdt)
159
160 - def setLocation(self, location):
161 self.location = location
162
163 - def setHeadersSize(self, size):
164 ''' 165 Set the headers size in bytes. Multifdsink append the stream headers 166 to each client. This size is then used to adjust the offset of the 167 index entries 168 ''' 169 self._headers_size = size
170
171 - def getHeaders(self):
172 ''' 173 Return an index entry corresponding to the headers, which is a chunk 174 with 'offset' 0 and 'length' equals to the headers size 175 ''' 176 if self._headers_size == 0: 177 return None 178 return {'offset': 0, 'length': self._headers_size, 179 'timestamp': 0, 'duration': -1, 180 'keyframe': 0, 'tdt': 0, 'tdt-duration': -1}
181
182 - def getFirstTimestamp(self):
183 if len(self._index) == 0: 184 return -1 185 return self._index[0]['timestamp']
186
187 - def getFirstTDT(self):
188 if len(self._index) == 0: 189 return -1 190 return self._index[0]['tdt']
191
192 - def clipTimestamp(self, start, stop):
193 ''' 194 Clip the current index to a start and stop time, returning all the 195 entries matching the boundaries using the 'timestamp' 196 ''' 197 return self._clip('timestamp', 'duration', start, stop)
198
199 - def clipTDT(self, start, stop):
200 ''' 201 Clip the current index to a start and stop time, returning all the 202 entries matching the boundaries using the 'tdt' 203 ''' 204 return self._clip('tdt', 'tdt-duration', start, stop)
205
206 - def clear(self):
207 ''' 208 Clears the index 209 ''' 210 self._index = []
211
212 - def save(self, start=None, stop=None):
213 ''' 214 Saves the index in a file, using the entries from 'start' to 'stop' 215 ''' 216 if self.location is None: 217 self.warning("Couldn't save the index, the location is not set.") 218 return False 219 f = _openFile(self, self.comp, self.location, 'w+') 220 if not f: 221 return False 222 223 self._write_index_headers(f) 224 if len(self._index) == 0: 225 return True 226 227 self._write_index_entries(f, self._filter_index(start, stop)) 228 self.info("Index saved successfully. start=%s stop=%s location=%s ", 229 start, stop, self.location) 230 return True
231
232 - def loadIndexFile(self, location):
233 ''' 234 Loads the entries of the index from an index file 235 ''' 236 237 def invalidIndex(reason): 238 self.warning("This file is not a valid index: %s", reason) 239 return False
240 241 if not location.endswith(self.INDEX_EXTENSION): 242 return invalidIndex("the extension of this file is not '%s'" % 243 self.INDEX_EXTENSION) 244 try: 245 self.info("Loading index file %s", location) 246 handle = open(location, 'r') 247 indexString = handle.readlines() 248 handle.close() 249 except IOError, e: 250 return invalidIndex("error reading index file (%r)" % e) 251 # Check if the file is not empty 252 if len(indexString) == 0: 253 return invalidIndex("the file is empty") 254 # Check headers 255 if not indexString[0].startswith('FLUIDX1 #'): 256 return invalidIndex('header is not FLUIDX1') 257 # Check index keys declaration 258 keysStr = ' '.join(self.INDEX_KEYS) 259 if indexString[1].strip('\n') != keysStr: 260 return invalidIndex('keys definition is not: %s' % keysStr) 261 # Add entries 262 setHeaders = True 263 for entryLine in indexString[2:]: 264 e = entryLine.split(' ') 265 if len(e) < len(self.INDEX_KEYS): 266 return invalidIndex("one of the entries doesn't have enough " 267 "parameters (needed=%d, provided=%d)" % 268 (len(self.INDEX_KEYS), len(e))) 269 try: 270 self.addEntry(int(e[1]), int(e[3]), common.strToBool(e[5]), 271 int(e[6])) 272 except Exception, e: 273 return invalidIndex("could not parse one of the entries: %r" 274 % e) 275 if setHeaders: 276 self._headers_size = int(e[1]) 277 setHeaders = False 278 self.info("Index parsed successfully") 279 return True
280 281 ### Private methods ### 282
283 - def _updateLastEntry(self, offset, timestamp, tdt):
284 last = self._index[-1] 285 last['length'] = offset - last['offset'] 286 last['duration'] = timestamp - last['timestamp'] 287 last['tdt-duration'] = tdt - last['tdt']
288
289 - def _checkEntriesContinuity(self, offset, timestamp, tdt):
290 last = self._index[-1] 291 for key, value in [('offset', offset), ('timestamp', timestamp), 292 ('tdt', tdt)]: 293 if value < last[key]: 294 self.warning("Could not add entries with a decreasing %s " 295 "(last=%s, new=%s)", key, last[key], value) 296 return False 297 return True
298
299 - def _clip(self, keyTS, keyDur, start, stop):
300 ''' 301 Clip the index to a start and stop time. For an index with 10 302 entries of 10 seconds starting from 0, cliping from 15 to 35 will 303 return the entries 1, 2, and 3. 304 ''' 305 if start >= stop: 306 return None 307 308 keys = [e[keyTS] for e in self._index] 309 310 # If the last entry has a duration we add a new entry in the TS list 311 # with the stop time 312 lastEntry = self._index[-1] 313 if lastEntry[keyDur] != -1: 314 keys.append(lastEntry[keyTS] + lastEntry[keyDur]) 315 316 # Return if the start and stop time are not inside the boundaries 317 if stop <= keys[0] or start >= keys[-1]: 318 return None 319 320 # Set the start and stop time to match the boundaries so that we don't 321 # get indexes outside the array boundaries 322 if start <= keys[0]: 323 start = keys[0] 324 if stop >= keys[-1]: 325 stop = keys[-1] - 1 326 327 # Do the bisection 328 i_start = bisect.bisect_right(keys, start) - 1 329 i_stop = bisect.bisect_right(keys, stop) 330 331 return self._index[i_start:i_stop]
332
333 - def _filter_index(self, start=None, stop=None):
334 ''' 335 Filter the index with a start and stop time. 336 FIXME: Check performance difference with clipping 337 ''' 338 if len(self._index) == 0: 339 return 340 if not start and not stop: 341 return self._index 342 if not start: 343 start = self._index[0]['timestamp'] 344 if not stop: 345 last_entry = self._index[len(self._index)-1] 346 stop = last_entry['timestamp'] + 1 347 return [x for x in self._index if (x['timestamp'] >= start and\ 348 x['timestamp'] <= stop)]
349
350 - def _write_index_headers(self, file):
351 file.write("%s" % self.INDEX_HEADER) 352 file.write("%s\n" % ' '.join(self.INDEX_KEYS))
353
354 - def _write_index_entry(self, file, entry, offset, count):
355 frmt = "%s\n" % " ".join(['%s'] * len(self.INDEX_KEYS)) 356 file.write(frmt % (count, entry['offset'] - offset, 357 entry['length'], 358 entry['timestamp'], 359 entry['duration'], 360 entry['keyframe'], 361 entry['tdt'], 362 entry['tdt-duration']))
363
364 - def _write_index_entries(self, file, entries):
365 offset =self._index[0]['offset'] - self._headers_size 366 count = 0 367 368 for entry in self._index: 369 self._write_index_entry(file, entry, offset, count) 370 count += 1
371 372
373 -class DiskerMedium(feedcomponent.FeedComponentMedium):
374 # called when admin ui wants to stop recording. call changeFilename to 375 # restart 376
377 - def remote_stopRecording(self):
378 self.comp.stopRecording()
379 380 # called when admin ui wants to change filename (this starts recording if 381 # the disker isn't currently writing to disk) 382
383 - def remote_changeFilename(self, filenameTemplate=None):
384 self.comp.changeFilename(filenameTemplate)
385
386 - def remote_scheduleRecordings(self, icalData):
387 icalFile = tempfile.TemporaryFile() 388 icalFile.write(icalData) 389 icalFile.seek(0) 390 391 self.comp.stopRecording() 392 393 self.comp.scheduleRecordings(icalFile) 394 icalFile.close()
395 396 # called when admin ui wants updated state (current filename info) 397
398 - def remote_notifyState(self):
399 self.comp.update_ui_state()
400 401
402 -class Disker(feedcomponent.ParseLaunchComponent, log.Loggable):
403 logCategory = "disker" 404 405 componentMediumClass = DiskerMedium 406 checkOffset = True 407 pipe_template = 'multifdsink name=fdsink sync-method=2 mode=1 sync=false' 408 file = None 409 directory = None 410 location = None 411 caps = None 412 last_tstamp = None 413 indexLocation = None 414 writeIndex = False 415 syncOnTdt = False 416 timeOverlap = 0 417 reactToMarks = False 418 419 _offset = 0L 420 _headers_size = 0 421 _index = None 422 _nextIsKF = False 423 _lastTdt = None 424 _lastEntry = None # last KF/TDT 425 _clients = {} # dict of clients {fd: (index, synced)} 426 _startFilenameTemplate = None # template to use when starting off recording 427 _startTime = None # time of event when starting 428 _rotateTimeDelayedCall = None 429 _pollDiskDC = None # _pollDisk delayed calls 430 _symlinkToLastRecording = None 431 _symlinkToCurrentRecording = None 432 433 434 # see the commented out import statement for IStateCacheableListener at 435 # the beginning of this file 436 # implements(IStateCacheableListener) 437 438 ### BaseComponent methods 439
440 - def init(self):
441 self._can_schedule = (eventcalendar.HAS_ICALENDAR and 442 eventcalendar.HAS_DATEUTIL) 443 self.uiState.addKey('filename', None) 444 self.uiState.addKey('recording', False) 445 self.uiState.addKey('can-schedule', self._can_schedule) 446 self.uiState.addKey('has-schedule', False) 447 self.uiState.addKey('rotate-type', None) 448 self.uiState.addKey('disk-free', None) 449 # list of (dt (in UTC, without tzinfo), which, content) 450 self.uiState.addListKey('next-points') 451 self.uiState.addListKey('filelist') 452 453 self._diskPoller = poller.Poller(self._pollDisk, 454 DISKPOLL_FREQ, 455 start=False)
456 457 ### uiState observer triggers 458
459 - def observerAppend(self, observer, num):
460 # PB may not have finished setting up its state and doing a 461 # remoteCall immediately here may cause some problems to the other 462 # side. For us to send the initial disk usage value with no 463 # noticeable delay, we will do it in a callLater with a timeout 464 # value of 0 465 self.debug("observer has started watching us, starting disk polling") 466 if not self._diskPoller.running and not self._pollDiskDC: 467 self._pollDiskDC = reactor.callLater(0, 468 self._diskPoller.start, 469 immediately=True) 470 # Start the BaseComponent pollers 471 feedcomponent.ParseLaunchComponent.observerAppend(self, observer, num)
472
473 - def observerRemove(self, observer, num):
474 if num == 0: 475 # cancel delayed _pollDisk calls if there's any 476 if self._pollDiskDC: 477 self._pollDiskDC.cancel() 478 self._pollDiskDC = None 479 480 self.debug("no more observers left, shutting down disk polling") 481 self._diskPoller.stop() 482 # Stop the BaseComponent pollers 483 feedcomponent.ParseLaunchComponent.observerRemove(self, observer, num)
484
485 - def check_properties(self, props, addMessage):
486 props = self.config['properties'] 487 rotateType = props.get('rotate-type', 'none') 488 489 if not rotateType in ['none', 'size', 'time']: 490 msg = messages.Error(T_(N_( 491 "The configuration property 'rotate-type' should be set to " 492 "'size', time', or 'none', not '%s'. " 493 "Please fix the configuration."), 494 rotateType), mid='rotate-type') 495 addMessage(msg) 496 raise errors.ConfigError(msg) 497 498 if rotateType in ['size', 'time']: 499 if rotateType not in props.keys(): 500 msg = messages.Error(T_(N_( 501 "The configuration property '%s' should be set. " 502 "Please fix the configuration."), 503 rotateType), mid='rotate-type') 504 addMessage(msg) 505 raise errors.ConfigError(msg) 506 507 if props[rotateType] == 0: 508 msg = messages.Error(T_(N_("Configuration error: " \ 509 "'rotate-type' %s value cannot be set to 0."), 510 rotateType), mid='rotate-type') 511 addMessage(msg) 512 raise errors.ConfigError(msg)
513 514 ### ParseLaunchComponent methods 515
516 - def get_pipeline_string(self, properties):
517 directory = properties['directory'] 518 519 self.directory = directory 520 521 self.fixRenamedProperties(properties, [('rotateType', 'rotate-type')]) 522 523 rotateType = properties.get('rotate-type', 'none') 524 525 # now act on the properties 526 if rotateType == 'size': 527 self.setSizeRotate(properties['size']) 528 self.uiState.set('rotate-type', 529 'every %sB' % \ 530 formatting.formatStorage(properties['size'])) 531 elif rotateType == 'time': 532 self.setTimeRotate(properties['time']) 533 self.uiState.set('rotate-type', 534 'every %s' % \ 535 formatting.formatTime(properties['time'])) 536 else: 537 self.uiState.set('rotate-type', 'disabled') 538 # FIXME: should add a way of saying "do first cycle at this time" 539 540 return self.pipe_template
541
542 - def configure_pipeline(self, pipeline, properties):
543 self.debug('configure_pipeline for disker') 544 self._clock = pipeline.get_clock() 545 self._symlinkToLastRecording = \ 546 properties.get('symlink-to-last-recording', None) 547 self._symlinkToCurrentRecording = \ 548 properties.get('symlink-to-current-recording', None) 549 self._recordAtStart = properties.get('start-recording', True) 550 self._defaultFilenameTemplate = properties.get('filename', 551 '%s.%%Y%%m%%d-%%H%%M%%S' % self.getName()) 552 self._startFilenameTemplate = self._defaultFilenameTemplate 553 icalfn = properties.get('ical-schedule') 554 if self._can_schedule and icalfn: 555 self.scheduleRecordings(open(icalfn, 'r')) 556 elif icalfn: 557 # ical schedule is set, but self._can_schedule is False 558 559 def missingModule(moduleName): 560 m = messages.Error(T_(N_( 561 "An iCal file has been specified for scheduling, " 562 "but the '%s' module is not installed.\n"), moduleName), 563 mid='error-python-%s' % moduleName) 564 documentation.messageAddPythonInstall(m, moduleName) 565 self.debug(m) 566 self.addMessage(m)
567 568 if not eventcalendar.HAS_ICALENDAR: 569 missingModule('icalendar') 570 if not eventcalendar.HAS_DATEUTIL: 571 missingModule('dateutil') 572 # self._can_schedule is False, so one of the above surely happened 573 raise errors.ComponentSetupHandledError() 574 575 self.writeIndex = properties.get('write-index', False) 576 self.reactToMarks = properties.get('react-to-stream-markers', False) 577 self.syncOnTdt = properties.get('sync-on-tdt', False) 578 self.timeOverlap = properties.get('time-overlap', 0) 579 580 sink = self.get_element('fdsink') 581 582 if gstreamer.element_factory_has_property('multifdsink', 583 'resend-streamheader'): 584 sink.set_property('resend-streamheader', False) 585 else: 586 self.debug("resend-streamheader property not available, " 587 "resending streamheader when it changes in the caps") 588 sink.get_pad('sink').connect('notify::caps', self._notify_caps_cb) 589 # connect to client-removed so we can detect errors in file writing 590 sink.connect('client-removed', self._client_removed_cb) 591 592 if self.writeIndex: 593 sink.connect('client-added', self._client_added_cb) 594 595 if self.reactToMarks: 596 pfx = properties.get('stream-marker-filename-prefix', '%03d.') 597 self._markerPrefix = pfx 598 599 if self.reactToMarks or self.writeIndex or self.syncOnTdt: 600 sink.get_pad("sink").add_data_probe(self._src_pad_probe)
601 602 603 ### our methods 604
605 - def _tdt_to_datetime(self, s):
606 ''' 607 TDT events contains a structure representing the UTC time of the 608 stream with the fields: 609 'year', 'month', 'day', 'hour', 'minute', 'second' 610 611 Can raise and Exception if the structure doesn't cotains all the 612 requiered fields. Protect with try-except. 613 ''' 614 if s.get_name() != 'tdt': 615 return None 616 t = dt.datetime(s['year'], s['month'], s['day'], s['hour'], 617 s['minute'], s['second']) 618 return time.mktime(t.timetuple())
619
620 - def _getStats(self, fd):
621 sink = self.get_element('fdsink') 622 stats = sink.emit('get-stats', fd) 623 if len(stats) <= 6: 624 self.warning("The current version of multifdsink doesn't " 625 "include the timestamp of the first and last " 626 "buffers sent: the indexing will be disabled.") 627 m = messages.Warning( 628 T_(N_("Versions up to and including %s of the '%s' " 629 "GStreamer plug-in can't be used to write index " 630 "files.\n"), 631 '0.10.30', 'multifdsink')) 632 self.addMessage(m) 633 self.writeIndex = False 634 return None 635 return stats
636
637 - def _updateIndex(self, offset, timestamp, isKeyframe, tdt=0):
638 for fd, val in self._clients.items(): 639 index, synced = val 640 if not synced: 641 stats = self._getStats(fd) 642 # Check if multifdsink can be used for indexing 643 if not stats: 644 return 645 # Very unlikely, but if we are not synced yet, 646 # add this entry to the index because it's going 647 # to be the sync point, and continue 648 if stats[6] == gst.CLOCK_TIME_NONE: 649 index.addEntry(offset, timestamp, isKeyframe, tdt, False) 650 continue 651 # if we know when the client was synced, trim the index. 652 index.updateStart(stats[6]) 653 self._clients[fd] = (index, True) 654 # At this point we should have only one entry in the index 655 # which will be written to file after getting the next sync 656 # buffer and we can update its duration and length. 657 658 index.addEntry(offset, timestamp, isKeyframe, tdt, True) 659 self._lastEntry = (offset, timestamp, isKeyframe, tdt)
660
661 - def _pollDisk(self):
662 # Figure out the remaining disk space where the disker is saving 663 # files to 664 self._pollDiskDC = None 665 s = None 666 try: 667 s = os.statvfs(self.directory) 668 except Exception, e: 669 self.debug('failed to figure out disk space: %s', 670 log.getExceptionMessage(e)) 671 672 if not s: 673 free = None 674 else: 675 free = formatting.formatStorage(s.f_frsize * s.f_bavail) 676 677 if self.uiState.get('disk-free') != free: 678 self.debug("disk usage changed, reporting to observers") 679 self.uiState.set('disk-free', free)
680
681 - def setTimeRotate(self, time):
682 """ 683 @param time: duration of file (in seconds) 684 """ 685 if self._rotateTimeDelayedCall: 686 self._rotateTimeDelayedCall.cancel() 687 self._rotateTimeDelayedCall = reactor.callLater( 688 time, self._rotateTimeCallLater, time)
689
690 - def setSizeRotate(self, size):
691 """ 692 @param size: size of file (in bytes) 693 """ 694 reactor.callLater(5, self._rotateSizeCallLater, size)
695
696 - def _rotateTimeCallLater(self, time):
697 self.changeFilename() 698 699 # reschedule ourselves indefinitely 700 self._rotateTimeDelayedCall = reactor.callLater( 701 time, self._rotateTimeCallLater, time)
702
703 - def _rotateSizeCallLater(self, size):
704 if not self.location: 705 self.warning('Cannot rotate file, no file location set') 706 else: 707 if os.stat(self.location).st_size > size: 708 self.changeFilename() 709 710 # Add a new one 711 reactor.callLater(5, self._rotateSizeCallLater, size)
712
713 - def getMime(self):
714 if self.caps: 715 return self.caps.get_structure(0).get_name()
716
717 - def scheduleRecordings(self, icalFile):
718 self.uiState.set('has-schedule', True) 719 self.debug('Parsing iCalendar file %s' % icalFile) 720 from flumotion.component.base import scheduler 721 try: 722 self.icalScheduler = scheduler.ICalScheduler(icalFile) 723 self.icalScheduler.subscribe(self.eventInstanceStarted, 724 self.eventInstanceEnded) 725 # FIXME: this should be handled through the subscription 726 # handlers; for that, we should subscribe before the calendar 727 # gets added 728 cal = self.icalScheduler.getCalendar() 729 eventInstances = cal.getActiveEventInstances() 730 if eventInstances: 731 instance = eventInstances[0] 732 content = instance.event.content 733 self.info('Event %s is in progress, start recording' % 734 content) 735 self._startFilenameTemplate = content 736 self._startTime = instance.start 737 self._recordAtStart = True 738 else: 739 self.info('No events in progress') 740 self._recordAtStart = False 741 self._updateNextPoints() 742 except (ValueError, IndexError, KeyError), e: 743 m = messages.Warning(T_(N_( 744 "Error parsing ical file %s, so not scheduling any" 745 " events." % icalFile)), 746 debug=log.getExceptionMessage(e), mid="error-parsing-ical") 747 self.addMessage(m)
748
749 - def changeFilename(self, filenameTemplate=None, datetime=None):
750 """ 751 @param filenameTemplate: strftime format string to decide filename 752 @param time: an aware datetime used for the filename and 753 to compare if an existing file needs to be 754 overwritten. defaulting to datetime.now(). 755 """ 756 mime = self.getMime() 757 ext = mimeTypeToExtention(mime) 758 759 # if the events comes from the calendar, datetime is aware and we can 760 # deduce from it both the local and utc time. 761 # in case datetime is None datetime.now() doesn't return an aware 762 # datetime, so we need to get both the local time and the utc time. 763 tm = datetime or dt.datetime.now() 764 tmutc = datetime or dt.datetime.utcnow() 765 766 # delay the stop of the current recording to ensure there are no gaps 767 # in the recorded files. We could think that emitting first the signal 768 # to add a new client before the one to remove the client and syncing 769 # with the latest keyframe should be enough, but it doesn't ensure the 770 # stream continuity if it's done close to a keyframe because when 771 # multifdsink looks internally for the latest keyframe it's already to 772 # late and a gap is introduced. 773 reactor.callLater(self.timeOverlap, self._stopRecordingFull, self.file, 774 self.location, self.last_tstamp, True) 775 776 sink = self.get_element('fdsink') 777 if sink.get_state() == gst.STATE_NULL: 778 sink.set_state(gst.STATE_READY) 779 780 filename = "" 781 if not filenameTemplate: 782 filenameTemplate = self._defaultFilenameTemplate 783 filename = "%s.%s" % (formatting.strftime(filenameTemplate, 784 # for the filename we want to use the local time 785 tm.timetuple()), ext) 786 self.location = os.path.join(self.directory, filename) 787 788 # only overwrite existing files if it was last changed before the 789 # start of this event; ie. if it is a recording of a previous event 790 location = self.location 791 i = 1 792 while os.path.exists(location): 793 mtimeTuple = time.gmtime(os.stat(location).st_mtime) 794 # time.gmtime returns a time tuple in utc, so we compare against 795 # the utc timetuple of the datetime 796 if mtimeTuple <= tmutc.utctimetuple(): 797 self.info( 798 "Existing recording %s from previous event, overwriting", 799 location) 800 break 801 802 self.info( 803 "Existing recording %s from current event, changing name", 804 location) 805 location = self.location + '.' + str(i) 806 i += 1 807 self.location = location 808 809 self.info("Changing filename to %s", self.location) 810 self.file = _openFile(self, self, self.location, 'wb') 811 if self.file is None: 812 return 813 self._recordingStarted(self.file, self.location) 814 sink.emit('add', self.file.fileno()) 815 self.last_tstamp = time.time() 816 self.uiState.set('filename', self.location) 817 self.uiState.set('recording', True) 818 819 if self._symlinkToCurrentRecording: 820 self._updateSymlink(self.location, 821 self._symlinkToCurrentRecording)
822 846
847 - def stopRecording(self):
848 self._stopRecordingFull(self.file, self.location, 849 self.last_tstamp, False)
850
851 - def _stopRecordingFull(self, handle, location, lastTstamp, delayedStop):
852 sink = self.get_element('fdsink') 853 if sink.get_state() == gst.STATE_NULL: 854 sink.set_state(gst.STATE_READY) 855 856 if handle: 857 handle.flush() 858 sink.emit('remove', handle.fileno()) 859 self._recordingStopped(handle, location) 860 handle = None 861 if not delayedStop: 862 self.uiState.set('filename', None) 863 self.uiState.set('recording', False) 864 try: 865 size = formatting.formatStorage(os.stat(location).st_size) 866 except EnvironmentError, e: 867 self.debug("Failed to stat %s: %s", location, 868 log.getExceptionMessage(e)) 869 # catch File not found, permission denied, disk problems 870 size = "unknown" 871 872 # Limit number of entries on filelist, remove the oldest entry 873 fl = self.uiState.get('filelist', otherwise=[]) 874 if FILELIST_SIZE == len(fl): 875 self.uiState.remove('filelist', fl[0]) 876 877 self.uiState.append('filelist', (lastTstamp, 878 location, 879 size)) 880 881 if not delayedStop and self._symlinkToLastRecording: 882 self._updateSymlink(location, 883 self._symlinkToLastRecording)
884
885 - def _updateHeadersSize(self):
886 for index, a in self._clients.values(): 887 index.setHeadersSize(self._headers_size)
888 889 890 # START OF THREAD AWARE METHODS 891
892 - def _notify_caps_cb(self, pad, param):
893 caps = pad.get_negotiated_caps() 894 if caps == None: 895 return 896 897 caps_str = gstreamer.caps_repr(caps) 898 self.debug('Got caps: %s' % caps_str) 899 900 new = True 901 if not self.caps == None: 902 self.warning('Already had caps: %s, replacing' % caps_str) 903 new = False 904 905 self.debug('Storing caps: %s' % caps_str) 906 self.caps = caps 907 908 if new and self._recordAtStart and not self.syncOnTdt: 909 reactor.callLater(0, self.changeFilename, 910 self._startFilenameTemplate, self._startTime)
911
912 - def _client_added_cb(self, element, arg0):
913 if not self.writeIndex: 914 return 915 916 indexLocation = '.'.join([self.location, 917 Index.INDEX_EXTENSION]) 918 index = Index(self, indexLocation) 919 index.setHeadersSize(self._headers_size) 920 # Write the index headers 921 index.save() 922 # We will need the last entry because multifdsink syncs in 923 # the last keyframe 924 if self._lastEntry: 925 index.addEntry(*(self._lastEntry + (False, ))) 926 self._clients[arg0] = (index, False)
927
928 - def _client_removed_cb(self, element, arg0, client_status):
929 # treat as error if we were removed because of GST_CLIENT_STATUS_ERROR 930 # FIXME: can we use the symbol instead of a numeric constant ? 931 if client_status == 4: 932 # since we get called from the streaming thread, hand off handling 933 # to the reactor's thread 934 reactor.callFromThread(self._client_error_cb) 935 936 if self.writeIndex: 937 del self._clients[arg0]
938
939 - def _handle_event(self, event):
940 if event.type != gst.EVENT_CUSTOM_DOWNSTREAM: 941 return True 942 943 struct = event.get_structure() 944 struct_name = struct.get_name() 945 if struct_name == 'FluStreamMark' and self.reactToMarks: 946 if struct['action'] == 'start': 947 self._onMarkerStart(struct['prog_id']) 948 elif struct['action'] == 'stop': 949 self._onMarkerStop() 950 elif struct_name == 'tdt' and self.syncOnTdt: 951 try: 952 if self._lastTdt == None: 953 self._firstTdt = True 954 self._lastTdt = self._tdt_to_datetime(struct) 955 self._nextIsKF = True 956 except KeyError, e: 957 self.warning("Error parsing tdt event: %r", e) 958 return True
959
960 - def _handle_buffer(self, buf):
961 # IN_CAPS Buffers 962 if buf.flag_is_set(gst.BUFFER_FLAG_IN_CAPS): 963 self._headers_size += buf.size 964 reactor.callFromThread(self._updateHeadersSize) 965 return True 966 967 # re-timestamp buffers without timestamp, so that we can get from 968 # multifdsink's client stats the first and last buffer received 969 if buf.timestamp == gst.CLOCK_TIME_NONE: 970 buf.timestamp = self._clock.get_time() 971 972 if self.syncOnTdt: 973 if self._nextIsKF: 974 # That's the first buffer after a 'tdt'. we mark it as a 975 # keyframe and the sink will start streaming from it. 976 buf.flag_unset(gst.BUFFER_FLAG_DELTA_UNIT) 977 self._nextIsKF = False 978 reactor.callFromThread(self._updateIndex, self._offset, 979 buf.timestamp, False, int(self._lastTdt)) 980 if self._recordAtStart and self._firstTdt: 981 reactor.callLater(0, self.changeFilename, 982 self._startFilenameTemplate, self._startTime) 983 self._firstTdt = False 984 else: 985 buf.flag_set(gst.BUFFER_FLAG_DELTA_UNIT) 986 # if we don't sync on TDT and this is a keyframe, add it to the index 987 elif not buf.flag_is_set(gst.BUFFER_FLAG_DELTA_UNIT): 988 reactor.callFromThread(self._updateIndex, 989 self._offset, buf.timestamp, True) 990 self._offset += buf.size 991 return True
992
993 - def _src_pad_probe(self, pad, data):
994 # Events 995 if type(data) is gst.Event: 996 if self.reactToMarks or self.syncOnTdt: 997 return self._handle_event(data) 998 # Buffers 999 elif self.writeIndex: 1000 return self._handle_buffer(data) 1001 return True
1002 1003 # END OF THREAD AWARE METHODS 1004
1005 - def _client_error_cb(self):
1006 self.file.close() 1007 self.file = None 1008 1009 self.setMood(moods.sad) 1010 messageId = "error-writing-%s" % self.location 1011 m = messages.Error(T_(N_( 1012 "Error writing to file '%s'."), self.location), 1013 mid=messageId, priority=40) 1014 self.addMessage(m)
1015
1016 - def eventInstanceStarted(self, eventInstance):
1017 self.debug('starting recording of %s', eventInstance.event.content) 1018 self.changeFilename(eventInstance.event.content, 1019 eventInstance.start) 1020 self._updateNextPoints()
1021
1022 - def eventInstanceEnded(self, eventInstance):
1023 self.debug('ending recording of %s', eventInstance.event.content) 1024 self.stopRecording() 1025 self._updateNextPoints()
1026
1027 - def _updateNextPoints(self):
1028 # query the scheduler for what the next points are in its window 1029 # and set it on the UI state 1030 1031 current = self.uiState.get('next-points')[:] 1032 points = self.icalScheduler.getPoints() 1033 new = [] 1034 1035 # twisted says 'Currently can't jelly datetime objects with tzinfo', 1036 # so convert all to UTC then remove tzinfo. 1037 1038 def _utcAndStripTZ(dt): 1039 return dt.astimezone(tz.UTC).replace(tzinfo=None)
1040 1041 for p in points: 1042 dtUTC = _utcAndStripTZ(p.dt) 1043 dtStart = p.eventInstance.start.replace(tzinfo=None) 1044 new.append((dtUTC, p.which, 1045 formatting.strftime(p.eventInstance.event.content, 1046 dtStart.timetuple()))) 1047 1048 for t in current: 1049 if t not in new: 1050 self.debug('removing tuple %r from next-points', t) 1051 self.uiState.remove('next-points', t) 1052 1053 for t in new: 1054 if t not in current: 1055 self.debug('appending tuple %r to next-points', t) 1056 self.uiState.append('next-points', t) 1057
1058 - def _recordingStarted(self, handle, location):
1059 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug' 1060 # make sure plugs are configured with our socket, see #732 1061 if socket not in self.plugs: 1062 return 1063 for plug in self.plugs[socket]: 1064 self.debug('invoking recordingStarted on ' 1065 'plug %r on socket %s', plug, socket) 1066 plug.recordingStarted(handle, location)
1067
1068 - def _recordingStopped(self, handle, location):
1069 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug' 1070 # make sure plugs are configured with our socket, see #732 1071 if socket not in self.plugs: 1072 return 1073 for plug in self.plugs[socket]: 1074 self.debug('invoking recordingStopped on ' 1075 'plug %r on socket %s', plug, socket) 1076 plug.recordingStopped(handle, location)
1077 1078 ### marker methods 1079
1080 - def _onMarkerStop(self):
1081 self.stopRecording()
1082
1083 - def _onMarkerStart(self, data):
1084 tmpl = self._defaultFilenameTemplate 1085 if self._markerPrefix: 1086 try: 1087 tmpl = '%s%s' % (self._markerPrefix % data, 1088 self._defaultFilenameTemplate) 1089 except TypeError, err: 1090 m = messages.Warning(T_(N_('Failed expanding filename prefix: ' 1091 '%r <-- %r.'), 1092 self._markerPrefix, data), 1093 mid='expand-marker-prefix') 1094 self.addMessage(m) 1095 self.warning('Failed expanding filename prefix: ' 1096 '%r <-- %r; %r' % 1097 (self._markerPrefix, data, err)) 1098 self.changeFilename(tmpl)
1099
1100 - def do_stop(self):
1101 if self._pollDiskDC: 1102 self._pollDiskDC.cancel() 1103 self._pollDiskDC = None 1104 self._diskPoller.stop()
1105