001// License: GPL. For details, see LICENSE file. 002package org.openstreetmap.josm.data.cache; 003 004import java.io.IOException; 005import java.net.URL; 006import java.util.Iterator; 007import java.util.Map; 008import java.util.concurrent.ConcurrentHashMap; 009import java.util.concurrent.LinkedBlockingDeque; 010import java.util.concurrent.Semaphore; 011import java.util.concurrent.TimeUnit; 012 013import org.openstreetmap.josm.Main; 014 015/** 016 * @author Wiktor Niesiobędzki 017 * 018 * Queue for ThreadPoolExecutor that implements per-host limit. It will acquire a semaphore for each task 019 * and it will set a runnable task with semaphore release, when job has finished. 020 * 021 * This implementation doesn't guarantee to have at most hostLimit connections per host[1], and it doesn't 022 * guarantee that all threads will be busy, when there is work for them[2]. 023 * 024 * [1] More connection per host may happen, when ThreadPoolExecutor is growing its pool, and thus 025 * tasks do not go through the Queue 026 * [2] If we have a queue, and for all hosts in queue we will fail to acquire semaphore, the thread 027 * take the first available job and wait for semaphore. It might be the case, that semaphore was released 028 * for some task further in queue, but this implementation doesn't try to detect such situation 029 * 030 * 031 */ 032public class HostLimitQueue extends LinkedBlockingDeque<Runnable> { 033 private static final long serialVersionUID = 1L; 034 035 private final Map<String, Semaphore> hostSemaphores = new ConcurrentHashMap<>(); 036 private final int hostLimit; 037 038 /** 039 * Creates an unbounded queue 040 * @param hostLimit how many parallel calls to host to allow 041 */ 042 public HostLimitQueue(int hostLimit) { 043 super(); // create unbounded queue 044 this.hostLimit = hostLimit; 045 } 046 047 private JCSCachedTileLoaderJob<?, ?> findJob() { 048 for (Iterator<Runnable> it = iterator(); it.hasNext();) { 049 Runnable r = it.next(); 050 if (r instanceof JCSCachedTileLoaderJob) { 051 JCSCachedTileLoaderJob<?, ?> job = (JCSCachedTileLoaderJob<?, ?>) r; 052 if (tryAcquireSemaphore(job)) { 053 if (remove(job)) { 054 return job; 055 } else { 056 // we have acquired the semaphore, but we didn't manage to remove job, as someone else did 057 // release the semaphore and look for another candidate 058 releaseSemaphore(job); 059 } 060 } else { 061 URL url = null; 062 try { 063 url = job.getUrl(); 064 } catch (IOException e) { 065 Main.debug(e); 066 } 067 Main.debug("TMS - Skipping job {0} because host limit reached", url); 068 } 069 } 070 } 071 return null; 072 } 073 074 @Override 075 public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { 076 Runnable job = findJob(); 077 if (job != null) { 078 return job; 079 } 080 job = pollFirst(timeout, unit); 081 if (job != null) { 082 acquireSemaphore(job); 083 } 084 return job; 085 } 086 087 @Override 088 public Runnable take() throws InterruptedException { 089 Runnable job = findJob(); 090 if (job != null) { 091 return job; 092 } 093 job = takeFirst(); 094 if (job != null) { 095 acquireSemaphore(job); 096 } 097 return job; 098 } 099 100 private Semaphore getSemaphore(JCSCachedTileLoaderJob<?, ?> job) { 101 String host; 102 try { 103 host = job.getUrl().getHost(); 104 } catch (IOException e) { 105 // do not pass me illegal URL's 106 throw new IllegalArgumentException(e); 107 } 108 Semaphore limit = hostSemaphores.get(host); 109 if (limit == null) { 110 synchronized (hostSemaphores) { 111 limit = hostSemaphores.get(host); 112 if (limit == null) { 113 limit = new Semaphore(hostLimit); 114 hostSemaphores.put(host, limit); 115 } 116 } 117 } 118 return limit; 119 } 120 121 private void acquireSemaphore(Runnable job) throws InterruptedException { 122 if (job instanceof JCSCachedTileLoaderJob) { 123 final JCSCachedTileLoaderJob<?, ?> jcsJob = (JCSCachedTileLoaderJob<?, ?>) job; 124 Semaphore limit = getSemaphore(jcsJob); 125 if (limit != null) { 126 limit.acquire(); 127 jcsJob.setFinishedTask(new Runnable() { 128 @Override 129 public void run() { 130 releaseSemaphore(jcsJob); 131 } 132 }); 133 } 134 } 135 } 136 137 private boolean tryAcquireSemaphore(final JCSCachedTileLoaderJob<?, ?> job) { 138 boolean ret = true; 139 Semaphore limit = getSemaphore(job); 140 if (limit != null) { 141 ret = limit.tryAcquire(); 142 if (ret) { 143 job.setFinishedTask(new Runnable() { 144 @Override 145 public void run() { 146 releaseSemaphore(job); 147 } 148 }); 149 } 150 } 151 return ret; 152 } 153 154 private void releaseSemaphore(JCSCachedTileLoaderJob<?, ?> job) { 155 Semaphore limit = getSemaphore(job); 156 if (limit != null) { 157 limit.release(); 158 if (limit.availablePermits() > hostLimit) { 159 Main.warn("More permits than it should be"); 160 } 161 } 162 } 163}