1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import errno
19 import os
20 import time
21 import tempfile
22 import datetime as dt
23 import bisect
24
25 import gst
26
27 from twisted.internet import reactor
28
29 from flumotion.component import feedcomponent
30 from flumotion.common import log, gstreamer, messages,\
31 errors, common
32 from flumotion.common import documentation
33 from flumotion.common import format as formatting
34 from flumotion.common import eventcalendar, poller, tz
35 from flumotion.common.i18n import N_, gettexter
36 from flumotion.common.mimetypes import mimeTypeToExtention
37
38
39
40
41
42
43
44
45
46 from flumotion.component.component import moods
47
48 __all__ = ['Disker']
49 __version__ = "$Rev$"
50 T_ = gettexter()
51
52
53 DISKPOLL_FREQ = 60
54
55
56 FILELIST_SIZE = 100
57
58 """
59 Disker has a property 'ical-schedule'. This allows an ical file to be
60 specified in the config and have recordings scheduled based on events.
61 This file will be monitored for changes and events reloaded if this
62 happens.
63
64 The filename of a recording started from an ical file will be produced
65 via passing the ical event summary through strftime, so that an archive
66 can encode the date and time that it was begun.
67
68 The time that will be given to strftime will be given in the timezone of
69 the ical event. In practice this will either be UTC or the local time of
70 the machine running the disker, as the ical scheduler does not
71 understand arbitrary timezones.
72 """
73
74
75 -def _openFile(loggable, component, location, mode):
89
90
91 -class Index(log.Loggable):
92 '''
93 Creates an index of keyframes for a file, than can be used later for
94 seeking in non indexed formats or whithout parsing the headers.
95
96 The format of the index is very similar to the AVI Index, but it can also
97 include information about the real time of each entry in UNIX time.
98 (see 'man aviindex')
99
100 If the index is for an indexed format, the offset of the first entry will
101 not start from 0. This offset is the size of the headers. '''
102
103
104
105
106
107
108
109
110
111 INDEX_HEADER = "FLUIDX1 #Flumotion\n"
112 INDEX_KEYS = ['CHK', 'POS', 'LEN', 'TS', 'DUR', 'KF', 'TDT', 'TDUR']
113 INDEX_EXTENSION = 'index'
114
115 logCategory = "index"
116
117 - def __init__(self, component=None, location=None):
122
123
124
131
132 - def addEntry(self, offset, timestamp, keyframe, tdt=0, writeIndex=True):
133 '''
134 Add a new entry to the the index and writes it to disk if
135 writeIndex is True
136 '''
137 if len(self._index) > 0:
138
139 if not self._checkEntriesContinuity(offset, timestamp, tdt):
140 return
141
142 self._updateLastEntry(offset, timestamp, tdt)
143
144 if writeIndex and self.location:
145 f = _openFile(self, self.comp, self.location, 'a+')
146 if not f:
147 return
148 off = self._index[0]['offset'] - self._headers_size
149 self._write_index_entry(f, self._index[-1], off,
150 len(self._index)-1)
151
152 self._index.append({'offset': offset, 'length': -1,
153 'timestamp': timestamp, 'duration': -1,
154 'keyframe': keyframe, 'tdt': tdt,
155 'tdt-duration': -1})
156
157 self.debug("Added new entry to the index: offset=%s timestamp=%s "
158 "keyframe=%s tdt=%s", offset, timestamp, keyframe, tdt)
159
162
164 '''
165 Set the headers size in bytes. Multifdsink append the stream headers
166 to each client. This size is then used to adjust the offset of the
167 index entries
168 '''
169 self._headers_size = size
170
172 '''
173 Return an index entry corresponding to the headers, which is a chunk
174 with 'offset' 0 and 'length' equals to the headers size
175 '''
176 if self._headers_size == 0:
177 return None
178 return {'offset': 0, 'length': self._headers_size,
179 'timestamp': 0, 'duration': -1,
180 'keyframe': 0, 'tdt': 0, 'tdt-duration': -1}
181
183 if len(self._index) == 0:
184 return -1
185 return self._index[0]['timestamp']
186
188 if len(self._index) == 0:
189 return -1
190 return self._index[0]['tdt']
191
193 '''
194 Clip the current index to a start and stop time, returning all the
195 entries matching the boundaries using the 'timestamp'
196 '''
197 return self._clip('timestamp', 'duration', start, stop)
198
200 '''
201 Clip the current index to a start and stop time, returning all the
202 entries matching the boundaries using the 'tdt'
203 '''
204 return self._clip('tdt', 'tdt-duration', start, stop)
205
207 '''
208 Clears the index
209 '''
210 self._index = []
211
212 - def save(self, start=None, stop=None):
213 '''
214 Saves the index in a file, using the entries from 'start' to 'stop'
215 '''
216 if self.location is None:
217 self.warning("Couldn't save the index, the location is not set.")
218 return False
219 f = _openFile(self, self.comp, self.location, 'w+')
220 if not f:
221 return False
222
223 self._write_index_headers(f)
224 if len(self._index) == 0:
225 return True
226
227 self._write_index_entries(f, self._filter_index(start, stop))
228 self.info("Index saved successfully. start=%s stop=%s location=%s ",
229 start, stop, self.location)
230 return True
231
233 '''
234 Loads the entries of the index from an index file
235 '''
236
237 def invalidIndex(reason):
238 self.warning("This file is not a valid index: %s", reason)
239 return False
240
241 if not location.endswith(self.INDEX_EXTENSION):
242 return invalidIndex("the extension of this file is not '%s'" %
243 self.INDEX_EXTENSION)
244 try:
245 self.info("Loading index file %s", location)
246 handle = open(location, 'r')
247 indexString = handle.readlines()
248 handle.close()
249 except IOError, e:
250 return invalidIndex("error reading index file (%r)" % e)
251
252 if len(indexString) == 0:
253 return invalidIndex("the file is empty")
254
255 if not indexString[0].startswith('FLUIDX1 #'):
256 return invalidIndex('header is not FLUIDX1')
257
258 keysStr = ' '.join(self.INDEX_KEYS)
259 if indexString[1].strip('\n') != keysStr:
260 return invalidIndex('keys definition is not: %s' % keysStr)
261
262 setHeaders = True
263 for entryLine in indexString[2:]:
264 e = entryLine.split(' ')
265 if len(e) < len(self.INDEX_KEYS):
266 return invalidIndex("one of the entries doesn't have enough "
267 "parameters (needed=%d, provided=%d)" %
268 (len(self.INDEX_KEYS), len(e)))
269 try:
270 self.addEntry(int(e[1]), int(e[3]), common.strToBool(e[5]),
271 int(e[6]))
272 except Exception, e:
273 return invalidIndex("could not parse one of the entries: %r"
274 % e)
275 if setHeaders:
276 self._headers_size = int(e[1])
277 setHeaders = False
278 self.info("Index parsed successfully")
279 return True
280
281
282
283 - def _updateLastEntry(self, offset, timestamp, tdt):
284 last = self._index[-1]
285 last['length'] = offset - last['offset']
286 last['duration'] = timestamp - last['timestamp']
287 last['tdt-duration'] = tdt - last['tdt']
288
290 last = self._index[-1]
291 for key, value in [('offset', offset), ('timestamp', timestamp),
292 ('tdt', tdt)]:
293 if value < last[key]:
294 self.warning("Could not add entries with a decreasing %s "
295 "(last=%s, new=%s)", key, last[key], value)
296 return False
297 return True
298
299 - def _clip(self, keyTS, keyDur, start, stop):
300 '''
301 Clip the index to a start and stop time. For an index with 10
302 entries of 10 seconds starting from 0, cliping from 15 to 35 will
303 return the entries 1, 2, and 3.
304 '''
305 if start >= stop:
306 return None
307
308 keys = [e[keyTS] for e in self._index]
309
310
311
312 lastEntry = self._index[-1]
313 if lastEntry[keyDur] != -1:
314 keys.append(lastEntry[keyTS] + lastEntry[keyDur])
315
316
317 if stop <= keys[0] or start >= keys[-1]:
318 return None
319
320
321
322 if start <= keys[0]:
323 start = keys[0]
324 if stop >= keys[-1]:
325 stop = keys[-1] - 1
326
327
328 i_start = bisect.bisect_right(keys, start) - 1
329 i_stop = bisect.bisect_right(keys, stop)
330
331 return self._index[i_start:i_stop]
332
334 '''
335 Filter the index with a start and stop time.
336 FIXME: Check performance difference with clipping
337 '''
338 if len(self._index) == 0:
339 return
340 if not start and not stop:
341 return self._index
342 if not start:
343 start = self._index[0]['timestamp']
344 if not stop:
345 last_entry = self._index[len(self._index)-1]
346 stop = last_entry['timestamp'] + 1
347 return [x for x in self._index if (x['timestamp'] >= start and\
348 x['timestamp'] <= stop)]
349
353
354 - def _write_index_entry(self, file, entry, offset, count):
355 frmt = "%s\n" % " ".join(['%s'] * len(self.INDEX_KEYS))
356 file.write(frmt % (count, entry['offset'] - offset,
357 entry['length'],
358 entry['timestamp'],
359 entry['duration'],
360 entry['keyframe'],
361 entry['tdt'],
362 entry['tdt-duration']))
363
371
372
374
375
376
379
380
381
382
385
395
396
397
400
401
402 -class Disker(feedcomponent.ParseLaunchComponent, log.Loggable):
403 logCategory = "disker"
404
405 componentMediumClass = DiskerMedium
406 checkOffset = True
407 pipe_template = 'multifdsink name=fdsink sync-method=2 mode=1 sync=false'
408 file = None
409 directory = None
410 location = None
411 caps = None
412 last_tstamp = None
413 indexLocation = None
414 writeIndex = False
415 syncOnTdt = False
416 timeOverlap = 0
417 reactToMarks = False
418
419 _offset = 0L
420 _headers_size = 0
421 _index = None
422 _nextIsKF = False
423 _lastTdt = None
424 _lastEntry = None
425 _clients = {}
426 _startFilenameTemplate = None
427 _startTime = None
428 _rotateTimeDelayedCall = None
429 _pollDiskDC = None
430 _symlinkToLastRecording = None
431 _symlinkToCurrentRecording = None
432
433
434
435
436
437
438
439
456
457
458
472
484
486 props = self.config['properties']
487 rotateType = props.get('rotate-type', 'none')
488
489 if not rotateType in ['none', 'size', 'time']:
490 msg = messages.Error(T_(N_(
491 "The configuration property 'rotate-type' should be set to "
492 "'size', time', or 'none', not '%s'. "
493 "Please fix the configuration."),
494 rotateType), mid='rotate-type')
495 addMessage(msg)
496 raise errors.ConfigError(msg)
497
498 if rotateType in ['size', 'time']:
499 if rotateType not in props.keys():
500 msg = messages.Error(T_(N_(
501 "The configuration property '%s' should be set. "
502 "Please fix the configuration."),
503 rotateType), mid='rotate-type')
504 addMessage(msg)
505 raise errors.ConfigError(msg)
506
507 if props[rotateType] == 0:
508 msg = messages.Error(T_(N_("Configuration error: " \
509 "'rotate-type' %s value cannot be set to 0."),
510 rotateType), mid='rotate-type')
511 addMessage(msg)
512 raise errors.ConfigError(msg)
513
514
515
541
567
568 if not eventcalendar.HAS_ICALENDAR:
569 missingModule('icalendar')
570 if not eventcalendar.HAS_DATEUTIL:
571 missingModule('dateutil')
572
573 raise errors.ComponentSetupHandledError()
574
575 self.writeIndex = properties.get('write-index', False)
576 self.reactToMarks = properties.get('react-to-stream-markers', False)
577 self.syncOnTdt = properties.get('sync-on-tdt', False)
578 self.timeOverlap = properties.get('time-overlap', 0)
579
580 sink = self.get_element('fdsink')
581
582 if gstreamer.element_factory_has_property('multifdsink',
583 'resend-streamheader'):
584 sink.set_property('resend-streamheader', False)
585 else:
586 self.debug("resend-streamheader property not available, "
587 "resending streamheader when it changes in the caps")
588 sink.get_pad('sink').connect('notify::caps', self._notify_caps_cb)
589
590 sink.connect('client-removed', self._client_removed_cb)
591
592 if self.writeIndex:
593 sink.connect('client-added', self._client_added_cb)
594
595 if self.reactToMarks:
596 pfx = properties.get('stream-marker-filename-prefix', '%03d.')
597 self._markerPrefix = pfx
598
599 if self.reactToMarks or self.writeIndex or self.syncOnTdt:
600 sink.get_pad("sink").add_data_probe(self._src_pad_probe)
601
602
603
604
606 '''
607 TDT events contains a structure representing the UTC time of the
608 stream with the fields:
609 'year', 'month', 'day', 'hour', 'minute', 'second'
610
611 Can raise and Exception if the structure doesn't cotains all the
612 requiered fields. Protect with try-except.
613 '''
614 if s.get_name() != 'tdt':
615 return None
616 t = dt.datetime(s['year'], s['month'], s['day'], s['hour'],
617 s['minute'], s['second'])
618 return time.mktime(t.timetuple())
619
621 sink = self.get_element('fdsink')
622 stats = sink.emit('get-stats', fd)
623 if len(stats) <= 6:
624 self.warning("The current version of multifdsink doesn't "
625 "include the timestamp of the first and last "
626 "buffers sent: the indexing will be disabled.")
627 m = messages.Warning(
628 T_(N_("Versions up to and including %s of the '%s' "
629 "GStreamer plug-in can't be used to write index "
630 "files.\n"),
631 '0.10.30', 'multifdsink'))
632 self.addMessage(m)
633 self.writeIndex = False
634 return None
635 return stats
636
637 - def _updateIndex(self, offset, timestamp, isKeyframe, tdt=0):
638 for fd, val in self._clients.items():
639 index, synced = val
640 if not synced:
641 stats = self._getStats(fd)
642
643 if not stats:
644 return
645
646
647
648 if stats[6] == gst.CLOCK_TIME_NONE:
649 index.addEntry(offset, timestamp, isKeyframe, tdt, False)
650 continue
651
652 index.updateStart(stats[6])
653 self._clients[fd] = (index, True)
654
655
656
657
658 index.addEntry(offset, timestamp, isKeyframe, tdt, True)
659 self._lastEntry = (offset, timestamp, isKeyframe, tdt)
660
662
663
664 self._pollDiskDC = None
665 s = None
666 try:
667 s = os.statvfs(self.directory)
668 except Exception, e:
669 self.debug('failed to figure out disk space: %s',
670 log.getExceptionMessage(e))
671
672 if not s:
673 free = None
674 else:
675 free = formatting.formatStorage(s.f_frsize * s.f_bavail)
676
677 if self.uiState.get('disk-free') != free:
678 self.debug("disk usage changed, reporting to observers")
679 self.uiState.set('disk-free', free)
680
689
695
702
712
714 if self.caps:
715 return self.caps.get_structure(0).get_name()
716
748
750 """
751 @param filenameTemplate: strftime format string to decide filename
752 @param time: an aware datetime used for the filename and
753 to compare if an existing file needs to be
754 overwritten. defaulting to datetime.now().
755 """
756 mime = self.getMime()
757 ext = mimeTypeToExtention(mime)
758
759
760
761
762
763 tm = datetime or dt.datetime.now()
764 tmutc = datetime or dt.datetime.utcnow()
765
766
767
768
769
770
771
772
773 reactor.callLater(self.timeOverlap, self._stopRecordingFull, self.file,
774 self.location, self.last_tstamp, True)
775
776 sink = self.get_element('fdsink')
777 if sink.get_state() == gst.STATE_NULL:
778 sink.set_state(gst.STATE_READY)
779
780 filename = ""
781 if not filenameTemplate:
782 filenameTemplate = self._defaultFilenameTemplate
783 filename = "%s.%s" % (formatting.strftime(filenameTemplate,
784
785 tm.timetuple()), ext)
786 self.location = os.path.join(self.directory, filename)
787
788
789
790 location = self.location
791 i = 1
792 while os.path.exists(location):
793 mtimeTuple = time.gmtime(os.stat(location).st_mtime)
794
795
796 if mtimeTuple <= tmutc.utctimetuple():
797 self.info(
798 "Existing recording %s from previous event, overwriting",
799 location)
800 break
801
802 self.info(
803 "Existing recording %s from current event, changing name",
804 location)
805 location = self.location + '.' + str(i)
806 i += 1
807 self.location = location
808
809 self.info("Changing filename to %s", self.location)
810 self.file = _openFile(self, self, self.location, 'wb')
811 if self.file is None:
812 return
813 self._recordingStarted(self.file, self.location)
814 sink.emit('add', self.file.fileno())
815 self.last_tstamp = time.time()
816 self.uiState.set('filename', self.location)
817 self.uiState.set('recording', True)
818
819 if self._symlinkToCurrentRecording:
820 self._updateSymlink(self.location,
821 self._symlinkToCurrentRecording)
822
824 if not dest.startswith('/'):
825 dest = os.path.join(self.directory, dest)
826
827
828
829 self.debug("updating symbolic link %s to point to %s", dest, src)
830 try:
831 try:
832 os.symlink(src, dest)
833 except OSError, e:
834 if e.errno == errno.EEXIST and os.path.islink(dest):
835 os.unlink(dest)
836 os.symlink(src, dest)
837 else:
838 raise
839 except Exception, e:
840 self.info("Failed to update link %s: %s", dest,
841 log.getExceptionMessage(e))
842 m = messages.Warning(T_(N_("Failed to update symbolic link "
843 "'%s'. Check your permissions."), dest),
844 debug=log.getExceptionMessage(e))
845 self.addMessage(m)
846
850
852 sink = self.get_element('fdsink')
853 if sink.get_state() == gst.STATE_NULL:
854 sink.set_state(gst.STATE_READY)
855
856 if handle:
857 handle.flush()
858 sink.emit('remove', handle.fileno())
859 self._recordingStopped(handle, location)
860 handle = None
861 if not delayedStop:
862 self.uiState.set('filename', None)
863 self.uiState.set('recording', False)
864 try:
865 size = formatting.formatStorage(os.stat(location).st_size)
866 except EnvironmentError, e:
867 self.debug("Failed to stat %s: %s", location,
868 log.getExceptionMessage(e))
869
870 size = "unknown"
871
872
873 fl = self.uiState.get('filelist', otherwise=[])
874 if FILELIST_SIZE == len(fl):
875 self.uiState.remove('filelist', fl[0])
876
877 self.uiState.append('filelist', (lastTstamp,
878 location,
879 size))
880
881 if not delayedStop and self._symlinkToLastRecording:
882 self._updateSymlink(location,
883 self._symlinkToLastRecording)
884
888
889
890
891
911
927
938
940 if event.type != gst.EVENT_CUSTOM_DOWNSTREAM:
941 return True
942
943 struct = event.get_structure()
944 struct_name = struct.get_name()
945 if struct_name == 'FluStreamMark' and self.reactToMarks:
946 if struct['action'] == 'start':
947 self._onMarkerStart(struct['prog_id'])
948 elif struct['action'] == 'stop':
949 self._onMarkerStop()
950 elif struct_name == 'tdt' and self.syncOnTdt:
951 try:
952 if self._lastTdt == None:
953 self._firstTdt = True
954 self._lastTdt = self._tdt_to_datetime(struct)
955 self._nextIsKF = True
956 except KeyError, e:
957 self.warning("Error parsing tdt event: %r", e)
958 return True
959
961
962 if buf.flag_is_set(gst.BUFFER_FLAG_IN_CAPS):
963 self._headers_size += buf.size
964 reactor.callFromThread(self._updateHeadersSize)
965 return True
966
967
968
969 if buf.timestamp == gst.CLOCK_TIME_NONE:
970 buf.timestamp = self._clock.get_time()
971
972 if self.syncOnTdt:
973 if self._nextIsKF:
974
975
976 buf.flag_unset(gst.BUFFER_FLAG_DELTA_UNIT)
977 self._nextIsKF = False
978 reactor.callFromThread(self._updateIndex, self._offset,
979 buf.timestamp, False, int(self._lastTdt))
980 if self._recordAtStart and self._firstTdt:
981 reactor.callLater(0, self.changeFilename,
982 self._startFilenameTemplate, self._startTime)
983 self._firstTdt = False
984 else:
985 buf.flag_set(gst.BUFFER_FLAG_DELTA_UNIT)
986
987 elif not buf.flag_is_set(gst.BUFFER_FLAG_DELTA_UNIT):
988 reactor.callFromThread(self._updateIndex,
989 self._offset, buf.timestamp, True)
990 self._offset += buf.size
991 return True
992
1002
1003
1004
1015
1021
1026
1028
1029
1030
1031 current = self.uiState.get('next-points')[:]
1032 points = self.icalScheduler.getPoints()
1033 new = []
1034
1035
1036
1037
1038 def _utcAndStripTZ(dt):
1039 return dt.astimezone(tz.UTC).replace(tzinfo=None)
1040
1041 for p in points:
1042 dtUTC = _utcAndStripTZ(p.dt)
1043 dtStart = p.eventInstance.start.replace(tzinfo=None)
1044 new.append((dtUTC, p.which,
1045 formatting.strftime(p.eventInstance.event.content,
1046 dtStart.timetuple())))
1047
1048 for t in current:
1049 if t not in new:
1050 self.debug('removing tuple %r from next-points', t)
1051 self.uiState.remove('next-points', t)
1052
1053 for t in new:
1054 if t not in current:
1055 self.debug('appending tuple %r to next-points', t)
1056 self.uiState.append('next-points', t)
1057
1059 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug'
1060
1061 if socket not in self.plugs:
1062 return
1063 for plug in self.plugs[socket]:
1064 self.debug('invoking recordingStarted on '
1065 'plug %r on socket %s', plug, socket)
1066 plug.recordingStarted(handle, location)
1067
1069 socket = 'flumotion.component.consumers.disker.disker_plug.DiskerPlug'
1070
1071 if socket not in self.plugs:
1072 return
1073 for plug in self.plugs[socket]:
1074 self.debug('invoking recordingStopped on '
1075 'plug %r on socket %s', plug, socket)
1076 plug.recordingStopped(handle, location)
1077
1078
1079
1082
1084 tmpl = self._defaultFilenameTemplate
1085 if self._markerPrefix:
1086 try:
1087 tmpl = '%s%s' % (self._markerPrefix % data,
1088 self._defaultFilenameTemplate)
1089 except TypeError, err:
1090 m = messages.Warning(T_(N_('Failed expanding filename prefix: '
1091 '%r <-- %r.'),
1092 self._markerPrefix, data),
1093 mid='expand-marker-prefix')
1094 self.addMessage(m)
1095 self.warning('Failed expanding filename prefix: '
1096 '%r <-- %r; %r' %
1097 (self._markerPrefix, data, err))
1098 self.changeFilename(tmpl)
1099
1105