1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.client;
17
18 import static com.google.common.base.Preconditions.checkState;
19 import static com.google.common.math.LongMath.saturatedAdd;
20 import static java.util.Objects.requireNonNull;
21
22 import java.util.AbstractMap.SimpleImmutableEntry;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Random;
26 import java.util.concurrent.CancellationException;
27 import java.util.concurrent.CompletableFuture;
28 import java.util.concurrent.CompletionException;
29 import java.util.concurrent.CopyOnWriteArrayList;
30 import java.util.concurrent.Executor;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.ScheduledFuture;
33 import java.util.concurrent.ThreadLocalRandom;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.atomic.AtomicReference;
36 import java.util.function.BiConsumer;
37
38 import javax.annotation.Nullable;
39
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import com.google.common.base.MoreObjects;
44
45 import com.linecorp.centraldogma.common.CentralDogmaException;
46 import com.linecorp.centraldogma.common.EntryNotFoundException;
47 import com.linecorp.centraldogma.common.PathPattern;
48 import com.linecorp.centraldogma.common.Query;
49 import com.linecorp.centraldogma.common.RepositoryNotFoundException;
50 import com.linecorp.centraldogma.common.Revision;
51
52 abstract class AbstractWatcher<T> implements Watcher<T> {
53
54 private static final Logger logger = LoggerFactory.getLogger(AbstractWatcher.class);
55
56 private enum State {
57 INIT,
58 STARTED,
59 STOPPED
60 }
61
62 private final ScheduledExecutorService watchScheduler;
63 private final String projectName;
64 private final String repositoryName;
65 private final String pathPattern;
66 private final boolean errorOnEntryNotFound;
67 private final long delayOnSuccessMillis;
68 private final long initialDelayMillis;
69 private final long maxDelayMillis;
70 private final double multiplier;
71 private final double jitterRate;
72
73 private final List<Map.Entry<BiConsumer<? super Revision, ? super T>, Executor>> updateListeners =
74 new CopyOnWriteArrayList<>();
75 private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
76 private final CompletableFuture<Latest<T>> initialValueFuture = new CompletableFuture<>();
77
78 @Nullable
79 private volatile Latest<T> latest;
80 @Nullable
81 private volatile ScheduledFuture<?> currentScheduleFuture;
82 @Nullable
83 private volatile CompletableFuture<?> currentWatchFuture;
84
85 AbstractWatcher(ScheduledExecutorService watchScheduler, String projectName, String repositoryName,
86 String pathPattern, boolean errorOnEntryNotFound, long delayOnSuccessMillis,
87 long initialDelayMillis, long maxDelayMillis, double multiplier, double jitterRate) {
88 this.watchScheduler = watchScheduler;
89 this.projectName = projectName;
90 this.repositoryName = repositoryName;
91 this.pathPattern = pathPattern;
92 this.errorOnEntryNotFound = errorOnEntryNotFound;
93 this.delayOnSuccessMillis = delayOnSuccessMillis;
94 this.initialDelayMillis = initialDelayMillis;
95 this.maxDelayMillis = maxDelayMillis;
96 this.multiplier = multiplier;
97 this.jitterRate = jitterRate;
98 }
99
100 @Override
101 public ScheduledExecutorService watchScheduler() {
102 return watchScheduler;
103 }
104
105 @Override
106 public CompletableFuture<Latest<T>> initialValueFuture() {
107 return initialValueFuture;
108 }
109
110 @Override
111 public Latest<T> latest() {
112 final Latest<T> latest = this.latest;
113 if (latest == null) {
114 throw new IllegalStateException("value not available yet");
115 }
116 return latest;
117 }
118
119
120
121
122
123 void start() {
124 if (state.compareAndSet(State.INIT, State.STARTED)) {
125 scheduleWatch(0);
126 }
127 }
128
129 @Override
130 public void close() {
131 state.set(State.STOPPED);
132 if (!initialValueFuture.isDone()) {
133 initialValueFuture.cancel(false);
134 }
135
136
137 final ScheduledFuture<?> currentScheduleFuture = this.currentScheduleFuture;
138 if (currentScheduleFuture != null && !currentScheduleFuture.isDone()) {
139 currentScheduleFuture.cancel(false);
140 }
141 final CompletableFuture<?> currentWatchFuture = this.currentWatchFuture;
142 if (currentWatchFuture != null && !currentWatchFuture.isDone()) {
143 currentWatchFuture.cancel(false);
144 }
145 }
146
147 private boolean isStopped() {
148 return state.get() == State.STOPPED;
149 }
150
151 @Override
152 public void watch(BiConsumer<? super Revision, ? super T> listener) {
153 watch(listener, watchScheduler);
154 }
155
156 @Override
157 public void watch(BiConsumer<? super Revision, ? super T> listener, Executor executor) {
158 requireNonNull(listener, "listener");
159 checkState(!isStopped(), "watcher closed");
160 updateListeners.add(new SimpleImmutableEntry<>(listener, executor));
161
162 final Latest<T> latest = this.latest;
163 if (latest != null) {
164
165
166
167
168
169
170
171 executor.execute(() -> listener.accept(latest.revision(), latest.value()));
172 }
173 }
174
175 private void scheduleWatch(int numAttemptsSoFar) {
176 if (isStopped()) {
177 return;
178 }
179
180 final long delay;
181 if (numAttemptsSoFar == 0) {
182 delay = latest != null ? delayOnSuccessMillis : 0;
183 } else {
184 delay = nextDelayMillis(numAttemptsSoFar);
185 }
186
187 currentScheduleFuture = watchScheduler.schedule(() -> {
188 currentScheduleFuture = null;
189 doWatch(numAttemptsSoFar);
190 }, delay, TimeUnit.MILLISECONDS);
191 }
192
193 private long nextDelayMillis(int numAttemptsSoFar) {
194 final long nextDelayMillis;
195 if (numAttemptsSoFar == 1) {
196 nextDelayMillis = initialDelayMillis;
197 } else {
198 nextDelayMillis =
199 Math.min(saturatedMultiply(initialDelayMillis, Math.pow(multiplier, numAttemptsSoFar - 1)),
200 maxDelayMillis);
201 }
202
203 final long minJitter = (long) (nextDelayMillis * (1 - jitterRate));
204 final long maxJitter = (long) (nextDelayMillis * (1 + jitterRate));
205 final long bound = maxJitter - minJitter + 1;
206 final long millis = random(bound);
207 return Math.max(0, saturatedAdd(minJitter, millis));
208 }
209
210 private static long saturatedMultiply(long left, double right) {
211 final double result = left * right;
212 return result >= Long.MAX_VALUE ? Long.MAX_VALUE : (long) result;
213 }
214
215 private static long random(long bound) {
216 assert bound > 0;
217 final long mask = bound - 1;
218 final Random random = ThreadLocalRandom.current();
219 long result = random.nextLong();
220
221 if ((bound & mask) == 0L) {
222
223 result &= mask;
224 } else {
225 for (long u = result >>> 1; u + mask - (result = u % bound) < 0L; u = random.nextLong() >>> 1) {
226 continue;
227 }
228 }
229
230 return result;
231 }
232
233 private void doWatch(int numAttemptsSoFar) {
234 if (isStopped()) {
235 return;
236 }
237
238 final Latest<T> latest = this.latest;
239 final Revision lastKnownRevision = latest != null ? latest.revision() : Revision.INIT;
240 final CompletableFuture<Latest<T>> f = doWatch(lastKnownRevision);
241
242 currentWatchFuture = f;
243 f.thenAccept(newLatest -> {
244 currentWatchFuture = null;
245 if (newLatest != null) {
246 this.latest = newLatest;
247 logger.debug("watcher noticed updated file {}/{}{}: rev={}",
248 projectName, repositoryName, pathPattern, newLatest.revision());
249 notifyListeners(newLatest);
250 if (!initialValueFuture.isDone()) {
251 initialValueFuture.complete(newLatest);
252 }
253 }
254
255
256 scheduleWatch(0);
257 })
258 .exceptionally(thrown -> {
259 currentWatchFuture = null;
260 try {
261 final Throwable cause = thrown instanceof CompletionException ? thrown.getCause() : thrown;
262 boolean logged = false;
263 if (cause instanceof CentralDogmaException) {
264 if (cause instanceof EntryNotFoundException) {
265 if (!initialValueFuture.isDone() && errorOnEntryNotFound) {
266 initialValueFuture.completeExceptionally(thrown);
267 close();
268 return null;
269 }
270 logger.info("{}/{}{} does not exist yet; trying again",
271 projectName, repositoryName, pathPattern);
272 logged = true;
273 } else if (cause instanceof RepositoryNotFoundException) {
274 logger.info("{}/{} does not exist yet; trying again",
275 projectName, repositoryName);
276 logged = true;
277 }
278 }
279
280 if (cause instanceof CancellationException) {
281
282 return null;
283 }
284
285 if (!logged) {
286 logger.warn("Failed to watch a file ({}/{}{}) at Central Dogma; trying again",
287 projectName, repositoryName, pathPattern, cause);
288 }
289
290 scheduleWatch(numAttemptsSoFar + 1);
291 } catch (Throwable t) {
292 logger.error("Unexpected exception while watching a file at Central Dogma:", t);
293 }
294 return null;
295 });
296 }
297
298 abstract CompletableFuture<Latest<T>> doWatch(Revision lastKnownRevision);
299
300 private void notifyListeners(Latest<T> latest) {
301 if (isStopped()) {
302
303 return;
304 }
305
306 for (Map.Entry<BiConsumer<? super Revision, ? super T>, Executor> entry : updateListeners) {
307 final BiConsumer<? super Revision, ? super T> listener = entry.getKey();
308 final Executor executor = entry.getValue();
309 executor.execute(() -> {
310 try {
311 listener.accept(latest.revision(), latest.value());
312 } catch (Exception e) {
313 logger.warn("Exception thrown for watcher ({}/{}{}): rev={}",
314 projectName, repositoryName, pathPattern, latest.revision(), e);
315 }
316 });
317 }
318 }
319
320 @Override
321 public String toString() {
322 return MoreObjects.toStringHelper(this).omitNullValues()
323 .add("watchScheduler", watchScheduler)
324 .add("projectName", projectName)
325 .add("repositoryName", repositoryName)
326 .add("pathPattern", pathPattern)
327 .add("errorOnEntryNotFound", errorOnEntryNotFound)
328 .add("delayOnSuccessMillis", delayOnSuccessMillis)
329 .add("initialDelayMillis", initialDelayMillis)
330 .add("maxDelayMillis", maxDelayMillis)
331 .add("multiplier", multiplier)
332 .add("jitterRate", jitterRate)
333 .add("latest", latest)
334 .toString();
335 }
336 }