1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import os
19 import string
20 import time
21
22 from twisted.web import server, http
23 from twisted.web.resource import Resource
24 from twisted.internet import defer, reactor, error
25 from twisted.cred import credentials
26 from zope.interface import implements
27
28 from flumotion.common import log, messages, errors, netutils, interfaces
29 from flumotion.common.i18n import N_, gettexter
30 from flumotion.component import component
31 from flumotion.component.base import http as httpbase
32 from flumotion.component.component import moods
33 from flumotion.component.misc.httpserver import httpfile, \
34 localprovider, localpath
35 from flumotion.component.misc.httpserver import serverstats
36 from flumotion.component.misc.porter import porterclient
37 from flumotion.twisted import fdserver
38
39 __version__ = "$Rev$"
40 T_ = gettexter()
41
42 UPTIME_UPDATE_INTERVAL = 5
43
44 FILEPROVIDER_SOCKET = 'flumotion.component.misc.httpserver' \
45 '.fileprovider.FileProviderPlug'
46
47
49
51 server.Request.__init__(self, channel, queued)
52 now = time.time()
53 self.lastTimeWritten = now
54
55
56
57 self.fd = self.transport.fileno()
58
59 self._component = channel.factory.component
60 self._transfer = None
61 self._provider = None
62 self._startTime = now
63 self._completionTime = None
64 self._rangeFirstByte = None
65 self._rangeLastByte = None
66 self._resourceSize = None
67 self._bytesWritten = 0L
68
69
70 self.stats = serverstats.RequestStatistics(self._component.stats)
71
72 self._component.requestStarted(self)
73
75 self._rangeFirstByte = first
76 self._rangeLastByte = last
77 self._resourceSize = size
78
86
95
101
110
112 headers = self.getAllHeaders()
113 duration = (self._completionTime or time.time()) - self._startTime
114 requestFields = {'ip': self.getClientIP(),
115 'method': self.method,
116 'uri': self.uri,
117 'get-parameters': self.args,
118 'clientproto': self.clientproto,
119 'response': self.code,
120 'bytes-sent': self._bytesWritten,
121 'referer': headers.get('referer', None),
122 'user-agent': headers.get('user-agent', None),
123 'time-connected': duration,
124 'resource-size': self._resourceSize,
125 'range-first': self._rangeFirstByte,
126 'range-last': self._rangeLastByte}
127 if self._provider:
128
129 providerFields = self._provider.getLogFields()
130 providerFields.update(requestFields)
131 requestFields = providerFields
132 return requestFields
133
134
135 -class Site(server.Site):
142
143
145 """
146 I wrap a statistics ui state entry, to allow updates.
147 """
148
152
153 - def update(self, name, value):
156
157
159
165
167 """
168 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None.
169 """
170 return self.callRemote('authenticate', bouncerName, keycard)
171
172 - def keepAlive(self, bouncerName, issuerName, ttl):
173 """
174 @rtype: L{twisted.internet.defer.Deferred}
175 """
176 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
177
179 """
180 @rtype: L{twisted.internet.defer.Deferred}
181 """
182 return self.callRemote('removeKeycardId', bouncerName, keycardId)
183
186
189
192
195
198
201
205
206
208 implements(interfaces.IStreamingComponent)
209
210 componentMediumClass = HTTPFileMedium
211
212 REQUEST_TIMEOUT = 30
213
214
216 self.mountPoint = None
217 self.type = None
218 self.port = None
219 self.hostname = None
220 self.stats = None
221 self._rateControlPlug = None
222 self._fileProviderPlug = None
223 self._metadataProviderPlug = None
224 self._loggers = []
225 self._requestModifiers = []
226 self._logfilter = None
227 self.httpauth = None
228 self._startTime = time.time()
229 self._uptimeCallId = None
230 self._allowBrowsing = False
231
232 self._description = 'On-Demand Flumotion Stream'
233
234 self._singleFile = False
235 self._connected_clients = {}
236 self._total_bytes_written = 0
237
238 self._pbclient = None
239
240 self._twistedPort = None
241 self._timeoutRequestsCallLater = None
242
243 self._pendingDisconnects = {}
244 self._rootResource = None
245
246
247
248 self._mimeToResource = {
249 'video/x-flv': httpfile.FLVFile,
250 'video/mp4': httpfile.MP4File,
251 }
252
253 self.uiState.addKey('stream-url', None)
254 self.uiState.addKey('server-uptime', 0)
255 self.uiState.addKey('file-provider', None)
256 self.uiState.addKey('allow-browsing', False)
257 self.uiState.addDictKey('request-statistics')
258 self.uiState.addDictKey('provider-statistics')
259
261 props = self.config['properties']
262 self.fixRenamedProperties(props, [
263 ('issuer', 'issuer-class'),
264 ('porter_socket_path', 'porter-socket-path'),
265 ('porter_username', 'porter-username'),
266 ('porter_password', 'porter-password'),
267 ('mount_point', 'mount-point')])
268
269 path = props.get('path', None)
270 plugs = self.plugs.get(FILEPROVIDER_SOCKET, [])
271 if plugs:
272 if path:
273 self.warning("The component property 'path' should not be used"
274 " in conjunction with a file provider plug.")
275
276
277
278
279
280
281 if props.get('type', 'master') == 'slave':
282 for k in 'socket-path', 'username', 'password':
283 if not 'porter-' + k in props:
284 msg = 'slave mode, missing required property porter-%s' % k
285 return defer.fail(errors.ConfigError(msg))
286 if plugs or not path:
287 return
288 if os.path.isfile(path):
289 self._singleFile = True
290 elif os.path.isdir(path):
291 self._singleFile = False
292 else:
293 msg = "the file or directory specified in 'path': %s does " \
294 "not exist or is neither a file nor directory" % path
295 return defer.fail(errors.ConfigError(msg))
296
298 desc = props.get('description', None)
299 if desc:
300 self._description = desc
301
302
303 mountPoint = props.get('mount-point', '/')
304 if not mountPoint.startswith('/'):
305 mountPoint = '/' + mountPoint
306 self.mountPoint = mountPoint
307 self.hostname = props.get('hostname', None)
308 if not self.hostname:
309 self.hostname = netutils.guess_public_hostname()
310
311 self.type = props.get('type', 'master')
312 self.port = props.get('port', 8801)
313 self._allowBrowsing = props.get('allow-browsing', False)
314 if self.type == 'slave':
315
316 self._porterPath = props['porter-socket-path']
317 self._porterUsername = props['porter-username']
318 self._porterPassword = props['porter-password']
319 socket = 'flumotion.component.plugs.request.RequestLoggerPlug'
320 self._loggers = self.plugs.get(socket, [])
321 socket = \
322 'flumotion.component.plugs.requestmodifier.RequestModifierPlug'
323 self._requestModifiers = self.plugs.get(socket, [])
324
325 self.httpauth = httpbase.HTTPAuthentication(self)
326 if 'avatarId' in self.config:
327 self.httpauth.setRequesterId(self.config['avatarId'])
328 if 'bouncer' in props:
329 self.httpauth.setBouncerName(props['bouncer'])
330 if 'issuer-class' in props:
331 self.warning("The component property 'issuer-class' has been"
332 "deprecated.")
333 msg = messages.Warning(T_(N_(
334 "The component property 'issuer-class' has "
335 "been deprecated.")))
336 self.addMessage(msg)
337
338 if 'allow-default' in props:
339 self.httpauth.setAllowDefault(props['allow-default'])
340 if 'ip-filter' in props:
341 logFilter = http.LogFilter()
342 for f in props['ip-filter']:
343 logFilter.addIPFilter(f)
344 self._logfilter = logFilter
345 socket = \
346 'flumotion.component.misc.httpserver.ratecontrol.RateControllerPlug'
347 plugs = self.plugs.get(socket, [])
348 if plugs:
349
350 self._rateControlPlug = self.plugs[socket][-1]
351
352 plugs = self.plugs.get(FILEPROVIDER_SOCKET, [])
353 if plugs:
354
355 self._fileProviderPlug = plugs[-1]
356 else:
357
358
359 plugProps = {"properties": {"path": props.get('path', None)}}
360 self._fileProviderPlug = localprovider.FileProviderLocalPlug(
361 plugProps)
362
363 socket = ('flumotion.component.misc.httpserver'
364 '.metadataprovider.MetadataProviderPlug')
365 plugs = self.plugs.get(socket, [])
366 if plugs:
367 self._metadataProviderPlug = plugs[-1]
368
369
370 self.uiState.set('stream-url', self.getUrl())
371 self.uiState.set('allow-browsing', self._allowBrowsing)
372
374 self.have_properties(self.config['properties'])
375
376 root = self._rootResource
377 if root is None:
378 root = self._getDefaultRootResource()
379
380 if root is None:
381 raise errors.WrongStateError(
382 "a resource or path property must be set")
383
384 site = Site(root, self)
385 self._timeoutRequestsCallLater = reactor.callLater(
386 self.REQUEST_TIMEOUT, self._timeoutRequests)
387
388
389 self.stats = serverstats.ServerStatistics()
390 updater = StatisticsUpdater(self.uiState, "request-statistics")
391 self.stats.startUpdates(updater)
392 updater = StatisticsUpdater(self.uiState, "provider-statistics")
393 self._fileProviderPlug.startStatsUpdates(updater)
394 self._updateUptime()
395
396 d = defer.Deferred()
397 if self.type == 'slave':
398
399 if self._singleFile:
400 self._pbclient = porterclient.HTTPPorterClientFactory(
401 site, [self.mountPoint], d)
402 else:
403 self._pbclient = porterclient.HTTPPorterClientFactory(
404 site, [], d,
405 prefixes=[self.mountPoint])
406 creds = credentials.UsernamePassword(self._porterUsername,
407 self._porterPassword)
408 self._pbclient.startLogin(creds, self._pbclient.medium)
409 self.info("Logging to porter on socketPath %s", self._porterPath)
410
411 reactor.connectWith(fdserver.FDConnector, self._porterPath,
412 self._pbclient, 10, checkPID=False)
413 else:
414
415 try:
416 self.debug('Going to listen on port %d' % self.port)
417 iface = ""
418
419
420 self._twistedPort = reactor.listenTCP(self.port,
421 site, interface=iface)
422 self.port = self._twistedPort.getHost().port
423 self.info('Listening on interface %r on port %d',
424 iface, self.port)
425 except error.CannotListenError:
426 t = 'Port %d is not available.' % self.port
427 self.warning(t)
428 m = messages.Error(T_(N_(
429 "Network error: TCP port %d is not available."),
430 self.port))
431 self.addMessage(m)
432 self.setMood(moods.sad)
433 return defer.fail(errors.ComponentSetupHandledError(t))
434
435 d.callback(None)
436
437
438 def setComponentHappy(result):
439 self.httpauth.scheduleKeepAlive()
440 self.setMood(moods.happy)
441 return result
442 d.addCallback(setComponentHappy)
443 return d
444
446 if self.stats:
447 self.stats.stopUpdates()
448 if self._fileProviderPlug:
449 self._fileProviderPlug.stopStatsUpdates()
450 if self.httpauth:
451 self.httpauth.stopKeepAlive()
452 if self._timeoutRequestsCallLater:
453 self._timeoutRequestsCallLater.cancel()
454 self._timeoutRequestsCallLater = None
455 if self._uptimeCallId:
456 self._uptimeCallId.cancel()
457 self._uptimeCallId = None
458 if self._twistedPort:
459 self._twistedPort.stopListening()
460
461 l = [self.remove_all_clients()]
462 if self.type == 'slave' and self._pbclient:
463 if self._singleFile:
464 l.append(self._pbclient.deregisterPath(self.mountPoint))
465 else:
466 l.append(self._pbclient.deregisterPrefix(self.mountPoint))
467 return defer.DeferredList(l)
468
470 """
471 Provide a new set of porter login information, for when we're in slave
472 mode and the porter changes.
473 If we're currently connected, this won't disconnect - it'll just change
474 the information so that next time we try and connect we'll use the
475 new ones
476 @param path: new path
477 @param username: new username
478 @param password: new password
479 """
480 if self.type != 'slave':
481 raise errors.WrongStateError(
482 "Can't specify porter details in master mode")
483
484 self._porterUsername = username
485 self._porterPassword = password
486
487 creds = credentials.UsernamePassword(self._porterUsername,
488 self._porterPassword)
489 self._pbclient.startLogin(creds, self.medium)
490
491 self._updatePath(path)
492
494
495 if path == self._porterPath:
496 return
497 self._porterPath = path
498
499
500 self._pbclient.stopTrying()
501
502 self._pbclient.resetDelay()
503 reactor.connectWith(fdserver.FDConnector, self._porterPath,
504 self._pbclient, 10, checkPID=False)
505
527
529 node = self._fileProviderPlug.getRootPath()
530 if node is None:
531 return None
532
533 self.debug('Starting with mount point "%s"' % self.mountPoint)
534 factory = httpfile.MimedFileFactory(self.httpauth,
535 mimeToResource=self._mimeToResource,
536 rateController=self._rateControlPlug,
537 requestModifiers=self._requestModifiers,
538 metadataProvider=self._metadataProviderPlug)
539
540 root = factory.create(node)
541 if self.mountPoint != '/':
542 root = self._createRootResourceForPath(self.mountPoint, root)
543
544 return root
545
547 if path.endswith('/'):
548 path = path[:-1]
549
550 root = Resource()
551 children = string.split(path[1:], '/')
552 parent = root
553 for child in children[:-1]:
554 resource = Resource()
555 self.debug("Putting Resource at %s", child)
556 parent.putChild(child, resource)
557 parent = resource
558 self.debug("Putting resource %r at %r", fileResource, children[-1])
559 parent.putChild(children[-1], fileResource)
560 return root
561
563 """
564 Remove a client when requested.
565
566 Used by keycard expiry.
567 """
568 if fd in self._connected_clients:
569 request = self._connected_clients[fd]
570 self.debug("Removing client for fd %d", fd)
571 request.unregisterProducer()
572 request.channel.transport.loseConnection()
573 else:
574 self.debug("No client with fd %d found", fd)
575
589
591
592 fd = request.transport.fileno()
593 self._connected_clients[fd] = request
594 self.debug("[fd %5d] (ts %f) request %r started",
595 fd, time.time(), request)
596
598
599
600 self.debug('[fd %5d] (ts %f) finishing request %r',
601 request.transport.fileno(), time.time(), request)
602
603 self.httpauth.cleanupAuth(fd)
604 ip = request.getClientIP()
605 if not self._logfilter or not self._logfilter.isInRange(ip):
606 fields = request.getLogFields()
607 fields.update({'time': time.gmtime(),
608 'username': '-'})
609 l = []
610 for logger in self._loggers:
611 l.append(defer.maybeDeferred(
612 logger.event, 'http_session_completed', fields))
613 d = defer.DeferredList(l)
614 else:
615 d = defer.succeed(None)
616
617 del self._connected_clients[fd]
618
619 self._total_bytes_written += bytesWritten
620
621 def firePendingDisconnect(_):
622 self.debug("Logging completed")
623 if fd in self._pendingDisconnects:
624 pending = self._pendingDisconnects.pop(fd)
625 self.debug("Firing pending disconnect deferred")
626 pending.callback(None)
627
628
629 self.debug('[fd %5d] (ts %f) finished request %r',
630 fd, time.time(), request)
631
632 d.addCallback(firePendingDisconnect)
633
635 return self._description
636
638 port = self.port
639
640 if self.type == 'slave' and self._pbclient:
641 if not self._pbclient.remote_port:
642 return ""
643 port = self._pbclient.remote_port
644
645 if (not port) or (port == 80):
646 port_str = ""
647 else:
648 port_str = ":%d" % port
649
650 return "http://%s%s%s" % (self.hostname, port_str, self.mountPoint)
651
653 socket = 'flumotion.component.plugs.streamdata.StreamDataProviderPlug'
654 if self.plugs[socket]:
655 plug = self.plugs[socket][-1]
656 return plug.getStreamData()
657 else:
658 return {'protocol': 'HTTP',
659 'description': self._description,
660 'url': self.getUrl()}
661
663 """
664 Return the number of connected clients
665 """
666 return len(self._connected_clients)
667
669 """
670 Current Bandwidth
671 """
672 bytesTransferred = self._total_bytes_written
673 for request in self._connected_clients.values():
674 if request._transfer:
675 bytesTransferred += request._transfer.bytesWritten
676 return bytesTransferred
677
679 """
680 Return a tuple (deltaadded, deltaremoved, bytes_transferred,
681 current_clients, current_load) of our current bandwidth and
682 user values. The deltas and current_load are NOT currently
683 implemented here, we set them as zero.
684 """
685 return (0, 0, self.getBytesSent(), self.getClients(), 0)
686
688 """
689 Close the logfile, then reopen using the previous logfilename
690 """
691 for logger in self._loggers:
692 self.debug('rotating logger %r' % logger)
693 logger.rotate()
694
696 """Attaches a root resource to this component. The root resource is the
697 once which will be used when accessing the mount point.
698 This is normally called from a plugs start() method.
699 @param resource: root resource
700 @type resource: L{twisted.web.resource.Resource}
701 """
702 rootResource = self._createRootResourceForPath(
703 self.getMountPoint(), resource)
704
705 self._rootResource = rootResource
706
708 """Get the mount point of this component
709 @returns: the mount point
710 """
711
712 return self.config['properties'].get('mount-point')
713
719