Package flumotion :: Package component :: Package common :: Package streamer :: Module fragmentedstreamer
[hide private]

Source Code for Module flumotion.component.common.streamer.fragmentedstreamer

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  import time 
 19   
 20  from twisted.internet import reactor 
 21  from twisted.web import server 
 22   
 23  from flumotion.component.common.streamer.streamer import \ 
 24          Streamer, Stats as Statistics 
 25   
 26   
27 -class Stats(Statistics):
28
29 - def __init__(self, resource):
30 Statistics.__init__(self) 31 self.resource = resource
32
33 - def getBytesSent(self):
34 return self.resource.getBytesSent()
35
36 - def getBytesReceived(self):
37 return self.resource.getBytesReceived()
38 39
40 -class LoggableRequest(server.Request):
41
42 - def __init__(self, channel, queued):
43 server.Request.__init__(self, channel, queued) 44 now = time.time() 45 self._startTime = now 46 self._completionTime = now 47 self._bytesWritten = 0L
48
49 - def write(self, data):
50 server.Request.write(self, data) 51 size = len(data) 52 self._bytesWritten += size
53
54 - def requestCompleted(self, fd):
55 server.Request.requestCompleted(self, fd) 56 if self._completionTime is None: 57 self._completionTime = time.time()
58
59 - def getDuration(self):
60 return (self._completionTime or time.time()) - self._startTime
61
62 - def getBytesSent(self):
63 return self._bytesWritten
64 65
66 -class Site(server.Site):
67 requestFactory = LoggableRequest 68
69 - def __init__(self, resource):
70 server.Site.__init__(self, resource)
71 72
73 -class FragmentedStreamer(Streamer, Stats):
74 DEFAULT_MIN_WINDOW = 2 75 DEFAULT_MAX_WINDOW = 5 76 DEFAULT_SECRET_KEY = 'aR%$w34Y=&08gFm%&!s8080' 77 DEFAULT_SESSION_TIMEOUT = 30 78 79 logCategory = 'fragmented-streamer' 80 siteClass = Site 81 multi_files = True 82
83 - def init(self):
84 self.debug("HTTP live fragmented streamer initialising") 85 self._fragmentsCount = 0 86 self._ready = False
87
88 - def isReady(self):
89 return self._ready
90
91 - def do_pipeline_playing(self):
92 # The component must stay 'waiking' until it receives at least 93 # the number of segments defined by the min-window property 94 pass
95
96 - def configure_pipeline(self, pipeline, props):
97 self.secret_key = props.get('secret-key', self.DEFAULT_SECRET_KEY) 98 self.session_timeout = props.get('session-timeout', 99 self.DEFAULT_SESSION_TIMEOUT) 100 self._minWindow = props.get('min-window', self.DEFAULT_MIN_WINDOW) 101 self._maxWindow = props.get('max-window', self.DEFAULT_MAX_WINDOW) 102 103 self.sink = pipeline.get_by_name('sink') 104 self._configure_sink() 105 self._connect_sink_signals() 106 107 Streamer.configure_pipeline(self, pipeline, props) 108 Stats.__init__(self, self.resource) 109 self.resource.setMountPoint(self.mountPoint)
110
111 - def remove_client(self, session_id):
112 session = self._site.sessions.get(session_id, None) 113 if session is not None: 114 session.expire()
115
116 - def update_bytes_received(self, length):
117 self.resource.bytesReceived += length
118
119 - def __repr__(self):
120 return '<FragmentedStreamer (%s)>' % self.name
121
122 - def _get_root(self):
123 return self.resource
124
125 - def _configure_sink(self):
126 ''' 127 Configure sink properties. Can be used by subclasses to set 128 configuration parameters in the element 129 ''' 130 pass
131
132 - def _connect_sink_signals(self):
133 self.sink.get_pad("sink").add_buffer_probe(self._sink_pad_probe, None) 134 self.sink.connect('eos', self._eos)
135 136 ### START OF THREAD-AWARE CODE (called from non-reactor threads) 137
138 - def _sink_pad_probe(self, pad, buffer, none):
139 reactor.callFromThread(self.update_bytes_received, len(buffer.data)) 140 return True
141
142 - def _eos(self, appsink):
143 #FIXME: How do we handle this for live? 144 self.log('appsink received an eos')
145 146 147 ### END OF THREAD-AWARE CODE 148