1   /*
2    * Copyright 2019 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.internal.client;
17  
18  import static com.google.common.base.Preconditions.checkArgument;
19  import static com.spotify.futures.CompletableFutures.exceptionallyCompletedFuture;
20  import static java.util.Objects.requireNonNull;
21  import static java.util.concurrent.CompletableFuture.completedFuture;
22  
23  import java.util.LinkedHashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Objects;
27  import java.util.Set;
28  import java.util.concurrent.CompletableFuture;
29  import java.util.concurrent.CompletionException;
30  import java.util.concurrent.ExecutionException;
31  import java.util.concurrent.ScheduledExecutorService;
32  import java.util.concurrent.TimeUnit;
33  import java.util.function.BiFunction;
34  import java.util.function.BiPredicate;
35  import java.util.function.Function;
36  import java.util.function.Supplier;
37  
38  import javax.annotation.Nullable;
39  
40  import org.slf4j.Logger;
41  import org.slf4j.LoggerFactory;
42  
43  import com.google.common.annotations.VisibleForTesting;
44  import com.google.common.collect.ImmutableList;
45  import com.spotify.futures.CompletableFutures;
46  
47  import com.linecorp.centraldogma.client.AbstractCentralDogma;
48  import com.linecorp.centraldogma.client.CentralDogma;
49  import com.linecorp.centraldogma.client.CentralDogmaRepository;
50  import com.linecorp.centraldogma.client.RepositoryInfo;
51  import com.linecorp.centraldogma.common.Author;
52  import com.linecorp.centraldogma.common.Change;
53  import com.linecorp.centraldogma.common.Commit;
54  import com.linecorp.centraldogma.common.Entry;
55  import com.linecorp.centraldogma.common.EntryType;
56  import com.linecorp.centraldogma.common.Markup;
57  import com.linecorp.centraldogma.common.MergeQuery;
58  import com.linecorp.centraldogma.common.MergedEntry;
59  import com.linecorp.centraldogma.common.PathPattern;
60  import com.linecorp.centraldogma.common.PushResult;
61  import com.linecorp.centraldogma.common.Query;
62  import com.linecorp.centraldogma.common.Revision;
63  import com.linecorp.centraldogma.common.RevisionNotFoundException;
64  
65  import io.micrometer.core.instrument.MeterRegistry;
66  
67  /**
68   * A {@link CentralDogma} client that retries the request automatically when a {@link RevisionNotFoundException}
69   * was raised but it is certain that a given {@link Revision} exists.
70   */
71  public final class ReplicationLagTolerantCentralDogma extends AbstractCentralDogma {
72  
73      private static final Logger logger =
74              LoggerFactory.getLogger(ReplicationLagTolerantCentralDogma.class);
75  
76      private final CentralDogma delegate;
77      private final int maxRetries;
78      private final long retryIntervalMillis;
79      private final Supplier<?> currentReplicaHintSupplier;
80      private final Map<RepoId, Revision> latestKnownRevisions = new LinkedHashMap<RepoId, Revision>() {
81          private static final long serialVersionUID = 3587793379404809027L;
82  
83          @Override
84          protected boolean removeEldestEntry(Map.Entry<RepoId, Revision> eldest) {
85              // Keep only up to 8192 repositories, which should be enough for almost all cases.
86              return size() > 8192;
87          }
88      };
89  
90      public ReplicationLagTolerantCentralDogma(ScheduledExecutorService blockingTaskExecutor,
91                                                CentralDogma delegate, int maxRetries, long retryIntervalMillis,
92                                                Supplier<?> currentReplicaHintSupplier,
93                                                @Nullable MeterRegistry meterRegistry) {
94          super(blockingTaskExecutor, meterRegistry);
95  
96          requireNonNull(delegate, "delegate");
97          checkArgument(maxRetries > 0, "maxRetries: %s (expected: > 0)", maxRetries);
98          checkArgument(retryIntervalMillis >= 0,
99                        "retryIntervalMillis: %s (expected: >= 0)", retryIntervalMillis);
100         requireNonNull(currentReplicaHintSupplier, "currentReplicaHintSupplier");
101 
102         this.delegate = delegate;
103         this.maxRetries = maxRetries;
104         this.retryIntervalMillis = retryIntervalMillis;
105         this.currentReplicaHintSupplier = currentReplicaHintSupplier;
106     }
107 
108     @Override
109     public CompletableFuture<Void> createProject(String projectName) {
110         return delegate.createProject(projectName);
111     }
112 
113     @Override
114     public CompletableFuture<Void> removeProject(String projectName) {
115         return delegate.removeProject(projectName);
116     }
117 
118     @Override
119     public CompletableFuture<Void> purgeProject(String projectName) {
120         return delegate.purgeProject(projectName);
121     }
122 
123     @Override
124     public CompletableFuture<Void> unremoveProject(String projectName) {
125         return delegate.unremoveProject(projectName);
126     }
127 
128     @Override
129     public CompletableFuture<Set<String>> listProjects() {
130         return delegate.listProjects();
131     }
132 
133     @Override
134     public CompletableFuture<Set<String>> listRemovedProjects() {
135         return delegate.listRemovedProjects();
136     }
137 
138     @Override
139     public CompletableFuture<CentralDogmaRepository> createRepository(String projectName,
140                                                                       String repositoryName) {
141         return delegate.createRepository(projectName, repositoryName);
142     }
143 
144     @Override
145     public CompletableFuture<Void> removeRepository(String projectName, String repositoryName) {
146         return delegate.removeRepository(projectName, repositoryName);
147     }
148 
149     @Override
150     public CompletableFuture<Void> purgeRepository(String projectName, String repositoryName) {
151         return delegate.purgeRepository(projectName, repositoryName);
152     }
153 
154     @Override
155     public CompletableFuture<CentralDogmaRepository> unremoveRepository(String projectName,
156                                                                         String repositoryName) {
157         return delegate.unremoveRepository(projectName, repositoryName);
158     }
159 
160     @Override
161     public CompletableFuture<Map<String, RepositoryInfo>> listRepositories(String projectName) {
162         return executeWithRetries(
163                 new Supplier<CompletableFuture<Map<String, RepositoryInfo>>>() {
164                     @Override
165                     public CompletableFuture<Map<String, RepositoryInfo>> get() {
166                         return delegate.listRepositories(projectName);
167                     }
168 
169                     @Override
170                     public String toString() {
171                         return "listRepositories(" + projectName + ')';
172                     }
173                 },
174                 (res, cause) -> {
175                     if (res != null) {
176                         for (RepositoryInfo info : res.values()) {
177                             if (!updateLatestKnownRevision(projectName, info.name(), info.headRevision())) {
178                                 return true;
179                             }
180                         }
181                     }
182                     return false;
183                 });
184     }
185 
186     @Override
187     public CompletableFuture<Set<String>> listRemovedRepositories(String projectName) {
188         return delegate.listRemovedRepositories(projectName);
189     }
190 
191     @Override
192     public CompletableFuture<Revision> normalizeRevision(
193             String projectName, String repositoryName, Revision revision) {
194         return executeWithRetries(
195                 new Supplier<CompletableFuture<Revision>>() {
196                     @Override
197                     public CompletableFuture<Revision> get() {
198                         return delegate.normalizeRevision(projectName, repositoryName, revision);
199                     }
200 
201                     @Override
202                     public String toString() {
203                         return "normalizeRevision(" + projectName + ", " + repositoryName + ", " +
204                                revision + ')';
205                     }
206                 },
207                 (res, cause) -> {
208                     if (cause != null) {
209                         return handleRevisionNotFound(projectName, repositoryName, revision, cause);
210                     }
211 
212                     if (revision.isRelative()) {
213                         final Revision headRevision = res.forward(-(revision.major() + 1));
214                         return !updateLatestKnownRevision(projectName, repositoryName, headRevision);
215                     }
216 
217                     updateLatestKnownRevision(projectName, repositoryName, revision);
218                     return false;
219                 });
220     }
221 
222     @Override
223     public CompletableFuture<Map<String, EntryType>> listFiles(
224             String projectName, String repositoryName, Revision revision, PathPattern pathPattern) {
225         return normalizeRevisionAndExecuteWithRetries(
226                 projectName, repositoryName, revision,
227                 new Function<Revision, CompletableFuture<Map<String, EntryType>>>() {
228                     @Override
229                     public CompletableFuture<Map<String, EntryType>> apply(Revision normRev) {
230                         return delegate.listFiles(projectName, repositoryName, normRev, pathPattern);
231                     }
232 
233                     @Override
234                     public String toString() {
235                         return "listFiles(" + projectName + ", " + repositoryName + ", " +
236                                revision + ", " + pathPattern + ')';
237                     }
238                 });
239     }
240 
241     @Override
242     public <T> CompletableFuture<Entry<T>> getFile(
243             String projectName, String repositoryName, Revision revision, Query<T> query) {
244         return normalizeRevisionAndExecuteWithRetries(
245                 projectName, repositoryName, revision,
246                 new Function<Revision, CompletableFuture<Entry<T>>>() {
247                     @Override
248                     public CompletableFuture<Entry<T>> apply(Revision normRev) {
249                         return delegate.getFile(projectName, repositoryName, normRev, query);
250                     }
251 
252                     @Override
253                     public String toString() {
254                         return "getFile(" + projectName + ", " + repositoryName + ", " +
255                                revision + ", " + query + ')';
256                     }
257                 });
258     }
259 
260     @Override
261     public CompletableFuture<Map<String, Entry<?>>> getFiles(
262             String projectName, String repositoryName, Revision revision, PathPattern pathPattern) {
263         return normalizeRevisionAndExecuteWithRetries(
264                 projectName, repositoryName, revision,
265                 new Function<Revision, CompletableFuture<Map<String, Entry<?>>>>() {
266                     @Override
267                     public CompletableFuture<Map<String, Entry<?>>> apply(Revision normRev) {
268                         return delegate.getFiles(projectName, repositoryName, normRev, pathPattern);
269                     }
270 
271                     @Override
272                     public String toString() {
273                         return "getFiles(" + projectName + ", " + repositoryName + ", " +
274                                revision + ", " + pathPattern + ')';
275                     }
276                 });
277     }
278 
279     @Override
280     public <T> CompletableFuture<MergedEntry<T>> mergeFiles(
281             String projectName, String repositoryName, Revision revision,
282             MergeQuery<T> mergeQuery) {
283         return normalizeRevisionAndExecuteWithRetries(
284                 projectName, repositoryName, revision,
285                 new Function<Revision, CompletableFuture<MergedEntry<T>>>() {
286                     @Override
287                     public CompletableFuture<MergedEntry<T>> apply(Revision normRev) {
288                         return delegate.mergeFiles(projectName, repositoryName, normRev, mergeQuery);
289                     }
290 
291                     @Override
292                     public String toString() {
293                         return "mergeFiles(" + projectName + ", " + repositoryName + ", " +
294                                revision + ", " + mergeQuery + ')';
295                     }
296                 });
297     }
298 
299     @Override
300     public CompletableFuture<List<Commit>> getHistory(
301             String projectName, String repositoryName, Revision from,
302             Revision to, PathPattern pathPattern, int maxCommits) {
303         return normalizeRevisionsAndExecuteWithRetries(
304                 projectName, repositoryName, from, to,
305                 new BiFunction<Revision, Revision, CompletableFuture<List<Commit>>>() {
306                     @Override
307                     public CompletableFuture<List<Commit>> apply(Revision normFromRev, Revision normToRev) {
308                         return delegate.getHistory(projectName, repositoryName,
309                                                    normFromRev, normToRev, pathPattern, maxCommits);
310                     }
311 
312                     @Override
313                     public String toString() {
314                         return "getHistory(" + projectName + ", " + repositoryName + ", " +
315                                from + ", " + to + ", " + pathPattern + ", " + maxCommits + ')';
316                     }
317                 });
318     }
319 
320     @Override
321     public <T> CompletableFuture<Change<T>> getDiff(
322             String projectName, String repositoryName, Revision from, Revision to, Query<T> query) {
323         return normalizeRevisionsAndExecuteWithRetries(
324                 projectName, repositoryName, from, to,
325                 new BiFunction<Revision, Revision, CompletableFuture<Change<T>>>() {
326                     @Override
327                     public CompletableFuture<Change<T>> apply(Revision normFromRev, Revision normToRev) {
328                         return delegate.getDiff(projectName, repositoryName,
329                                                 normFromRev, normToRev, query);
330                     }
331 
332                     @Override
333                     public String toString() {
334                         return "getDiff(" + projectName + ", " + repositoryName + ", " +
335                                from + ", " + to + ", " + query + ')';
336                     }
337                 });
338     }
339 
340     @Override
341     public CompletableFuture<List<Change<?>>> getDiff(
342             String projectName, String repositoryName, Revision from, Revision to, PathPattern pathPattern) {
343         return normalizeRevisionsAndExecuteWithRetries(
344                 projectName, repositoryName, from, to,
345                 new BiFunction<Revision, Revision, CompletableFuture<List<Change<?>>>>() {
346                     @Override
347                     public CompletableFuture<List<Change<?>>> apply(Revision normFromRev, Revision normToRev) {
348                         return delegate.getDiff(projectName, repositoryName,
349                                                 normFromRev, normToRev, pathPattern);
350                     }
351 
352                     @Override
353                     public String toString() {
354                         return "getDiffs(" + projectName + ", " + repositoryName + ", " +
355                                from + ", " + to + ", " + pathPattern + ')';
356                     }
357                 });
358     }
359 
360     @Override
361     public CompletableFuture<List<Change<?>>> getPreviewDiffs(
362             String projectName, String repositoryName, Revision baseRevision,
363             Iterable<? extends Change<?>> changes) {
364         return normalizeRevisionAndExecuteWithRetries(
365                 projectName, repositoryName, baseRevision,
366                 new Function<Revision, CompletableFuture<List<Change<?>>>>() {
367                     @Override
368                     public CompletableFuture<List<Change<?>>> apply(Revision normBaseRev) {
369                         return delegate.getPreviewDiffs(projectName, repositoryName, normBaseRev, changes);
370                     }
371 
372                     @Override
373                     public String toString() {
374                         return "getPreviewDiffs(" + projectName + ", " + repositoryName + ", " +
375                                baseRevision + ", ...)";
376                     }
377                 });
378     }
379 
380     @Override
381     public CompletableFuture<PushResult> push(
382             String projectName, String repositoryName, Revision baseRevision,
383             String summary, String detail, Markup markup, Iterable<? extends Change<?>> changes) {
384         return executeWithRetries(
385                 new Supplier<CompletableFuture<PushResult>>() {
386                     @Override
387                     public CompletableFuture<PushResult> get() {
388                         return delegate.push(projectName, repositoryName, baseRevision,
389                                              summary, detail, markup, changes);
390                     }
391 
392                     @Override
393                     public String toString() {
394                         return "push(" + projectName + ", " + repositoryName + ", " +
395                                baseRevision + ", " + summary + ", ...)";
396                     }
397                 },
398                 pushRetryPredicate(projectName, repositoryName, baseRevision));
399     }
400 
401     @Override
402     public CompletableFuture<PushResult> push(
403             String projectName, String repositoryName, Revision baseRevision,
404             Author author, String summary, String detail, Markup markup,
405             Iterable<? extends Change<?>> changes) {
406         return executeWithRetries(
407                 new Supplier<CompletableFuture<PushResult>>() {
408                     @Override
409                     public CompletableFuture<PushResult> get() {
410                         return delegate.push(projectName, repositoryName, baseRevision,
411                                              author, summary, detail, markup, changes);
412                     }
413 
414                     @Override
415                     public String toString() {
416                         return "push(" + projectName + ", " + repositoryName + ", " +
417                                baseRevision + ", " + summary + ", ...)";
418                     }
419                 },
420                 pushRetryPredicate(projectName, repositoryName, baseRevision));
421     }
422 
423     private BiPredicate<PushResult, Throwable> pushRetryPredicate(
424             String projectName, String repositoryName, Revision baseRevision) {
425 
426         return (res, cause) -> {
427             if (cause != null) {
428                 return handleRevisionNotFound(projectName, repositoryName, baseRevision, cause);
429             }
430 
431             updateLatestKnownRevision(projectName, repositoryName, res.revision());
432             return false;
433         };
434     }
435 
436     @Override
437     public CompletableFuture<Revision> watchRepository(
438             String projectName, String repositoryName, Revision lastKnownRevision,
439             PathPattern pathPattern, long timeoutMillis, boolean errorOnEntryNotFound) {
440 
441         return normalizeRevisionAndExecuteWithRetries(
442                 projectName, repositoryName, lastKnownRevision,
443                 new Function<Revision, CompletableFuture<Revision>>() {
444                     @Override
445                     public CompletableFuture<Revision> apply(Revision normLastKnownRevision) {
446                         return delegate.watchRepository(projectName, repositoryName, normLastKnownRevision,
447                                                         pathPattern, timeoutMillis, errorOnEntryNotFound)
448                                        .thenApply(newLastKnownRevision -> {
449                                            if (newLastKnownRevision != null) {
450                                                updateLatestKnownRevision(projectName, repositoryName,
451                                                                          newLastKnownRevision);
452                                            }
453                                            return newLastKnownRevision;
454                                        });
455                     }
456 
457                     @Override
458                     public String toString() {
459                         return "watchRepository(" + projectName + ", " + repositoryName + ", " +
460                                lastKnownRevision + ", " + pathPattern + ", " + timeoutMillis + ", " +
461                                errorOnEntryNotFound + ')';
462                     }
463                 });
464     }
465 
466     @Override
467     public <T> CompletableFuture<Entry<T>> watchFile(
468             String projectName, String repositoryName, Revision lastKnownRevision,
469             Query<T> query, long timeoutMillis, boolean errorOnEntryNotFound) {
470 
471         return normalizeRevisionAndExecuteWithRetries(
472                 projectName, repositoryName, lastKnownRevision,
473                 new Function<Revision, CompletableFuture<Entry<T>>>() {
474                     @Override
475                     public CompletableFuture<Entry<T>> apply(Revision normLastKnownRevision) {
476                         return delegate.watchFile(projectName, repositoryName, normLastKnownRevision,
477                                                   query, timeoutMillis, errorOnEntryNotFound)
478                                        .thenApply(entry -> {
479                                            if (entry != null) {
480                                                updateLatestKnownRevision(projectName, repositoryName,
481                                                                          entry.revision());
482                                            }
483                                            return entry;
484                                        });
485                     }
486 
487                     @Override
488                     public String toString() {
489                         return "watchFile(" + projectName + ", " + repositoryName + ", " +
490                                lastKnownRevision + ", " + query + ", " + timeoutMillis + ", " +
491                                errorOnEntryNotFound + ')';
492                     }
493                 });
494     }
495 
496     @Override
497     public CompletableFuture<Void> whenEndpointReady() {
498         return delegate.whenEndpointReady();
499     }
500 
501     /**
502      * Normalizes the given {@link Revision} and then executes the task by calling {@code taskRunner.apply()}
503      * with the normalized {@link Revision}. The task can be executed repetitively when the task failed with
504      * a {@link RevisionNotFoundException} for the {@link Revision} which is known to exist.
505      */
506     private <T> CompletableFuture<T> normalizeRevisionAndExecuteWithRetries(
507             String projectName, String repositoryName, Revision revision,
508             Function<Revision, CompletableFuture<T>> taskRunner) {
509         return normalizeRevision(projectName, repositoryName, revision)
510                 .thenCompose(normRev -> executeWithRetries(
511                         new Supplier<CompletableFuture<T>>() {
512                             @Override
513                             public CompletableFuture<T> get() {
514                                 return taskRunner.apply(normRev);
515                             }
516 
517                             @Override
518                             public String toString() {
519                                 return taskRunner + " with " + normRev;
520                             }
521                         },
522                         (res, cause) -> cause != null &&
523                                         handleRevisionNotFound(projectName, repositoryName, normRev, cause)));
524     }
525 
526     /**
527      * Normalizes the given {@link Revision} range and then executes the task by calling
528      * {@code taskRunner.apply()} with the normalized {@link Revision} range. The task can be executed
529      * repetitively when the task failed with a {@link RevisionNotFoundException} for the {@link Revision}
530      * range which is known to exist.
531      */
532     private <T> CompletableFuture<T> normalizeRevisionsAndExecuteWithRetries(
533             String projectName, String repositoryName, Revision from, Revision to,
534             BiFunction<Revision, Revision, CompletableFuture<T>> taskRunner) {
535 
536         if (to == null) {
537             return exceptionallyCompletedFuture(new NullPointerException("to"));
538         }
539 
540         if (from == null) {
541             return exceptionallyCompletedFuture(new NullPointerException("from"));
542         }
543 
544         if (from.isRelative() && to.isRelative() ||
545             !from.isRelative() && !to.isRelative()) {
546 
547             // When both revisions are absolute or both revisions are relative,
548             // we can call normalizeRevision() only once and guess the other revision from the distance.
549             final int distance = to.major() - from.major();
550             final Revision baseRevision = to.compareTo(from) >= 0 ? to : from;
551 
552             return normalizeRevision(projectName, repositoryName, baseRevision).thenCompose(normBaseRev -> {
553                 final Revision normFromRev;
554                 final Revision normToRev;
555                 if (distance >= 0) {
556                     normToRev = normBaseRev;
557                     normFromRev = normBaseRev.backward(distance);
558                 } else {
559                     normFromRev = normBaseRev;
560                     normToRev = normBaseRev.backward(-distance);
561                 }
562 
563                 return executeWithRetries(
564                         new Supplier<CompletableFuture<T>>() {
565                             @Override
566                             public CompletableFuture<T> get() {
567                                 return taskRunner.apply(normFromRev, normToRev);
568                             }
569 
570                             @Override
571                             public String toString() {
572                                 return taskRunner + " with [" + normFromRev + ", " + normToRev + ']';
573                             }
574                         },
575                         (res, cause) -> {
576                             if (cause == null) {
577                                 return false;
578                             }
579                             return handleRevisionNotFound(projectName, repositoryName, normBaseRev, cause);
580                         });
581             });
582         } else {
583             // When one revision is absolute and the other is relative, we have to normalize both revisions
584             // because it is impossible to know the distance between them and which is a newer revision.
585             return CompletableFutures.allAsList(ImmutableList.of(
586                     normalizeRevision(projectName, repositoryName, from),
587                     normalizeRevision(projectName, repositoryName, to))).thenCompose(normRevs -> {
588                 final Revision normFromRev = normRevs.get(0);
589                 final Revision normToRev = normRevs.get(1);
590                 return executeWithRetries(
591                         new Supplier<CompletableFuture<T>>() {
592                             @Override
593                             public CompletableFuture<T> get() {
594                                 return taskRunner.apply(normFromRev, normToRev);
595                             }
596 
597                             @Override
598                             public String toString() {
599                                 return taskRunner + " with [" + normFromRev + ", " + normToRev + ']';
600                             }
601                         },
602                         (res, cause) -> {
603                             if (cause == null) {
604                                 return false;
605                             }
606 
607                             final Revision normBaseRev = normFromRev.compareTo(normToRev) > 0 ? normFromRev
608                                                                                               : normToRev;
609                             return handleRevisionNotFound(projectName, repositoryName, normBaseRev, cause);
610                         });
611             });
612         }
613     }
614 
615     /**
616      * Executes the task by calling {@code taskRunner.get()} and re-executes the task if {@code retryPredicated}
617      * returns {@code true}. This method is used as a building block for sending a request repetitively
618      * when the request has failed due to replication lag.
619      */
620     private <T> CompletableFuture<T> executeWithRetries(
621             Supplier<CompletableFuture<T>> taskRunner,
622             BiPredicate<T, Throwable> retryPredicate) {
623         return executeWithRetries(taskRunner, retryPredicate, 0);
624     }
625 
626     private <T> CompletableFuture<T> executeWithRetries(
627             Supplier<CompletableFuture<T>> taskRunner,
628             BiPredicate<T, Throwable> retryPredicate,
629             int attemptsSoFar) {
630 
631         return CompletableFutures.handleCompose(taskRunner.get(), (res, cause) -> {
632             final Object currentReplicaHint = currentReplicaHintSupplier.get();
633             final int nextAttemptsSoFar = attemptsSoFar + 1;
634             final boolean retryRequired = retryPredicate.test(res, cause);
635             if (!retryRequired || nextAttemptsSoFar > maxRetries) {
636                 if (retryRequired) {
637                     if (currentReplicaHint != null) {
638                         logger.warn("[{}] Failed to retrieve the up-to-date data from Central Dogma " +
639                                     "after {} retries: {} => {}",
640                                     currentReplicaHint, attemptsSoFar, taskRunner, resultOrCause(res, cause));
641                     } else {
642                         logger.warn("Failed to retrieve the up-to-date data from Central Dogma " +
643                                     "after {} retries: {} => {}",
644                                     attemptsSoFar, taskRunner, resultOrCause(res, cause));
645                     }
646                 } else if (logger.isDebugEnabled()) {
647                     if (currentReplicaHint != null) {
648                         logger.debug("[{}] Retrieved the up-to-date data after {} retries: {} => {}",
649                                      currentReplicaHint, attemptsSoFar, taskRunner, resultOrCause(res, cause));
650                     } else {
651                         logger.debug("Retrieved the up-to-date data after {} retries: {} => {}",
652                                      attemptsSoFar, taskRunner, resultOrCause(res, cause));
653                     }
654                 }
655 
656                 if (cause == null) {
657                     return completedFuture(res);
658                 } else {
659                     return exceptionallyCompletedFuture(cause);
660                 }
661             }
662 
663             if (logger.isDebugEnabled()) {
664                 if (currentReplicaHint != null) {
665                     logger.debug("[{}] Got the out-of-date data ({} attempt(s) so far): {} => {}",
666                                  currentReplicaHint, nextAttemptsSoFar, taskRunner, resultOrCause(res, cause));
667                 } else {
668                     logger.debug("Got the out-of-date data ({} attempt(s) so far): {} => {}",
669                                  nextAttemptsSoFar, taskRunner, resultOrCause(res, cause));
670                 }
671             }
672 
673             final CompletableFuture<T> nextAttemptFuture = new CompletableFuture<>();
674             executor().schedule(() -> {
675                 try {
676                     executeWithRetries(taskRunner, retryPredicate,
677                                        nextAttemptsSoFar).handle((newRes, newCause) -> {
678                         if (newCause != null) {
679                             nextAttemptFuture.completeExceptionally(newCause);
680                         } else {
681                             nextAttemptFuture.complete(newRes);
682                         }
683                         return null;
684                     });
685                 } catch (Throwable t) {
686                     nextAttemptFuture.completeExceptionally(t);
687                 }
688             }, retryIntervalMillis, TimeUnit.MILLISECONDS);
689 
690             return nextAttemptFuture;
691         }).toCompletableFuture();
692     }
693 
694     /**
695      * Returns the cause of the specified {@code throwable} peeling it recursively, if it is one of the
696      * {@link CompletionException}, {@link ExecutionException}. Otherwise returns the {@code throwable}.
697      */
698     private static Throwable peel(Throwable throwable) {
699         Throwable cause = throwable.getCause();
700         while (cause != null && cause != throwable &&
701                (throwable instanceof CompletionException || throwable instanceof ExecutionException)) {
702             throwable = cause;
703             cause = throwable.getCause();
704         }
705         return throwable;
706     }
707 
708     /**
709      * Returns {@code true} to indicate that the request must be retried if {@code cause} is
710      * a {@link RevisionNotFoundException} and the specified {@link Revision} is supposed to exist.
711      */
712     private boolean handleRevisionNotFound(
713             String projectName, String repositoryName, Revision revision, Throwable cause) {
714         requireNonNull(cause, "cause");
715         cause = peel(cause);
716         if (!(cause instanceof RevisionNotFoundException)) {
717             return false;
718         }
719 
720         final Revision latestKnownRevision = latestKnownRevision(projectName, repositoryName);
721         if (latestKnownRevision == null) {
722             return false;
723         }
724 
725         if (revision.isRelative()) {
726             return revision.major() + latestKnownRevision.major() >= 0;
727         } else {
728             return revision.major() <= latestKnownRevision.major();
729         }
730     }
731 
732     @Nullable
733     @VisibleForTesting
734     Revision latestKnownRevision(String projectName, String repositoryName) {
735         synchronized (latestKnownRevisions) {
736             return latestKnownRevisions.get(new RepoId(projectName, repositoryName));
737         }
738     }
739 
740     /**
741      * Updates the latest known revision for the specified repository.
742      *
743      * @return {@code true} if the latest known revision has been updated to the specified {@link Revision} or
744      *         the latest known revision is equal to the specified {@link Revision}. {@code false} otherwise.
745      */
746     private boolean updateLatestKnownRevision(String projectName, String repositoryName, Revision newRevision) {
747         final Object currentReplicaHint = currentReplicaHintSupplier.get();
748         final RepoId id = new RepoId(projectName, repositoryName);
749         synchronized (latestKnownRevisions) {
750             final Revision oldRevision = latestKnownRevisions.get(id);
751             if (oldRevision == null) {
752                 if (currentReplicaHint != null) {
753                     logger.debug("[{}] Updating the latest known revision for {}/{} from <unknown> to: {}",
754                                  currentReplicaHint, projectName, repositoryName, newRevision);
755                 } else {
756                     logger.debug("Updating the latest known revision for {}/{} from <unknown> to: {}",
757                                  projectName, repositoryName, newRevision);
758                 }
759                 latestKnownRevisions.put(id, newRevision);
760                 return true;
761             }
762 
763             final int comparison = oldRevision.compareTo(newRevision);
764             if (comparison < 0) {
765                 if (currentReplicaHint != null) {
766                     logger.debug("[{}] Updating the latest known revision for {}/{} from {} to: {}",
767                                  currentReplicaHint, projectName, repositoryName, oldRevision, newRevision);
768                 } else {
769                     logger.debug("Updating the latest known revision for {}/{} from {} to: {}",
770                                  projectName, repositoryName, oldRevision, newRevision);
771                 }
772                 latestKnownRevisions.put(id, newRevision);
773                 return true;
774             }
775 
776             if (comparison == 0) {
777                 if (currentReplicaHint != null) {
778                     logger.debug("[{}] The latest known revision for {}/{} stays unchanged at: {}",
779                                  currentReplicaHint, projectName, repositoryName, newRevision);
780                 } else {
781                     logger.debug("The latest known revision for {}/{} stays unchanged at: {}",
782                                  projectName, repositoryName, newRevision);
783                 }
784                 return true;
785             }
786 
787             if (currentReplicaHint != null) {
788                 logger.debug("[{}] An out-of-date latest known revision for {}/{}: {}",
789                              currentReplicaHint, projectName, repositoryName, newRevision);
790             } else {
791                 logger.debug("An out-of-date latest known revision for {}/{}: {}",
792                              projectName, repositoryName, newRevision);
793             }
794             return false;
795         }
796     }
797 
798     @Nullable
799     private static Object resultOrCause(@Nullable Object res, @Nullable Throwable cause) {
800         return res != null ? res : cause;
801     }
802 
803     @Override
804     public void close() throws Exception {
805         delegate.close();
806     }
807 
808     private static final class RepoId {
809         private final String projectName;
810         private final String repositoryName;
811 
812         RepoId(String projectName, String repositoryName) {
813             this.projectName = projectName;
814             this.repositoryName = repositoryName;
815         }
816 
817         @Override
818         public boolean equals(Object o) {
819             if (this == o) {
820                 return true;
821             }
822             if (!(o instanceof RepoId)) {
823                 return false;
824             }
825             final RepoId that = (RepoId) o;
826             return projectName.equals(that.projectName) && repositoryName.equals(that.repositoryName);
827         }
828 
829         @Override
830         public int hashCode() {
831             return Objects.hash(projectName, repositoryName);
832         }
833 
834         @Override
835         public String toString() {
836             return projectName + '/' + repositoryName;
837         }
838     }
839 }