Package flumotion :: Package component :: Package misc :: Package httpserver :: Package httpcached :: Module strategy_base
[hide private]

Source Code for Module flumotion.component.misc.httpserver.httpcached.strategy_base

   1  # -*- Mode: Python; test-case-name: flumotion.test.test_component_providers -*- 
   2  # vi:si:et:sw=4:sts=4:ts=4 
   3   
   4  # Flumotion - a streaming media server 
   5  # Copyright (C) 2004,2005,2006,2007,2008,2009 Fluendo, S.L. 
   6  # Copyright (C) 2010,2011 Flumotion Services, S.A. 
   7  # All rights reserved. 
   8  # 
   9  # This file may be distributed and/or modified under the terms of 
  10  # the GNU Lesser General Public License version 2.1 as published by 
  11  # the Free Software Foundation. 
  12  # This file is distributed without any warranty; without even the implied 
  13  # warranty of merchantability or fitness for a particular purpose. 
  14  # See "LICENSE.LGPL" in the source distribution for more information. 
  15  # 
  16  # Headers in this file shall remain intact. 
  17   
  18  import stat 
  19  from cStringIO import StringIO 
  20  import time 
  21   
  22  from twisted.internet import defer, reactor, abstract 
  23   
  24  from flumotion.common import log 
  25   
  26  from flumotion.component.misc.httpserver import fileprovider 
  27  from flumotion.component.misc.httpserver import ourmimetypes 
  28  from flumotion.component.misc.httpserver import cachestats 
  29  from flumotion.component.misc.httpserver.httpcached import common 
  30  from flumotion.component.misc.httpserver.httpcached import resource_manager 
  31   
  32  EXP_TABLE_CLEANUP_PERIOD = 30 
  33  MAX_RESUME_COUNT = 20 
  34   
  35  # A RemoteProducer will not be able to 
  36  # produce faster than 6.25 Mibit/s (6.55 Mbit/s) 
  37  PRODUCING_PERIOD = 0.08 
  38   
  39   
