1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 import stat
19 from cStringIO import StringIO
20 import time
21
22 from twisted.internet import defer, reactor, abstract
23
24 from flumotion.common import log
25
26 from flumotion.component.misc.httpserver import fileprovider
27 from flumotion.component.misc.httpserver import ourmimetypes
28 from flumotion.component.misc.httpserver import cachestats
29 from flumotion.component.misc.httpserver.httpcached import common
30 from flumotion.component.misc.httpserver.httpcached import resource_manager
31
32 EXP_TABLE_CLEANUP_PERIOD = 30
33 MAX_RESUME_COUNT = 20
34
35
36
37 PRODUCING_PERIOD = 0.08
38
39
41 """
42 Raised when a request used by a caching session
43 was using a conditional retrieval and it fails.
44 """
45
49
52
53
55 """
56 Base class for all caching strategies.
57
58 Handles the cache lookup, cache expiration checks,
59 statistics gathering and caching sessions managment.
60 """
61
62 logCategory = "base-caching"
63
64 - def __init__(self, cachemgr, reqmgr, ttl):
65 self.cachemgr = cachemgr
66 self.reqmgr = reqmgr
67 self.ttl = ttl
68
69 self._identifiers = {}
70 self._etimes = {}
71
72 self._cleanupCall = None
73
77
84
86 identifier = self.cachemgr.getIdentifier(url.path)
87 session = self._identifiers.get(identifier, None)
88 if session is not None and not session.checkModified:
89 self.debug("Caching session found for '%s'", url)
90
91 if (session.getState() in
92 (CachingSession.DETACHED, CachingSession.CACHED)):
93 stats.onStarted(session.size, cachestats.CACHE_HIT)
94 elif (session.getState() in
95 (CachingSession.REQUESTING, CachingSession.BUFFERING,
96 CachingSession.CACHING)):
97 stats.onStarted(session.size, cachestats.TEMP_HIT)
98 else:
99 stats.onStarted(session.size, cachestats.CACHE_MISS)
100
101
102 d = session.waitInfo()
103 d.addCallback(RemoteSource, stats)
104 return d
105
106 self.log("Looking for cached file for '%s'", url)
107 d = defer.Deferred()
108 d.addCallback(self.cachemgr.openCacheFile)
109 d.addErrback(self._cachedFileError, url)
110 d.addCallback(self._gotCachedFile, url, identifier, stats)
111
112 d.callback(url.path)
113
114 return d
115
116 - def requestData(self, url, offset=None, size=None, mtime=None):
119
121 return self._identifiers.values()
122
125
126
127
129 raise NotImplementedError()
130
132 raise NotImplementedError()
133
134
135
140
142 if self._cleanupCall:
143 self._cleanupCall.cancel()
144 self._cleanupCall = None
145
150
152 now = time.time()
153 expired = [i for i, e in self._etimes.items() if e < now]
154 for ident in expired:
155 del self._etimes[ident]
156
163
167
171
174
180
182 if cachedFile is not None:
183 self.log("Opened cached file '%s'", cachedFile.name)
184 etime = self._etimes.get(identifier, None)
185 session = self._identifiers.get(identifier, None)
186 if (etime and (etime > time.time()) or
187 (session and session.checkModified)):
188 stats.onStarted(cachedFile.stat[stat.ST_SIZE],
189 cachestats.CACHE_HIT)
190 return CachedSource(identifier, url, cachedFile, stats)
191 self.debug("Cached file may have expired '%s'", cachedFile.name)
192 return self._onCacheOutdated(url, identifier, cachedFile, stats)
193 self.debug("Resource not cached '%s'", url)
194 return self._onCacheMiss(url, stats)
195
196
198 """
199 Data source that read data directly from a localy cached file.
200 """
201
202 mimetypes = ourmimetypes.MimeTypes()
203
204 - def __init__(self, ident, url, cachedFile, stats):
215
216 - def produce(self, consumer, offset):
220
221 - def read(self, offset, size):
229
234
235
237 """
238 Base class for resource not yet cached.
239 It offers a push producer, it delegates read operations
240 to the session and start a block pipelining if the session
241 cannot serve the requested data.
242 Updates the cache statistics.
243 """
244
245 strategy = None
246 session = None
247 stats = None
248
249 - def produce(self, consumer, offset):
251
252 - def read(self, offset, size):
268
273
277
278
280 """
281 Simple remote source.
282 """
283
296
301
302
304 """
305 Base class of caching sessions.
306 Just an interface to be implemented or inherited
307 by all caching sessions.
308 """
309
310 strategy = None
311 url = None
312 size = 0
313 mtime = None
314 mimeType = None
315
316 - def read(self, offset, size):
318
320 raise NotImplementedError()
321
323 raise NotImplementedError()
324
326 raise NotImplementedError()
327
328
330 """
331 Caches a stream locally in a temporary file.
332 The already cached data can be read from the session.
333
334 Can be canceled, meaning the session is not valid anymore.
335
336 Can be aborted, meaning the session will stop caching locally
337 but is still valid.
338
339 The caching operation can be started at any moment, but the
340 session have to receive the stream info before it can be used
341 with a RemoteSource instance.
342
343 It can recover request failures up to MAX_RESUME_COUNT times.
344 """
345
346 logCategory = "caching-session"
347
348 (PIPELINING,
349 REQUESTING,
350 BUFFERING,
351 CACHING,
352 CACHED,
353 DETACHED,
354 CLOSED,
355 CANCELED,
356 ABORTED,
357 ERROR) = range(10)
358
359 mimetypes = ourmimetypes.MimeTypes()
360
361 - def __init__(self, strategy, url, cache_stats, ifModifiedSince=None):
391
394
397
411
420
429
438
439 - def read(self, offset, size):
455
483
510
513
515 self._refcount -= 1
516 if self._refcount == 0:
517 if self._state == self.DETACHED:
518
519 self.log("Detached session not referenced anymore")
520 self._close()
521
523 return self._refcount > 0
524
525
526
528 self.warning("Session request error %s (%s) for %s using %s:%s",
529 message, code, self.url, getter.host, getter.port)
530 if code in (common.SERVER_DISCONNECTED, common.SERVER_TIMEOUT):
531 if self._resumes > 0:
532 self._resumes -= 1
533 if self._state > self.REQUESTING:
534
535 offset = self._bytes
536 size = self.size - self._bytes
537 self.debug("Resuming retrieval from offset %d with "
538 "size %d of %s (%d tries left)", offset, size,
539 self.url, self._resumes)
540
541 self._resumeRetrieve(offset, size)
542 return
543 else:
544
545 self.debug("Resuming retrieval from start of %s "
546 "(%d tries left)", self.url, self._resumes)
547 self._firstRetrieve()
548 return
549 self.debug("Too much resuming intents, stopping "
550 "after %d of %s bytes of %s",
551 self._bytes, self.size, self.url)
552 self._close()
553 self._error(fileprovider.UnavailableError(message))
554
564
574
575 - def onInfo(self, getter, info):
576 if self._state == self.BUFFERING:
577
578
579 self._request.pause()
580 return
581
582 if self._state != self.REQUESTING:
583
584 return
585
586 if info.size != (info.length - self._bytes):
587 self.log("Unexpected stream size: %s / %s bytes "
588 "(Already got %s bytes)",
589 info.size, info.length, self._bytes)
590 self._close()
591 msg = "Unexpected resource size: %d" % info.size
592 self._error(fileprovider.FileError(msg))
593 return
594
595 self._state = self.BUFFERING
596
597 self.mimeType = self.mimetypes.fromPath(self.url.path)
598 self.mtime = info.mtime
599 self.size = info.size
600
601 self.log("Caching session with type %s, size %s, mtime %s for %s",
602 self.mimeType, self.size, self.mtime, self.url)
603
604 self._file = StringIO()
605
606 self.log("Requesting temporary file for %s", self.url)
607 d = self.strategy.cachemgr.newTempFile(self.url.path, info.size,
608 info.mtime)
609
610
611
612
613 self._request.pause()
614
615
616 self._fireInfo(self)
617 self._fireStarted(self)
618
619 self.debug("Start buffering %s", self.url)
620 d.addCallback(self._gotTempFile)
621
623 if self._state not in (self.BUFFERING, self.CACHED):
624
625 if tempFile:
626 tempFile.close()
627 return
628
629 if tempFile is None:
630 self.warning("Temporary file creation failed, "
631 "aborting caching of %s", self.url)
632 self.abort()
633 return
634
635 self.log("Got temporary file for %s", self.url)
636
637 self.debug("Start caching %s", self.url)
638
639 data = self._file.getvalue()
640 self._file = tempFile
641 tempFile.write(data)
642
643 if self._request is not None:
644
645 self._request.resume()
646
647 if self._state == self.CACHED:
648
649 self._real_complete()
650 else:
651 self._state = self.CACHING
652
653 - def onData(self, getter, data):
667
672
685
687 defers = list(self._infoDefers)
688
689 self._infoDefers = []
690 for d in defers:
691 d.callback(value)
692
694 defers = list(self._startedDefers)
695
696 self._startedDefers = []
697 for d in defers:
698 d.callback(value)
699
701 defers = list(self._finishedDefers)
702
703 self._finishedDefers = []
704 for d in defers:
705 d.callback(value)
706
708 self._errorValue = error
709 defers = list(self._infoDefers)
710 defers.extend(self._startedDefers)
711 defers.extend(self._finishedDefers)
712
713 self._infoDefers = []
714 self._startedDefers = []
715 self._finishedDefers = []
716 for d in defers:
717 d.errback(error)
718
730
740
758
760 since = self.ifModifiedSince
761 self._request = self.strategy.reqmgr.retrieve(self, self.url,
762 ifModifiedSince=since)
763 self.log("Retrieving data using %s", self._request.logName)
764
772
773
775 """
776 Offers a IPushProducer interface to a caching session.
777 It starts producing data from the specified point.
778
779 If the data is already cached by the session,
780 it produce data with a reactor loop reading the data
781 from the session by block.
782
783 If the data is not yet cached, it starts a request
784 using the request manager and pipeline the data
785 to the specified consumer.
786
787 It can recover request failures up to MAX_RESUME_COUNT times.
788
789 It's not used yet in the context of http-server.
790 Until now, the simulations show that using a producer with
791 long-lived HTTP requests instead of short lived block request
792 is less efficient and produce bigger latency for the clients.
793 At least when used with HTTP proxies.
794 """
795
796 logCategory = "pipe-producer"
797
798 - def __init__(self, consumer, session, offset, stats):
821
822
823
825 if self.consumer is None:
826
827 return
828
829 self._paused = False
830
831 if self._pipelining:
832
833 if self._request:
834
835 self._request.resume()
836 else:
837
838 self._pipeline()
839 else:
840
841 self._produce()
842
844 if self.consumer is None:
845
846 return
847
848 self._paused = True
849
850 if self._pipelining:
851
852 if self._request:
853 self._request.pause()
854 else:
855
856 self._stop()
857
861
862
863
865 if self._request is None:
866
867 return
868 self._request = None
869
870 if code in (common.SERVER_DISCONNECTED, common.SERVER_TIMEOUT):
871 self.warning("Producer request error %s (%s) for %s "
872 "(%s tries left)", message, code,
873 self.session.url, self._resumes)
874
875 if self._resumes > 0:
876 self._resumes -= 1
877 if self._paused:
878 self.log("Producer paused, waiting to recover pipelining "
879 "(%d tries left)", self._resumes)
880 else:
881 self.log("Recovering pipelining (%d tries left)",
882 self._resumes)
883 self._pipeline()
884 return
885
886 self.debug("Too much resuming intents, stopping "
887 "after %d of %s", self._bytes, self.size)
888
889 self._terminate()
890
892 if self._request is None:
893
894 return
895 self._request = None
896 self.warning("Modifications detected while producing %s",
897 self.session.url)
898 self._terminate()
899
901 if self._request is None:
902
903 return
904 self._request = None
905 self.warning("%s detected while producing %s",
906 message, self.session.url)
907 self._terminate()
908
909 - def onData(self, getter, data):
910 if self._request is None:
911
912 return
913 self._write(data)
914
916 if self._request is None:
917
918 return
919 self.log("Pipelining finished")
920 self._terminate()
921
922
923
947
952
954 if self._call is not None:
955 self._call.cancel()
956 self._call = None
957
959 if not self.session.isActive():
960 self.log("Session %s not active anymore (%s), "
961 "aborting production of %s",
962 self.session.logName,
963 self.session._state,
964 self.session.url)
965 self._terminate()
966 return
967
968 self._pipelining = True
969
970 offset = self.offset + self._produced
971 size = self.session.size - offset
972 mtime = self.session.mtime
973
974 if size == 0:
975 self.log("No more data to be retrieved, pipelining finished")
976 self._terminate()
977 return
978
979 self.debug("Producing %s bytes from offset %d of %s",
980 size, offset, self.session.url)
981
982 self._request = self.reqmgr.retrieve(self, self.session.url,
983 start=offset, size=size,
984 ifUnmodifiedSince=mtime)
985 self.log("Retrieving data using %s", self._request.logName)
986
988 if self._request:
989
990 self._request.cancel()
991 self._request = None
992
993 self._stop()
994
995 expected = self.session.size - self.offset
996 if self._produced != expected:
997 self.warning("Only produced %s of the %s bytes "
998 "starting at %s of %s",
999 self._produced, expected,
1000 self.offset, self.session.url)
1001 else:
1002 self.log("Finished producing %s bytes starting at %s of %s",
1003 self._produced, self.offset, self.session.url)
1004
1005 self.consumer.unregisterProducer()
1006 self.consumer.finish()
1007 self.consumer = None
1008
1009 self.session.delref()
1010 self.session = None
1011
1012
1014 """
1015 Retrieves a block of data using a range request.
1016 A modification time can be specified for the retrieval to
1017 fail if the requested file modification time changed.
1018
1019 The data is returned as a block by triggering the deferred
1020 returned by calling the retrieve method.
1021
1022 It can recover request failures up to MAX_RESUME_COUNT times.
1023 """
1024
1025 logCategory = "block-requester"
1026
1027 - def __init__(self, reqmgr, url, mtime=None):
1028 self.reqmgr = reqmgr
1029 self._url = url
1030 self._mtime = mtime
1031 self._data = None
1032 self._deferred = None
1033 self._offset = None
1034 self._size = None
1035 self._resumes = MAX_RESUME_COUNT
1036
1037 self.logName = common.log_id(self)
1038
1040 assert self._deferred is None, "Already retrieving"
1041 self._deferred = defer.Deferred()
1042 self._data = []
1043 self._offset = offset
1044 self._size = size
1045 self._curr = 0
1046
1047 self._retrieve()
1048
1049 return self._deferred
1050
1052 assert self._deferred is not None, "Not retrieving anything"
1053 if code == common.RANGE_NOT_SATISFIABLE:
1054
1055 self._deferred.callback("")
1056 self._cleanup()
1057 return
1058 if code in (common.SERVER_DISCONNECTED, common.SERVER_TIMEOUT):
1059 self.warning("Block request error: %s (%s)", message, code)
1060 if self._resumes > 0:
1061 self._resumes -= 1
1062 self.debug("Resuming block retrieval from offset %d "
1063 "with size %d (%d tries left)",
1064 self._offset, self._size, self._resumes)
1065
1066 self._retrieve()
1067 return
1068 self.debug("Too much resuming intents, stopping "
1069 "after %d of %d", self._offset, self._size)
1070 self._deferred.errback(fileprovider.FileError(message))
1071 self._cleanup()
1072
1077
1083
1084 - def onData(self, getter, data):
1089
1094
1098
1100 self._deferred = None
1101 self._data = None
1102