1   /*
2    * Copyright 2019 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.internal.client;
17  
18  import static com.google.common.base.Preconditions.checkArgument;
19  import static com.spotify.futures.CompletableFutures.exceptionallyCompletedFuture;
20  import static java.util.Objects.requireNonNull;
21  import static java.util.concurrent.CompletableFuture.completedFuture;
22  
23  import java.util.LinkedHashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Objects;
27  import java.util.Set;
28  import java.util.concurrent.CompletableFuture;
29  import java.util.concurrent.CompletionException;
30  import java.util.concurrent.ExecutionException;
31  import java.util.concurrent.ScheduledExecutorService;
32  import java.util.concurrent.TimeUnit;
33  import java.util.function.BiFunction;
34  import java.util.function.BiPredicate;
35  import java.util.function.Function;
36  import java.util.function.Supplier;
37  
38  import javax.annotation.Nullable;
39  
40  import org.slf4j.Logger;
41  import org.slf4j.LoggerFactory;
42  
43  import com.google.common.annotations.VisibleForTesting;
44  import com.google.common.collect.ImmutableList;
45  import com.spotify.futures.CompletableFutures;
46  
47  import com.linecorp.centraldogma.client.AbstractCentralDogma;
48  import com.linecorp.centraldogma.client.CentralDogma;
49  import com.linecorp.centraldogma.client.CentralDogmaRepository;
50  import com.linecorp.centraldogma.client.RepositoryInfo;
51  import com.linecorp.centraldogma.common.Author;
52  import com.linecorp.centraldogma.common.Change;
53  import com.linecorp.centraldogma.common.Commit;
54  import com.linecorp.centraldogma.common.Entry;
55  import com.linecorp.centraldogma.common.EntryType;
56  import com.linecorp.centraldogma.common.Markup;
57  import com.linecorp.centraldogma.common.MergeQuery;
58  import com.linecorp.centraldogma.common.MergedEntry;
59  import com.linecorp.centraldogma.common.PathPattern;
60  import com.linecorp.centraldogma.common.PushResult;
61  import com.linecorp.centraldogma.common.Query;
62  import com.linecorp.centraldogma.common.Revision;
63  import com.linecorp.centraldogma.common.RevisionNotFoundException;
64  
65  /**
66   * A {@link CentralDogma} client that retries the request automatically when a {@link RevisionNotFoundException}
67   * was raised but it is certain that a given {@link Revision} exists.
68   */
69  public final class ReplicationLagTolerantCentralDogma extends AbstractCentralDogma {
70  
71      private static final Logger logger =
72              LoggerFactory.getLogger(ReplicationLagTolerantCentralDogma.class);
73  
74      private final CentralDogma delegate;
75      private final int maxRetries;
76      private final long retryIntervalMillis;
77      private final Supplier<?> currentReplicaHintSupplier;
78      private final Map<RepoId, Revision> latestKnownRevisions = new LinkedHashMap<RepoId, Revision>() {
79          private static final long serialVersionUID = 3587793379404809027L;
80  
81          @Override
82          protected boolean removeEldestEntry(Map.Entry<RepoId, Revision> eldest) {
83              // Keep only up to 8192 repositories, which should be enough for almost all cases.
84              return size() > 8192;
85          }
86      };
87  
88      public ReplicationLagTolerantCentralDogma(ScheduledExecutorService blockingTaskExecutor,
89                                                CentralDogma delegate, int maxRetries, long retryIntervalMillis,
90                                                Supplier<?> currentReplicaHintSupplier) {
91          super(blockingTaskExecutor);
92  
93          requireNonNull(delegate, "delegate");
94          checkArgument(maxRetries > 0, "maxRetries: %s (expected: > 0)", maxRetries);
95          checkArgument(retryIntervalMillis >= 0,
96                        "retryIntervalMillis: %s (expected: >= 0)", retryIntervalMillis);
97          requireNonNull(currentReplicaHintSupplier, "currentReplicaHintSupplier");
98  
99          this.delegate = delegate;
100         this.maxRetries = maxRetries;
101         this.retryIntervalMillis = retryIntervalMillis;
102         this.currentReplicaHintSupplier = currentReplicaHintSupplier;
103     }
104 
105     @Override
106     public CompletableFuture<Void> createProject(String projectName) {
107         return delegate.createProject(projectName);
108     }
109 
110     @Override
111     public CompletableFuture<Void> removeProject(String projectName) {
112         return delegate.removeProject(projectName);
113     }
114 
115     @Override
116     public CompletableFuture<Void> purgeProject(String projectName) {
117         return delegate.purgeProject(projectName);
118     }
119 
120     @Override
121     public CompletableFuture<Void> unremoveProject(String projectName) {
122         return delegate.unremoveProject(projectName);
123     }
124 
125     @Override
126     public CompletableFuture<Set<String>> listProjects() {
127         return delegate.listProjects();
128     }
129 
130     @Override
131     public CompletableFuture<Set<String>> listRemovedProjects() {
132         return delegate.listRemovedProjects();
133     }
134 
135     @Override
136     public CompletableFuture<CentralDogmaRepository> createRepository(String projectName,
137                                                                       String repositoryName) {
138         return delegate.createRepository(projectName, repositoryName);
139     }
140 
141     @Override
142     public CompletableFuture<Void> removeRepository(String projectName, String repositoryName) {
143         return delegate.removeRepository(projectName, repositoryName);
144     }
145 
146     @Override
147     public CompletableFuture<Void> purgeRepository(String projectName, String repositoryName) {
148         return delegate.purgeRepository(projectName, repositoryName);
149     }
150 
151     @Override
152     public CompletableFuture<CentralDogmaRepository> unremoveRepository(String projectName,
153                                                                         String repositoryName) {
154         return delegate.unremoveRepository(projectName, repositoryName);
155     }
156 
157     @Override
158     public CompletableFuture<Map<String, RepositoryInfo>> listRepositories(String projectName) {
159         return executeWithRetries(
160                 new Supplier<CompletableFuture<Map<String, RepositoryInfo>>>() {
161                     @Override
162                     public CompletableFuture<Map<String, RepositoryInfo>> get() {
163                         return delegate.listRepositories(projectName);
164                     }
165 
166                     @Override
167                     public String toString() {
168                         return "listRepositories(" + projectName + ')';
169                     }
170                 },
171                 (res, cause) -> {
172                     if (res != null) {
173                         for (RepositoryInfo info : res.values()) {
174                             if (!updateLatestKnownRevision(projectName, info.name(), info.headRevision())) {
175                                 return true;
176                             }
177                         }
178                     }
179                     return false;
180                 });
181     }
182 
183     @Override
184     public CompletableFuture<Set<String>> listRemovedRepositories(String projectName) {
185         return delegate.listRemovedRepositories(projectName);
186     }
187 
188     @Override
189     public CompletableFuture<Revision> normalizeRevision(
190             String projectName, String repositoryName, Revision revision) {
191         return executeWithRetries(
192                 new Supplier<CompletableFuture<Revision>>() {
193                     @Override
194                     public CompletableFuture<Revision> get() {
195                         return delegate.normalizeRevision(projectName, repositoryName, revision);
196                     }
197 
198                     @Override
199                     public String toString() {
200                         return "normalizeRevision(" + projectName + ", " + repositoryName + ", " +
201                                revision + ')';
202                     }
203                 },
204                 (res, cause) -> {
205                     if (cause != null) {
206                         return handleRevisionNotFound(projectName, repositoryName, revision, cause);
207                     }
208 
209                     if (revision.isRelative()) {
210                         final Revision headRevision = res.forward(-(revision.major() + 1));
211                         return !updateLatestKnownRevision(projectName, repositoryName, headRevision);
212                     }
213 
214                     updateLatestKnownRevision(projectName, repositoryName, revision);
215                     return false;
216                 });
217     }
218 
219     @Override
220     public CompletableFuture<Map<String, EntryType>> listFiles(
221             String projectName, String repositoryName, Revision revision, PathPattern pathPattern) {
222         return normalizeRevisionAndExecuteWithRetries(
223                 projectName, repositoryName, revision,
224                 new Function<Revision, CompletableFuture<Map<String, EntryType>>>() {
225                     @Override
226                     public CompletableFuture<Map<String, EntryType>> apply(Revision normRev) {
227                         return delegate.listFiles(projectName, repositoryName, normRev, pathPattern);
228                     }
229 
230                     @Override
231                     public String toString() {
232                         return "listFiles(" + projectName + ", " + repositoryName + ", " +
233                                revision + ", " + pathPattern + ')';
234                     }
235                 });
236     }
237 
238     @Override
239     public <T> CompletableFuture<Entry<T>> getFile(
240             String projectName, String repositoryName, Revision revision, Query<T> query) {
241         return normalizeRevisionAndExecuteWithRetries(
242                 projectName, repositoryName, revision,
243                 new Function<Revision, CompletableFuture<Entry<T>>>() {
244                     @Override
245                     public CompletableFuture<Entry<T>> apply(Revision normRev) {
246                         return delegate.getFile(projectName, repositoryName, normRev, query);
247                     }
248 
249                     @Override
250                     public String toString() {
251                         return "getFile(" + projectName + ", " + repositoryName + ", " +
252                                revision + ", " + query + ')';
253                     }
254                 });
255     }
256 
257     @Override
258     public CompletableFuture<Map<String, Entry<?>>> getFiles(
259             String projectName, String repositoryName, Revision revision, PathPattern pathPattern) {
260         return normalizeRevisionAndExecuteWithRetries(
261                 projectName, repositoryName, revision,
262                 new Function<Revision, CompletableFuture<Map<String, Entry<?>>>>() {
263                     @Override
264                     public CompletableFuture<Map<String, Entry<?>>> apply(Revision normRev) {
265                         return delegate.getFiles(projectName, repositoryName, normRev, pathPattern);
266                     }
267 
268                     @Override
269                     public String toString() {
270                         return "getFiles(" + projectName + ", " + repositoryName + ", " +
271                                revision + ", " + pathPattern + ')';
272                     }
273                 });
274     }
275 
276     @Override
277     public <T> CompletableFuture<MergedEntry<T>> mergeFiles(
278             String projectName, String repositoryName, Revision revision,
279             MergeQuery<T> mergeQuery) {
280         return normalizeRevisionAndExecuteWithRetries(
281                 projectName, repositoryName, revision,
282                 new Function<Revision, CompletableFuture<MergedEntry<T>>>() {
283                     @Override
284                     public CompletableFuture<MergedEntry<T>> apply(Revision normRev) {
285                         return delegate.mergeFiles(projectName, repositoryName, normRev, mergeQuery);
286                     }
287 
288                     @Override
289                     public String toString() {
290                         return "mergeFiles(" + projectName + ", " + repositoryName + ", " +
291                                revision + ", " + mergeQuery + ')';
292                     }
293                 });
294     }
295 
296     @Override
297     public CompletableFuture<List<Commit>> getHistory(
298             String projectName, String repositoryName, Revision from,
299             Revision to, PathPattern pathPattern, int maxCommits) {
300         return normalizeRevisionsAndExecuteWithRetries(
301                 projectName, repositoryName, from, to,
302                 new BiFunction<Revision, Revision, CompletableFuture<List<Commit>>>() {
303                     @Override
304                     public CompletableFuture<List<Commit>> apply(Revision normFromRev, Revision normToRev) {
305                         return delegate.getHistory(projectName, repositoryName,
306                                                    normFromRev, normToRev, pathPattern, maxCommits);
307                     }
308 
309                     @Override
310                     public String toString() {
311                         return "getHistory(" + projectName + ", " + repositoryName + ", " +
312                                from + ", " + to + ", " + pathPattern + ", " + maxCommits + ')';
313                     }
314                 });
315     }
316 
317     @Override
318     public <T> CompletableFuture<Change<T>> getDiff(
319             String projectName, String repositoryName, Revision from, Revision to, Query<T> query) {
320         return normalizeRevisionsAndExecuteWithRetries(
321                 projectName, repositoryName, from, to,
322                 new BiFunction<Revision, Revision, CompletableFuture<Change<T>>>() {
323                     @Override
324                     public CompletableFuture<Change<T>> apply(Revision normFromRev, Revision normToRev) {
325                         return delegate.getDiff(projectName, repositoryName,
326                                                 normFromRev, normToRev, query);
327                     }
328 
329                     @Override
330                     public String toString() {
331                         return "getDiff(" + projectName + ", " + repositoryName + ", " +
332                                from + ", " + to + ", " + query + ')';
333                     }
334                 });
335     }
336 
337     @Override
338     public CompletableFuture<List<Change<?>>> getDiff(
339             String projectName, String repositoryName, Revision from, Revision to, PathPattern pathPattern) {
340         return normalizeRevisionsAndExecuteWithRetries(
341                 projectName, repositoryName, from, to,
342                 new BiFunction<Revision, Revision, CompletableFuture<List<Change<?>>>>() {
343                     @Override
344                     public CompletableFuture<List<Change<?>>> apply(Revision normFromRev, Revision normToRev) {
345                         return delegate.getDiff(projectName, repositoryName,
346                                                 normFromRev, normToRev, pathPattern);
347                     }
348 
349                     @Override
350                     public String toString() {
351                         return "getDiffs(" + projectName + ", " + repositoryName + ", " +
352                                from + ", " + to + ", " + pathPattern + ')';
353                     }
354                 });
355     }
356 
357     @Override
358     public CompletableFuture<List<Change<?>>> getPreviewDiffs(
359             String projectName, String repositoryName, Revision baseRevision,
360             Iterable<? extends Change<?>> changes) {
361         return normalizeRevisionAndExecuteWithRetries(
362                 projectName, repositoryName, baseRevision,
363                 new Function<Revision, CompletableFuture<List<Change<?>>>>() {
364                     @Override
365                     public CompletableFuture<List<Change<?>>> apply(Revision normBaseRev) {
366                         return delegate.getPreviewDiffs(projectName, repositoryName, normBaseRev, changes);
367                     }
368 
369                     @Override
370                     public String toString() {
371                         return "getPreviewDiffs(" + projectName + ", " + repositoryName + ", " +
372                                baseRevision + ", ...)";
373                     }
374                 });
375     }
376 
377     @Override
378     public CompletableFuture<PushResult> push(
379             String projectName, String repositoryName, Revision baseRevision,
380             String summary, String detail, Markup markup, Iterable<? extends Change<?>> changes) {
381         return executeWithRetries(
382                 new Supplier<CompletableFuture<PushResult>>() {
383                     @Override
384                     public CompletableFuture<PushResult> get() {
385                         return delegate.push(projectName, repositoryName, baseRevision,
386                                              summary, detail, markup, changes);
387                     }
388 
389                     @Override
390                     public String toString() {
391                         return "push(" + projectName + ", " + repositoryName + ", " +
392                                baseRevision + ", " + summary + ", ...)";
393                     }
394                 },
395                 pushRetryPredicate(projectName, repositoryName, baseRevision));
396     }
397 
398     @Override
399     public CompletableFuture<PushResult> push(
400             String projectName, String repositoryName, Revision baseRevision,
401             Author author, String summary, String detail, Markup markup,
402             Iterable<? extends Change<?>> changes) {
403         return executeWithRetries(
404                 new Supplier<CompletableFuture<PushResult>>() {
405                     @Override
406                     public CompletableFuture<PushResult> get() {
407                         return delegate.push(projectName, repositoryName, baseRevision,
408                                              author, summary, detail, markup, changes);
409                     }
410 
411                     @Override
412                     public String toString() {
413                         return "push(" + projectName + ", " + repositoryName + ", " +
414                                baseRevision + ", " + summary + ", ...)";
415                     }
416                 },
417                 pushRetryPredicate(projectName, repositoryName, baseRevision));
418     }
419 
420     private BiPredicate<PushResult, Throwable> pushRetryPredicate(
421             String projectName, String repositoryName, Revision baseRevision) {
422 
423         return (res, cause) -> {
424             if (cause != null) {
425                 return handleRevisionNotFound(projectName, repositoryName, baseRevision, cause);
426             }
427 
428             updateLatestKnownRevision(projectName, repositoryName, res.revision());
429             return false;
430         };
431     }
432 
433     @Override
434     public CompletableFuture<Revision> watchRepository(
435             String projectName, String repositoryName, Revision lastKnownRevision,
436             PathPattern pathPattern, long timeoutMillis, boolean errorOnEntryNotFound) {
437 
438         return normalizeRevisionAndExecuteWithRetries(
439                 projectName, repositoryName, lastKnownRevision,
440                 new Function<Revision, CompletableFuture<Revision>>() {
441                     @Override
442                     public CompletableFuture<Revision> apply(Revision normLastKnownRevision) {
443                         return delegate.watchRepository(projectName, repositoryName, normLastKnownRevision,
444                                                         pathPattern, timeoutMillis, errorOnEntryNotFound)
445                                        .thenApply(newLastKnownRevision -> {
446                                            if (newLastKnownRevision != null) {
447                                                updateLatestKnownRevision(projectName, repositoryName,
448                                                                          newLastKnownRevision);
449                                            }
450                                            return newLastKnownRevision;
451                                        });
452                     }
453 
454                     @Override
455                     public String toString() {
456                         return "watchRepository(" + projectName + ", " + repositoryName + ", " +
457                                lastKnownRevision + ", " + pathPattern + ", " + timeoutMillis + ", " +
458                                errorOnEntryNotFound + ')';
459                     }
460                 });
461     }
462 
463     @Override
464     public <T> CompletableFuture<Entry<T>> watchFile(
465             String projectName, String repositoryName, Revision lastKnownRevision,
466             Query<T> query, long timeoutMillis, boolean errorOnEntryNotFound) {
467 
468         return normalizeRevisionAndExecuteWithRetries(
469                 projectName, repositoryName, lastKnownRevision,
470                 new Function<Revision, CompletableFuture<Entry<T>>>() {
471                     @Override
472                     public CompletableFuture<Entry<T>> apply(Revision normLastKnownRevision) {
473                         return delegate.watchFile(projectName, repositoryName, normLastKnownRevision,
474                                                   query, timeoutMillis, errorOnEntryNotFound)
475                                        .thenApply(entry -> {
476                                            if (entry != null) {
477                                                updateLatestKnownRevision(projectName, repositoryName,
478                                                                          entry.revision());
479                                            }
480                                            return entry;
481                                        });
482                     }
483 
484                     @Override
485                     public String toString() {
486                         return "watchFile(" + projectName + ", " + repositoryName + ", " +
487                                lastKnownRevision + ", " + query + ", " + timeoutMillis + ", " +
488                                errorOnEntryNotFound + ')';
489                     }
490                 });
491     }
492 
493     @Override
494     public CompletableFuture<Void> whenEndpointReady() {
495         return delegate.whenEndpointReady();
496     }
497 
498     /**
499      * Normalizes the given {@link Revision} and then executes the task by calling {@code taskRunner.apply()}
500      * with the normalized {@link Revision}. The task can be executed repetitively when the task failed with
501      * a {@link RevisionNotFoundException} for the {@link Revision} which is known to exist.
502      */
503     private <T> CompletableFuture<T> normalizeRevisionAndExecuteWithRetries(
504             String projectName, String repositoryName, Revision revision,
505             Function<Revision, CompletableFuture<T>> taskRunner) {
506         return normalizeRevision(projectName, repositoryName, revision)
507                 .thenCompose(normRev -> executeWithRetries(
508                         new Supplier<CompletableFuture<T>>() {
509                             @Override
510                             public CompletableFuture<T> get() {
511                                 return taskRunner.apply(normRev);
512                             }
513 
514                             @Override
515                             public String toString() {
516                                 return taskRunner + " with " + normRev;
517                             }
518                         },
519                         (res, cause) -> cause != null &&
520                                         handleRevisionNotFound(projectName, repositoryName, normRev, cause)));
521     }
522 
523     /**
524      * Normalizes the given {@link Revision} range and then executes the task by calling
525      * {@code taskRunner.apply()} with the normalized {@link Revision} range. The task can be executed
526      * repetitively when the task failed with a {@link RevisionNotFoundException} for the {@link Revision}
527      * range which is known to exist.
528      */
529     private <T> CompletableFuture<T> normalizeRevisionsAndExecuteWithRetries(
530             String projectName, String repositoryName, Revision from, Revision to,
531             BiFunction<Revision, Revision, CompletableFuture<T>> taskRunner) {
532 
533         if (to == null) {
534             return exceptionallyCompletedFuture(new NullPointerException("to"));
535         }
536 
537         if (from == null) {
538             return exceptionallyCompletedFuture(new NullPointerException("from"));
539         }
540 
541         if (from.isRelative() && to.isRelative() ||
542             !from.isRelative() && !to.isRelative()) {
543 
544             // When both revisions are absolute or both revisions are relative,
545             // we can call normalizeRevision() only once and guess the other revision from the distance.
546             final int distance = to.major() - from.major();
547             final Revision baseRevision = to.compareTo(from) >= 0 ? to : from;
548 
549             return normalizeRevision(projectName, repositoryName, baseRevision).thenCompose(normBaseRev -> {
550                 final Revision normFromRev;
551                 final Revision normToRev;
552                 if (distance >= 0) {
553                     normToRev = normBaseRev;
554                     normFromRev = normBaseRev.backward(distance);
555                 } else {
556                     normFromRev = normBaseRev;
557                     normToRev = normBaseRev.backward(-distance);
558                 }
559 
560                 return executeWithRetries(
561                         new Supplier<CompletableFuture<T>>() {
562                             @Override
563                             public CompletableFuture<T> get() {
564                                 return taskRunner.apply(normFromRev, normToRev);
565                             }
566 
567                             @Override
568                             public String toString() {
569                                 return taskRunner + " with [" + normFromRev + ", " + normToRev + ']';
570                             }
571                         },
572                         (res, cause) -> {
573                             if (cause == null) {
574                                 return false;
575                             }
576                             return handleRevisionNotFound(projectName, repositoryName, normBaseRev, cause);
577                         });
578             });
579         } else {
580             // When one revision is absolute and the other is relative, we have to normalize both revisions
581             // because it is impossible to know the distance between them and which is a newer revision.
582             return CompletableFutures.allAsList(ImmutableList.of(
583                     normalizeRevision(projectName, repositoryName, from),
584                     normalizeRevision(projectName, repositoryName, to))).thenCompose(normRevs -> {
585                 final Revision normFromRev = normRevs.get(0);
586                 final Revision normToRev = normRevs.get(1);
587                 return executeWithRetries(
588                         new Supplier<CompletableFuture<T>>() {
589                             @Override
590                             public CompletableFuture<T> get() {
591                                 return taskRunner.apply(normFromRev, normToRev);
592                             }
593 
594                             @Override
595                             public String toString() {
596                                 return taskRunner + " with [" + normFromRev + ", " + normToRev + ']';
597                             }
598                         },
599                         (res, cause) -> {
600                             if (cause == null) {
601                                 return false;
602                             }
603 
604                             final Revision normBaseRev = normFromRev.compareTo(normToRev) > 0 ? normFromRev
605                                                                                               : normToRev;
606                             return handleRevisionNotFound(projectName, repositoryName, normBaseRev, cause);
607                         });
608             });
609         }
610     }
611 
612     /**
613      * Executes the task by calling {@code taskRunner.get()} and re-executes the task if {@code retryPredicated}
614      * returns {@code true}. This method is used as a building block for sending a request repetitively
615      * when the request has failed due to replication lag.
616      */
617     private <T> CompletableFuture<T> executeWithRetries(
618             Supplier<CompletableFuture<T>> taskRunner,
619             BiPredicate<T, Throwable> retryPredicate) {
620         return executeWithRetries(taskRunner, retryPredicate, 0);
621     }
622 
623     private <T> CompletableFuture<T> executeWithRetries(
624             Supplier<CompletableFuture<T>> taskRunner,
625             BiPredicate<T, Throwable> retryPredicate,
626             int attemptsSoFar) {
627 
628         return CompletableFutures.handleCompose(taskRunner.get(), (res, cause) -> {
629             final Object currentReplicaHint = currentReplicaHintSupplier.get();
630             final int nextAttemptsSoFar = attemptsSoFar + 1;
631             final boolean retryRequired = retryPredicate.test(res, cause);
632             if (!retryRequired || nextAttemptsSoFar > maxRetries) {
633                 if (retryRequired) {
634                     if (currentReplicaHint != null) {
635                         logger.warn("[{}] Failed to retrieve the up-to-date data from Central Dogma " +
636                                     "after {} retries: {} => {}",
637                                     currentReplicaHint, attemptsSoFar, taskRunner, resultOrCause(res, cause));
638                     } else {
639                         logger.warn("Failed to retrieve the up-to-date data from Central Dogma " +
640                                     "after {} retries: {} => {}",
641                                     attemptsSoFar, taskRunner, resultOrCause(res, cause));
642                     }
643                 } else if (logger.isDebugEnabled()) {
644                     if (currentReplicaHint != null) {
645                         logger.debug("[{}] Retrieved the up-to-date data after {} retries: {} => {}",
646                                      currentReplicaHint, attemptsSoFar, taskRunner, resultOrCause(res, cause));
647                     } else {
648                         logger.debug("Retrieved the up-to-date data after {} retries: {} => {}",
649                                      attemptsSoFar, taskRunner, resultOrCause(res, cause));
650                     }
651                 }
652 
653                 if (cause == null) {
654                     return completedFuture(res);
655                 } else {
656                     return exceptionallyCompletedFuture(cause);
657                 }
658             }
659 
660             if (logger.isDebugEnabled()) {
661                 if (currentReplicaHint != null) {
662                     logger.debug("[{}] Got the out-of-date data ({} attempt(s) so far): {} => {}",
663                                  currentReplicaHint, nextAttemptsSoFar, taskRunner, resultOrCause(res, cause));
664                 } else {
665                     logger.debug("Got the out-of-date data ({} attempt(s) so far): {} => {}",
666                                  nextAttemptsSoFar, taskRunner, resultOrCause(res, cause));
667                 }
668             }
669 
670             final CompletableFuture<T> nextAttemptFuture = new CompletableFuture<>();
671             executor().schedule(() -> {
672                 try {
673                     executeWithRetries(taskRunner, retryPredicate,
674                                        nextAttemptsSoFar).handle((newRes, newCause) -> {
675                         if (newCause != null) {
676                             nextAttemptFuture.completeExceptionally(newCause);
677                         } else {
678                             nextAttemptFuture.complete(newRes);
679                         }
680                         return null;
681                     });
682                 } catch (Throwable t) {
683                     nextAttemptFuture.completeExceptionally(t);
684                 }
685             }, retryIntervalMillis, TimeUnit.MILLISECONDS);
686 
687             return nextAttemptFuture;
688         }).toCompletableFuture();
689     }
690 
691     /**
692      * Returns the cause of the specified {@code throwable} peeling it recursively, if it is one of the
693      * {@link CompletionException}, {@link ExecutionException}. Otherwise returns the {@code throwable}.
694      */
695     private static Throwable peel(Throwable throwable) {
696         Throwable cause = throwable.getCause();
697         while (cause != null && cause != throwable &&
698                (throwable instanceof CompletionException || throwable instanceof ExecutionException)) {
699             throwable = cause;
700             cause = throwable.getCause();
701         }
702         return throwable;
703     }
704 
705     /**
706      * Returns {@code true} to indicate that the request must be retried if {@code cause} is
707      * a {@link RevisionNotFoundException} and the specified {@link Revision} is supposed to exist.
708      */
709     private boolean handleRevisionNotFound(
710             String projectName, String repositoryName, Revision revision, Throwable cause) {
711         requireNonNull(cause, "cause");
712         cause = peel(cause);
713         if (!(cause instanceof RevisionNotFoundException)) {
714             return false;
715         }
716 
717         final Revision latestKnownRevision = latestKnownRevision(projectName, repositoryName);
718         if (latestKnownRevision == null) {
719             return false;
720         }
721 
722         if (revision.isRelative()) {
723             return revision.major() + latestKnownRevision.major() >= 0;
724         } else {
725             return revision.major() <= latestKnownRevision.major();
726         }
727     }
728 
729     @Nullable
730     @VisibleForTesting
731     Revision latestKnownRevision(String projectName, String repositoryName) {
732         synchronized (latestKnownRevisions) {
733             return latestKnownRevisions.get(new RepoId(projectName, repositoryName));
734         }
735     }
736 
737     /**
738      * Updates the latest known revision for the specified repository.
739      *
740      * @return {@code true} if the latest known revision has been updated to the specified {@link Revision} or
741      *         the latest known revision is equal to the specified {@link Revision}. {@code false} otherwise.
742      */
743     private boolean updateLatestKnownRevision(String projectName, String repositoryName, Revision newRevision) {
744         final Object currentReplicaHint = currentReplicaHintSupplier.get();
745         final RepoId id = new RepoId(projectName, repositoryName);
746         synchronized (latestKnownRevisions) {
747             final Revision oldRevision = latestKnownRevisions.get(id);
748             if (oldRevision == null) {
749                 if (currentReplicaHint != null) {
750                     logger.debug("[{}] Updating the latest known revision for {}/{} from <unknown> to: {}",
751                                  currentReplicaHint, projectName, repositoryName, newRevision);
752                 } else {
753                     logger.debug("Updating the latest known revision for {}/{} from <unknown> to: {}",
754                                  projectName, repositoryName, newRevision);
755                 }
756                 latestKnownRevisions.put(id, newRevision);
757                 return true;
758             }
759 
760             final int comparison = oldRevision.compareTo(newRevision);
761             if (comparison < 0) {
762                 if (currentReplicaHint != null) {
763                     logger.debug("[{}] Updating the latest known revision for {}/{} from {} to: {}",
764                                  currentReplicaHint, projectName, repositoryName, oldRevision, newRevision);
765                 } else {
766                     logger.debug("Updating the latest known revision for {}/{} from {} to: {}",
767                                  projectName, repositoryName, oldRevision, newRevision);
768                 }
769                 latestKnownRevisions.put(id, newRevision);
770                 return true;
771             }
772 
773             if (comparison == 0) {
774                 if (currentReplicaHint != null) {
775                     logger.debug("[{}] The latest known revision for {}/{} stays unchanged at: {}",
776                                  currentReplicaHint, projectName, repositoryName, newRevision);
777                 } else {
778                     logger.debug("The latest known revision for {}/{} stays unchanged at: {}",
779                                  projectName, repositoryName, newRevision);
780                 }
781                 return true;
782             }
783 
784             if (currentReplicaHint != null) {
785                 logger.debug("[{}] An out-of-date latest known revision for {}/{}: {}",
786                              currentReplicaHint, projectName, repositoryName, newRevision);
787             } else {
788                 logger.debug("An out-of-date latest known revision for {}/{}: {}",
789                              projectName, repositoryName, newRevision);
790             }
791             return false;
792         }
793     }
794 
795     @Nullable
796     private static Object resultOrCause(@Nullable Object res, @Nullable Throwable cause) {
797         return res != null ? res : cause;
798     }
799 
800     private static final class RepoId {
801         private final String projectName;
802         private final String repositoryName;
803 
804         RepoId(String projectName, String repositoryName) {
805             this.projectName = projectName;
806             this.repositoryName = repositoryName;
807         }
808 
809         @Override
810         public boolean equals(Object o) {
811             if (this == o) {
812                 return true;
813             }
814             if (!(o instanceof RepoId)) {
815                 return false;
816             }
817             final RepoId that = (RepoId) o;
818             return projectName.equals(that.projectName) && repositoryName.equals(that.repositoryName);
819         }
820 
821         @Override
822         public int hashCode() {
823             return Objects.hash(projectName, repositoryName);
824         }
825 
826         @Override
827         public String toString() {
828             return projectName + '/' + repositoryName;
829         }
830     }
831 }