1   /*
2    * Copyright 2019 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.internal.client;
17  
18  import static com.google.common.base.Preconditions.checkArgument;
19  import static com.spotify.futures.CompletableFutures.exceptionallyCompletedFuture;
20  import static java.util.Objects.requireNonNull;
21  import static java.util.concurrent.CompletableFuture.completedFuture;
22  
23  import java.util.LinkedHashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Objects;
27  import java.util.Set;
28  import java.util.concurrent.CompletableFuture;
29  import java.util.concurrent.CompletionException;
30  import java.util.concurrent.ExecutionException;
31  import java.util.concurrent.ScheduledExecutorService;
32  import java.util.concurrent.TimeUnit;
33  import java.util.function.BiFunction;
34  import java.util.function.BiPredicate;
35  import java.util.function.Function;
36  import java.util.function.Supplier;
37  
38  import javax.annotation.Nullable;
39  
40  import org.slf4j.Logger;
41  import org.slf4j.LoggerFactory;
42  
43  import com.google.common.annotations.VisibleForTesting;
44  import com.google.common.collect.ImmutableList;
45  import com.spotify.futures.CompletableFutures;
46  
47  import com.linecorp.centraldogma.client.AbstractCentralDogma;
48  import com.linecorp.centraldogma.client.CentralDogma;
49  import com.linecorp.centraldogma.client.CentralDogmaRepository;
50  import com.linecorp.centraldogma.client.RepositoryInfo;
51  import com.linecorp.centraldogma.common.Author;
52  import com.linecorp.centraldogma.common.Change;
53  import com.linecorp.centraldogma.common.Commit;
54  import com.linecorp.centraldogma.common.Entry;
55  import com.linecorp.centraldogma.common.EntryType;
56  import com.linecorp.centraldogma.common.Markup;
57  import com.linecorp.centraldogma.common.MergeQuery;
58  import com.linecorp.centraldogma.common.MergedEntry;
59  import com.linecorp.centraldogma.common.PathPattern;
60  import com.linecorp.centraldogma.common.PushResult;
61  import com.linecorp.centraldogma.common.Query;
62  import com.linecorp.centraldogma.common.Revision;
63  import com.linecorp.centraldogma.common.RevisionNotFoundException;
64  
65  import io.micrometer.core.instrument.MeterRegistry;
66  
67  /**
68   * A {@link CentralDogma} client that retries the request automatically when a {@link RevisionNotFoundException}
69   * was raised but it is certain that a given {@link Revision} exists.
70   */
71  public final class ReplicationLagTolerantCentralDogma extends AbstractCentralDogma {
72  
73      private static final Logger logger =
74              LoggerFactory.getLogger(ReplicationLagTolerantCentralDogma.class);
75  
76      private final CentralDogma delegate;
77      private final int maxRetries;
78      private final long retryIntervalMillis;
79      private final Supplier<?> currentReplicaHintSupplier;
80  
81      @VisibleForTesting
82      final Map<RepoId, Revision> latestKnownRevisions = new LinkedHashMap<RepoId, Revision>() {
83          private static final long serialVersionUID = 3587793379404809027L;
84  
85          @Override
86          protected boolean removeEldestEntry(Map.Entry<RepoId, Revision> eldest) {
87              // Keep only up to 8192 repositories, which should be enough for almost all cases.
88              return size() > 8192;
89          }
90      };
91  
92      public ReplicationLagTolerantCentralDogma(ScheduledExecutorService blockingTaskExecutor,
93                                                CentralDogma delegate, int maxRetries, long retryIntervalMillis,
94                                                Supplier<?> currentReplicaHintSupplier,
95                                                @Nullable MeterRegistry meterRegistry) {
96          super(blockingTaskExecutor, meterRegistry);
97  
98          requireNonNull(delegate, "delegate");
99          checkArgument(maxRetries > 0, "maxRetries: %s (expected: > 0)", maxRetries);
100         checkArgument(retryIntervalMillis >= 0,
101                       "retryIntervalMillis: %s (expected: >= 0)", retryIntervalMillis);
102         requireNonNull(currentReplicaHintSupplier, "currentReplicaHintSupplier");
103 
104         this.delegate = delegate;
105         this.maxRetries = maxRetries;
106         this.retryIntervalMillis = retryIntervalMillis;
107         this.currentReplicaHintSupplier = currentReplicaHintSupplier;
108     }
109 
110     @Override
111     public CompletableFuture<Void> createProject(String projectName) {
112         return delegate.createProject(projectName);
113     }
114 
115     @Override
116     public CompletableFuture<Void> removeProject(String projectName) {
117         return delegate.removeProject(projectName).thenAccept(unused -> {
118             synchronized (latestKnownRevisions) {
119                 latestKnownRevisions.entrySet().removeIf(e -> e.getKey().projectName.equals(projectName));
120             }
121         });
122     }
123 
124     @Override
125     public CompletableFuture<Void> purgeProject(String projectName) {
126         return delegate.purgeProject(projectName);
127     }
128 
129     @Override
130     public CompletableFuture<Void> unremoveProject(String projectName) {
131         return delegate.unremoveProject(projectName);
132     }
133 
134     @Override
135     public CompletableFuture<Set<String>> listProjects() {
136         return delegate.listProjects();
137     }
138 
139     @Override
140     public CompletableFuture<Set<String>> listRemovedProjects() {
141         return delegate.listRemovedProjects();
142     }
143 
144     @Override
145     public CompletableFuture<CentralDogmaRepository> createRepository(String projectName,
146                                                                       String repositoryName) {
147         return delegate.createRepository(projectName, repositoryName);
148     }
149 
150     @Override
151     public CompletableFuture<Void> removeRepository(String projectName, String repositoryName) {
152         return delegate.removeRepository(projectName, repositoryName).thenAccept(unused -> {
153             synchronized (latestKnownRevisions) {
154                 latestKnownRevisions.remove(new RepoId(projectName, repositoryName));
155             }
156         });
157     }
158 
159     @Override
160     public CompletableFuture<Void> purgeRepository(String projectName, String repositoryName) {
161         return delegate.purgeRepository(projectName, repositoryName);
162     }
163 
164     @Override
165     public CompletableFuture<CentralDogmaRepository> unremoveRepository(String projectName,
166                                                                         String repositoryName) {
167         return delegate.unremoveRepository(projectName, repositoryName);
168     }
169 
170     @Override
171     public CompletableFuture<Map<String, RepositoryInfo>> listRepositories(String projectName) {
172         return executeWithRetries(
173                 new Supplier<CompletableFuture<Map<String, RepositoryInfo>>>() {
174                     @Override
175                     public CompletableFuture<Map<String, RepositoryInfo>> get() {
176                         return delegate.listRepositories(projectName);
177                     }
178 
179                     @Override
180                     public String toString() {
181                         return "listRepositories(" + projectName + ')';
182                     }
183                 },
184                 (res, cause) -> {
185                     if (res != null) {
186                         for (RepositoryInfo info : res.values()) {
187                             if (!updateLatestKnownRevision(projectName, info.name(), info.headRevision())) {
188                                 return true;
189                             }
190                         }
191                     }
192                     return false;
193                 });
194     }
195 
196     @Override
197     public CompletableFuture<Set<String>> listRemovedRepositories(String projectName) {
198         return delegate.listRemovedRepositories(projectName);
199     }
200 
201     @Override
202     public CompletableFuture<Revision> normalizeRevision(
203             String projectName, String repositoryName, Revision revision) {
204         return executeWithRetries(
205                 new Supplier<CompletableFuture<Revision>>() {
206                     @Override
207                     public CompletableFuture<Revision> get() {
208                         return delegate.normalizeRevision(projectName, repositoryName, revision);
209                     }
210 
211                     @Override
212                     public String toString() {
213                         return "normalizeRevision(" + projectName + ", " + repositoryName + ", " +
214                                revision + ')';
215                     }
216                 },
217                 (res, cause) -> {
218                     if (cause != null) {
219                         return handleRevisionNotFound(projectName, repositoryName, revision, cause);
220                     }
221 
222                     if (revision.isRelative()) {
223                         final Revision headRevision = res.forward(-(revision.major() + 1));
224                         return !updateLatestKnownRevision(projectName, repositoryName, headRevision);
225                     }
226 
227                     updateLatestKnownRevision(projectName, repositoryName, revision);
228                     return false;
229                 });
230     }
231 
232     @Override
233     public CompletableFuture<Map<String, EntryType>> listFiles(
234             String projectName, String repositoryName, Revision revision, PathPattern pathPattern) {
235         return normalizeRevisionAndExecuteWithRetries(
236                 projectName, repositoryName, revision,
237                 new Function<Revision, CompletableFuture<Map<String, EntryType>>>() {
238                     @Override
239                     public CompletableFuture<Map<String, EntryType>> apply(Revision normRev) {
240                         return delegate.listFiles(projectName, repositoryName, normRev, pathPattern);
241                     }
242 
243                     @Override
244                     public String toString() {
245                         return "listFiles(" + projectName + ", " + repositoryName + ", " +
246                                revision + ", " + pathPattern + ')';
247                     }
248                 });
249     }
250 
251     @Override
252     public <T> CompletableFuture<Entry<T>> getFile(
253             String projectName, String repositoryName, Revision revision, Query<T> query, boolean viewRaw) {
254         return normalizeRevisionAndExecuteWithRetries(
255                 projectName, repositoryName, revision,
256                 new Function<Revision, CompletableFuture<Entry<T>>>() {
257                     @Override
258                     public CompletableFuture<Entry<T>> apply(Revision normRev) {
259                         return delegate.getFile(projectName, repositoryName, normRev, query, viewRaw);
260                     }
261 
262                     @Override
263                     public String toString() {
264                         return "getFile(" + projectName + ", " + repositoryName + ", " +
265                                revision + ", " + query + ", " + viewRaw + ')';
266                     }
267                 });
268     }
269 
270     @Override
271     public CompletableFuture<Map<String, Entry<?>>> getFiles(
272             String projectName, String repositoryName, Revision revision, PathPattern pathPattern,
273             boolean viewRaw) {
274         return normalizeRevisionAndExecuteWithRetries(
275                 projectName, repositoryName, revision,
276                 new Function<Revision, CompletableFuture<Map<String, Entry<?>>>>() {
277                     @Override
278                     public CompletableFuture<Map<String, Entry<?>>> apply(Revision normRev) {
279                         return delegate.getFiles(projectName, repositoryName, normRev, pathPattern, viewRaw);
280                     }
281 
282                     @Override
283                     public String toString() {
284                         return "getFiles(" + projectName + ", " + repositoryName + ", " +
285                                revision + ", " + pathPattern + ", " + viewRaw + ')';
286                     }
287                 });
288     }
289 
290     @Override
291     public <T> CompletableFuture<MergedEntry<T>> mergeFiles(
292             String projectName, String repositoryName, Revision revision,
293             MergeQuery<T> mergeQuery) {
294         return normalizeRevisionAndExecuteWithRetries(
295                 projectName, repositoryName, revision,
296                 new Function<Revision, CompletableFuture<MergedEntry<T>>>() {
297                     @Override
298                     public CompletableFuture<MergedEntry<T>> apply(Revision normRev) {
299                         return delegate.mergeFiles(projectName, repositoryName, normRev, mergeQuery);
300                     }
301 
302                     @Override
303                     public String toString() {
304                         return "mergeFiles(" + projectName + ", " + repositoryName + ", " +
305                                revision + ", " + mergeQuery + ')';
306                     }
307                 });
308     }
309 
310     @Override
311     public CompletableFuture<List<Commit>> getHistory(
312             String projectName, String repositoryName, Revision from,
313             Revision to, PathPattern pathPattern, int maxCommits) {
314         return normalizeRevisionsAndExecuteWithRetries(
315                 projectName, repositoryName, from, to,
316                 new BiFunction<Revision, Revision, CompletableFuture<List<Commit>>>() {
317                     @Override
318                     public CompletableFuture<List<Commit>> apply(Revision normFromRev, Revision normToRev) {
319                         return delegate.getHistory(projectName, repositoryName,
320                                                    normFromRev, normToRev, pathPattern, maxCommits);
321                     }
322 
323                     @Override
324                     public String toString() {
325                         return "getHistory(" + projectName + ", " + repositoryName + ", " +
326                                from + ", " + to + ", " + pathPattern + ", " + maxCommits + ')';
327                     }
328                 });
329     }
330 
331     @Override
332     public <T> CompletableFuture<Change<T>> getDiff(
333             String projectName, String repositoryName, Revision from, Revision to, Query<T> query) {
334         return normalizeRevisionsAndExecuteWithRetries(
335                 projectName, repositoryName, from, to,
336                 new BiFunction<Revision, Revision, CompletableFuture<Change<T>>>() {
337                     @Override
338                     public CompletableFuture<Change<T>> apply(Revision normFromRev, Revision normToRev) {
339                         return delegate.getDiff(projectName, repositoryName,
340                                                 normFromRev, normToRev, query);
341                     }
342 
343                     @Override
344                     public String toString() {
345                         return "getDiff(" + projectName + ", " + repositoryName + ", " +
346                                from + ", " + to + ", " + query + ')';
347                     }
348                 });
349     }
350 
351     @Override
352     public CompletableFuture<List<Change<?>>> getDiff(
353             String projectName, String repositoryName, Revision from, Revision to, PathPattern pathPattern) {
354         return normalizeRevisionsAndExecuteWithRetries(
355                 projectName, repositoryName, from, to,
356                 new BiFunction<Revision, Revision, CompletableFuture<List<Change<?>>>>() {
357                     @Override
358                     public CompletableFuture<List<Change<?>>> apply(Revision normFromRev, Revision normToRev) {
359                         return delegate.getDiff(projectName, repositoryName,
360                                                 normFromRev, normToRev, pathPattern);
361                     }
362 
363                     @Override
364                     public String toString() {
365                         return "getDiffs(" + projectName + ", " + repositoryName + ", " +
366                                from + ", " + to + ", " + pathPattern + ')';
367                     }
368                 });
369     }
370 
371     @Override
372     public CompletableFuture<List<Change<?>>> getPreviewDiffs(
373             String projectName, String repositoryName, Revision baseRevision,
374             Iterable<? extends Change<?>> changes) {
375         return normalizeRevisionAndExecuteWithRetries(
376                 projectName, repositoryName, baseRevision,
377                 new Function<Revision, CompletableFuture<List<Change<?>>>>() {
378                     @Override
379                     public CompletableFuture<List<Change<?>>> apply(Revision normBaseRev) {
380                         return delegate.getPreviewDiffs(projectName, repositoryName, normBaseRev, changes);
381                     }
382 
383                     @Override
384                     public String toString() {
385                         return "getPreviewDiffs(" + projectName + ", " + repositoryName + ", " +
386                                baseRevision + ", ...)";
387                     }
388                 });
389     }
390 
391     @Override
392     public CompletableFuture<PushResult> push(
393             String projectName, String repositoryName, Revision baseRevision,
394             String summary, String detail, Markup markup, Iterable<? extends Change<?>> changes) {
395         return executeWithRetries(
396                 new Supplier<CompletableFuture<PushResult>>() {
397                     @Override
398                     public CompletableFuture<PushResult> get() {
399                         return delegate.push(projectName, repositoryName, baseRevision,
400                                              summary, detail, markup, changes);
401                     }
402 
403                     @Override
404                     public String toString() {
405                         return "push(" + projectName + ", " + repositoryName + ", " +
406                                baseRevision + ", " + summary + ", ...)";
407                     }
408                 },
409                 pushRetryPredicate(projectName, repositoryName, baseRevision));
410     }
411 
412     @Override
413     public CompletableFuture<PushResult> push(
414             String projectName, String repositoryName, Revision baseRevision,
415             Author author, String summary, String detail, Markup markup,
416             Iterable<? extends Change<?>> changes) {
417         return executeWithRetries(
418                 new Supplier<CompletableFuture<PushResult>>() {
419                     @Override
420                     public CompletableFuture<PushResult> get() {
421                         return delegate.push(projectName, repositoryName, baseRevision,
422                                              author, summary, detail, markup, changes);
423                     }
424 
425                     @Override
426                     public String toString() {
427                         return "push(" + projectName + ", " + repositoryName + ", " +
428                                baseRevision + ", " + summary + ", ...)";
429                     }
430                 },
431                 pushRetryPredicate(projectName, repositoryName, baseRevision));
432     }
433 
434     private BiPredicate<PushResult, Throwable> pushRetryPredicate(
435             String projectName, String repositoryName, Revision baseRevision) {
436 
437         return (res, cause) -> {
438             if (cause != null) {
439                 return handleRevisionNotFound(projectName, repositoryName, baseRevision, cause);
440             }
441 
442             updateLatestKnownRevision(projectName, repositoryName, res.revision());
443             return false;
444         };
445     }
446 
447     @Override
448     public CompletableFuture<Revision> watchRepository(
449             String projectName, String repositoryName, Revision lastKnownRevision,
450             PathPattern pathPattern, long timeoutMillis, boolean errorOnEntryNotFound) {
451 
452         return normalizeRevisionAndExecuteWithRetries(
453                 projectName, repositoryName, lastKnownRevision,
454                 new Function<Revision, CompletableFuture<Revision>>() {
455                     @Override
456                     public CompletableFuture<Revision> apply(Revision normLastKnownRevision) {
457                         return delegate.watchRepository(projectName, repositoryName, normLastKnownRevision,
458                                                         pathPattern, timeoutMillis, errorOnEntryNotFound)
459                                        .thenApply(newLastKnownRevision -> {
460                                            if (newLastKnownRevision != null) {
461                                                updateLatestKnownRevision(projectName, repositoryName,
462                                                                          newLastKnownRevision);
463                                            }
464                                            return newLastKnownRevision;
465                                        });
466                     }
467 
468                     @Override
469                     public String toString() {
470                         return "watchRepository(" + projectName + ", " + repositoryName + ", " +
471                                lastKnownRevision + ", " + pathPattern + ", " + timeoutMillis + ", " +
472                                errorOnEntryNotFound + ')';
473                     }
474                 });
475     }
476 
477     @Override
478     public <T> CompletableFuture<Entry<T>> watchFile(
479             String projectName, String repositoryName, Revision lastKnownRevision,
480             Query<T> query, long timeoutMillis, boolean errorOnEntryNotFound, boolean viewRaw) {
481 
482         return normalizeRevisionAndExecuteWithRetries(
483                 projectName, repositoryName, lastKnownRevision,
484                 new Function<Revision, CompletableFuture<Entry<T>>>() {
485                     @Override
486                     public CompletableFuture<Entry<T>> apply(Revision normLastKnownRevision) {
487                         return delegate.watchFile(projectName, repositoryName, normLastKnownRevision,
488                                                   query, timeoutMillis, errorOnEntryNotFound, viewRaw)
489                                        .thenApply(entry -> {
490                                            if (entry != null) {
491                                                updateLatestKnownRevision(projectName, repositoryName,
492                                                                          entry.revision());
493                                            }
494                                            return entry;
495                                        });
496                     }
497 
498                     @Override
499                     public String toString() {
500                         return "watchFile(" + projectName + ", " + repositoryName + ", " +
501                                lastKnownRevision + ", " + query + ", " + timeoutMillis + ", " +
502                                errorOnEntryNotFound + ')';
503                     }
504                 });
505     }
506 
507     @Override
508     public CompletableFuture<Void> whenEndpointReady() {
509         return delegate.whenEndpointReady();
510     }
511 
512     /**
513      * Normalizes the given {@link Revision} and then executes the task by calling {@code taskRunner.apply()}
514      * with the normalized {@link Revision}. The task can be executed repetitively when the task failed with
515      * a {@link RevisionNotFoundException} for the {@link Revision} which is known to exist.
516      */
517     private <T> CompletableFuture<T> normalizeRevisionAndExecuteWithRetries(
518             String projectName, String repositoryName, Revision revision,
519             Function<Revision, CompletableFuture<T>> taskRunner) {
520         return normalizeRevision(projectName, repositoryName, revision)
521                 .thenCompose(normRev -> executeWithRetries(
522                         new Supplier<CompletableFuture<T>>() {
523                             @Override
524                             public CompletableFuture<T> get() {
525                                 return taskRunner.apply(normRev);
526                             }
527 
528                             @Override
529                             public String toString() {
530                                 return taskRunner + " with " + normRev;
531                             }
532                         },
533                         (res, cause) -> cause != null &&
534                                         handleRevisionNotFound(projectName, repositoryName, normRev, cause)));
535     }
536 
537     /**
538      * Normalizes the given {@link Revision} range and then executes the task by calling
539      * {@code taskRunner.apply()} with the normalized {@link Revision} range. The task can be executed
540      * repetitively when the task failed with a {@link RevisionNotFoundException} for the {@link Revision}
541      * range which is known to exist.
542      */
543     private <T> CompletableFuture<T> normalizeRevisionsAndExecuteWithRetries(
544             String projectName, String repositoryName, Revision from, Revision to,
545             BiFunction<Revision, Revision, CompletableFuture<T>> taskRunner) {
546 
547         if (to == null) {
548             return exceptionallyCompletedFuture(new NullPointerException("to"));
549         }
550 
551         if (from == null) {
552             return exceptionallyCompletedFuture(new NullPointerException("from"));
553         }
554 
555         if (from.isRelative() && to.isRelative() ||
556             !from.isRelative() && !to.isRelative()) {
557 
558             // When both revisions are absolute or both revisions are relative,
559             // we can call normalizeRevision() only once and guess the other revision from the distance.
560             final int distance = to.major() - from.major();
561             final Revision baseRevision = to.compareTo(from) >= 0 ? to : from;
562 
563             return normalizeRevision(projectName, repositoryName, baseRevision).thenCompose(normBaseRev -> {
564                 final Revision normFromRev;
565                 final Revision normToRev;
566                 if (distance >= 0) {
567                     normToRev = normBaseRev;
568                     normFromRev = normBaseRev.backward(distance);
569                 } else {
570                     normFromRev = normBaseRev;
571                     normToRev = normBaseRev.backward(-distance);
572                 }
573 
574                 return executeWithRetries(
575                         new Supplier<CompletableFuture<T>>() {
576                             @Override
577                             public CompletableFuture<T> get() {
578                                 return taskRunner.apply(normFromRev, normToRev);
579                             }
580 
581                             @Override
582                             public String toString() {
583                                 return taskRunner + " with [" + normFromRev + ", " + normToRev + ']';
584                             }
585                         },
586                         (res, cause) -> {
587                             if (cause == null) {
588                                 return false;
589                             }
590                             return handleRevisionNotFound(projectName, repositoryName, normBaseRev, cause);
591                         });
592             });
593         } else {
594             // When one revision is absolute and the other is relative, we have to normalize both revisions
595             // because it is impossible to know the distance between them and which is a newer revision.
596             return CompletableFutures.allAsList(ImmutableList.of(
597                     normalizeRevision(projectName, repositoryName, from),
598                     normalizeRevision(projectName, repositoryName, to))).thenCompose(normRevs -> {
599                 final Revision normFromRev = normRevs.get(0);
600                 final Revision normToRev = normRevs.get(1);
601                 return executeWithRetries(
602                         new Supplier<CompletableFuture<T>>() {
603                             @Override
604                             public CompletableFuture<T> get() {
605                                 return taskRunner.apply(normFromRev, normToRev);
606                             }
607 
608                             @Override
609                             public String toString() {
610                                 return taskRunner + " with [" + normFromRev + ", " + normToRev + ']';
611                             }
612                         },
613                         (res, cause) -> {
614                             if (cause == null) {
615                                 return false;
616                             }
617 
618                             final Revision normBaseRev = normFromRev.compareTo(normToRev) > 0 ? normFromRev
619                                                                                               : normToRev;
620                             return handleRevisionNotFound(projectName, repositoryName, normBaseRev, cause);
621                         });
622             });
623         }
624     }
625 
626     /**
627      * Executes the task by calling {@code taskRunner.get()} and re-executes the task if {@code retryPredicated}
628      * returns {@code true}. This method is used as a building block for sending a request repetitively
629      * when the request has failed due to replication lag.
630      */
631     private <T> CompletableFuture<T> executeWithRetries(
632             Supplier<CompletableFuture<T>> taskRunner,
633             BiPredicate<T, Throwable> retryPredicate) {
634         return executeWithRetries(taskRunner, retryPredicate, 0);
635     }
636 
637     private <T> CompletableFuture<T> executeWithRetries(
638             Supplier<CompletableFuture<T>> taskRunner,
639             BiPredicate<T, Throwable> retryPredicate,
640             int attemptsSoFar) {
641 
642         return CompletableFutures.handleCompose(taskRunner.get(), (res, cause) -> {
643             final Object currentReplicaHint = currentReplicaHintSupplier.get();
644             final int nextAttemptsSoFar = attemptsSoFar + 1;
645             final boolean retryRequired = retryPredicate.test(res, cause);
646             if (!retryRequired || nextAttemptsSoFar > maxRetries) {
647                 if (retryRequired) {
648                     if (currentReplicaHint != null) {
649                         logger.warn("[{}] Failed to retrieve the up-to-date data from Central Dogma " +
650                                     "after {} retries: {} => {}",
651                                     currentReplicaHint, attemptsSoFar, taskRunner, resultOrCause(res, cause));
652                     } else {
653                         logger.warn("Failed to retrieve the up-to-date data from Central Dogma " +
654                                     "after {} retries: {} => {}",
655                                     attemptsSoFar, taskRunner, resultOrCause(res, cause));
656                     }
657                 } else if (logger.isDebugEnabled()) {
658                     if (currentReplicaHint != null) {
659                         logger.debug("[{}] Retrieved the up-to-date data after {} retries: {} => {}",
660                                      currentReplicaHint, attemptsSoFar, taskRunner, resultOrCause(res, cause));
661                     } else {
662                         logger.debug("Retrieved the up-to-date data after {} retries: {} => {}",
663                                      attemptsSoFar, taskRunner, resultOrCause(res, cause));
664                     }
665                 }
666 
667                 if (cause == null) {
668                     return completedFuture(res);
669                 } else {
670                     return exceptionallyCompletedFuture(cause);
671                 }
672             }
673 
674             if (logger.isDebugEnabled()) {
675                 if (currentReplicaHint != null) {
676                     logger.debug("[{}] Got the out-of-date data ({} attempt(s) so far): {} => {}",
677                                  currentReplicaHint, nextAttemptsSoFar, taskRunner, resultOrCause(res, cause));
678                 } else {
679                     logger.debug("Got the out-of-date data ({} attempt(s) so far): {} => {}",
680                                  nextAttemptsSoFar, taskRunner, resultOrCause(res, cause));
681                 }
682             }
683 
684             final CompletableFuture<T> nextAttemptFuture = new CompletableFuture<>();
685             executor().schedule(() -> {
686                 try {
687                     executeWithRetries(taskRunner, retryPredicate,
688                                        nextAttemptsSoFar).handle((newRes, newCause) -> {
689                         if (newCause != null) {
690                             nextAttemptFuture.completeExceptionally(newCause);
691                         } else {
692                             nextAttemptFuture.complete(newRes);
693                         }
694                         return null;
695                     });
696                 } catch (Throwable t) {
697                     nextAttemptFuture.completeExceptionally(t);
698                 }
699             }, retryIntervalMillis, TimeUnit.MILLISECONDS);
700 
701             return nextAttemptFuture;
702         }).toCompletableFuture();
703     }
704 
705     /**
706      * Returns the cause of the specified {@code throwable} peeling it recursively, if it is one of the
707      * {@link CompletionException}, {@link ExecutionException}. Otherwise returns the {@code throwable}.
708      */
709     private static Throwable peel(Throwable throwable) {
710         Throwable cause = throwable.getCause();
711         while (cause != null && cause != throwable &&
712                (throwable instanceof CompletionException || throwable instanceof ExecutionException)) {
713             throwable = cause;
714             cause = throwable.getCause();
715         }
716         return throwable;
717     }
718 
719     /**
720      * Returns {@code true} to indicate that the request must be retried if {@code cause} is
721      * a {@link RevisionNotFoundException} and the specified {@link Revision} is supposed to exist.
722      */
723     private boolean handleRevisionNotFound(
724             String projectName, String repositoryName, Revision revision, Throwable cause) {
725         requireNonNull(cause, "cause");
726         cause = peel(cause);
727         if (!(cause instanceof RevisionNotFoundException)) {
728             return false;
729         }
730 
731         final Revision latestKnownRevision = latestKnownRevision(projectName, repositoryName);
732         if (latestKnownRevision == null) {
733             return false;
734         }
735 
736         if (revision.isRelative()) {
737             return revision.major() + latestKnownRevision.major() >= 0;
738         } else {
739             return revision.major() <= latestKnownRevision.major();
740         }
741     }
742 
743     @Nullable
744     @VisibleForTesting
745     Revision latestKnownRevision(String projectName, String repositoryName) {
746         synchronized (latestKnownRevisions) {
747             return latestKnownRevisions.get(new RepoId(projectName, repositoryName));
748         }
749     }
750 
751     /**
752      * Updates the latest known revision for the specified repository.
753      *
754      * @return {@code true} if the latest known revision has been updated to the specified {@link Revision} or
755      *         the latest known revision is equal to the specified {@link Revision}. {@code false} otherwise.
756      */
757     private boolean updateLatestKnownRevision(String projectName, String repositoryName, Revision newRevision) {
758         final Object currentReplicaHint = currentReplicaHintSupplier.get();
759         final RepoId id = new RepoId(projectName, repositoryName);
760         synchronized (latestKnownRevisions) {
761             final Revision oldRevision = latestKnownRevisions.get(id);
762             if (oldRevision == null) {
763                 if (currentReplicaHint != null) {
764                     logger.debug("[{}] Updating the latest known revision for {}/{} from <unknown> to: {}",
765                                  currentReplicaHint, projectName, repositoryName, newRevision);
766                 } else {
767                     logger.debug("Updating the latest known revision for {}/{} from <unknown> to: {}",
768                                  projectName, repositoryName, newRevision);
769                 }
770                 latestKnownRevisions.put(id, newRevision);
771                 return true;
772             }
773 
774             final int comparison = oldRevision.compareTo(newRevision);
775             if (comparison < 0) {
776                 if (currentReplicaHint != null) {
777                     logger.debug("[{}] Updating the latest known revision for {}/{} from {} to: {}",
778                                  currentReplicaHint, projectName, repositoryName, oldRevision, newRevision);
779                 } else {
780                     logger.debug("Updating the latest known revision for {}/{} from {} to: {}",
781                                  projectName, repositoryName, oldRevision, newRevision);
782                 }
783                 latestKnownRevisions.put(id, newRevision);
784                 return true;
785             }
786 
787             if (comparison == 0) {
788                 if (currentReplicaHint != null) {
789                     logger.debug("[{}] The latest known revision for {}/{} stays unchanged at: {}",
790                                  currentReplicaHint, projectName, repositoryName, newRevision);
791                 } else {
792                     logger.debug("The latest known revision for {}/{} stays unchanged at: {}",
793                                  projectName, repositoryName, newRevision);
794                 }
795                 return true;
796             }
797 
798             if (currentReplicaHint != null) {
799                 logger.debug("[{}] An out-of-date latest known revision for {}/{}: {}",
800                              currentReplicaHint, projectName, repositoryName, newRevision);
801             } else {
802                 logger.debug("An out-of-date latest known revision for {}/{}: {}",
803                              projectName, repositoryName, newRevision);
804             }
805             return false;
806         }
807     }
808 
809     @Nullable
810     private static Object resultOrCause(@Nullable Object res, @Nullable Throwable cause) {
811         return res != null ? res : cause;
812     }
813 
814     @Override
815     public void close() throws Exception {
816         delegate.close();
817     }
818 
819     @VisibleForTesting
820     static final class RepoId {
821         final String projectName;
822         final String repositoryName;
823 
824         RepoId(String projectName, String repositoryName) {
825             this.projectName = projectName;
826             this.repositoryName = repositoryName;
827         }
828 
829         @Override
830         public boolean equals(Object o) {
831             if (this == o) {
832                 return true;
833             }
834             if (!(o instanceof RepoId)) {
835                 return false;
836             }
837             final RepoId that = (RepoId) o;
838             return projectName.equals(that.projectName) && repositoryName.equals(that.repositoryName);
839         }
840 
841         @Override
842         public int hashCode() {
843             return Objects.hash(projectName, repositoryName);
844         }
845 
846         @Override
847         public String toString() {
848             return projectName + '/' + repositoryName;
849         }
850     }
851 }