Package flumotion :: Package worker :: Module feedserver
[hide private]

Source Code for Module flumotion.worker.feedserver

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_worker_feed -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18  """ 
 19  implementation of a PB Server through which other components can request 
 20  to eat from or feed to this worker's components. 
 21  """ 
 22   
 23  from twisted.internet import reactor 
 24  from twisted.spread import pb 
 25  from twisted.cred import portal 
 26  from zope.interface import implements 
 27   
 28  from flumotion.common import log, common 
 29  from flumotion.twisted import fdserver 
 30  from flumotion.twisted import portal as fportal 
 31  from flumotion.twisted import pb as fpb 
 32   
 33  __version__ = "$Rev$" 
 34   
 35   
36 -class FeedServer(log.Loggable):
37 """ 38 I am the feed server. PHEAR 39 """ 40 41 implements(portal.IRealm) 42 43 logCategory = 'dispatcher' 44
45 - def __init__(self, brain, bouncer, portNum):
46 """ 47 @param brain: L{flumotion.worker.worker.WorkerBrain} 48 """ 49 self._brain = brain 50 self._tport = None 51 self.listen(bouncer, portNum)
52
53 - def getPortNum(self):
54 if not self._tport: 55 self.warning('not listening!') 56 return 0 57 return self._tport.getHost().port
58
59 - def listen(self, bouncer, portNum, unsafeTracebacks=0):
60 portal = fportal.BouncerPortal(self, bouncer) 61 factory = pb.PBServerFactory(portal, 62 unsafeTracebacks=unsafeTracebacks) 63 64 tport = reactor.listenWith(fdserver.PassableServerPort, portNum, 65 factory) 66 67 self._tport = tport 68 self.debug('Listening for feed requests on TCP port %d', 69 self.getPortNum())
70
71 - def shutdown(self):
72 d = self._tport.stopListening() 73 self._tport = None 74 return d
75 76 ### IRealm method 77
78 - def requestAvatar(self, avatarId, keycard, mind, *ifaces):
79 avatar = FeedAvatar(self, avatarId, mind) 80 return (pb.IPerspective, avatar, 81 lambda: self.avatarLogout(avatar))
82
83 - def avatarLogout(self, avatar):
84 self.debug('feed avatar logged out: %s', avatar.avatarId)
85 86 ## proxy these to the brain 87
88 - def feedToFD(self, componentId, feedId, fd, eaterId):
89 return self._brain.feedToFD(componentId, feedId, fd, eaterId)
90
91 - def eatFromFD(self, componentId, eaterAlias, fd, feedId):
92 return self._brain.eatFromFD(componentId, eaterAlias, fd, feedId)
93 94
95 -class FeedAvatar(fpb.Avatar):
96 """ 97 I am an avatar in a FeedServer for components that log in and request 98 to eat from or feed to one of my components. 99 100 My mind is a reference to a L{flumotion.component.feed.FeedMedium} 101 """ 102 logCategory = "feedavatar" 103 remoteLogName = "feedmedium" 104
105 - def __init__(self, feedServer, avatarId, mind):
106 """ 107 """ 108 fpb.Avatar.__init__(self, avatarId) 109 self._transport = None 110 self.feedServer = feedServer 111 self.avatarId = avatarId 112 self.setMind(mind)
113
114 - def perspective_sendFeed(self, fullFeedId):
115 """ 116 Called when the PB client wants us to send them the given feed. 117 """ 118 # the PB message needs to be sent from the side that has the feeder 119 # for proper switching, so we call back as a reply 120 d = self.mindCallRemote('sendFeedReply', fullFeedId) 121 d.addCallback(self._sendFeedReplyCb, fullFeedId)
122
123 - def _sendFeedReplyCb(self, result, fullFeedId):
124 # compare with startStreaming in prototype 125 # Remove this from the reactor; we mustn't read or write from it from 126 # here on 127 t = self.mind.broker.transport 128 t.stopReading() 129 t.stopWriting() 130 131 # hand off the fd to the component 132 self.debug("Attempting to send FD: %d", t.fileno()) 133 134 (flowName, componentName, feedName) = common.parseFullFeedId( 135 fullFeedId) 136 componentId = common.componentId(flowName, componentName) 137 138 if self.feedServer.feedToFD(componentId, feedName, t.fileno(), 139 self.avatarId): 140 t.keepSocketAlive = True 141 142 # We removed the transport from the reactor before sending the 143 # FD; now we want the socket cleaned up. 144 t.loseConnection()
145
146 - def perspective_receiveFeed(self, fullFeedId):
147 """ 148 Called when the PB client wants to send the given feedId to the 149 given component 150 """ 151 # we need to make sure our result goes back, so only stop reading 152 t = self.mind.broker.transport 153 t.stopReading() 154 reactor.callLater(0, self._doReceiveFeed, fullFeedId)
155
156 - def _doReceiveFeed(self, fullFeedId):
157 t = self.mind.broker.transport 158 159 self.debug('flushing PB write queue') 160 t.doWrite() 161 self.debug('stop writing to transport') 162 t.stopWriting() 163 164 # hand off the fd to the component 165 self.debug("Attempting to send FD: %d", t.fileno()) 166 167 (flowName, componentName, eaterAlias) = common.parseFullFeedId( 168 fullFeedId) 169 componentId = common.componentId(flowName, componentName) 170 171 if self.feedServer.eatFromFD(componentId, eaterAlias, t.fileno(), 172 self.avatarId): 173 t.keepSocketAlive = True 174 175 t.loseConnection()
176