40 -class ConditionError(Exception):
41 """ 42 Raised when a request used by a caching session 43 was using a conditional retrieval and it fails. 44 """ 45
46 - def __init__(self, *args, **kwargs):
47 self.code = kwargs.pop("code", None) 48 Exception.__init__(self, *args, **kwargs)
49
50 - def __str__(self):
51 return "<%s: %s>" % (type(self).__name__, repr(self.code))
52 53
54 -class CachingStrategy(log.Loggable):
55 """ 56 Base class for all caching strategies. 57 58 Handles the cache lookup, cache expiration checks, 59 statistics gathering and caching sessions managment. 60 """ 61 62 logCategory = "base-caching" 63
64 - def __init__(self, cachemgr, reqmgr, ttl):
65 self.cachemgr = cachemgr 66 self.reqmgr = reqmgr 67 self.ttl = ttl 68 69 self._identifiers = {} # {IDENTIFIER: CachingSession} 70 self._etimes = {} # {IDENTIFIER: EXPIRATION_TIME} 71 72 self._cleanupCall = None
73
74 - def setup(self):
75 self._startCleanupLoop() 76 return self.reqmgr.setup()
77
78 - def cleanup(self):
79 self._stopCleanupLoop() 80 self.reqmgr.cleanup() 81 for session in self._identifiers.values(): 82 session.cancel() 83 return self
84
85 - def getSourceFor(self, url, stats):
86 identifier = self.cachemgr.getIdentifier(url.path) 87 session = self._identifiers.get(identifier, None) 88 if session is not None and not session.checkModified: 89 self.debug("Caching session found for '%s'", url) 90 91 if (session.getState() in 92 (CachingSession.DETACHED, CachingSession.CACHED)): 93 stats.onStarted(session.size, cachestats.CACHE_HIT) 94 elif (session.getState() in 95 (CachingSession.REQUESTING, CachingSession.BUFFERING, 96 CachingSession.CACHING)): 97 stats.onStarted(session.size, cachestats.TEMP_HIT) 98 else: 99 stats.onStarted(session.size, cachestats.CACHE_MISS) 100 101 # Wait to know session info like mtime and size 102 d = session.waitInfo() 103 d.addCallback(RemoteSource, stats) 104 return d 105 106 self.log("Looking for cached file for '%s'", url) 107 d = defer.Deferred() 108 d.addCallback(self.cachemgr.openCacheFile) 109 d.addErrback(self._cachedFileError, url) 110 d.addCallback(self._gotCachedFile, url, identifier, stats) 111 112 d.callback(url.path) 113 114 return d
115
116 - def requestData(self, url, offset=None, size=None, mtime=None):
117 requester = BlockRequester(self.reqmgr, url, mtime) 118 return requester.retrieve(offset, size)
119
120 - def getSessions(self):
121 return self._identifiers.values()
122
123 - def keepCacheAlive(self, identifier, ttl=None):
124 self._etimes[identifier] = time.time() + (ttl or self.ttl)
125 126 ### To Be Overridden ### 127
128 - def _onCacheMiss(self, url, stats):
129 raise NotImplementedError()
130
131 - def _onCacheOutdated(self, url, identifier, cachedFile, stats):
132 raise NotImplementedError()
133 134 ### Protected Methods ### 135
136 - def _startCleanupLoop(self):
137 assert self._cleanupCall is None, "Already started" 138 self._cleanupCall = reactor.callLater(EXP_TABLE_CLEANUP_PERIOD, 139 self._cleanupLoop)
140
141 - def _stopCleanupLoop(self):
142 if self._cleanupCall: 143 self._cleanupCall.cancel() 144 self._cleanupCall = None
145
146 - def _cleanupLoop(self):
147 self._cleanupCall = None 148 self._cleanupExpirationTable() 149 self._startCleanupLoop()
150
151 - def _cleanupExpirationTable(self):
152 now = time.time() 153 expired = [i for i, e in self._etimes.items() if e < now] 154 for ident in expired: 155 del self._etimes[ident]
156
157 - def _onNewSession(self, session):
158 identifier = session.identifier 159 old = self._identifiers.get(identifier, None) 160 if old is not None: 161 old.cancel() 162 self._identifiers[session.identifier] = session
163
164 - def _onSessionCanceled(self, session):
165 if self._identifiers[session.identifier] == session: 166 del self._identifiers[session.identifier]
167
168 - def _onResourceCached(self, session):
169 self.keepCacheAlive(session.identifier) 170 del self._identifiers[session.identifier]
171
172 - def _onResourceError(self, session, error):
173 del self._identifiers[session.identifier]
174
175 - def _cachedFileError(self, failure, url):
176 if failure.check(fileprovider.FileError): 177 self.debug("Error looking for cached file for '%s'", url) 178 return None 179 return failure
180
181 - def _gotCachedFile(self, cachedFile, url, identifier, stats):
182 if cachedFile is not None: 183 self.log("Opened cached file '%s'", cachedFile.name) 184 etime = self._etimes.get(identifier, None) 185 session = self._identifiers.get(identifier, None) 186 if (etime and (etime > time.time()) or 187 (session and session.checkModified)): 188 stats.onStarted(cachedFile.stat[stat.ST_SIZE], 189 cachestats.CACHE_HIT) 190 return CachedSource(identifier, url, cachedFile, stats) 191 self.debug("Cached file may have expired '%s'", cachedFile.name) 192 return self._onCacheOutdated(url, identifier, cachedFile, stats) 193 self.debug("Resource not cached '%s'", url) 194 return self._onCacheMiss(url, stats)
195 196
197 -class CachedSource(resource_manager.DataSource):
198 """ 199 Data source that read data directly from a localy cached file. 200 """ 201 202 mimetypes = ourmimetypes.MimeTypes() 203
204 - def __init__(self, ident, url, cachedFile, stats):
205 self.identifier = ident 206 self.url = url 207 self._file = cachedFile 208 self.stats = stats 209 210 self.mimeType = self.mimetypes.fromPath(url.path) 211 self.mtime = cachedFile.stat[stat.ST_MTIME] 212 self.size = cachedFile.stat[stat.ST_SIZE] 213 214 self._current = cachedFile.tell()
215
216 - def produce(self, consumer, offset):
217 # A producer for a cached file is not really convenient 218 # because it's better used pulling than pushing. 219 return None
220
221 - def read(self, offset, size):
222 if offset != self._current: 223 self._file.seek(offset) 224 data = self._file.read(size) 225 size = len(data) 226 self.stats.onBytesRead(0, size, 0) 227 self._current = offset + size 228 return data
229
230 - def close(self):
231 self.stats.onClosed() 232 self._file.close() 233 self._file = None
234 235
236 -class BaseRemoteSource(resource_manager.DataSource):
237 """ 238 Base class for resource not yet cached. 239 It offers a push producer, it delegates read operations 240 to the session and start a block pipelining if the session 241 cannot serve the requested data. 242 Updates the cache statistics. 243 """ 244 245 strategy = None 246 session = None 247 stats = None 248
249 - def produce(self, consumer, offset):
250 return RemoteProducer(consumer, self.session, offset, self.stats)
251
252 - def read(self, offset, size):
253 if offset >= self.size: 254 return "" # EOF 255 data = self.session.read(offset, size) 256 if data is not None: 257 # Adjust the cache/source values to take copy into account 258 # FIXME: ask sebastien if he is on crack or LSD 259 size = len(data) 260 diff = min(self.session._correction, size) 261 self.session._correction -= diff 262 self.stats.onBytesRead(0, size, diff) # from cache 263 return data 264 d = self.strategy.requestData(self.url, offset, size, self.mtime) 265 d.addCallback(self._requestDataCb) 266 d.addErrback(self._requestDataFailed) 267 return d
268
269 - def _requestDataFailed(self, failure):
270 if failure.check(fileprovider.FileOutOfDate): 271 self.session.cancel() 272 return failure
273
274 - def _requestDataCb(self, data):
275 self.stats.onBytesRead(len(data), 0, 0) # from remote source 276 return data
277 278
279 -class RemoteSource(BaseRemoteSource):
280 """ 281 Simple remote source. 282 """ 283
284 - def __init__(self, session, stats):
285 self.session = session 286 self.stats = stats 287 288 self.strategy = session.strategy 289 self.identifier = session.identifier 290 self.url = session.url 291 self.mimeType = session.mimeType 292 self.mtime = session.mtime 293 self.size = session.size 294 295 session.addref()
296
297 - def close(self):
298 self.stats.onClosed() 299 self.session.delref() 300 self.session = None
301 302
303 -class BaseCachingSession(object):
304 """ 305 Base class of caching sessions. 306 Just an interface to be implemented or inherited 307 by all caching sessions. 308 """ 309 310 strategy = None 311 url = None 312 size = 0 313 mtime = None 314 mimeType = None 315
316 - def read(self, offset, size):
317 return None
318
319 - def cancel(self):
320 raise NotImplementedError()
321
322 - def addref(self):
323 raise NotImplementedError()
324
325 - def delref(self):
326 raise NotImplementedError()
327 328
329 -class CachingSession(BaseCachingSession, log.Loggable):
330 """ 331 Caches a stream locally in a temporary file. 332 The already cached data can be read from the session. 333 334 Can be canceled, meaning the session is not valid anymore. 335 336 Can be aborted, meaning the session will stop caching locally 337 but is still valid. 338 339 The caching operation can be started at any moment, but the 340 session have to receive the stream info before it can be used 341 with a RemoteSource instance. 342 343 It can recover request failures up to MAX_RESUME_COUNT times. 344 """ 345 346 logCategory = "caching-session" 347 348 (PIPELINING, 349 REQUESTING, 350 BUFFERING, 351 CACHING, 352 CACHED, 353 DETACHED, 354 CLOSED, 355 CANCELED, 356 ABORTED, 357 ERROR) = range(10) 358 359 mimetypes = ourmimetypes.MimeTypes() 360
361 - def __init__(self, strategy, url, cache_stats, ifModifiedSince=None):
362 self.strategy = strategy 363 self.url = url 364 self.identifier = strategy.cachemgr.getIdentifier(url.path) 365 366 self.ifModifiedSince = ifModifiedSince 367 self.cache_stats = cache_stats 368 369 self._refcount = 0 370 self._state = self.PIPELINING 371 self._request = None 372 373 self.checkModified = False 374 375 self._infoDefers = [] 376 self._startedDefers = [] 377 self._finishedDefers = [] 378 self._errorValue = None 379 380 self._file = None 381 self._bytes = 0 382 self._correction = 0 383 384 self._resumes = MAX_RESUME_COUNT 385 386 self.logName = common.log_id(self) # To be able to track the instance 387 388 self.strategy._onNewSession(self) 389 390 self.log("Caching session created for %s", url)
391
392 - def isActive(self):
393 return (self._state < self.CLOSED) or (self._state == self.ABORTED)
394
395 - def getState(self):
396 return self._state
397
398 - def cache(self):
399 """ 400 Starts caching the remote resource locally. 401 """ 402 if self._state != self.PIPELINING: 403 return 404 405 self._state = self.REQUESTING 406 407 self.debug("Caching requested for %s", self.url) 408 self.cache_stats.onCopyStarted() 409 410 self._firstRetrieve()
411
412 - def waitInfo(self):
413 if self._state < self.BUFFERING: 414 d = defer.Deferred() 415 self._infoDefers.append(d) 416 return d 417 if self._state <= self.CLOSED: 418 return defer.succeed(self) 419 return defer.fail(self._errorValue)
420
421 - def waitStarted(self):
422 if self._state <= self.REQUESTING: 423 d = defer.Deferred() 424 self._startedDefers.append(d) 425 return d 426 if self._state <= self.CLOSED: 427 return defer.succeed(self) 428 return defer.fail(self._errorValue)
429
430 - def waitFinished(self):
431 if self._state < self.DETACHED: 432 d = defer.Deferred() 433 self._finishedDefers.append(d) 434 return d 435 if self._state <= self.CLOSED: 436 return defer.succeed(self) 437 return defer.fail(self._errorValue)
438
439 - def read(self, offset, size):
440 if self._state == self.CANCELED: 441 raise fileprovider.FileOutOfDate("File out of date") 442 if self._state == self.ABORTED: 443 return None 444 if self._state >= self.CLOSED: 445 raise fileprovider.FileClosedError("Session Closed") 446 447 if self._file is None: 448 return None 449 450 if min(self.size, offset + size) > self._bytes: 451 return None 452 453 self._file.seek(offset) 454 return self._file.read(size)
455
456 - def cancel(self):
457 """ 458 After calling this method the session cannot be used anymore. 459 """ 460 if self._state < self.REQUESTING or self._state >= self.CACHED: 461 return 462 463 self.log("Canceling caching session for %s", self.url) 464 465 self.strategy._onSessionCanceled(self) 466 self.cache_stats.onCopyCancelled(self.size, self._bytes) 467 468 self._close() 469 470 error = fileprovider.FileOutOfDate("File out of date") 471 self._fireError(error) 472 473 if self._request: 474 self.debug("Caching canceled for %s (%d/%d Bytes ~ %d %%)", 475 self.url, self._bytes, self.size, 476 self.size and int(self._bytes * 100 / self.size)) 477 self._request.cancel() 478 self._request = None 479 else: 480 self.debug("Caching canceled before starting to cache") 481 482 self._state = self.CANCELED
483
484 - def abort(self):
485 """ 486 After calling this method the session will just stop caching 487 and return None when trying to read. Used when pipelining is wanted. 488 """ 489 if self._state < self.REQUESTING or self._state >= self.CACHED: 490 return 491 492 self.log("Aborting caching session for %s", self.url) 493 494 self.strategy._onSessionCanceled(self) 495 self.cache_stats.onCopyCancelled(self.size, self._bytes) 496 497 self._close() 498 499 error = fileprovider.FileError("Caching aborted") 500 self._fireError(error) 501 502 if self._request: 503 self.debug("Caching aborted for %s", self.url) 504 self._request.cancel() 505 self._request = None 506 else: 507 self.debug("Caching aborted before starting to cache") 508 509 self._state = self.ABORTED
510
511 - def addref(self):
512 self._refcount += 1
513
514 - def delref(self):
515 self._refcount -= 1 516 if self._refcount == 0: 517 if self._state == self.DETACHED: 518 # not referenced, so no we can close the file 519 self.log("Detached session not referenced anymore") 520 self._close()
521
522 - def isref(self):
523 return self._refcount > 0
524 525 ### StreamConsumer ### 526
527 - def serverError(self, getter, code, message):
528 self.warning("Session request error %s (%s) for %s using %s:%s", 529 message, code, self.url, getter.host, getter.port) 530 if code in (common.SERVER_DISCONNECTED, common.SERVER_TIMEOUT): 531 if self._resumes > 0: 532 self._resumes -= 1 533 if self._state > self.REQUESTING: 534 # We already have request info 535 offset = self._bytes 536 size = self.size - self._bytes 537 self.debug("Resuming retrieval from offset %d with " 538 "size %d of %s (%d tries left)", offset, size, 539 self.url, self._resumes) 540 541 self._resumeRetrieve(offset, size) 542 return 543 else: 544 # We don't have any info, e must retry from scratch 545 self.debug("Resuming retrieval from start of %s " 546 "(%d tries left)", self.url, self._resumes) 547 self._firstRetrieve() 548 return 549 self.debug("Too much resuming intents, stopping " 550 "after %d of %s bytes of %s", 551 self._bytes, self.size, self.url) 552 self._close() 553 self._error(fileprovider.UnavailableError(message))
554
555 - def conditionFail(self, getter, code, message):
556 if code == common.STREAM_MODIFIED: 557 # Modified file detected during recovery 558 self.log("Modifications detected during recovery of %s", self.url) 559 self.cancel() 560 return 561 self.log("Unexpected HTTP condition failed: %s", message) 562 self._close() 563 self._error(ConditionError(message, code=code))
564
565 - def streamNotAvailable(self, getter, code, message):
566 self.log("Stream to be cached is not available: %s", message) 567 self._close() 568 if code == common.STREAM_NOTFOUND: 569 self._error(fileprovider.NotFoundError(message)) 570 elif code == common.STREAM_FORBIDDEN: 571 self._error(fileprovider.AccessError(message)) 572 else: 573 self._error(fileprovider.FileError(message))
574
575 - def onInfo(self, getter, info):
576 if self._state == self.BUFFERING: 577 # We are resuming while waiting for a temporary file, 578 # so we still don't want to accumulate data 579 self._request.pause() 580 return 581 582 if self._state != self.REQUESTING: 583 # Already canceled, or recovering from disconnection 584 return 585 586 if info.size != (info.length - self._bytes): 587 self.log("Unexpected stream size: %s / %s bytes " 588 "(Already got %s bytes)", 589 info.size, info.length, self._bytes) 590 self._close() 591 msg = "Unexpected resource size: %d" % info.size 592 self._error(fileprovider.FileError(msg)) 593 return 594 595 self._state = self.BUFFERING 596 597 self.mimeType = self.mimetypes.fromPath(self.url.path) 598 self.mtime = info.mtime 599 self.size = info.size 600 601 self.log("Caching session with type %s, size %s, mtime %s for %s", 602 self.mimeType, self.size, self.mtime, self.url) 603 604 self._file = StringIO() # To wait until we got the real one 605 606 self.log("Requesting temporary file for %s", self.url) 607 d = self.strategy.cachemgr.newTempFile(self.url.path, info.size, 608 info.mtime) 609 610 # But we don't want to accumulate data 611 # but it is possible to receive a small amount of data 612 # even after calling pause(), so we need buffering. 613 self._request.pause() 614 615 # We have got meta data, so callback 616 self._fireInfo(self) 617 self._fireStarted(self) 618 619 self.debug("Start buffering %s", self.url) 620 d.addCallback(self._gotTempFile)
621
622 - def _gotTempFile(self, tempFile):
623 if self._state not in (self.BUFFERING, self.CACHED): 624 # Already canceled 625 if tempFile: 626 tempFile.close() 627 return 628 629 if tempFile is None: 630 self.warning("Temporary file creation failed, " 631 "aborting caching of %s", self.url) 632 self.abort() 633 return 634 635 self.log("Got temporary file for %s", self.url) 636 637 self.debug("Start caching %s", self.url) 638 639 data = self._file.getvalue() 640 self._file = tempFile 641 tempFile.write(data) 642 643 if self._request is not None: 644 # We still have a request, so we want more data of it 645 self._request.resume() 646 647 if self._state == self.CACHED: 648 # Already got all the data 649 self._real_complete() 650 else: 651 self._state = self.CACHING
652
653 - def onData(self, getter, data):
654 assert self._state in (self.BUFFERING, self.CACHING), "Not caching" 655 self._file.seek(self._bytes) 656 size = len(data) 657 try: 658 self._file.write(data) 659 except Exception, e: 660 self.warning("Error writing in temporary file: %s", e) 661 self.debug("Got %s / %s bytes, would be %s with %s more", 662 self._bytes, self.size, self._bytes + size, size) 663 self.abort() 664 else: 665 self._bytes += size 666 self._correction += size
667
668 - def streamDone(self, getter):
669 assert self._state in (self.BUFFERING, self.CACHING), "Not caching" 670 self._request = None 671 self._complete()
672
673 - def _error(self, error):
674 assert self._state < self.CANCELED, "Wrong state for errors" 675 676 self.log("Caching error for %s: %s", self.url, error) 677 678 self._state = self.ERROR 679 680 self.strategy._onResourceError(self, error) 681 self.strategy = None 682 self._request = None 683 684 self._fireError(error)
685
686 - def _fireInfo(self, value):
687 defers = list(self._infoDefers) 688 # Prevent multiple deferred firing due to reentrence 689 self._infoDefers = [] 690 for d in defers: 691 d.callback(value)
692
693 - def _fireStarted(self, value):
694 defers = list(self._startedDefers) 695 # Prevent multiple deferred firing due to reentrence 696 self._startedDefers = [] 697 for d in defers: 698 d.callback(value)
699
700 - def _fireFinished(self, value):
701 defers = list(self._finishedDefers) 702 # Prevent multiple deferred firing due to reentrence 703 self._finishedDefers = [] 704 for d in defers: 705 d.callback(value)
706
707 - def _fireError(self, error):
708 self._errorValue = error 709 defers = list(self._infoDefers) 710 defers.extend(self._startedDefers) 711 defers.extend(self._finishedDefers) 712 # Prevent multiple deferred firing due to reentrence 713 self._infoDefers = [] 714 self._startedDefers = [] 715 self._finishedDefers = [] 716 for d in defers: 717 d.errback(error)
718
719 - def _close(self):
720 if self._state >= self.CLOSED: 721 return 722 723 self.log("Closing caching session for %s", self.url) 724 725 if self._state >= self.BUFFERING: 726 self._file.close() 727 self._file = None 728 729 self._state = self.CLOSED
730
731 - def _complete(self):
732 assert self._state in (self.CACHING, self.BUFFERING), "Not caching" 733 self.debug("Finished caching %s (%d Bytes)", self.url, self.size) 734 735 oldstate = self._state 736 self._state = self.CACHED 737 738 if oldstate != self.BUFFERING: 739 self._real_complete()
740
741 - def _real_complete(self):
742 assert self._state == self.CACHED, "Not cached" 743 self._state = self.DETACHED 744 self.log("Caching session detached for %s", self.url) 745 746 self._file.complete() 747 748 self.strategy._onResourceCached(self) 749 self.strategy = None 750 751 if not self.isref(): 752 # Not referenced anymore by sources, so close the session 753 self.log("Caching session not referenced, it can be closed") 754 self._close() 755 756 self.cache_stats.onCopyFinished(self.size) 757 self._fireFinished(self)
758
759 - def _firstRetrieve(self):
760 since = self.ifModifiedSince 761 self._request = self.strategy.reqmgr.retrieve(self, self.url, 762 ifModifiedSince=since) 763 self.log("Retrieving data using %s", self._request.logName)
764
765 - def _resumeRetrieve(self, offset, size):
766 reqmgr = self.strategy.reqmgr 767 req = reqmgr.retrieve(self, self.url, 768 ifUnmodifiedSince=self.mtime, 769 start=offset, size=size) 770 self._request = req 771 self.log("Retrieving data using %s", self._request.logName)
772 773
774 -class RemoteProducer(common.StreamConsumer, log.Loggable):
775 """ 776 Offers a IPushProducer interface to a caching session. 777 It starts producing data from the specified point. 778 779 If the data is already cached by the session, 780 it produce data with a reactor loop reading the data 781 from the session by block. 782 783 If the data is not yet cached, it starts a request 784 using the request manager and pipeline the data 785 to the specified consumer. 786 787 It can recover request failures up to MAX_RESUME_COUNT times. 788 789 It's not used yet in the context of http-server. 790 Until now, the simulations show that using a producer with 791 long-lived HTTP requests instead of short lived block request 792 is less efficient and produce bigger latency for the clients. 793 At least when used with HTTP proxies. 794 """ 795 796 logCategory = "pipe-producer" 797
798 - def __init__(self, consumer, session, offset, stats):
799 self.consumer = consumer 800 self.offset = offset 801 self.session = session 802 self.stats = stats 803 self.reqmgr = session.strategy.reqmgr 804 805 self.logName = common.log_id(self) # To be able to track the instance 806 807 self._pipelining = False 808 self._paused = False 809 self._request = None 810 self._produced = 0 811 self._resumes = MAX_RESUME_COUNT 812 self._call = None 813 814 session.addref() 815 816 self.log("Starting producing data with session %s from %s", 817 self.session.logName, self.session.url) 818 819 consumer.registerProducer(self, True) # Push producer 820 self._produce()
821 822 ### IPushProducer Methods ### 823
824 - def resumeProducing(self):
825 if self.consumer is None: 826 # Already stopped 827 return 828 829 self._paused = False 830 831 if self._pipelining: 832 # Doing pipelining 833 if self._request: 834 # Just resuming current request 835 self._request.resume() 836 else: 837 # Start a new one 838 self._pipeline() 839 else: 840 # Producing from session 841 self._produce()
842
843 - def pauseProducing(self):
844 if self.consumer is None: 845 # Already stopped 846 return 847 848 self._paused = True 849 850 if self._pipelining: 851 # Doing pipelining 852 if self._request: 853 self._request.pause() 854 else: 855 # Producing from session 856 self._stop()
857
858 - def stopProducing(self):
859 self.log("Ask to stop producing %s", self.session.url) 860 self._terminate()
861 862 ### common.StreamConsumer Methods ### 863
864 - def serverError(self, getter, code, message):
865 if self._request is None: 866 # Already terminated 867 return 868 self._request = None 869 870 if code in (common.SERVER_DISCONNECTED, common.SERVER_TIMEOUT): 871 self.warning("Producer request error %s (%s) for %s " 872 "(%s tries left)", message, code, 873 self.session.url, self._resumes) 874 875 if self._resumes > 0: 876 self._resumes -= 1 877 if self._paused: 878 self.log("Producer paused, waiting to recover pipelining " 879 "(%d tries left)", self._resumes) 880 else: 881 self.log("Recovering pipelining (%d tries left)", 882 self._resumes) 883 self._pipeline() 884 return 885 886 self.debug("Too much resuming intents, stopping " 887 "after %d of %s", self._bytes, self.size) 888 889 self._terminate()
890
891 - def conditionFail(self, getter, code, message):
892 if self._request is None: 893 # Already terminated 894 return 895 self._request = None 896 self.warning("Modifications detected while producing %s", 897 self.session.url) 898 self._terminate()
899
900 - def streamNotAvailable(self, getter, code, message):
901 if self._request is None: 902 # Already terminated 903 return 904 self._request = None 905 self.warning("%s detected while producing %s", 906 message, self.session.url) 907 self._terminate()
908
909 - def onData(self, getter, data):
910 if self._request is None: 911 # Already terminated 912 return 913 self._write(data)
914
915 - def streamDone(self, getter):
916 if self._request is None: 917 # Already terminated 918 return 919 self.log("Pipelining finished") 920 self._terminate()
921 922 ### Private Methods ### 923
924 - def _produce(self):
925 self._call = None 926 if self.consumer is None: 927 # Already terminated 928 return 929 930 data = self.session.read(self.offset + self._produced, 931 abstract.FileDescriptor.bufferSize) 932 933 if data is None: 934 # The session can't serve the data, start pipelining 935 self._pipeline() 936 return 937 938 if data == "": 939 # No more data 940 self.log("All data served from session") 941 self._terminate() 942 return 943 944 self._write(data) 945 946 self._call = reactor.callLater(PRODUCING_PERIOD, self._produce)
947
948 - def _write(self, data):
949 size = len(data) 950 self._produced += size 951 self.consumer.write(data)
952
953 - def _stop(self):
954 if self._call is not None: 955 self._call.cancel() 956 self._call = None
957
958 - def _pipeline(self):
959 if not self.session.isActive(): 960 self.log("Session %s not active anymore (%s), " 961 "aborting production of %s", 962 self.session.logName, 963 self.session._state, 964 self.session.url) 965 self._terminate() 966 return 967 968 self._pipelining = True 969 970 offset = self.offset + self._produced 971 size = self.session.size - offset 972 mtime = self.session.mtime 973 974 if size == 0: 975 self.log("No more data to be retrieved, pipelining finished") 976 self._terminate() 977 return 978 979 self.debug("Producing %s bytes from offset %d of %s", 980 size, offset, self.session.url) 981 982 self._request = self.reqmgr.retrieve(self, self.session.url, 983 start=offset, size=size, 984 ifUnmodifiedSince=mtime) 985 self.log("Retrieving data using %s", self._request.logName)
986
987 - def _terminate(self):
988 if self._request: 989 # Doing pipelining 990 self._request.cancel() 991 self._request = None 992 993 self._stop() # Stopping producing from session 994 995 expected = self.session.size - self.offset 996 if self._produced != expected: 997 self.warning("Only produced %s of the %s bytes " 998 "starting at %s of %s", 999 self._produced, expected, 1000 self.offset, self.session.url) 1001 else: 1002 self.log("Finished producing %s bytes starting at %s of %s", 1003 self._produced, self.offset, self.session.url) 1004 1005 self.consumer.unregisterProducer() 1006 self.consumer.finish() 1007 self.consumer = None 1008 1009 self.session.delref() 1010 self.session = None
1011 1012
1013 -class BlockRequester(common.StreamConsumer, log.Loggable):
1014 """ 1015 Retrieves a block of data using a range request. 1016 A modification time can be specified for the retrieval to 1017 fail if the requested file modification time changed. 1018 1019 The data is returned as a block by triggering the deferred 1020 returned by calling the retrieve method. 1021 1022 It can recover request failures up to MAX_RESUME_COUNT times. 1023 """ 1024 1025 logCategory = "block-requester" 1026
1027 - def __init__(self, reqmgr, url, mtime=None):
1028 self.reqmgr = reqmgr 1029 self._url = url 1030 self._mtime = mtime 1031 self._data = None 1032 self._deferred = None 1033 self._offset = None 1034 self._size = None 1035 self._resumes = MAX_RESUME_COUNT 1036 1037 self.logName = common.log_id(self) # To be able to track the instance
1038
1039 - def retrieve(self, offset, size):
1040 assert self._deferred is None, "Already retrieving" 1041 self._deferred = defer.Deferred() 1042 self._data = [] 1043 self._offset = offset 1044 self._size = size 1045 self._curr = 0 1046 1047 self._retrieve() 1048 1049 return self._deferred
1050
1051 - def serverError(self, getter, code, message):
1052 assert self._deferred is not None, "Not retrieving anything" 1053 if code == common.RANGE_NOT_SATISFIABLE: 1054 # Simulate EOF 1055 self._deferred.callback("") 1056 self._cleanup() 1057 return 1058 if code in (common.SERVER_DISCONNECTED, common.SERVER_TIMEOUT): 1059 self.warning("Block request error: %s (%s)", message, code) 1060 if self._resumes > 0: 1061 self._resumes -= 1 1062 self.debug("Resuming block retrieval from offset %d " 1063 "with size %d (%d tries left)", 1064 self._offset, self._size, self._resumes) 1065 1066 self._retrieve() 1067 return 1068 self.debug("Too much resuming intents, stopping " 1069 "after %d of %d", self._offset, self._size) 1070 self._deferred.errback(fileprovider.FileError(message)) 1071 self._cleanup()
1072
1073 - def conditionFail(self, getter, code, message):
1074 assert self._deferred is not None, "Not retrieving anything" 1075 self._deferred.errback(fileprovider.FileOutOfDate(message)) 1076 self._cleanup()
1077
1078 - def streamNotAvailable(self, getter, code, message):
1079 assert self._deferred is not None, "Not retrieving anything" 1080 error = fileprovider.FileOutOfDate(message) 1081 self._deferred.errback(error) 1082 self._cleanup()
1083
1084 - def onData(self, getter, data):
1085 size = len(data) 1086 self._offset += size 1087 self._size -= size 1088 self._data.append(data)
1089
1090 - def streamDone(self, getter):
1091 data = "".join(self._data) 1092 self._deferred.callback(data) 1093 self._cleanup()
1094
1095 - def _retrieve(self):
1096 self.reqmgr.retrieve(self, self._url, start=self._offset, 1097 size=self._size, ifUnmodifiedSince=self._mtime)
1098
1099 - def _cleanup(self):
1100 self._deferred = None 1101 self._data = None
1102