1   /*
2    * Copyright 2018 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.client;
17  
18  import static com.google.common.base.Preconditions.checkState;
19  import static com.google.common.math.LongMath.saturatedAdd;
20  import static java.util.Objects.requireNonNull;
21  
22  import java.time.Instant;
23  import java.util.AbstractMap.SimpleImmutableEntry;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Random;
27  import java.util.concurrent.CancellationException;
28  import java.util.concurrent.CompletableFuture;
29  import java.util.concurrent.CompletionException;
30  import java.util.concurrent.CopyOnWriteArrayList;
31  import java.util.concurrent.Executor;
32  import java.util.concurrent.ScheduledExecutorService;
33  import java.util.concurrent.ScheduledFuture;
34  import java.util.concurrent.ThreadLocalRandom;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.atomic.AtomicLong;
37  import java.util.concurrent.atomic.AtomicReference;
38  import java.util.function.BiConsumer;
39  
40  import javax.annotation.Nullable;
41  
42  import org.slf4j.Logger;
43  import org.slf4j.LoggerFactory;
44  
45  import com.google.common.base.MoreObjects;
46  
47  import com.linecorp.centraldogma.common.CentralDogmaException;
48  import com.linecorp.centraldogma.common.EntryNotFoundException;
49  import com.linecorp.centraldogma.common.PathPattern;
50  import com.linecorp.centraldogma.common.Query;
51  import com.linecorp.centraldogma.common.RepositoryNotFoundException;
52  import com.linecorp.centraldogma.common.Revision;
53  
54  import io.micrometer.core.instrument.Meter.Id;
55  import io.micrometer.core.instrument.Meter.Type;
56  import io.micrometer.core.instrument.MeterRegistry;
57  import io.micrometer.core.instrument.Tags;
58  import io.micrometer.core.instrument.TimeGauge;
59  
60  abstract class AbstractWatcher<T> implements Watcher<T> {
61  
62      private static final Logger logger = LoggerFactory.getLogger(AbstractWatcher.class);
63      private static final String LATEST_REVISION_METER_NAME = "centraldogma.client.watcher.latest.revision";
64      private static final String LATEST_RECEIVED_TIME_METER_NAME =
65              "centraldogma.client.watcher.latest.received.time";
66      private static final AtomicLong WATCHER_ID = new AtomicLong();
67  
68      private enum State {
69          INIT,
70          STARTED,
71          STOPPED
72      }
73  
74      private final ScheduledExecutorService watchScheduler;
75      private final String projectName;
76      private final String repositoryName;
77      private final String pathPattern;
78      private final boolean errorOnEntryNotFound;
79      private final long delayOnSuccessMillis;
80      private final long initialDelayMillis;
81      private final long maxDelayMillis;
82      private final double multiplier;
83      private final double jitterRate;
84      @Nullable
85      private final MeterRegistry meterRegistry;
86      private final Tags tags;
87  
88      private final List<Map.Entry<BiConsumer<? super Revision, ? super T>, Executor>> updateListeners =
89              new CopyOnWriteArrayList<>();
90      private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
91      private final CompletableFuture<Latest<T>> initialValueFuture = new CompletableFuture<>();
92  
93      @Nullable
94      private volatile Latest<T> latest;
95      @Nullable
96      private volatile ScheduledFuture<?> currentScheduleFuture;
97      @Nullable
98      private volatile CompletableFuture<?> currentWatchFuture;
99  
100     // unix epoch seconds
101     // In Prometheus, it is common to handle data with second precision,
102     // so we intentionally use second precision.
103     // ref: https://prometheus.io/docs/prometheus/latest/querying/functions/#time
104     private volatile long latestReceivedTimeSeconds = -1;
105 
106     AbstractWatcher(ScheduledExecutorService watchScheduler, String projectName, String repositoryName,
107                     String pathPattern, boolean errorOnEntryNotFound, long delayOnSuccessMillis,
108                     long initialDelayMillis, long maxDelayMillis, double multiplier, double jitterRate,
109                     @Nullable MeterRegistry meterRegistry) {
110         this.watchScheduler = watchScheduler;
111         this.projectName = projectName;
112         this.repositoryName = repositoryName;
113         this.pathPattern = pathPattern;
114         this.errorOnEntryNotFound = errorOnEntryNotFound;
115         this.delayOnSuccessMillis = delayOnSuccessMillis;
116         this.initialDelayMillis = initialDelayMillis;
117         this.maxDelayMillis = maxDelayMillis;
118         this.multiplier = multiplier;
119         this.jitterRate = jitterRate;
120         this.meterRegistry = meterRegistry;
121         tags = Tags.of(
122                 "project", projectName,
123                 "repository", repositoryName,
124                 "path", pathPattern,
125                 // There is a possibility of using the same watcher for the same project, repo, and pathPattern.
126                 // Therefore, an auto-incremental name will be assigned automatically.
127                 // In the future, if there is a need, we could allow this name to be explicitly specified.
128                 "name", String.valueOf(WATCHER_ID.incrementAndGet())
129         );
130     }
131 
132     @Override
133     public ScheduledExecutorService watchScheduler() {
134         return watchScheduler;
135     }
136 
137     @Override
138     public CompletableFuture<Latest<T>> initialValueFuture() {
139         return initialValueFuture;
140     }
141 
142     @Override
143     public Latest<T> latest() {
144         final Latest<T> latest = this.latest;
145         if (latest == null) {
146             throw new IllegalStateException("value not available yet");
147         }
148         return latest;
149     }
150 
151     /**
152      * Starts to watch the file specified in the {@link Query} or the {@link PathPattern}
153      * given with the constructor.
154      */
155     void start() {
156         if (state.compareAndSet(State.INIT, State.STARTED)) {
157             scheduleWatch(0);
158         }
159         if (meterRegistry != null) {
160             meterRegistry.gauge(LATEST_REVISION_METER_NAME, tags, this,
161                                 watcher -> {
162                                     final Latest<T> latest = watcher.latest;
163                                     if (latest == null) {
164                                         return -1;
165                                     } else {
166                                         return latest.revision().major();
167                                     }
168                                 });
169             TimeGauge.builder(LATEST_RECEIVED_TIME_METER_NAME, this, TimeUnit.SECONDS,
170                               watcher -> watcher.latestReceivedTimeSeconds)
171                      .tags(tags)
172                      .register(meterRegistry);
173         }
174     }
175 
176     @Override
177     public void close() {
178         state.set(State.STOPPED);
179         if (!initialValueFuture.isDone()) {
180             initialValueFuture.cancel(false);
181         }
182 
183         // Cancel any scheduled operations.
184         final ScheduledFuture<?> currentScheduleFuture = this.currentScheduleFuture;
185         if (currentScheduleFuture != null && !currentScheduleFuture.isDone()) {
186             currentScheduleFuture.cancel(false);
187         }
188         final CompletableFuture<?> currentWatchFuture = this.currentWatchFuture;
189         if (currentWatchFuture != null && !currentWatchFuture.isDone()) {
190             currentWatchFuture.cancel(false);
191         }
192 
193         if (meterRegistry != null) {
194             meterRegistry.remove(new Id(LATEST_REVISION_METER_NAME, tags, null, null, Type.GAUGE));
195             meterRegistry.remove(
196                     new Id(LATEST_RECEIVED_TIME_METER_NAME, tags, null, null, Type.GAUGE));
197         }
198     }
199 
200     private boolean isStopped() {
201         return state.get() == State.STOPPED;
202     }
203 
204     @Override
205     public void watch(BiConsumer<? super Revision, ? super T> listener) {
206         watch(listener, watchScheduler);
207     }
208 
209     @Override
210     public void watch(BiConsumer<? super Revision, ? super T> listener, Executor executor) {
211         requireNonNull(listener, "listener");
212         checkState(!isStopped(), "watcher closed");
213         updateListeners.add(new SimpleImmutableEntry<>(listener, executor));
214 
215         final Latest<T> latest = this.latest;
216         if (latest != null) {
217             // There's a chance that listener.accept(...) is called twice for the same value
218             // if this watch method is called:
219             // - after " this.latest = newLatest;" is invoked.
220             // - and before notifyListener() is called.
221             // However, it's such a rare case and we usually call `watch` method right after creating a Watcher,
222             // which means latest is probably not set yet, so we don't use a lock to guarantee
223             // the atomicity.
224             executor.execute(() -> listener.accept(latest.revision(), latest.value()));
225         }
226     }
227 
228     private void scheduleWatch(int numAttemptsSoFar) {
229         if (isStopped()) {
230             return;
231         }
232 
233         final long delay;
234         if (numAttemptsSoFar == 0) {
235             delay = latest != null ? delayOnSuccessMillis : 0;
236         } else {
237             delay = nextDelayMillis(numAttemptsSoFar);
238         }
239 
240         currentScheduleFuture = watchScheduler.schedule(() -> {
241             currentScheduleFuture = null;
242             doWatch(numAttemptsSoFar);
243         }, delay, TimeUnit.MILLISECONDS);
244     }
245 
246     private long nextDelayMillis(int numAttemptsSoFar) {
247         final long nextDelayMillis;
248         if (numAttemptsSoFar == 1) {
249             nextDelayMillis = initialDelayMillis;
250         } else {
251             nextDelayMillis =
252                     Math.min(saturatedMultiply(initialDelayMillis, Math.pow(multiplier, numAttemptsSoFar - 1)),
253                              maxDelayMillis);
254         }
255 
256         final long minJitter = (long) (nextDelayMillis * (1 - jitterRate));
257         final long maxJitter = (long) (nextDelayMillis * (1 + jitterRate));
258         final long bound = maxJitter - minJitter + 1;
259         final long millis = random(bound);
260         return Math.max(0, saturatedAdd(minJitter, millis));
261     }
262 
263     private static long saturatedMultiply(long left, double right) {
264         final double result = left * right;
265         return result >= Long.MAX_VALUE ? Long.MAX_VALUE : (long) result;
266     }
267 
268     private static long random(long bound) {
269         assert bound > 0;
270         final long mask = bound - 1;
271         final Random random = ThreadLocalRandom.current();
272         long result = random.nextLong();
273 
274         if ((bound & mask) == 0L) {
275             // power of two
276             result &= mask;
277         } else { // reject over-represented candidates
278             for (long u = result >>> 1; u + mask - (result = u % bound) < 0L; u = random.nextLong() >>> 1) {
279                 continue;
280             }
281         }
282 
283         return result;
284     }
285 
286     private void doWatch(int numAttemptsSoFar) {
287         if (isStopped()) {
288             return;
289         }
290 
291         final Latest<T> latest = this.latest;
292         final Revision lastKnownRevision = latest != null ? latest.revision() : Revision.INIT;
293         final CompletableFuture<Latest<T>> f = doWatch(lastKnownRevision);
294 
295         currentWatchFuture = f;
296         f.thenAccept(newLatest -> {
297              currentWatchFuture = null;
298              if (newLatest != null) {
299                  this.latest = newLatest;
300                  logger.debug("watcher noticed updated file {}/{}{}: rev={}",
301                               projectName, repositoryName, pathPattern, newLatest.revision());
302                  notifyListeners(newLatest);
303                  latestReceivedTimeSeconds = Instant.now().getEpochSecond();
304                  if (!initialValueFuture.isDone()) {
305                      initialValueFuture.complete(newLatest);
306                  }
307              }
308 
309              // Watch again for the next change.
310              scheduleWatch(0);
311          })
312          .exceptionally(thrown -> {
313              currentWatchFuture = null;
314              try {
315                  final Throwable cause = thrown instanceof CompletionException ? thrown.getCause() : thrown;
316                  boolean logged = false;
317                  if (cause instanceof CentralDogmaException) {
318                      if (cause instanceof EntryNotFoundException) {
319                          if (!initialValueFuture.isDone() && errorOnEntryNotFound) {
320                              initialValueFuture.completeExceptionally(thrown);
321                              close();
322                              return null;
323                          }
324                          logger.info("{}/{}{} does not exist yet; trying again",
325                                      projectName, repositoryName, pathPattern);
326                          logged = true;
327                      } else if (cause instanceof RepositoryNotFoundException) {
328                          logger.info("{}/{} does not exist yet; trying again",
329                                      projectName, repositoryName);
330                          logged = true;
331                      }
332                  }
333 
334                  if (cause instanceof CancellationException) {
335                      // Cancelled by close()
336                      return null;
337                  }
338 
339                  if (!logged) {
340                      logger.warn("Failed to watch a file ({}/{}{}) at Central Dogma; trying again",
341                                  projectName, repositoryName, pathPattern, cause);
342                  }
343 
344                  scheduleWatch(numAttemptsSoFar + 1);
345              } catch (Throwable t) {
346                  logger.error("Unexpected exception while watching a file at Central Dogma:", t);
347              }
348              return null;
349          });
350     }
351 
352     abstract CompletableFuture<Latest<T>> doWatch(Revision lastKnownRevision);
353 
354     private void notifyListeners(Latest<T> latest) {
355         if (isStopped()) {
356             // Do not notify after stopped.
357             return;
358         }
359 
360         for (Map.Entry<BiConsumer<? super Revision, ? super T>, Executor> entry : updateListeners) {
361             final BiConsumer<? super Revision, ? super T> listener = entry.getKey();
362             final Executor executor = entry.getValue();
363             executor.execute(() -> {
364                 try {
365                     listener.accept(latest.revision(), latest.value());
366                 } catch (Exception e) {
367                     logger.warn("Exception thrown for watcher ({}/{}{}): rev={}",
368                                 projectName, repositoryName, pathPattern, latest.revision(), e);
369                 }
370             });
371         }
372     }
373 
374     @Override
375     public String toString() {
376         return MoreObjects.toStringHelper(this).omitNullValues()
377                           .add("watchScheduler", watchScheduler)
378                           .add("projectName", projectName)
379                           .add("repositoryName", repositoryName)
380                           .add("pathPattern", pathPattern)
381                           .add("errorOnEntryNotFound", errorOnEntryNotFound)
382                           .add("delayOnSuccessMillis", delayOnSuccessMillis)
383                           .add("initialDelayMillis", initialDelayMillis)
384                           .add("maxDelayMillis", maxDelayMillis)
385                           .add("multiplier", multiplier)
386                           .add("jitterRate", jitterRate)
387                           .add("latest", latest)
388                           .toString();
389     }
390 }