1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.client;
17
18 import static com.google.common.base.Preconditions.checkState;
19 import static com.google.common.math.LongMath.saturatedAdd;
20 import static java.util.Objects.requireNonNull;
21
22 import java.time.Instant;
23 import java.util.AbstractMap.SimpleImmutableEntry;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Random;
27 import java.util.concurrent.CancellationException;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.concurrent.CompletionException;
30 import java.util.concurrent.CopyOnWriteArrayList;
31 import java.util.concurrent.Executor;
32 import java.util.concurrent.ScheduledExecutorService;
33 import java.util.concurrent.ScheduledFuture;
34 import java.util.concurrent.ThreadLocalRandom;
35 import java.util.concurrent.TimeUnit;
36 import java.util.concurrent.atomic.AtomicLong;
37 import java.util.concurrent.atomic.AtomicReference;
38 import java.util.function.BiConsumer;
39
40 import javax.annotation.Nullable;
41
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import com.google.common.base.MoreObjects;
46
47 import com.linecorp.centraldogma.common.CentralDogmaException;
48 import com.linecorp.centraldogma.common.EntryNotFoundException;
49 import com.linecorp.centraldogma.common.PathPattern;
50 import com.linecorp.centraldogma.common.Query;
51 import com.linecorp.centraldogma.common.RepositoryNotFoundException;
52 import com.linecorp.centraldogma.common.Revision;
53
54 import io.micrometer.core.instrument.Meter.Id;
55 import io.micrometer.core.instrument.Meter.Type;
56 import io.micrometer.core.instrument.MeterRegistry;
57 import io.micrometer.core.instrument.Tags;
58 import io.micrometer.core.instrument.TimeGauge;
59
60 abstract class AbstractWatcher<T> implements Watcher<T> {
61
62 private static final Logger logger = LoggerFactory.getLogger(AbstractWatcher.class);
63 private static final String LATEST_REVISION_METER_NAME = "centraldogma.client.watcher.latest.revision";
64 private static final String LATEST_RECEIVED_TIME_METER_NAME =
65 "centraldogma.client.watcher.latest.received.time";
66 private static final AtomicLong WATCHER_ID = new AtomicLong();
67
68 private enum State {
69 INIT,
70 STARTED,
71 STOPPED
72 }
73
74 private final ScheduledExecutorService watchScheduler;
75 private final String projectName;
76 private final String repositoryName;
77 private final String pathPattern;
78 private final boolean errorOnEntryNotFound;
79 private final long delayOnSuccessMillis;
80 private final long initialDelayMillis;
81 private final long maxDelayMillis;
82 private final double multiplier;
83 private final double jitterRate;
84 @Nullable
85 private final MeterRegistry meterRegistry;
86 private final Tags tags;
87
88 private final List<Map.Entry<BiConsumer<? super Revision, ? super T>, Executor>> updateListeners =
89 new CopyOnWriteArrayList<>();
90 private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
91 private final CompletableFuture<Latest<T>> initialValueFuture = new CompletableFuture<>();
92
93 @Nullable
94 private volatile Latest<T> latest;
95 @Nullable
96 private volatile ScheduledFuture<?> currentScheduleFuture;
97 @Nullable
98 private volatile CompletableFuture<?> currentWatchFuture;
99
100
101
102
103
104 private volatile long latestReceivedTimeSeconds = -1;
105
106 AbstractWatcher(ScheduledExecutorService watchScheduler, String projectName, String repositoryName,
107 String pathPattern, boolean errorOnEntryNotFound, long delayOnSuccessMillis,
108 long initialDelayMillis, long maxDelayMillis, double multiplier, double jitterRate,
109 @Nullable MeterRegistry meterRegistry) {
110 this.watchScheduler = watchScheduler;
111 this.projectName = projectName;
112 this.repositoryName = repositoryName;
113 this.pathPattern = pathPattern;
114 this.errorOnEntryNotFound = errorOnEntryNotFound;
115 this.delayOnSuccessMillis = delayOnSuccessMillis;
116 this.initialDelayMillis = initialDelayMillis;
117 this.maxDelayMillis = maxDelayMillis;
118 this.multiplier = multiplier;
119 this.jitterRate = jitterRate;
120 this.meterRegistry = meterRegistry;
121 tags = Tags.of(
122 "project", projectName,
123 "repository", repositoryName,
124 "path", pathPattern,
125
126
127
128 "name", String.valueOf(WATCHER_ID.incrementAndGet())
129 );
130 }
131
132 @Override
133 public ScheduledExecutorService watchScheduler() {
134 return watchScheduler;
135 }
136
137 @Override
138 public CompletableFuture<Latest<T>> initialValueFuture() {
139 return initialValueFuture;
140 }
141
142 @Override
143 public Latest<T> latest() {
144 final Latest<T> latest = this.latest;
145 if (latest == null) {
146 throw new IllegalStateException("value not available yet");
147 }
148 return latest;
149 }
150
151
152
153
154
155 void start() {
156 if (state.compareAndSet(State.INIT, State.STARTED)) {
157 scheduleWatch(0);
158 }
159 if (meterRegistry != null) {
160 meterRegistry.gauge(LATEST_REVISION_METER_NAME, tags, this,
161 watcher -> {
162 final Latest<T> latest = watcher.latest;
163 if (latest == null) {
164 return -1;
165 } else {
166 return latest.revision().major();
167 }
168 });
169 TimeGauge.builder(LATEST_RECEIVED_TIME_METER_NAME, this, TimeUnit.SECONDS,
170 watcher -> watcher.latestReceivedTimeSeconds)
171 .tags(tags)
172 .register(meterRegistry);
173 }
174 }
175
176 @Override
177 public void close() {
178 state.set(State.STOPPED);
179 if (!initialValueFuture.isDone()) {
180 initialValueFuture.cancel(false);
181 }
182
183
184 final ScheduledFuture<?> currentScheduleFuture = this.currentScheduleFuture;
185 if (currentScheduleFuture != null && !currentScheduleFuture.isDone()) {
186 currentScheduleFuture.cancel(false);
187 }
188 final CompletableFuture<?> currentWatchFuture = this.currentWatchFuture;
189 if (currentWatchFuture != null && !currentWatchFuture.isDone()) {
190 currentWatchFuture.cancel(false);
191 }
192
193 if (meterRegistry != null) {
194 meterRegistry.remove(new Id(LATEST_REVISION_METER_NAME, tags, null, null, Type.GAUGE));
195 meterRegistry.remove(
196 new Id(LATEST_RECEIVED_TIME_METER_NAME, tags, null, null, Type.GAUGE));
197 }
198 }
199
200 private boolean isStopped() {
201 return state.get() == State.STOPPED;
202 }
203
204 @Override
205 public void watch(BiConsumer<? super Revision, ? super T> listener) {
206 watch(listener, watchScheduler);
207 }
208
209 @Override
210 public void watch(BiConsumer<? super Revision, ? super T> listener, Executor executor) {
211 requireNonNull(listener, "listener");
212 checkState(!isStopped(), "watcher closed");
213 updateListeners.add(new SimpleImmutableEntry<>(listener, executor));
214
215 final Latest<T> latest = this.latest;
216 if (latest != null) {
217
218
219
220
221
222
223
224 executor.execute(() -> listener.accept(latest.revision(), latest.value()));
225 }
226 }
227
228 private void scheduleWatch(int numAttemptsSoFar) {
229 if (isStopped()) {
230 return;
231 }
232
233 final long delay;
234 if (numAttemptsSoFar == 0) {
235 delay = latest != null ? delayOnSuccessMillis : 0;
236 } else {
237 delay = nextDelayMillis(numAttemptsSoFar);
238 }
239
240 currentScheduleFuture = watchScheduler.schedule(() -> {
241 currentScheduleFuture = null;
242 doWatch(numAttemptsSoFar);
243 }, delay, TimeUnit.MILLISECONDS);
244 }
245
246 private long nextDelayMillis(int numAttemptsSoFar) {
247 final long nextDelayMillis;
248 if (numAttemptsSoFar == 1) {
249 nextDelayMillis = initialDelayMillis;
250 } else {
251 nextDelayMillis =
252 Math.min(saturatedMultiply(initialDelayMillis, Math.pow(multiplier, numAttemptsSoFar - 1)),
253 maxDelayMillis);
254 }
255
256 final long minJitter = (long) (nextDelayMillis * (1 - jitterRate));
257 final long maxJitter = (long) (nextDelayMillis * (1 + jitterRate));
258 final long bound = maxJitter - minJitter + 1;
259 final long millis = random(bound);
260 return Math.max(0, saturatedAdd(minJitter, millis));
261 }
262
263 private static long saturatedMultiply(long left, double right) {
264 final double result = left * right;
265 return result >= Long.MAX_VALUE ? Long.MAX_VALUE : (long) result;
266 }
267
268 private static long random(long bound) {
269 assert bound > 0;
270 final long mask = bound - 1;
271 final Random random = ThreadLocalRandom.current();
272 long result = random.nextLong();
273
274 if ((bound & mask) == 0L) {
275
276 result &= mask;
277 } else {
278 for (long u = result >>> 1; u + mask - (result = u % bound) < 0L; u = random.nextLong() >>> 1) {
279 continue;
280 }
281 }
282
283 return result;
284 }
285
286 private void doWatch(int numAttemptsSoFar) {
287 if (isStopped()) {
288 return;
289 }
290
291 final Latest<T> latest = this.latest;
292 final Revision lastKnownRevision = latest != null ? latest.revision() : Revision.INIT;
293 final CompletableFuture<Latest<T>> f = doWatch(lastKnownRevision);
294
295 currentWatchFuture = f;
296 f.thenAccept(newLatest -> {
297 currentWatchFuture = null;
298 if (newLatest != null) {
299 this.latest = newLatest;
300 logger.debug("watcher noticed updated file {}/{}{}: rev={}",
301 projectName, repositoryName, pathPattern, newLatest.revision());
302 notifyListeners(newLatest);
303 latestReceivedTimeSeconds = Instant.now().getEpochSecond();
304 if (!initialValueFuture.isDone()) {
305 initialValueFuture.complete(newLatest);
306 }
307 }
308
309
310 scheduleWatch(0);
311 })
312 .exceptionally(thrown -> {
313 currentWatchFuture = null;
314 try {
315 final Throwable cause = thrown instanceof CompletionException ? thrown.getCause() : thrown;
316 boolean logged = false;
317 if (cause instanceof CentralDogmaException) {
318 if (cause instanceof EntryNotFoundException) {
319 if (!initialValueFuture.isDone() && errorOnEntryNotFound) {
320 initialValueFuture.completeExceptionally(thrown);
321 close();
322 return null;
323 }
324 logger.info("{}/{}{} does not exist yet; trying again",
325 projectName, repositoryName, pathPattern);
326 logged = true;
327 } else if (cause instanceof RepositoryNotFoundException) {
328 logger.info("{}/{} does not exist yet; trying again",
329 projectName, repositoryName);
330 logged = true;
331 }
332 }
333
334 if (cause instanceof CancellationException) {
335
336 return null;
337 }
338
339 if (!logged) {
340 logger.warn("Failed to watch a file ({}/{}{}) at Central Dogma; trying again",
341 projectName, repositoryName, pathPattern, cause);
342 }
343
344 scheduleWatch(numAttemptsSoFar + 1);
345 } catch (Throwable t) {
346 logger.error("Unexpected exception while watching a file at Central Dogma:", t);
347 }
348 return null;
349 });
350 }
351
352 abstract CompletableFuture<Latest<T>> doWatch(Revision lastKnownRevision);
353
354 private void notifyListeners(Latest<T> latest) {
355 if (isStopped()) {
356
357 return;
358 }
359
360 for (Map.Entry<BiConsumer<? super Revision, ? super T>, Executor> entry : updateListeners) {
361 final BiConsumer<? super Revision, ? super T> listener = entry.getKey();
362 final Executor executor = entry.getValue();
363 executor.execute(() -> {
364 try {
365 listener.accept(latest.revision(), latest.value());
366 } catch (Exception e) {
367 logger.warn("Exception thrown for watcher ({}/{}{}): rev={}",
368 projectName, repositoryName, pathPattern, latest.revision(), e);
369 }
370 });
371 }
372 }
373
374 @Override
375 public String toString() {
376 return MoreObjects.toStringHelper(this).omitNullValues()
377 .add("watchScheduler", watchScheduler)
378 .add("projectName", projectName)
379 .add("repositoryName", repositoryName)
380 .add("pathPattern", pathPattern)
381 .add("errorOnEntryNotFound", errorOnEntryNotFound)
382 .add("delayOnSuccessMillis", delayOnSuccessMillis)
383 .add("initialDelayMillis", initialDelayMillis)
384 .add("maxDelayMillis", maxDelayMillis)
385 .add("multiplier", multiplier)
386 .add("jitterRate", jitterRate)
387 .add("latest", latest)
388 .toString();
389 }
390 }