1   /*
2    * Copyright 2018 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.client;
17  
18  import static com.google.common.base.Preconditions.checkState;
19  import static com.google.common.math.LongMath.saturatedAdd;
20  import static java.util.Objects.requireNonNull;
21  
22  import java.util.AbstractMap.SimpleImmutableEntry;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Random;
26  import java.util.concurrent.CancellationException;
27  import java.util.concurrent.CompletableFuture;
28  import java.util.concurrent.CompletionException;
29  import java.util.concurrent.CopyOnWriteArrayList;
30  import java.util.concurrent.Executor;
31  import java.util.concurrent.ScheduledExecutorService;
32  import java.util.concurrent.ScheduledFuture;
33  import java.util.concurrent.ThreadLocalRandom;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicReference;
36  import java.util.function.BiConsumer;
37  
38  import javax.annotation.Nullable;
39  
40  import org.slf4j.Logger;
41  import org.slf4j.LoggerFactory;
42  
43  import com.google.common.base.MoreObjects;
44  
45  import com.linecorp.centraldogma.common.CentralDogmaException;
46  import com.linecorp.centraldogma.common.EntryNotFoundException;
47  import com.linecorp.centraldogma.common.PathPattern;
48  import com.linecorp.centraldogma.common.Query;
49  import com.linecorp.centraldogma.common.RepositoryNotFoundException;
50  import com.linecorp.centraldogma.common.Revision;
51  
52  abstract class AbstractWatcher<T> implements Watcher<T> {
53  
54      private static final Logger logger = LoggerFactory.getLogger(AbstractWatcher.class);
55  
56      private enum State {
57          INIT,
58          STARTED,
59          STOPPED
60      }
61  
62      private final ScheduledExecutorService watchScheduler;
63      private final String projectName;
64      private final String repositoryName;
65      private final String pathPattern;
66      private final boolean errorOnEntryNotFound;
67      private final long delayOnSuccessMillis;
68      private final long initialDelayMillis;
69      private final long maxDelayMillis;
70      private final double multiplier;
71      private final double jitterRate;
72  
73      private final List<Map.Entry<BiConsumer<? super Revision, ? super T>, Executor>> updateListeners =
74              new CopyOnWriteArrayList<>();
75      private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
76      private final CompletableFuture<Latest<T>> initialValueFuture = new CompletableFuture<>();
77  
78      @Nullable
79      private volatile Latest<T> latest;
80      @Nullable
81      private volatile ScheduledFuture<?> currentScheduleFuture;
82      @Nullable
83      private volatile CompletableFuture<?> currentWatchFuture;
84  
85      AbstractWatcher(ScheduledExecutorService watchScheduler, String projectName, String repositoryName,
86                      String pathPattern, boolean errorOnEntryNotFound, long delayOnSuccessMillis,
87                      long initialDelayMillis, long maxDelayMillis, double multiplier, double jitterRate) {
88          this.watchScheduler = watchScheduler;
89          this.projectName = projectName;
90          this.repositoryName = repositoryName;
91          this.pathPattern = pathPattern;
92          this.errorOnEntryNotFound = errorOnEntryNotFound;
93          this.delayOnSuccessMillis = delayOnSuccessMillis;
94          this.initialDelayMillis = initialDelayMillis;
95          this.maxDelayMillis = maxDelayMillis;
96          this.multiplier = multiplier;
97          this.jitterRate = jitterRate;
98      }
99  
100     @Override
101     public ScheduledExecutorService watchScheduler() {
102         return watchScheduler;
103     }
104 
105     @Override
106     public CompletableFuture<Latest<T>> initialValueFuture() {
107         return initialValueFuture;
108     }
109 
110     @Override
111     public Latest<T> latest() {
112         final Latest<T> latest = this.latest;
113         if (latest == null) {
114             throw new IllegalStateException("value not available yet");
115         }
116         return latest;
117     }
118 
119     /**
120      * Starts to watch the file specified in the {@link Query} or the {@link PathPattern}
121      * given with the constructor.
122      */
123     void start() {
124         if (state.compareAndSet(State.INIT, State.STARTED)) {
125             scheduleWatch(0);
126         }
127     }
128 
129     @Override
130     public void close() {
131         state.set(State.STOPPED);
132         if (!initialValueFuture.isDone()) {
133             initialValueFuture.cancel(false);
134         }
135 
136         // Cancel any scheduled operations.
137         final ScheduledFuture<?> currentScheduleFuture = this.currentScheduleFuture;
138         if (currentScheduleFuture != null && !currentScheduleFuture.isDone()) {
139             currentScheduleFuture.cancel(false);
140         }
141         final CompletableFuture<?> currentWatchFuture = this.currentWatchFuture;
142         if (currentWatchFuture != null && !currentWatchFuture.isDone()) {
143             currentWatchFuture.cancel(false);
144         }
145     }
146 
147     private boolean isStopped() {
148         return state.get() == State.STOPPED;
149     }
150 
151     @Override
152     public void watch(BiConsumer<? super Revision, ? super T> listener) {
153         watch(listener, watchScheduler);
154     }
155 
156     @Override
157     public void watch(BiConsumer<? super Revision, ? super T> listener, Executor executor) {
158         requireNonNull(listener, "listener");
159         checkState(!isStopped(), "watcher closed");
160         updateListeners.add(new SimpleImmutableEntry<>(listener, executor));
161 
162         final Latest<T> latest = this.latest;
163         if (latest != null) {
164             // There's a chance that listener.accept(...) is called twice for the same value
165             // if this watch method is called:
166             // - after " this.latest = newLatest;" is invoked.
167             // - and before notifyListener() is called.
168             // However, it's such a rare case and we usually call `watch` method right after creating a Watcher,
169             // which means latest is probably not set yet, so we don't use a lock to guarantee
170             // the atomicity.
171             executor.execute(() -> listener.accept(latest.revision(), latest.value()));
172         }
173     }
174 
175     private void scheduleWatch(int numAttemptsSoFar) {
176         if (isStopped()) {
177             return;
178         }
179 
180         final long delay;
181         if (numAttemptsSoFar == 0) {
182             delay = latest != null ? delayOnSuccessMillis : 0;
183         } else {
184             delay = nextDelayMillis(numAttemptsSoFar);
185         }
186 
187         currentScheduleFuture = watchScheduler.schedule(() -> {
188             currentScheduleFuture = null;
189             doWatch(numAttemptsSoFar);
190         }, delay, TimeUnit.MILLISECONDS);
191     }
192 
193     private long nextDelayMillis(int numAttemptsSoFar) {
194         final long nextDelayMillis;
195         if (numAttemptsSoFar == 1) {
196             nextDelayMillis = initialDelayMillis;
197         } else {
198             nextDelayMillis =
199                     Math.min(saturatedMultiply(initialDelayMillis, Math.pow(multiplier, numAttemptsSoFar - 1)),
200                              maxDelayMillis);
201         }
202 
203         final long minJitter = (long) (nextDelayMillis * (1 - jitterRate));
204         final long maxJitter = (long) (nextDelayMillis * (1 + jitterRate));
205         final long bound = maxJitter - minJitter + 1;
206         final long millis = random(bound);
207         return Math.max(0, saturatedAdd(minJitter, millis));
208     }
209 
210     private static long saturatedMultiply(long left, double right) {
211         final double result = left * right;
212         return result >= Long.MAX_VALUE ? Long.MAX_VALUE : (long) result;
213     }
214 
215     private static long random(long bound) {
216         assert bound > 0;
217         final long mask = bound - 1;
218         final Random random = ThreadLocalRandom.current();
219         long result = random.nextLong();
220 
221         if ((bound & mask) == 0L) {
222             // power of two
223             result &= mask;
224         } else { // reject over-represented candidates
225             for (long u = result >>> 1; u + mask - (result = u % bound) < 0L; u = random.nextLong() >>> 1) {
226                 continue;
227             }
228         }
229 
230         return result;
231     }
232 
233     private void doWatch(int numAttemptsSoFar) {
234         if (isStopped()) {
235             return;
236         }
237 
238         final Latest<T> latest = this.latest;
239         final Revision lastKnownRevision = latest != null ? latest.revision() : Revision.INIT;
240         final CompletableFuture<Latest<T>> f = doWatch(lastKnownRevision);
241 
242         currentWatchFuture = f;
243         f.thenAccept(newLatest -> {
244              currentWatchFuture = null;
245              if (newLatest != null) {
246                  this.latest = newLatest;
247                  logger.debug("watcher noticed updated file {}/{}{}: rev={}",
248                               projectName, repositoryName, pathPattern, newLatest.revision());
249                  notifyListeners(newLatest);
250                  if (!initialValueFuture.isDone()) {
251                      initialValueFuture.complete(newLatest);
252                  }
253              }
254 
255              // Watch again for the next change.
256              scheduleWatch(0);
257          })
258          .exceptionally(thrown -> {
259              currentWatchFuture = null;
260              try {
261                  final Throwable cause = thrown instanceof CompletionException ? thrown.getCause() : thrown;
262                  boolean logged = false;
263                  if (cause instanceof CentralDogmaException) {
264                      if (cause instanceof EntryNotFoundException) {
265                          if (!initialValueFuture.isDone() && errorOnEntryNotFound) {
266                              initialValueFuture.completeExceptionally(thrown);
267                              close();
268                              return null;
269                          }
270                          logger.info("{}/{}{} does not exist yet; trying again",
271                                      projectName, repositoryName, pathPattern);
272                          logged = true;
273                      } else if (cause instanceof RepositoryNotFoundException) {
274                          logger.info("{}/{} does not exist yet; trying again",
275                                      projectName, repositoryName);
276                          logged = true;
277                      }
278                  }
279 
280                  if (cause instanceof CancellationException) {
281                      // Cancelled by close()
282                      return null;
283                  }
284 
285                  if (!logged) {
286                      logger.warn("Failed to watch a file ({}/{}{}) at Central Dogma; trying again",
287                                  projectName, repositoryName, pathPattern, cause);
288                  }
289 
290                  scheduleWatch(numAttemptsSoFar + 1);
291              } catch (Throwable t) {
292                  logger.error("Unexpected exception while watching a file at Central Dogma:", t);
293              }
294              return null;
295          });
296     }
297 
298     abstract CompletableFuture<Latest<T>> doWatch(Revision lastKnownRevision);
299 
300     private void notifyListeners(Latest<T> latest) {
301         if (isStopped()) {
302             // Do not notify after stopped.
303             return;
304         }
305 
306         for (Map.Entry<BiConsumer<? super Revision, ? super T>, Executor> entry : updateListeners) {
307             final BiConsumer<? super Revision, ? super T> listener = entry.getKey();
308             final Executor executor = entry.getValue();
309             executor.execute(() -> {
310                 try {
311                     listener.accept(latest.revision(), latest.value());
312                 } catch (Exception e) {
313                     logger.warn("Exception thrown for watcher ({}/{}{}): rev={}",
314                                 projectName, repositoryName, pathPattern, latest.revision(), e);
315                 }
316             });
317         }
318     }
319 
320     @Override
321     public String toString() {
322         return MoreObjects.toStringHelper(this).omitNullValues()
323                           .add("watchScheduler", watchScheduler)
324                           .add("projectName", projectName)
325                           .add("repositoryName", repositoryName)
326                           .add("pathPattern", pathPattern)
327                           .add("errorOnEntryNotFound", errorOnEntryNotFound)
328                           .add("delayOnSuccessMillis", delayOnSuccessMillis)
329                           .add("initialDelayMillis", initialDelayMillis)
330                           .add("maxDelayMillis", maxDelayMillis)
331                           .add("multiplier", multiplier)
332                           .add("jitterRate", jitterRate)
333                           .add("latest", latest)
334                           .toString();
335     }
336 }