1   /*
2    * Copyright 2019 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.internal.client;
17  
18  import static com.google.common.base.Preconditions.checkArgument;
19  import static com.spotify.futures.CompletableFutures.exceptionallyCompletedFuture;
20  import static java.util.Objects.requireNonNull;
21  import static java.util.concurrent.CompletableFuture.completedFuture;
22  
23  import java.util.LinkedHashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Objects;
27  import java.util.Set;
28  import java.util.concurrent.CompletableFuture;
29  import java.util.concurrent.CompletionException;
30  import java.util.concurrent.ExecutionException;
31  import java.util.concurrent.ScheduledExecutorService;
32  import java.util.concurrent.TimeUnit;
33  import java.util.function.BiFunction;
34  import java.util.function.BiPredicate;
35  import java.util.function.Function;
36  import java.util.function.Supplier;
37  
38  import org.jspecify.annotations.Nullable;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  import com.google.common.annotations.VisibleForTesting;
43  import com.google.common.collect.ImmutableList;
44  import com.spotify.futures.CompletableFutures;
45  
46  import com.linecorp.centraldogma.client.AbstractCentralDogma;
47  import com.linecorp.centraldogma.client.CentralDogma;
48  import com.linecorp.centraldogma.client.CentralDogmaRepository;
49  import com.linecorp.centraldogma.client.RepositoryInfo;
50  import com.linecorp.centraldogma.common.Author;
51  import com.linecorp.centraldogma.common.Change;
52  import com.linecorp.centraldogma.common.Commit;
53  import com.linecorp.centraldogma.common.Entry;
54  import com.linecorp.centraldogma.common.EntryType;
55  import com.linecorp.centraldogma.common.Markup;
56  import com.linecorp.centraldogma.common.MergeQuery;
57  import com.linecorp.centraldogma.common.MergedEntry;
58  import com.linecorp.centraldogma.common.PathPattern;
59  import com.linecorp.centraldogma.common.PushResult;
60  import com.linecorp.centraldogma.common.Query;
61  import com.linecorp.centraldogma.common.Revision;
62  import com.linecorp.centraldogma.common.RevisionNotFoundException;
63  
64  import io.micrometer.core.instrument.MeterRegistry;
65  
66  /**
67   * A {@link CentralDogma} client that retries the request automatically when a {@link RevisionNotFoundException}
68   * was raised but it is certain that a given {@link Revision} exists.
69   */
70  public final class ReplicationLagTolerantCentralDogma extends AbstractCentralDogma {
71  
72      private static final Logger logger =
73              LoggerFactory.getLogger(ReplicationLagTolerantCentralDogma.class);
74  
75      private final CentralDogma delegate;
76      private final int maxRetries;
77      private final long retryIntervalMillis;
78      private final Supplier<?> currentReplicaHintSupplier;
79  
80      @VisibleForTesting
81      final Map<RepoId, Revision> latestKnownRevisions = new LinkedHashMap<RepoId, Revision>() {
82          private static final long serialVersionUID = 3587793379404809027L;
83  
84          @Override
85          protected boolean removeEldestEntry(Map.Entry<RepoId, Revision> eldest) {
86              // Keep only up to 8192 repositories, which should be enough for almost all cases.
87              return size() > 8192;
88          }
89      };
90  
91      public ReplicationLagTolerantCentralDogma(ScheduledExecutorService blockingTaskExecutor,
92                                                CentralDogma delegate, int maxRetries, long retryIntervalMillis,
93                                                Supplier<?> currentReplicaHintSupplier,
94                                                @Nullable MeterRegistry meterRegistry) {
95          super(blockingTaskExecutor, meterRegistry);
96  
97          requireNonNull(delegate, "delegate");
98          checkArgument(maxRetries > 0, "maxRetries: %s (expected: > 0)", maxRetries);
99          checkArgument(retryIntervalMillis >= 0,
100                       "retryIntervalMillis: %s (expected: >= 0)", retryIntervalMillis);
101         requireNonNull(currentReplicaHintSupplier, "currentReplicaHintSupplier");
102 
103         this.delegate = delegate;
104         this.maxRetries = maxRetries;
105         this.retryIntervalMillis = retryIntervalMillis;
106         this.currentReplicaHintSupplier = currentReplicaHintSupplier;
107     }
108 
109     @Override
110     public CompletableFuture<Void> createProject(String projectName) {
111         return delegate.createProject(projectName);
112     }
113 
114     @Override
115     public CompletableFuture<Void> removeProject(String projectName) {
116         return delegate.removeProject(projectName).thenAccept(unused -> {
117             synchronized (latestKnownRevisions) {
118                 latestKnownRevisions.entrySet().removeIf(e -> e.getKey().projectName.equals(projectName));
119             }
120         });
121     }
122 
123     @Override
124     public CompletableFuture<Void> purgeProject(String projectName) {
125         return delegate.purgeProject(projectName);
126     }
127 
128     @Override
129     public CompletableFuture<Void> unremoveProject(String projectName) {
130         return delegate.unremoveProject(projectName);
131     }
132 
133     @Override
134     public CompletableFuture<Set<String>> listProjects() {
135         return delegate.listProjects();
136     }
137 
138     @Override
139     public CompletableFuture<Set<String>> listRemovedProjects() {
140         return delegate.listRemovedProjects();
141     }
142 
143     @Override
144     public CompletableFuture<CentralDogmaRepository> createRepository(String projectName,
145                                                                       String repositoryName) {
146         return delegate.createRepository(projectName, repositoryName);
147     }
148 
149     @Override
150     public CompletableFuture<Void> removeRepository(String projectName, String repositoryName) {
151         return delegate.removeRepository(projectName, repositoryName).thenAccept(unused -> {
152             synchronized (latestKnownRevisions) {
153                 latestKnownRevisions.remove(new RepoId(projectName, repositoryName));
154             }
155         });
156     }
157 
158     @Override
159     public CompletableFuture<Void> purgeRepository(String projectName, String repositoryName) {
160         return delegate.purgeRepository(projectName, repositoryName);
161     }
162 
163     @Override
164     public CompletableFuture<CentralDogmaRepository> unremoveRepository(String projectName,
165                                                                         String repositoryName) {
166         return delegate.unremoveRepository(projectName, repositoryName);
167     }
168 
169     @Override
170     public CompletableFuture<Map<String, RepositoryInfo>> listRepositories(String projectName) {
171         return executeWithRetries(
172                 new Supplier<CompletableFuture<Map<String, RepositoryInfo>>>() {
173                     @Override
174                     public CompletableFuture<Map<String, RepositoryInfo>> get() {
175                         return delegate.listRepositories(projectName);
176                     }
177 
178                     @Override
179                     public String toString() {
180                         return "listRepositories(" + projectName + ')';
181                     }
182                 },
183                 (res, cause) -> {
184                     if (res != null) {
185                         for (RepositoryInfo info : res.values()) {
186                             if (!updateLatestKnownRevision(projectName, info.name(), info.headRevision())) {
187                                 return true;
188                             }
189                         }
190                     }
191                     return false;
192                 });
193     }
194 
195     @Override
196     public CompletableFuture<Set<String>> listRemovedRepositories(String projectName) {
197         return delegate.listRemovedRepositories(projectName);
198     }
199 
200     @Override
201     public CompletableFuture<Revision> normalizeRevision(
202             String projectName, String repositoryName, Revision revision) {
203         return executeWithRetries(
204                 new Supplier<CompletableFuture<Revision>>() {
205                     @Override
206                     public CompletableFuture<Revision> get() {
207                         return delegate.normalizeRevision(projectName, repositoryName, revision);
208                     }
209 
210                     @Override
211                     public String toString() {
212                         return "normalizeRevision(" + projectName + ", " + repositoryName + ", " +
213                                revision + ')';
214                     }
215                 },
216                 (res, cause) -> {
217                     if (cause != null) {
218                         return handleRevisionNotFound(projectName, repositoryName, revision, cause);
219                     }
220 
221                     if (revision.isRelative()) {
222                         final Revision headRevision = res.forward(-(revision.major() + 1));
223                         return !updateLatestKnownRevision(projectName, repositoryName, headRevision);
224                     }
225 
226                     updateLatestKnownRevision(projectName, repositoryName, revision);
227                     return false;
228                 });
229     }
230 
231     @Override
232     public CompletableFuture<Map<String, EntryType>> listFiles(
233             String projectName, String repositoryName, Revision revision, PathPattern pathPattern) {
234         return normalizeRevisionAndExecuteWithRetries(
235                 projectName, repositoryName, revision,
236                 new Function<Revision, CompletableFuture<Map<String, EntryType>>>() {
237                     @Override
238                     public CompletableFuture<Map<String, EntryType>> apply(Revision normRev) {
239                         return delegate.listFiles(projectName, repositoryName, normRev, pathPattern);
240                     }
241 
242                     @Override
243                     public String toString() {
244                         return "listFiles(" + projectName + ", " + repositoryName + ", " +
245                                revision + ", " + pathPattern + ')';
246                     }
247                 });
248     }
249 
250     @Override
251     public <T> CompletableFuture<Entry<T>> getFile(
252             String projectName, String repositoryName, Revision revision, Query<T> query,
253             boolean viewRaw, boolean renderTemplate, @Nullable String variableFile) {
254         return normalizeRevisionAndExecuteWithRetries(
255                 projectName, repositoryName, revision,
256                 new Function<>() {
257                     @Override
258                     public CompletableFuture<Entry<T>> apply(Revision normRev) {
259                         return delegate.getFile(projectName, repositoryName, normRev, query,
260                                                 viewRaw, renderTemplate, variableFile);
261                     }
262 
263                     @Override
264                     public String toString() {
265                         return "getFile(" + projectName + ", " + repositoryName + ", " +
266                                revision + ", " + query + ", " + viewRaw + ", " + renderTemplate + ", " +
267                                variableFile + ')';
268                     }
269                 });
270     }
271 
272     @Override
273     public CompletableFuture<Map<String, Entry<?>>> getFiles(
274             String projectName, String repositoryName, Revision revision, PathPattern pathPattern,
275             boolean viewRaw, boolean renderTemplate, @Nullable String variableFile) {
276         return normalizeRevisionAndExecuteWithRetries(
277                 projectName, repositoryName, revision,
278                 new Function<>() {
279                     @Override
280                     public CompletableFuture<Map<String, Entry<?>>> apply(Revision normRev) {
281                         return delegate.getFiles(projectName, repositoryName, normRev, pathPattern,
282                                                  viewRaw, renderTemplate, variableFile);
283                     }
284 
285                     @Override
286                     public String toString() {
287                         return "getFiles(" + projectName + ", " + repositoryName + ", " +
288                                revision + ", " + pathPattern + ", " + viewRaw + ", " + renderTemplate + ", " +
289                                variableFile + ')';
290                     }
291                 });
292     }
293 
294     @Override
295     public <T> CompletableFuture<MergedEntry<T>> mergeFiles(
296             String projectName, String repositoryName, Revision revision,
297             MergeQuery<T> mergeQuery) {
298         return normalizeRevisionAndExecuteWithRetries(
299                 projectName, repositoryName, revision,
300                 new Function<>() {
301                     @Override
302                     public CompletableFuture<MergedEntry<T>> apply(Revision normRev) {
303                         return delegate.mergeFiles(projectName, repositoryName, normRev, mergeQuery);
304                     }
305 
306                     @Override
307                     public String toString() {
308                         return "mergeFiles(" + projectName + ", " + repositoryName + ", " +
309                                revision + ", " + mergeQuery + ')';
310                     }
311                 });
312     }
313 
314     @Override
315     public CompletableFuture<List<Commit>> getHistory(
316             String projectName, String repositoryName, Revision from,
317             Revision to, PathPattern pathPattern, int maxCommits) {
318         return normalizeRevisionsAndExecuteWithRetries(
319                 projectName, repositoryName, from, to,
320                 new BiFunction<>() {
321                     @Override
322                     public CompletableFuture<List<Commit>> apply(Revision normFromRev, Revision normToRev) {
323                         return delegate.getHistory(projectName, repositoryName,
324                                                    normFromRev, normToRev, pathPattern, maxCommits);
325                     }
326 
327                     @Override
328                     public String toString() {
329                         return "getHistory(" + projectName + ", " + repositoryName + ", " +
330                                from + ", " + to + ", " + pathPattern + ", " + maxCommits + ')';
331                     }
332                 });
333     }
334 
335     @Override
336     public <T> CompletableFuture<Change<T>> getDiff(
337             String projectName, String repositoryName, Revision from, Revision to, Query<T> query) {
338         return normalizeRevisionsAndExecuteWithRetries(
339                 projectName, repositoryName, from, to,
340                 new BiFunction<Revision, Revision, CompletableFuture<Change<T>>>() {
341                     @Override
342                     public CompletableFuture<Change<T>> apply(Revision normFromRev, Revision normToRev) {
343                         return delegate.getDiff(projectName, repositoryName,
344                                                 normFromRev, normToRev, query);
345                     }
346 
347                     @Override
348                     public String toString() {
349                         return "getDiff(" + projectName + ", " + repositoryName + ", " +
350                                from + ", " + to + ", " + query + ')';
351                     }
352                 });
353     }
354 
355     @Override
356     public CompletableFuture<List<Change<?>>> getDiff(
357             String projectName, String repositoryName, Revision from, Revision to, PathPattern pathPattern) {
358         return normalizeRevisionsAndExecuteWithRetries(
359                 projectName, repositoryName, from, to,
360                 new BiFunction<Revision, Revision, CompletableFuture<List<Change<?>>>>() {
361                     @Override
362                     public CompletableFuture<List<Change<?>>> apply(Revision normFromRev, Revision normToRev) {
363                         return delegate.getDiff(projectName, repositoryName,
364                                                 normFromRev, normToRev, pathPattern);
365                     }
366 
367                     @Override
368                     public String toString() {
369                         return "getDiffs(" + projectName + ", " + repositoryName + ", " +
370                                from + ", " + to + ", " + pathPattern + ')';
371                     }
372                 });
373     }
374 
375     @Override
376     public CompletableFuture<List<Change<?>>> getPreviewDiffs(
377             String projectName, String repositoryName, Revision baseRevision,
378             Iterable<? extends Change<?>> changes) {
379         return normalizeRevisionAndExecuteWithRetries(
380                 projectName, repositoryName, baseRevision,
381                 new Function<Revision, CompletableFuture<List<Change<?>>>>() {
382                     @Override
383                     public CompletableFuture<List<Change<?>>> apply(Revision normBaseRev) {
384                         return delegate.getPreviewDiffs(projectName, repositoryName, normBaseRev, changes);
385                     }
386 
387                     @Override
388                     public String toString() {
389                         return "getPreviewDiffs(" + projectName + ", " + repositoryName + ", " +
390                                baseRevision + ", ...)";
391                     }
392                 });
393     }
394 
395     @Override
396     public CompletableFuture<PushResult> push(
397             String projectName, String repositoryName, Revision baseRevision,
398             String summary, String detail, Markup markup, Iterable<? extends Change<?>> changes) {
399         return executeWithRetries(
400                 new Supplier<CompletableFuture<PushResult>>() {
401                     @Override
402                     public CompletableFuture<PushResult> get() {
403                         return delegate.push(projectName, repositoryName, baseRevision,
404                                              summary, detail, markup, changes);
405                     }
406 
407                     @Override
408                     public String toString() {
409                         return "push(" + projectName + ", " + repositoryName + ", " +
410                                baseRevision + ", " + summary + ", ...)";
411                     }
412                 },
413                 pushRetryPredicate(projectName, repositoryName, baseRevision));
414     }
415 
416     @Override
417     public CompletableFuture<PushResult> push(
418             String projectName, String repositoryName, Revision baseRevision,
419             Author author, String summary, String detail, Markup markup,
420             Iterable<? extends Change<?>> changes) {
421         return executeWithRetries(
422                 new Supplier<CompletableFuture<PushResult>>() {
423                     @Override
424                     public CompletableFuture<PushResult> get() {
425                         return delegate.push(projectName, repositoryName, baseRevision,
426                                              author, summary, detail, markup, changes);
427                     }
428 
429                     @Override
430                     public String toString() {
431                         return "push(" + projectName + ", " + repositoryName + ", " +
432                                baseRevision + ", " + summary + ", ...)";
433                     }
434                 },
435                 pushRetryPredicate(projectName, repositoryName, baseRevision));
436     }
437 
438     private BiPredicate<PushResult, Throwable> pushRetryPredicate(
439             String projectName, String repositoryName, Revision baseRevision) {
440 
441         return (res, cause) -> {
442             if (cause != null) {
443                 return handleRevisionNotFound(projectName, repositoryName, baseRevision, cause);
444             }
445 
446             updateLatestKnownRevision(projectName, repositoryName, res.revision());
447             return false;
448         };
449     }
450 
451     @Override
452     public CompletableFuture<Revision> watchRepository(
453             String projectName, String repositoryName, Revision lastKnownRevision,
454             PathPattern pathPattern, long timeoutMillis, boolean errorOnEntryNotFound) {
455 
456         return normalizeRevisionAndExecuteWithRetries(
457                 projectName, repositoryName, lastKnownRevision,
458                 new Function<Revision, CompletableFuture<Revision>>() {
459                     @Override
460                     public CompletableFuture<Revision> apply(Revision normLastKnownRevision) {
461                         return delegate.watchRepository(projectName, repositoryName, normLastKnownRevision,
462                                                         pathPattern, timeoutMillis, errorOnEntryNotFound)
463                                        .thenApply(newLastKnownRevision -> {
464                                            if (newLastKnownRevision != null) {
465                                                updateLatestKnownRevision(projectName, repositoryName,
466                                                                          newLastKnownRevision);
467                                            }
468                                            return newLastKnownRevision;
469                                        });
470                     }
471 
472                     @Override
473                     public String toString() {
474                         return "watchRepository(" + projectName + ", " + repositoryName + ", " +
475                                lastKnownRevision + ", " + pathPattern + ", " + timeoutMillis + ", " +
476                                errorOnEntryNotFound + ')';
477                     }
478                 });
479     }
480 
481     @Override
482     public <T> CompletableFuture<Entry<T>> watchFile(
483             String projectName, String repositoryName, Revision lastKnownRevision,
484             Query<T> query, long timeoutMillis, boolean errorOnEntryNotFound,
485             boolean viewRaw, boolean renderTemplate, @Nullable String variableFile,
486             @Nullable Revision templateRevision) {
487 
488         return normalizeRevisionAndExecuteWithRetries(
489                 projectName, repositoryName, lastKnownRevision,
490                 new Function<>() {
491                     @Override
492                     public CompletableFuture<Entry<T>> apply(Revision normLastKnownRevision) {
493                         return delegate.watchFile(projectName, repositoryName, normLastKnownRevision,
494                                                   query, timeoutMillis, errorOnEntryNotFound,
495                                                   viewRaw, renderTemplate, variableFile, templateRevision)
496                                        .thenApply(entry -> {
497                                            if (entry != null) {
498                                                updateLatestKnownRevision(projectName, repositoryName,
499                                                                          entry.revision());
500                                            }
501                                            return entry;
502                                        });
503                     }
504 
505                     @Override
506                     public String toString() {
507                         return "watchFile(" + projectName + ", " + repositoryName + ", " +
508                                lastKnownRevision + ", " + query + ", " + timeoutMillis + ", " +
509                                errorOnEntryNotFound + ", " + viewRaw + ", " + renderTemplate + ", " +
510                                variableFile + ", " + templateRevision + ')';
511                     }
512                 });
513     }
514 
515     @Override
516     public CompletableFuture<Void> whenEndpointReady() {
517         return delegate.whenEndpointReady();
518     }
519 
520     /**
521      * Normalizes the given {@link Revision} and then executes the task by calling {@code taskRunner.apply()}
522      * with the normalized {@link Revision}. The task can be executed repetitively when the task failed with
523      * a {@link RevisionNotFoundException} for the {@link Revision} which is known to exist.
524      */
525     private <T> CompletableFuture<T> normalizeRevisionAndExecuteWithRetries(
526             String projectName, String repositoryName, Revision revision,
527             Function<Revision, CompletableFuture<T>> taskRunner) {
528         return normalizeRevision(projectName, repositoryName, revision)
529                 .thenCompose(normRev -> executeWithRetries(
530                         new Supplier<>() {
531                             @Override
532                             public CompletableFuture<T> get() {
533                                 return taskRunner.apply(normRev);
534                             }
535 
536                             @Override
537                             public String toString() {
538                                 return taskRunner + " with " + normRev;
539                             }
540                         },
541                         (res, cause) -> cause != null &&
542                                         handleRevisionNotFound(projectName, repositoryName, normRev, cause)));
543     }
544 
545     /**
546      * Normalizes the given {@link Revision} range and then executes the task by calling
547      * {@code taskRunner.apply()} with the normalized {@link Revision} range. The task can be executed
548      * repetitively when the task failed with a {@link RevisionNotFoundException} for the {@link Revision}
549      * range which is known to exist.
550      */
551     private <T> CompletableFuture<T> normalizeRevisionsAndExecuteWithRetries(
552             String projectName, String repositoryName, Revision from, Revision to,
553             BiFunction<Revision, Revision, CompletableFuture<T>> taskRunner) {
554 
555         if (to == null) {
556             return exceptionallyCompletedFuture(new NullPointerException("to"));
557         }
558 
559         if (from == null) {
560             return exceptionallyCompletedFuture(new NullPointerException("from"));
561         }
562 
563         if (from.isRelative() && to.isRelative() ||
564             !from.isRelative() && !to.isRelative()) {
565 
566             // When both revisions are absolute or both revisions are relative,
567             // we can call normalizeRevision() only once and guess the other revision from the distance.
568             final int distance = to.major() - from.major();
569             final Revision baseRevision = to.compareTo(from) >= 0 ? to : from;
570 
571             return normalizeRevision(projectName, repositoryName, baseRevision).thenCompose(normBaseRev -> {
572                 final Revision normFromRev;
573                 final Revision normToRev;
574                 if (distance >= 0) {
575                     normToRev = normBaseRev;
576                     normFromRev = normBaseRev.backward(distance);
577                 } else {
578                     normFromRev = normBaseRev;
579                     normToRev = normBaseRev.backward(-distance);
580                 }
581 
582                 return executeWithRetries(
583                         new Supplier<CompletableFuture<T>>() {
584                             @Override
585                             public CompletableFuture<T> get() {
586                                 return taskRunner.apply(normFromRev, normToRev);
587                             }
588 
589                             @Override
590                             public String toString() {
591                                 return taskRunner + " with [" + normFromRev + ", " + normToRev + ']';
592                             }
593                         },
594                         (res, cause) -> {
595                             if (cause == null) {
596                                 return false;
597                             }
598                             return handleRevisionNotFound(projectName, repositoryName, normBaseRev, cause);
599                         });
600             });
601         } else {
602             // When one revision is absolute and the other is relative, we have to normalize both revisions
603             // because it is impossible to know the distance between them and which is a newer revision.
604             return CompletableFutures.allAsList(ImmutableList.of(
605                     normalizeRevision(projectName, repositoryName, from),
606                     normalizeRevision(projectName, repositoryName, to))).thenCompose(normRevs -> {
607                 final Revision normFromRev = normRevs.get(0);
608                 final Revision normToRev = normRevs.get(1);
609                 return executeWithRetries(
610                         new Supplier<CompletableFuture<T>>() {
611                             @Override
612                             public CompletableFuture<T> get() {
613                                 return taskRunner.apply(normFromRev, normToRev);
614                             }
615 
616                             @Override
617                             public String toString() {
618                                 return taskRunner + " with [" + normFromRev + ", " + normToRev + ']';
619                             }
620                         },
621                         (res, cause) -> {
622                             if (cause == null) {
623                                 return false;
624                             }
625 
626                             final Revision normBaseRev = normFromRev.compareTo(normToRev) > 0 ? normFromRev
627                                                                                               : normToRev;
628                             return handleRevisionNotFound(projectName, repositoryName, normBaseRev, cause);
629                         });
630             });
631         }
632     }
633 
634     /**
635      * Executes the task by calling {@code taskRunner.get()} and re-executes the task if {@code retryPredicated}
636      * returns {@code true}. This method is used as a building block for sending a request repetitively
637      * when the request has failed due to replication lag.
638      */
639     private <T> CompletableFuture<T> executeWithRetries(
640             Supplier<CompletableFuture<T>> taskRunner,
641             BiPredicate<T, Throwable> retryPredicate) {
642         return executeWithRetries(taskRunner, retryPredicate, 0);
643     }
644 
645     private <T> CompletableFuture<T> executeWithRetries(
646             Supplier<CompletableFuture<T>> taskRunner,
647             BiPredicate<T, Throwable> retryPredicate,
648             int attemptsSoFar) {
649 
650         return CompletableFutures.handleCompose(taskRunner.get(), (res, cause) -> {
651             final Object currentReplicaHint = currentReplicaHintSupplier.get();
652             final int nextAttemptsSoFar = attemptsSoFar + 1;
653             final boolean retryRequired = retryPredicate.test(res, cause);
654             if (!retryRequired || nextAttemptsSoFar > maxRetries) {
655                 if (retryRequired) {
656                     if (currentReplicaHint != null) {
657                         logger.warn("[{}] Failed to retrieve the up-to-date data from Central Dogma " +
658                                     "after {} retries: {} => {}",
659                                     currentReplicaHint, attemptsSoFar, taskRunner, resultOrCause(res, cause));
660                     } else {
661                         logger.warn("Failed to retrieve the up-to-date data from Central Dogma " +
662                                     "after {} retries: {} => {}",
663                                     attemptsSoFar, taskRunner, resultOrCause(res, cause));
664                     }
665                 } else if (logger.isDebugEnabled()) {
666                     if (currentReplicaHint != null) {
667                         logger.debug("[{}] Retrieved the up-to-date data after {} retries: {} => {}",
668                                      currentReplicaHint, attemptsSoFar, taskRunner, resultOrCause(res, cause));
669                     } else {
670                         logger.debug("Retrieved the up-to-date data after {} retries: {} => {}",
671                                      attemptsSoFar, taskRunner, resultOrCause(res, cause));
672                     }
673                 }
674 
675                 if (cause == null) {
676                     return completedFuture(res);
677                 } else {
678                     return exceptionallyCompletedFuture(cause);
679                 }
680             }
681 
682             if (logger.isDebugEnabled()) {
683                 if (currentReplicaHint != null) {
684                     logger.debug("[{}] Got the out-of-date data ({} attempt(s) so far): {} => {}",
685                                  currentReplicaHint, nextAttemptsSoFar, taskRunner, resultOrCause(res, cause));
686                 } else {
687                     logger.debug("Got the out-of-date data ({} attempt(s) so far): {} => {}",
688                                  nextAttemptsSoFar, taskRunner, resultOrCause(res, cause));
689                 }
690             }
691 
692             final CompletableFuture<T> nextAttemptFuture = new CompletableFuture<>();
693             executor().schedule(() -> {
694                 try {
695                     executeWithRetries(taskRunner, retryPredicate,
696                                        nextAttemptsSoFar).handle((newRes, newCause) -> {
697                         if (newCause != null) {
698                             nextAttemptFuture.completeExceptionally(newCause);
699                         } else {
700                             nextAttemptFuture.complete(newRes);
701                         }
702                         return null;
703                     });
704                 } catch (Throwable t) {
705                     nextAttemptFuture.completeExceptionally(t);
706                 }
707             }, retryIntervalMillis, TimeUnit.MILLISECONDS);
708 
709             return nextAttemptFuture;
710         }).toCompletableFuture();
711     }
712 
713     /**
714      * Returns the cause of the specified {@code throwable} peeling it recursively, if it is one of the
715      * {@link CompletionException}, {@link ExecutionException}. Otherwise returns the {@code throwable}.
716      */
717     private static Throwable peel(Throwable throwable) {
718         Throwable cause = throwable.getCause();
719         while (cause != null && cause != throwable &&
720                (throwable instanceof CompletionException || throwable instanceof ExecutionException)) {
721             throwable = cause;
722             cause = throwable.getCause();
723         }
724         return throwable;
725     }
726 
727     /**
728      * Returns {@code true} to indicate that the request must be retried if {@code cause} is
729      * a {@link RevisionNotFoundException} and the specified {@link Revision} is supposed to exist.
730      */
731     private boolean handleRevisionNotFound(
732             String projectName, String repositoryName, Revision revision, Throwable cause) {
733         requireNonNull(cause, "cause");
734         cause = peel(cause);
735         if (!(cause instanceof RevisionNotFoundException)) {
736             return false;
737         }
738 
739         final Revision latestKnownRevision = latestKnownRevision(projectName, repositoryName);
740         if (latestKnownRevision == null) {
741             return false;
742         }
743 
744         if (revision.isRelative()) {
745             return revision.major() + latestKnownRevision.major() >= 0;
746         } else {
747             return revision.major() <= latestKnownRevision.major();
748         }
749     }
750 
751     @Nullable
752     @VisibleForTesting
753     Revision latestKnownRevision(String projectName, String repositoryName) {
754         synchronized (latestKnownRevisions) {
755             return latestKnownRevisions.get(new RepoId(projectName, repositoryName));
756         }
757     }
758 
759     /**
760      * Updates the latest known revision for the specified repository.
761      *
762      * @return {@code true} if the latest known revision has been updated to the specified {@link Revision} or
763      *         the latest known revision is equal to the specified {@link Revision}. {@code false} otherwise.
764      */
765     private boolean updateLatestKnownRevision(String projectName, String repositoryName, Revision newRevision) {
766         final Object currentReplicaHint = currentReplicaHintSupplier.get();
767         final RepoId id = new RepoId(projectName, repositoryName);
768         synchronized (latestKnownRevisions) {
769             final Revision oldRevision = latestKnownRevisions.get(id);
770             if (oldRevision == null) {
771                 if (currentReplicaHint != null) {
772                     logger.debug("[{}] Updating the latest known revision for {}/{} from <unknown> to: {}",
773                                  currentReplicaHint, projectName, repositoryName, newRevision);
774                 } else {
775                     logger.debug("Updating the latest known revision for {}/{} from <unknown> to: {}",
776                                  projectName, repositoryName, newRevision);
777                 }
778                 latestKnownRevisions.put(id, newRevision);
779                 return true;
780             }
781 
782             final int comparison = oldRevision.compareTo(newRevision);
783             if (comparison < 0) {
784                 if (currentReplicaHint != null) {
785                     logger.debug("[{}] Updating the latest known revision for {}/{} from {} to: {}",
786                                  currentReplicaHint, projectName, repositoryName, oldRevision, newRevision);
787                 } else {
788                     logger.debug("Updating the latest known revision for {}/{} from {} to: {}",
789                                  projectName, repositoryName, oldRevision, newRevision);
790                 }
791                 latestKnownRevisions.put(id, newRevision);
792                 return true;
793             }
794 
795             if (comparison == 0) {
796                 if (currentReplicaHint != null) {
797                     logger.debug("[{}] The latest known revision for {}/{} stays unchanged at: {}",
798                                  currentReplicaHint, projectName, repositoryName, newRevision);
799                 } else {
800                     logger.debug("The latest known revision for {}/{} stays unchanged at: {}",
801                                  projectName, repositoryName, newRevision);
802                 }
803                 return true;
804             }
805 
806             if (currentReplicaHint != null) {
807                 logger.debug("[{}] An out-of-date latest known revision for {}/{}: {}",
808                              currentReplicaHint, projectName, repositoryName, newRevision);
809             } else {
810                 logger.debug("An out-of-date latest known revision for {}/{}: {}",
811                              projectName, repositoryName, newRevision);
812             }
813             return false;
814         }
815     }
816 
817     @Nullable
818     private static Object resultOrCause(@Nullable Object res, @Nullable Throwable cause) {
819         return res != null ? res : cause;
820     }
821 
822     @Override
823     public void close() throws Exception {
824         delegate.close();
825     }
826 
827     @VisibleForTesting
828     static final class RepoId {
829         final String projectName;
830         final String repositoryName;
831 
832         RepoId(String projectName, String repositoryName) {
833             this.projectName = projectName;
834             this.repositoryName = repositoryName;
835         }
836 
837         @Override
838         public boolean equals(Object o) {
839             if (this == o) {
840                 return true;
841             }
842             if (!(o instanceof RepoId)) {
843                 return false;
844             }
845             final RepoId that = (RepoId) o;
846             return projectName.equals(that.projectName) && repositoryName.equals(that.repositoryName);
847         }
848 
849         @Override
850         public int hashCode() {
851             return Objects.hash(projectName, repositoryName);
852         }
853 
854         @Override
855         public String toString() {
856             return projectName + '/' + repositoryName;
857         }
858     }
859 }