1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 import operator
20 import random
21 import socket
22
23 from twisted.internet import base, defer, threads, reactor
24 from twisted.python import threadpool
25 from flumotion.common import log
26
27 DEFAULT_PRIORITY = 1.0
28 DEFAULT_REFRESH_TIMEOUT = 300
29
30 LOG_CATEGORY = "server-selector"
31
32
34
38
40 if timeout:
41 timeoutDelay = reduce(operator.add, timeout)
42 else:
43 timeoutDelay = 60
44 userDeferred = defer.Deferred()
45
46
47 lookupDeferred = threads.deferToThread(
48 self.socket.gethostbyname_ex, name)
49 cancelCall = self.reactor.callLater(
50 timeoutDelay, self._cleanup, name, lookupDeferred)
51 self._runningQueries[lookupDeferred] = (userDeferred, cancelCall)
52 lookupDeferred.addBoth(self._checkTimeout, name, lookupDeferred)
53 return userDeferred
54
55
57
58 logCategory = LOG_CATEGORY
59
61 self.servers = {}
62 self.hostnames = {}
63 self.timeout = timeout
64 self.socket = socket
65
66 self._resolver = ThreadedResolver(reactor, sk)
67 self._refresh = None
68
70 ip_list = h[2]
71 for ip in ip_list:
72 s = Server(ip, port, priority)
73 if s not in self.servers[priority]:
74 self.servers[priority].append(s)
75
76 self.hostnames[hostname] = (ip_list, priority, port)
77
82
84 """
85 Add a hostname to the list of servers, with a priority. (in
86 increasing order, 1 comes before 2).
87
88 @return None
89 """
90 self.hostnames[hostname] = ([], priority, port)
91 if priority not in self.servers:
92 self.servers[priority] = []
93
94 d = self._resolver.getHostByNameEx(hostname)
95 d.addCallbacks(self._addCallback,
96 self._addErrback,
97 callbackArgs=(hostname, port, priority))
98 return d
99
101 """
102 Order the looked up servers by priority, and return them.
103
104 @return a generator of Server
105 """
106 priorities = self.servers.keys()
107 priorities.sort()
108 for p in priorities:
109 servers = self.servers[p]
110 random.shuffle(servers)
111 for s in servers:
112 yield s
113
115
116 new_ips = host[2]
117 old_ips, priority, port = self.hostnames[hostname]
118 to_be_added = [ip for ip in new_ips if ip not in old_ips]
119 to_be_removed = [ip for ip in old_ips if ip not in new_ips]
120 servers = self.servers[priority]
121 for ip in to_be_added:
122 servers.append(Server(ip, port, priority))
123 self.hostnames[hostname][0].append(ip)
124 for ip in to_be_removed:
125 for s in servers:
126 if s.ip == ip:
127 servers.remove(s)
128 self.hostnames[hostname][0].remove(ip)
129 self.servers[priority] = servers
130
142
146
149
152
154 if self._refresh:
155 self._refresh.cancel()
156 self._refresh = None
157
158
160
161 - def __init__(self, ip, port, priority):
162 self.ip = ip
163 self.port = port
164 self.priority = priority
165
168
171
173 return self.__dict__ == other.__dict__
174