1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 Feed components, participating in the stream
20 """
21
22 import os
23
24 import gst
25 import gst.interfaces
26 import gobject
27
28 from twisted.internet import reactor, defer
29 from twisted.spread import pb
30 from zope.interface import implements
31
32 from flumotion.component import component as basecomponent
33 from flumotion.component import feed
34 from flumotion.common import common, interfaces, errors, log, pygobject, \
35 messages
36 from flumotion.common import gstreamer
37 from flumotion.common.i18n import N_, gettexter
38 from flumotion.common.planet import moods
39 from flumotion.common.pygobject import gsignal
40
41 __version__ = "$Rev$"
42 T_ = gettexter()
43
44
46 """
47 I am a component-side medium for a FeedComponent to interface with
48 the manager-side ComponentAvatar.
49 """
50 implements(interfaces.IComponentMedium)
51 logCategory = 'feedcompmed'
52 remoteLogName = 'feedserver'
53
55 """
56 @param component: L{flumotion.component.feedcomponent.FeedComponent}
57 """
58 basecomponent.BaseComponentMedium.__init__(self, component)
59
60 self._feederFeedServer = {}
61
62 self._feederPendingConnections = {}
63 self._eaterFeedServer = {}
64
65 self._eaterPendingConnections = {}
66 self.logName = component.name
67
68
69
72
74 """
75 Sets the GStreamer debugging levels based on the passed debug string.
76
77 @since: 0.4.2
78 """
79 self.debug('Setting GStreamer debug level to %s' % debug)
80 if not debug:
81 return
82
83 for part in debug.split(','):
84 glob = None
85 value = None
86 pair = part.split(':')
87 if len(pair) == 1:
88
89 value = int(pair[0])
90 elif len(pair) == 2:
91 glob, value = pair
92 value = int(value)
93 else:
94 self.warning("Cannot parse GStreamer debug setting '%s'." %
95 part)
96 continue
97
98 if glob:
99 try:
100
101 gst.debug_set_threshold_for_name(glob, value)
102 except TypeError:
103 self.warning("Cannot set glob %s to value %s" % (
104 glob, value))
105 else:
106 gst.debug_set_default_threshold(value)
107
108 self.comp.uiState.set('gst-debug', debug)
109
111 """
112 Tell the component the host and port for the FeedServer through which
113 it can connect a local eater to a remote feeder to eat the given
114 fullFeedId.
115
116 Called on by the manager-side ComponentAvatar.
117 """
118 if self._feederFeedServer.get(eaterAlias):
119 if self._feederFeedServer[eaterAlias] == (fullFeedId, host, port):
120 self.debug("Feed:%r is the same as the current one. "\
121 "Request ignored.", (fullFeedId, host, port))
122 return
123 self._feederFeedServer[eaterAlias] = (fullFeedId, host, port)
124 return self.connectEater(eaterAlias)
125
138
140 """
141 Connect one of the medium's component's eaters to a remote feed.
142 Called by the component, both on initial connection and for
143 reconnecting.
144
145 @returns: deferred that will fire with a value of None
146 """
147
148
149 def gotFeed((feedId, fd)):
150 self._feederPendingConnections.pop(eaterAlias, None)
151 self.comp.eatFromFD(eaterAlias, feedId, fd)
152
153 if eaterAlias not in self._feederFeedServer:
154 self.debug("eatFrom() hasn't been called yet for eater %s",
155 eaterAlias)
156
157
158 return defer.succeed(None)
159
160 (fullFeedId, host, port) = self._feederFeedServer[eaterAlias]
161
162 cancel = self._feederPendingConnections.pop(eaterAlias, None)
163 if cancel:
164 self.debug('cancelling previous connection attempt on %s',
165 eaterAlias)
166 cancel()
167
168 client = feed.FeedMedium(logName=self.comp.name)
169
170 d = client.requestFeed(host, port,
171 self._getAuthenticatorForFeed(eaterAlias),
172 fullFeedId)
173 self._feederPendingConnections[eaterAlias] = client.stopConnecting
174 d.addCallback(gotFeed)
175 return d
176
178 """
179 Tell the component to feed the given feed to the receiving component
180 accessible through the FeedServer on the given host and port.
181
182 Called on by the manager-side ComponentAvatar.
183 """
184 self._eaterFeedServer[fullFeedId] = (host, port)
185 self.connectFeeder(feederName, fullFeedId)
186
188 """
189 Tell the component to feed the given feed to the receiving component
190 accessible through the FeedServer on the given host and port.
191
192 Called on by the manager-side ComponentAvatar.
193 """
194
195 def gotFeed((fullFeedId, fd)):
196 self._eaterPendingConnections.pop(feederName, None)
197 self.comp.feedToFD(feederName, fd, os.close, fullFeedId)
198
199 if fullFeedId not in self._eaterFeedServer:
200 self.debug("feedTo() hasn't been called yet for feeder %s",
201 feederName)
202
203
204 return defer.succeed(None)
205
206 host, port = self._eaterFeedServer[fullFeedId]
207
208
209 cancel = self._eaterPendingConnections.pop(fullFeedId, None)
210 if cancel:
211 self.debug('cancelling previous connection attempt on %s',
212 feederName)
213 cancel()
214
215 client = feed.FeedMedium(logName=self.comp.name)
216
217 d = client.sendFeed(host, port,
218 self._getAuthenticatorForFeed(feederName),
219 fullFeedId)
220 self._eaterPendingConnections[feederName] = client.stopConnecting
221 d.addCallback(gotFeed)
222 return d
223
225 """
226 Tells the component to start providing a master clock on the given
227 UDP port.
228 Can only be called if setup() has been called on the component.
229
230 The IP address returned is the local IP the clock is listening on.
231
232 @returns: (ip, port, base_time)
233 @rtype: tuple of (str, int, long)
234 """
235 self.debug('remote_provideMasterClock(port=%r)' % port)
236 return self.comp.provide_master_clock(port)
237
239 """
240 Return the clock master info created by a previous call
241 to provideMasterClock.
242
243 @returns: (ip, port, base_time)
244 @rtype: tuple of (str, int, long)
245 """
246 return self.comp.get_master_clock()
247
250
251 - def remote_effect(self, effectName, methodName, *args, **kwargs):
252 """
253 Invoke the given methodName on the given effectName in this component.
254 The effect should implement effect_(methodName) to receive the call.
255 """
256 self.debug("calling %s on effect %s" % (methodName, effectName))
257 if not effectName in self.comp.effects:
258 raise errors.UnknownEffectError(effectName)
259 effect = self.comp.effects[effectName]
260 if not hasattr(effect, "effect_%s" % methodName):
261 raise errors.NoMethodError("%s on effect %s" % (methodName,
262 effectName))
263 method = getattr(effect, "effect_%s" % methodName)
264 try:
265 result = method(*args, **kwargs)
266 except TypeError:
267 msg = "effect method %s did not accept %s and %s" % (
268 methodName, args, kwargs)
269 self.debug(msg)
270 raise errors.RemoteRunError(msg)
271 self.debug("effect: result: %r" % result)
272 return result
273
276
277 from feedcomponent010 import FeedComponent
278
279 FeedComponent.componentMediumClass = FeedComponentMedium
280
281
283 """A component using gst-launch syntax
284
285 @cvar checkTimestamp: whether to check continuity of timestamps for eaters
286 @cvar checkOffset: whether to check continuity of offsets for
287 eaters
288 """
289
290 DELIMITER = '@'
291
292
293 checkTimestamp = False
294 checkOffset = False
295
296
297 FDSRC_TMPL = 'fdsrc name=%(name)s'
298 DEPAY_TMPL = 'gdpdepay name=%(name)s-depay'
299 FEEDER_TMPL = 'gdppay name=%(name)s-pay ! multifdsink sync=false '\
300 'name=%(name)s buffers-max=500 buffers-soft-max=450 '\
301 'recover-policy=1'
302 EATER_TMPL = None
303
323
324
325
352
361
362
363
365 """
366 Method that must be implemented by subclasses to produce the
367 gstparse string for the component's pipeline. Subclasses should
368 not chain up; this method raises a NotImplemented error.
369
370 Returns: a new pipeline string representation.
371 """
372 raise NotImplementedError('subclasses should implement '
373 'get_pipeline_string')
374
384
385
386
397
399 """
400 Expand the given pipeline string representation by substituting
401 blocks between '@' with a filled-in template.
402
403 @param pipeline: a pipeline string representation with variables
404 @param templatizers: A dict of prefix => procedure. Template
405 blocks in the pipeline will be replaced
406 with the result of calling the procedure
407 with what is left of the template after
408 taking off the prefix.
409 @returns: a new pipeline string representation.
410 """
411 assert pipeline != ''
412
413
414 if pipeline.count(self.DELIMITER) % 2 != 0:
415 raise TypeError("'%s' contains an odd number of '%s'"
416 % (pipeline, self.DELIMITER))
417
418 out = []
419 for i, block in enumerate(pipeline.split(self.DELIMITER)):
420
421
422 if i % 2 == 0:
423 out.append(block)
424 else:
425 block = block.strip()
426 try:
427 pos = block.index(':')
428 except ValueError:
429 raise TypeError("Template %r has no colon" % (block, ))
430 prefix = block[:pos+1]
431 if prefix not in templatizers:
432 raise TypeError("Template %r has invalid prefix %r"
433 % (block, prefix))
434 out.append(templatizers[prefix](block[pos+1:]))
435 return ''.join(out)
436
465
467 queue = self.get_queue_string(eaterAlias)
468 elementName = self.eaters[eaterAlias].elementName
469
470 return self.EATER_TMPL % {'name': elementName, 'queue': queue}
471
473 elementName = self.feeders[feederName].elementName
474 return self.FEEDER_TMPL % {'name': elementName}
475
477 """
478 Return a parse-launch string to join the fdsrc eater element and
479 the depayer, for example '!' or '! queue !'. The string may have
480 no format strings.
481 """
482 return '!'
483
485 """
486 Method that returns the source pad of the final element in an eater.
487
488 @returns: the GStreamer source pad of the final element in an eater
489 @rtype: L{gst.Pad}
490 """
491 e = self.eaters[eaterAlias]
492 identity = self.get_element(e.elementName + '-identity')
493 depay = self.get_element(e.depayName)
494 srcpad = depay.get_pad("src")
495 if identity:
496 srcpad = identity.get_pad("src")
497 return srcpad
498
500 """
501 Method that returns the sink pad of the first element in a feeder
502
503 @returns: the GStreamer sink pad of the first element in a feeder
504 @rtype: L{gst.Pad}
505 """
506 e = self.feeders[feederAlias]
507 gdppay = self.get_element(e.elementName + '-pay')
508 return gdppay.get_static_pad("sink")
509
510
512 """
513 I am a part of a feed component for a specific group
514 of functionality.
515
516 @ivar name: name of the effect
517 @type name: string
518 @ivar component: component owning the effect
519 @type component: L{FeedComponent}
520 """
521 logCategory = "effect"
522
524 """
525 @param name: the name of the effect
526 """
527 self.name = name
528 self.setComponent(None)
529
531 """
532 Set the given component as the effect's owner.
533
534 @param component: the component to set as an owner of this effect
535 @type component: L{FeedComponent}
536 """
537 self.component = component
538 self.setUIState(component and component.uiState or None)
539
541 """
542 Set the given UI state on the effect. This method is ideal for
543 adding keys to the UI state.
544
545 @param state: the UI state for the component to use.
546 @type state: L{flumotion.common.componentui.WorkerComponentUIState}
547 """
548 self.uiState = state
549
551 """
552 Get the component owning this effect.
553
554 @rtype: L{FeedComponent}
555 """
556 return self.component
557
558
559 -class PostProcEffect (Effect):
560 """
561 I am an effect that is plugged in the pipeline to do a post processing
562 job and can be chained to other effect of the same class.
563
564 @ivar name: name of the effect
565 @type name: string
566 @ivar component: component owning the effect
567 @type component: L{FeedComponent}
568 @ivar sourcePad: pad of the source after which I'm plugged
569 @type sourcePad: L{GstPad}
570 @ivar effectBin: gstreamer bin doing the post processing effect
571 @type source: L{GstBin}
572 @ivar pipeline: pipeline holding the gstreamer elements
573 @type pipeline: L{GstPipeline}
574
575 """
576 logCategory = "effect"
577
578 - def __init__(self, name, sourcePad, effectBin, pipeline):
579 """
580 @param name: the name of the effect
581 @param sourcePad: pad of the source after which I'm plugged
582 @param effectBin: gstreamer bin doing the post processing effect
583 @param pipeline: pipeline holding the gstreamer elements
584 """
585 Effect.__init__(self, name)
586 self.sourcePad = sourcePad
587 self.effectBin = effectBin
588 self.pipeline = pipeline
589 self.plugged = False
590
592 """
593 Plug the effect in the pipeline unlinking the source element with it's
594 downstream peer
595 """
596 if self.plugged:
597 return
598
599
600 peerSinkPad = self.sourcePad
601 peerSrcPad = peerSinkPad.get_peer()
602 peerSinkPad.unlink(peerSrcPad)
603
604
605 self.effectBin.set_state(gst.STATE_PLAYING)
606 self.pipeline.add(self.effectBin)
607
608
609 peerSinkPad.link(self.effectBin.get_pad('sink'))
610 self.effectBin.get_pad('src').link(peerSrcPad)
611 self.plugged = True
612
613
687
688 signalid = queue.connect("underrun", _underrun_cb)
689
690
692
693 disconnectedPads = False
694 dropStreamHeaders = False
695
697 """Should be overrided by subclasses to provide the pipeline the
698 component uses.
699 """
700 return ""
701
703 self.EATER_TMPL += ' ! queue name=input-%(name)s'
704 self._reset_count = 0
705
706 self.uiState.addKey('reset-count', 0)
707 self.not_dropping = False
708
712
713
714
716 return [self.get_element(f.elementName + '-pay')
717 for f in self.feeders.values()]
718
722
724 raise NotImplementedError('Subclasses should implement '
725 'get_base_pipeline_string')
726
728 e = self.eaters[eaterAlias]
729 inputq = self.get_element('input-' + e.elementName)
730 return inputq.get_pad('src')
731
732
733
735 """
736 Add the event probes that will check for a caps change.
737
738 Those will trigger the pipeline's blocking and posterior reload
739 """
740
741
742 def output_reset_event(pad, event):
743 if event.type != gst.EVENT_FLUSH_START:
744 return True
745
746 self.debug('RESET: out reset event received on output pad %r', pad)
747
748
749
750
751
752
753 self._reset_count -= 1
754 if self._reset_count > 0:
755 return False
756
757 reactor.callFromThread(self._on_pipeline_drained)
758
759 return False
760
761 def got_new_caps(pad, args):
762 caps = pad.get_negotiated_caps()
763 if not caps:
764 self.debug("RESET: Caps unset! Looks like we're stopping")
765 return
766 self.debug("Got new caps at %s: %s",
767 pad.get_name(), caps.to_string())
768
769 if self.disconnectedPads:
770 return
771
772
773
774
775
776
777 self.debug('RESET: caps changed on input pad %r', pad)
778 self._reset_count = len(self.feeders)
779
780
781
782 self._block_eaters()
783
784 def got_new_buffer(pad, buff, element):
785 if self.disconnectedPads:
786 self.info("INCAPS: Got buffer but we're still disconnected.")
787 return True
788
789 if not buff.flag_is_set(gst.BUFFER_FLAG_IN_CAPS):
790 return True
791
792 self.info("INCAPS: Got buffer with caps of len %d", buff.size)
793 if buff.caps:
794 newcaps = buff.caps[0].copy()
795 resets = self.uiState.get('reset-count')
796 newcaps['count'] = resets
797 buff.set_caps(gst.Caps(newcaps))
798 return True
799
800 self.log('RESET: installing event probes for detecting changes')
801
802 for elem in self.get_input_elements():
803 self.debug('RESET: Add caps monitor for %s', elem.get_name())
804 sink = elem.get_pad('sink')
805 sink.get_peer().add_buffer_probe(got_new_buffer, elem)
806 sink.connect("notify::caps", got_new_caps)
807
808 for elem in self.get_output_elements():
809 self.debug('RESET: adding event probe for %s', elem.get_name())
810 elem.get_pad('sink').add_event_probe(output_reset_event)
811
813 """
814 Function that blocks all the identities of the eaters
815 """
816 for elem in self.get_input_elements():
817 pad = elem.get_pad('src')
818 self.debug("RESET: Blocking pad %s", pad)
819 pad.set_blocked_async(True, self._on_eater_blocked)
820
826
828 for pad in element.pads():
829 ppad = pad.get_peer()
830 if not ppad:
831 continue
832 if (pad.get_direction() in directions and
833 pad.get_direction() == gst.PAD_SINK):
834 self.debug('RESET: unlink %s with %s', pad, ppad)
835 ppad.unlink(pad)
836 elif (pad.get_direction() in directions and
837 pad.get_direction() == gst.PAD_SRC):
838 self.debug('RESET: unlink %s with %s', pad, ppad)
839 pad.unlink(ppad)
840
842 if done is None:
843 done = []
844 if not element:
845 return
846 if element in done:
847 return
848 if element in end:
849 return
850
851 for src in element.src_pads():
852 self.log('going to start by pad %r', src)
853 if not src.get_peer():
854 continue
855 peer = src.get_peer().get_parent()
856 self._remove_pipeline(pipeline, peer, end, done)
857 done.append(peer)
858 element.unlink(peer)
859
860 self.log("RESET: removing old element %s from pipeline", element)
861 element.set_state(gst.STATE_NULL)
862 pipeline.remove(element)
863
865
866
867
868
869 self.log('RESET: Going to rebuild the pipeline')
870
871 base_pipe = self._get_base_pipeline_string()
872
873
874
875 fake_pipeline = 'fakesrc name=start ! %s' % base_pipe
876 pipeline = gst.parse_launch(fake_pipeline)
877
878 def move_element(element, orig, dest):
879 if not element:
880 return
881 if element in done:
882 return
883
884 to_link = []
885 done.append(element)
886 self.log("RESET: going to remove %s", element)
887 for src in element.src_pads():
888 self.log("RESET: got src pad element %s", src)
889 if not src.get_peer():
890 continue
891 peer = src.get_peer().get_parent()
892 to_link.append(peer)
893
894 move_element(to_link[-1], orig, dest)
895
896 self._unlink_pads(element, [gst.PAD_SRC, gst.PAD_SINK])
897 orig.remove(element)
898 dest.add(element)
899
900 self.log("RESET: new element %s added to the pipeline", element)
901 for peer in to_link:
902 self.log("RESET: linking peers %s -> %s", element, peer)
903 element.link(peer)
904
905 done = []
906 start = pipeline.get_by_name('start').get_pad('src').get_peer()
907 move_element(start.get_parent(), pipeline, self.pipeline)
908
909
910
911
912
913
914 if len(self.get_input_elements()) == 1:
915 elem = self.get_input_elements()[0]
916 self.log("RESET: linking eater %r to %r", elem, done[0])
917 elem.link(done[0])
918
919
920 if len(self.get_output_elements()) == 1:
921 elem = self.get_output_elements()[0]
922 self.log("RESET: linking %r to feeder %r", done[-1], elem)
923 done[-1].link(elem)
924
925 self.configure_pipeline(self.pipeline, self.config['properties'])
926 self.pipeline.set_state(gst.STATE_PLAYING)
927 self._unblock_eaters()
928
929 resets = self.uiState.get('reset-count')
930 self.uiState.set('reset-count', resets+1)
931
932
933
935 self.log("RESET: Pad %s %s", pad,
936 (blocked and "blocked") or "unblocked")
937
939 self._on_pad_blocked(pad, blocked)
940 if blocked:
941 peer = pad.get_peer()
942 peer.send_event(gst.event_new_flush_start())
943
944
945
955
956
958 """
959 Component that is reconfigured when new changes arrive through the
960 flumotion-reset event (sent by the fms producer).
961 """
962 pass
963
964
966 """
967 This class provides for multi-input ParseLaunchComponents, such as muxers,
968 that handle flumotion-reset events for reconfiguration.
969 """
970
971 LINK_MUXER = False
972 dropAudioKuEvents = True
973
975 return muxer.get_compatible_pad(srcpad, caps)
976
978 pad = depay.get_pad("src")
979 caps = pad.get_negotiated_caps()
980 if not caps:
981 return False
982 srcpad_to_link = self.get_eater_srcpad(eaterAlias)
983 muxer = self.pipeline.get_by_name("muxer")
984 self.debug("Trying to get compatible pad for pad %r with caps %s",
985 srcpad_to_link, caps)
986 linkpad = self.get_link_pad(muxer, srcpad_to_link, caps)
987 if not linkpad:
988 m = messages.Error(T_(N_(
989 "The incoming data is not compatible with this muxer.")),
990 debug="Caps %s not compatible with this muxer." % (
991 caps.to_string()))
992 self.addMessage(m)
993
994
995 reactor.callLater(0, self.pipeline.set_state, gst.STATE_NULL)
996 return True
997 self.debug("Got link pad %r", linkpad)
998 srcpad_to_link.link(linkpad)
999 depay.get_pad("src").remove_buffer_probe(self._probes[eaterAlias])
1000 if srcpad_to_link.is_blocked():
1001 self.is_blocked_cb(srcpad_to_link, True)
1002 else:
1003 srcpad_to_link.set_blocked_async(True, self.is_blocked_cb)
1004 return True
1005
1007 caps = pad.get_negotiated_caps()
1008 if caps is None:
1009 return True
1010
1011 if 'audio' not in caps[0].to_string():
1012 depay.get_pad("src").remove_buffer_probe(self._eprobes[eaterAlias])
1013 if event.get_structure().get_name() == 'GstForceKeyUnit':
1014 return False
1015 return True
1016
1040
1042 if is_blocked:
1043 self.fired_eaters = self.fired_eaters + 1
1044 if self.fired_eaters == len(self.eaters):
1045 self.debug("All pads are now blocked")
1046 self.disconnectedPads = False
1047 for e in self.eaters:
1048 srcpad = self.get_eater_srcpad(e)
1049 srcpad.set_blocked_async(False, self.is_blocked_cb)
1050