1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import gst
19 import gobject
20
21 import os
22 import time
23
24 from twisted.internet import reactor, defer
25
26 from flumotion.common import common, errors, pygobject, messages, log
27 from flumotion.common import gstreamer
28 from flumotion.common.i18n import N_, gettexter
29 from flumotion.common.planet import moods
30 from flumotion.component import component as basecomponent
31 from flumotion.component import padmonitor
32 from flumotion.component.feeder import Feeder
33 from flumotion.component.eater import Eater
34
35 __version__ = "$Rev$"
36 T_ = gettexter()
37
38
40 """
41 I am a base class for all Flumotion feed components.
42
43 @type eaters: dict of str -> L{Eater}
44 @type feeders: dict of str -> L{Feeder}
45 """
46
47
48 FEEDER_STATS_UPDATE_FREQUENCY = 12.5
49 dropStreamHeaders = True
50 swallowNewSegment = True
51
52 logCategory = 'feedcomponent'
53
54
55
57
58 self.feeders = {}
59 self.eaters = {}
60 self.uiState.addListKey('feeders')
61 self.uiState.addListKey('eaters')
62 self.uiState.addKey('gst-debug')
63
64 self.pipeline = None
65 self.pipeline_signals = []
66 self.bus_signal_id = None
67 self.effects = {}
68 self._feeder_probe_cl = None
69
70 self._pad_monitors = padmonitor.PadMonitorSet(
71 lambda: self.setMood(moods.happy),
72 lambda: self.setMood(moods.hungry))
73
74 self._clock_slaved = False
75 self.clock_provider = None
76 self._master_clock_info = None
77
78
79 self._change_monitor = gstreamer.StateChangeMonitor()
80
81
82 self._get_stats_supported = (gstreamer.get_plugin_version('tcp')
83 >= (0, 10, 11, 0))
84
86 """
87 Sets up component.
88
89 Invokes the L{create_pipeline} and L{set_pipeline} vmethods,
90 which subclasses can provide.
91 """
92 config = self.config
93 eater_config = config.get('eater', {})
94 feeder_config = config.get('feed', [])
95 source_config = config.get('source', [])
96
97 self.debug("FeedComponent.do_setup(): eater_config %r", eater_config)
98 self.debug("FeedComponent.do_setup(): feeder_config %r", feeder_config)
99 self.debug("FeedComponent.do_setup(): source_config %r", source_config)
100
101
102
103 if eater_config == {} and source_config != []:
104 eater_config = {'default': [(x, 'default') for x in source_config]}
105
106 for eaterName in eater_config:
107 for feedId, eaterAlias in eater_config[eaterName]:
108 self.eaters[eaterAlias] = Eater(eaterAlias, eaterName)
109 self.uiState.append('eaters', self.eaters[eaterAlias].uiState)
110
111 for feederName in feeder_config:
112 self.feeders[feederName] = Feeder(feederName)
113 self.uiState.append('feeders',
114 self.feeders[feederName].uiState)
115
116 clockMaster = config.get('clock-master', None)
117 if clockMaster:
118 self._clock_slaved = clockMaster != config['avatarId']
119 else:
120 self._clock_slaved = False
121
122 pipeline = self.create_pipeline()
123 self.connect_feeders(pipeline)
124 self.set_pipeline(pipeline)
125
126 self.uiState.set('gst-debug', os.environ.get('GST_DEBUG', '*:0'))
127 self.debug("FeedComponent.do_setup(): setup finished")
128
129 self.try_start_pipeline()
130
131
132 d = self._change_monitor.add(gst.STATE_CHANGE_PAUSED_TO_PLAYING)
133 d.addCallback(lambda x: self.do_pipeline_playing())
134
136
137
138 self.debug("Setup completed")
139
140
141
143 """
144 Subclasses have to implement this method.
145
146 @rtype: L{gst.Pipeline}
147 """
148 raise NotImplementedError(
149 "subclass must implement create_pipeline")
150
160
162 elementName = self.feeders[feederName].payName
163 element = self.pipeline.get_by_name(elementName)
164 if not element:
165 raise errors.ComponentError("No such feeder %s" % feederName)
166
167 pad = element.get_pad('src')
168 self._pad_monitors.attach(pad, "%s:%s" % (self.name, elementName))
169
181
182
183
187
189
190
191
192 def client_fd_removed(sink, fd, feeder):
193
194
195
196
197 self.debug("cleaning up fd %d", fd)
198 feeder.clientDisconnected(fd)
199
200 for feeder in self.feeders.values():
201 element = pipeline.get_by_name(feeder.elementName)
202 if element:
203 element.connect('client-fd-removed', client_fd_removed,
204 feeder)
205 self.debug("Connected to client-fd-removed on %r", feeder)
206 else:
207 self.warning("No feeder %s in pipeline", feeder.elementName)
208
211
213 """
214 Invoked when the pipeline has changed the state to playing.
215 The default implementation sets the component's mood to HAPPY.
216 """
217 self.setMood(moods.happy)
218
220 """Make a flumotion error message to show to the user.
221
222 This method may be overridden by components that have special
223 knowledge about potential errors. If the component does not know
224 about the error, it can chain up to this implementation, which
225 will make a generic message.
226
227 @param gerror: The GError from the error message posted on the
228 GStreamer message bus.
229 @type gerror: L{gst.GError}
230 @param debug: A string with debugging information.
231 @type debug: str
232
233 @returns: A L{flumotion.common.messages.Message} to show to the
234 user.
235 """
236
237 mid = "%s-%s-%d" % (self.name, gerror.domain, gerror.code)
238 m = messages.Error(T_(N_(
239 "Internal GStreamer error.")),
240 debug="%s\n%s: %d\n%s" % (
241 gerror.message, gerror.domain, gerror.code, debug),
242 mid=mid, priority=40)
243 return m
244
255
256 def error():
257 gerror, debug = message.parse_error()
258 self.warning('element %s error %s %s',
259 src.get_path_string(), gerror, debug)
260 self.setMood(moods.sad)
261
262
263 try:
264 m = self.make_message_for_gstreamer_error(gerror, debug)
265 except Exception, e:
266 msg = log.getExceptionMessage(e)
267 m = messages.Error(T_(N_(
268 "Programming error in component.")),
269 debug="Bug in %r.make_message_for_gstreamer_error: %s" % (
270 self.__class__, msg))
271
272 self.state.append('messages', m)
273 self._change_monitor.have_error(self.pipeline.get_state(),
274 message)
275
276 def eos():
277 name = src.get_name()
278 if name in self._pad_monitors:
279 self.info('End of stream in element %s', name)
280 self._pad_monitors[name].setInactive()
281 else:
282 self.info("We got an eos from %s", name)
283
284 def default():
285 self.log('message received: %r', message)
286
287 handlers = {gst.MESSAGE_STATE_CHANGED: state_changed,
288 gst.MESSAGE_ERROR: error,
289 gst.MESSAGE_EOS: eos}
290 t = message.type
291 src = message.src
292 handlers.get(t, default)()
293 return True
294
296 """Watch a set of elements for discontinuity messages.
297
298 @param eaterWatchElements: the set of elements to watch for
299 discontinuities.
300 @type eaterWatchElements: Dict of elementName => Eater.
301 """
302
303 def on_element_message(bus, message):
304 src = message.src
305 name = src.get_name()
306 if name in eaterWatchElements:
307 eater = eaterWatchElements[name]
308 s = message.structure
309
310 def timestampDiscont():
311 prevTs = s["prev-timestamp"]
312 prevDuration = s["prev-duration"]
313 curTs = s["cur-timestamp"]
314
315 if prevTs == gst.CLOCK_TIME_NONE:
316 self.debug("no previous timestamp")
317 return
318 if prevDuration == gst.CLOCK_TIME_NONE:
319 self.debug("no previous duration")
320 return
321 if curTs == gst.CLOCK_TIME_NONE:
322 self.debug("no current timestamp")
323 return
324
325 discont = curTs - (prevTs + prevDuration)
326 dSeconds = discont / float(gst.SECOND)
327 self.debug("we have a discont on eater %s of %.9f s "
328 "between %s and %s ", eater.eaterAlias,
329 dSeconds,
330 gst.TIME_ARGS(prevTs + prevDuration),
331 gst.TIME_ARGS(curTs))
332
333 eater.timestampDiscont(dSeconds,
334 float(curTs) / float(gst.SECOND))
335
336 def offsetDiscont():
337 prevOffsetEnd = s["prev-offset-end"]
338 curOffset = s["cur-offset"]
339 discont = curOffset - prevOffsetEnd
340 self.debug("we have a discont on eater %s of %d "
341 "units between %d and %d ",
342 eater.eaterAlias, discont, prevOffsetEnd,
343 curOffset)
344 eater.offsetDiscont(discont, curOffset)
345
346 handlers = {'imperfect-timestamp': timestampDiscont,
347 'imperfect-offset': offsetDiscont}
348 if s.get_name() in handlers:
349 handlers[s.get_name()]()
350
351
352 bus = self.pipeline.get_bus()
353
354 bus.connect("message::element", on_element_message)
355
357
358 def fdsrc_event(pad, event):
359
360
361 if event.type == gst.EVENT_EOS:
362 self.info('End of stream for eater %s, disconnect will be '
363 'triggered', eater.eaterAlias)
364
365
366
367 return False
368 return True
369
370 def depay_event(pad, event):
371
372
373
374 if event.type == gst.EVENT_NEWSEGMENT:
375
376
377
378
379
380
381
382 if getattr(eater, '_gotFirstNewSegment', False):
383 self.info("Subsequent new segment event received on "
384 "depay on eater %s", eater.eaterAlias)
385
386 eater.streamheader = []
387 if self.swallowNewSegment:
388 return False
389 else:
390 eater._gotFirstNewSegment = True
391 return True
392
393 self.debug('adding event probe for eater %s', eater.eaterAlias)
394 fdsrc = self.get_element(eater.elementName)
395 fdsrc.get_pad("src").add_event_probe(fdsrc_event)
396 depay = self.get_element(eater.depayName)
397 depay.get_pad("src").add_event_probe(depay_event)
398
400 self.debug('setup_pipeline()')
401 assert self.bus_signal_id == None
402
403 self.pipeline.set_name('pipeline-' + self.getName())
404 bus = self.pipeline.get_bus()
405 bus.add_signal_watch()
406 self.bus_signal_id = bus.connect('message',
407 self.bus_message_received_cb)
408 sig_id = self.pipeline.connect('deep-notify',
409 gstreamer.verbose_deep_notify_cb, self)
410 self.pipeline_signals.append(sig_id)
411
412
413
414 self.pipeline.set_state(gst.STATE_READY)
415
416
417 if self._get_stats_supported:
418 self._feeder_probe_cl = reactor.callLater(
419 self.FEEDER_STATS_UPDATE_FREQUENCY,
420 self._feeder_probe_calllater)
421 else:
422 self.warning("Feeder statistics unavailable, your "
423 "gst-plugins-base is too old")
424 m = messages.Warning(T_(N_(
425 "Your gst-plugins-base is too old, so "
426 "feeder statistics will be unavailable.")),
427 mid='multifdsink')
428 m.add(T_(N_(
429 "Please upgrade '%s' to version %s."), 'gst-plugins-base',
430 '0.10.11'))
431 self.addMessage(m)
432
433 for eater in self.eaters.values():
434 self.install_eater_event_probes(eater)
435 pad = self.get_element(eater.elementName).get_pad('src')
436 name = "%s:%s" % (self.name, eater.elementName)
437 self._pad_monitors.attach(pad, name,
438 padmonitor.EaterPadMonitor,
439 self.reconnectEater,
440 eater.eaterAlias)
441 eater.setPadMonitor(self._pad_monitors[name])
442
444 if not self.pipeline:
445 return
446
447 if self.clock_provider:
448 self.clock_provider.set_property('active', False)
449 self.clock_provider = None
450 retval = self.pipeline.set_state(gst.STATE_NULL)
451 if retval != gst.STATE_CHANGE_SUCCESS:
452 self.warning('Setting pipeline to NULL failed')
453
455 self.debug("cleaning up")
456
457 assert self.pipeline != None
458
459 self.stop_pipeline()
460
461 map(self.pipeline.disconnect, self.pipeline_signals)
462 self.pipeline_signals = []
463 if self.bus_signal_id:
464 self.pipeline.get_bus().disconnect(self.bus_signal_id)
465 self.pipeline.get_bus().remove_signal_watch()
466 self.bus_signal_id = None
467 self.pipeline = None
468
469 if self._feeder_probe_cl:
470 self._feeder_probe_cl.cancel()
471 self._feeder_probe_cl = None
472
473
474 for eater in self.eaters.values():
475 self._pad_monitors.remove("%s:%s" % (self.name, eater.elementName))
476 eater.setPadMonitor(None)
477
484
486 self.debug("Master clock set to %s:%d with base_time %s", ip, port,
487 gst.TIME_ARGS(base_time))
488
489 assert self._clock_slaved
490 if self._master_clock_info == (ip, port, base_time):
491 self.debug("Same master clock info, returning directly")
492 return defer.succeed(None)
493 elif self._master_clock_info:
494 self.stop_pipeline()
495
496 self._master_clock_info = ip, port, base_time
497
498 clock = gst.NetClientClock(None, ip, port, base_time)
499
500
501 self.pipeline.set_new_stream_time(gst.CLOCK_TIME_NONE)
502 self.pipeline.set_base_time(base_time)
503 self.pipeline.use_clock(clock)
504
505 self.try_start_pipeline()
506
508 """
509 Return the connection details for the network clock provided by
510 this component, if any.
511 """
512 if self.clock_provider:
513 ip, port, base_time = self._master_clock_info
514 return ip, port, base_time
515 else:
516 return None
517
519 """
520 Tell the component to provide a master clock on the given port.
521
522 @returns: a deferred firing a (ip, port, base_time) triple.
523 """
524
525 def pipelinePaused(r):
526 clock = self.pipeline.get_clock()
527
528 self.pipeline.use_clock(clock)
529
530 self.clock_provider = gst.NetTimeProvider(clock, None, port)
531 realport = self.clock_provider.get_property('port')
532
533 base_time = self.pipeline.get_base_time()
534
535 self.debug('provided master clock from %r, base time %s',
536 clock, gst.TIME_ARGS(base_time))
537
538 if self.medium:
539
540
541
542 ip = self.medium.getIP()
543 else:
544 ip = "127.0.0.1"
545
546 self._master_clock_info = (ip, realport, base_time)
547 return self.get_master_clock()
548
549 assert self.pipeline
550 assert not self._clock_slaved
551 (ret, state, pending) = self.pipeline.get_state(0)
552 if state != gst.STATE_PAUSED and state != gst.STATE_PLAYING:
553 self.debug("pipeline still spinning up: %r", state)
554 d = self._change_monitor.add(gst.STATE_CHANGE_READY_TO_PAUSED)
555 d.addCallback(pipelinePaused)
556 return d
557 elif self.clock_provider:
558 self.debug("returning existing master clock info")
559 return defer.succeed(self.get_master_clock())
560 else:
561 return defer.maybeDeferred(pipelinePaused, None)
562
564 """
565 Dumps a graphviz dot file of the pipeline's current state to disk.
566 This will only actually do anything if the environment variable
567 GST_DEBUG_DUMP_DOT_DIR is set.
568
569 @param filename: filename to store
570 @param with_timestamp: if True, then timestamp will be prepended to
571 filename
572 """
573 if hasattr(gst, "DEBUG_BIN_TO_DOT_FILE"):
574 method = gst.DEBUG_BIN_TO_DOT_FILE
575 if with_timestamp:
576 method = gst.DEBUG_BIN_TO_DOT_FILE_WITH_TS
577 method(self.pipeline, gst.DEBUG_GRAPH_SHOW_ALL, filename)
578
579
580
582 """
583 Tell the component to start.
584 Whatever is using the component is responsible for making sure all
585 eaters have received their file descriptor to eat from.
586 """
587 (ret, state, pending) = self.pipeline.get_state(0)
588 if state == gst.STATE_PLAYING:
589 self.log('already PLAYING')
590 if not force:
591 return
592 self.debug('pipeline PLAYING, but starting anyway as requested')
593
594 if self._clock_slaved and not self._master_clock_info:
595 self.debug("Missing master clock info, deferring set to PLAYING")
596 return
597
598 for eater in self.eaters.values():
599 if not eater.fd:
600 self.debug('eater %s not yet connected, deferring set to '
601 'PLAYING', eater.eaterAlias)
602 return
603
604 self.debug("Setting pipeline %r to GST_STATE_PLAYING", self.pipeline)
605 self.pipeline.set_state(gst.STATE_PLAYING)
606
629
631 """
632 After this function returns, the stream lock for this eater must have
633 been released. If your component needs to do something here, override
634 this method.
635 """
636 pass
637
639 """Get an element out of the pipeline.
640
641 If it is possible that the component has not yet been set up,
642 the caller needs to check if self.pipeline is actually set.
643 """
644 assert self.pipeline
645 self.log('Looking up element %r in pipeline %r',
646 element_name, self.pipeline)
647 element = self.pipeline.get_by_name(element_name)
648 if not element:
649 self.warning("No element named %r in pipeline", element_name)
650 return element
651
653 'Gets a property of an element in the GStreamer pipeline.'
654 self.debug("%s: getting property %s of element %s" % (
655 self.getName(), property, element_name))
656 element = self.get_element(element_name)
657 if not element:
658 msg = "Element '%s' does not exist" % element_name
659 self.warning(msg)
660 raise errors.PropertyError(msg)
661
662 self.debug('getting property %s on element %s' % (
663 property, element_name))
664 try:
665 value = element.get_property(property)
666 except (ValueError, TypeError):
667 msg = "Property '%s' on element '%s' does not exist" % (
668 property, element_name)
669 self.warning(msg)
670 raise errors.PropertyError(msg)
671
672
673 if isinstance(value, gobject.GEnum):
674 value = int(value)
675
676 return value
677
678 - def modify_element_property(self, element_name, property_name, value,
679 mutable_state=gst.STATE_READY,
680 needs_reset=False):
681 '''
682 Sets a property on the fly on a gstreamer element
683
684 @param element_name: Name of the gstreamer element
685 @type element_name: str
686 @param property_name: Name of the property to change
687 @type property_name: str
688 @param value: Value to set
689 @param mutable_state: Minimum state required to set the property
690 @type mutable_state: L{gst.Enum}
691 @param needs_reset: Whether setting this property requires sending a
692 'flumotion-reset' event
693 @type needs_reset: bool
694 '''
695
696 def drop_stream_headers(pad, buf):
697 if buf.flag_is_set(gst.BUFFER_FLAG_IN_CAPS):
698 return False
699 pad.remove_buffer_probe(probes[pad])
700 return True
701
702 probes = {}
703 element = self.get_element(element_name)
704 if not element:
705 self.warning("The property %s cannot be set because the "
706 "element %s could not be found",
707 property_name, element_name)
708 return
709
710 state = self.pipeline.get_state(0)[1]
711
712
713 sink_pads = [p.get_peer() for p in element.pads()
714 if p.get_direction() == gst.PAD_SINK]
715 src_pads = [p for p in element.pads()
716 if p.get_direction() == gst.PAD_SRC]
717
718
719 for pad in sink_pads:
720 pad.set_blocked(True)
721
722
723
724
725
726 if state > mutable_state:
727 element.set_state(mutable_state)
728 element.get_state(0)
729
730 element.set_property(property_name, value)
731
732
733
734
735 if needs_reset:
736 for pad in src_pads:
737 pad.push_event(gstreamer.flumotion_reset_event())
738
739
740
741
742
743 else:
744 for pad in src_pads:
745 probes[pad] = pad.add_buffer_probe(drop_stream_headers)
746
747 if state > mutable_state:
748 element.set_state(state)
749
750
751 for pad in sink_pads:
752 pad.set_blocked(False)
753
755 'Sets a property on an element in the GStreamer pipeline.'
756 self.debug("%s: setting property %s of element %s to %s" % (
757 self.getName(), property, element_name, value))
758 element = self.get_element(element_name)
759 if not element:
760 msg = "Element '%s' does not exist" % element_name
761 self.warning(msg)
762 raise errors.PropertyError(msg)
763
764 self.debug('setting property %s on element %r to %s' %
765 (property, element_name, value))
766 pygobject.gobject_set_property(element, property, value)
767
768
769
771 if not self.medium:
772 self.debug("Can't reconnect eater %s, running "
773 "without a medium", eaterAlias)
774 return
775
776 self.eaters[eaterAlias].disconnected()
777 self.medium.connectEater(eaterAlias)
778
779 - def feedToFD(self, feedName, fd, cleanup, eaterId=None):
780 """
781 @param feedName: name of the feed to feed to the given fd.
782 @type feedName: str
783 @param fd: the file descriptor to feed to
784 @type fd: int
785 @param cleanup: the function to call when the FD is no longer feeding
786 @type cleanup: callable
787 """
788 self.debug('FeedToFD(%s, %d)', feedName, fd)
789
790
791
792 if (not self.pipeline or
793 self.pipeline.get_state(0)[1] == gst.STATE_NULL):
794 self.warning('told to feed %s to fd %d, but pipeline not '
795 'running yet', feedName, fd)
796 cleanup(fd)
797
798
799 return
800
801 if feedName not in self.feeders:
802 msg = "Cannot find feeder named '%s'" % feedName
803 mid = "feedToFD-%s" % feedName
804 m = messages.Warning(T_(N_("Internal Flumotion error.")),
805 debug=msg, mid=mid, priority=40)
806 self.state.append('messages', m)
807 self.warning(msg)
808 cleanup(fd)
809 return False
810
811 feeder = self.feeders[feedName]
812 element = self.get_element(feeder.elementName)
813 assert element
814 clientId = eaterId or ('client-%d' % fd)
815 element.emit('add', fd)
816 feeder.clientConnected(clientId, fd, cleanup)
817
818 - def eatFromFD(self, eaterAlias, feedId, fd):
819 """
820 Tell the component to eat the given feedId from the given fd.
821 The component takes over the ownership of the fd, closing it when
822 no longer eating.
823
824 @param eaterAlias: the alias of the eater
825 @type eaterAlias: str
826 @param feedId: feed id (componentName:feedName) to eat from through
827 the given fd
828 @type feedId: str
829 @param fd: the file descriptor to eat from
830 @type fd: int
831 """
832 self.debug('EatFromFD(%s, %s, %d)', eaterAlias, feedId, fd)
833
834 if not self.pipeline:
835 self.warning('told to eat %s from fd %d, but pipeline not '
836 'running yet', feedId, fd)
837
838
839 os.close(fd)
840 return
841
842 if eaterAlias not in self.eaters:
843 self.warning('Unknown eater alias: %s', eaterAlias)
844 os.close(fd)
845 return
846
847 eater = self.eaters[eaterAlias]
848 element = self.get_element(eater.elementName)
849 if not element:
850 self.warning('Eater element %s not found', eater.elementName)
851 os.close(fd)
852 return
853
854
855 (result, current, pending) = element.get_state(0L)
856 pipeline_playing = current not in [gst.STATE_NULL, gst.STATE_READY]
857 if pipeline_playing:
858 self.debug('eater %s in state %r, kidnapping it',
859 eaterAlias, current)
860
861
862
863
864
865
866
867 srcpad = element.get_pad('src')
868
869 def _block_cb(pad, blocked):
870 pass
871 srcpad.set_blocked_async(True, _block_cb)
872
873
874 depay = self.get_element(eater.depayName)
875
876 def remove_in_caps_buffers(pad, buffer, eater):
877 if buffer.flag_is_set(gst.BUFFER_FLAG_IN_CAPS):
878 self.info("We got streamheader buffer which we are "
879 "dropping because we do not want this just "
880 "after a reconnect because it breaks "
881 "everything ")
882 return False
883
884
885 self.log("We got buffer with no in caps flag set on "
886 "eater %r", eater)
887
888 if eater.streamheaderBufferProbeHandler:
889 self.log("Removing buffer probe on depay src pad on "
890 "eater %r", eater)
891 pad.remove_buffer_probe(
892 eater.streamheaderBufferProbeHandler)
893 eater.streamheaderBufferProbeHandler = None
894 else:
895 self.warning("buffer probe handler is None, bad news on "
896 "eater %r", eater)
897
898 return True
899
900 if not eater.streamheaderBufferProbeHandler:
901 if self.dropStreamHeaders:
902 self.log("Adding buffer probe on depay src pad on "
903 "eater %r", eater)
904 eater.streamheaderBufferProbeHandler = \
905 depay.get_pad("src").add_buffer_probe(
906 remove_in_caps_buffers, eater)
907
908 self.unblock_eater(eaterAlias)
909
910
911 sinkpad = srcpad.get_peer()
912 srcpad.unlink(sinkpad)
913 parent = element.get_parent()
914 parent.remove(element)
915 self.log("setting to ready")
916 element.set_state(gst.STATE_READY)
917 self.log("setting to ready complete!!!")
918 old = element.get_property('fd')
919 self.log("Closing old fd %d", old)
920 os.close(old)
921 element.set_property('fd', fd)
922 parent.add(element)
923 srcpad.link(sinkpad)
924 element.set_state(gst.STATE_PLAYING)
925
926 srcpad.set_blocked_async(False, _block_cb)
927 else:
928 element.set_property('fd', fd)
929
930
931
932 eater.connected(fd, feedId)
933
934 if not pipeline_playing:
935 self.try_start_pipeline()
936