Package flumotion :: Package component :: Package misc :: Package httpserver :: Package httpcached :: Module server_selection
[hide private]

Source Code for Module flumotion.component.misc.httpserver.httpcached.server_selection

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_component_providers -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3   
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
  6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
  7  # All rights reserved. 
  8  # 
  9  # This file may be distributed and/or modified under the terms of 
 10  # the GNU Lesser General Public License version 2.1 as published by 
 11  # the Free Software Foundation. 
 12  # This file is distributed without any warranty; without even the implied 
 13  # warranty of merchantability or fitness for a particular purpose. 
 14  # See "LICENSE.LGPL" in the source distribution for more information. 
 15  # 
 16  # Headers in this file shall remain intact. 
 17   
 18   
 19  import operator 
 20  import random 
 21  import socket 
 22   
 23  from twisted.internet import base, defer, threads, reactor 
 24  from twisted.python import threadpool 
 25  from flumotion.common import log 
 26   
 27  DEFAULT_PRIORITY = 1.0 
 28  DEFAULT_REFRESH_TIMEOUT = 300 
 29   
 30  LOG_CATEGORY = "server-selector" 
 31   
 32   
33 -class ThreadedResolver(base.ThreadedResolver):
34
35 - def __init__(self, reactor, sk=socket):
36 base.ThreadedResolver.__init__(self, reactor) 37 self.socket = sk
38
39 - def getHostByNameEx(self, name, timeout = (1, 3, 11, 45)):
40 if timeout: 41 timeoutDelay = reduce(operator.add, timeout) 42 else: 43 timeoutDelay = 60 44 userDeferred = defer.Deferred() 45 # lookupDeferred = threads.deferToThreadPool( 46 # self.reactor, self.reactor.getThreadPool(), 47 lookupDeferred = threads.deferToThread( 48 self.socket.gethostbyname_ex, name) 49 cancelCall = self.reactor.callLater( 50 timeoutDelay, self._cleanup, name, lookupDeferred) 51 self._runningQueries[lookupDeferred] = (userDeferred, cancelCall) 52 lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred) 53 return userDeferred
54 55
56 -class ServerSelector(log.Loggable):
57 58 logCategory = LOG_CATEGORY 59
60 - def __init__(self, timeout=DEFAULT_REFRESH_TIMEOUT, sk=socket):
61 self.servers = {} 62 self.hostnames = {} 63 self.timeout = timeout 64 self.socket = socket 65 66 self._resolver = ThreadedResolver(reactor, sk) 67 self._refresh = None
68
69 - def _addCallback(self, h, hostname, port, priority):
70 ip_list = h[2] 71 for ip in ip_list: 72 s = Server(ip, port, priority) 73 if s not in self.servers[priority]: 74 self.servers[priority].append(s) 75 76 self.hostnames[hostname] = (ip_list, priority, port)
77
78 - def _addErrback(self, err):
79 self.warning("Could not resolve host %s", 80 log.getFailureMessage(err)) 81 return
82
83 - def addServer(self, hostname, port, priority=DEFAULT_PRIORITY):
84 """ 85 Add a hostname to the list of servers, with a priority. (in 86 increasing order, 1 comes before 2). 87 88 @return None 89 """ 90 self.hostnames[hostname] = ([], priority, port) 91 if priority not in self.servers: 92 self.servers[priority] = [] 93 94 d = self._resolver.getHostByNameEx(hostname) 95 d.addCallbacks(self._addCallback, 96 self._addErrback, 97 callbackArgs=(hostname, port, priority)) 98 return d
99
100 - def getServers(self):
101 """ 102 Order the looked up servers by priority, and return them. 103 104 @return a generator of Server 105 """ 106 priorities = self.servers.keys() 107 priorities.sort() 108 for p in priorities: 109 servers = self.servers[p] 110 random.shuffle(servers) 111 for s in servers: 112 yield s
113
114 - def _refreshCallback(self, host, hostname):
115 # FIXME: improve me, avoid data duplication, Server info loss.. 116 new_ips = host[2] 117 old_ips, priority, port = self.hostnames[hostname] 118 to_be_added = [ip for ip in new_ips if ip not in old_ips] 119 to_be_removed = [ip for ip in old_ips if ip not in new_ips] 120 servers = self.servers[priority] 121 for ip in to_be_added: 122 servers.append(Server(ip, port, priority)) 123 self.hostnames[hostname][0].append(ip) 124 for ip in to_be_removed: 125 for s in servers: 126 if s.ip == ip: 127 servers.remove(s) 128 self.hostnames[hostname][0].remove(ip) 129 self.servers[priority] = servers
130
131 - def refreshServers(self):
132 dl = [] 133 for h in self.hostnames.keys(): 134 d = self._resolver.getHostByNameEx(h) 135 d.addCallbacks(self._refreshCallback, self._addErrback, 136 callbackArgs=(h, )) 137 dl.append(d) 138 self._resetRefresh() 139 d = defer.DeferredList(dl) 140 d.addCallback(lambda _: self) 141 return d
142
143 - def _resetRefresh(self):
144 if self.timeout: 145 self._refresh = reactor.callLater(self.timeout, self._onRefresh)
146
147 - def _onRefresh(self):
148 self.refreshServers()
149
150 - def setup(self):
151 return self.refreshServers()
152
153 - def cleanup(self):
154 if self._refresh: 155 self._refresh.cancel() 156 self._refresh = None
157 158
159 -class Server(object):
160
161 - def __init__(self, ip, port, priority):
162 self.ip = ip 163 self.port = port 164 self.priority = priority
165
166 - def reportError(self, code):
167 pass
168
169 - def __repr__(self):
170 return "<%s: %s:%d>" % (type(self).__name__, self.ip, self.port)
171
172 - def __eq__(self, other):
173 return self.__dict__ == other.__dict__
174