1   /*
2    * Copyright 2019 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.internal.client;
17  
18  import static com.google.common.base.Preconditions.checkArgument;
19  import static com.spotify.futures.CompletableFutures.exceptionallyCompletedFuture;
20  import static java.util.Objects.requireNonNull;
21  import static java.util.concurrent.CompletableFuture.completedFuture;
22  
23  import java.util.LinkedHashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Objects;
27  import java.util.Set;
28  import java.util.concurrent.CompletableFuture;
29  import java.util.concurrent.CompletionException;
30  import java.util.concurrent.ExecutionException;
31  import java.util.concurrent.ScheduledExecutorService;
32  import java.util.concurrent.TimeUnit;
33  import java.util.function.BiFunction;
34  import java.util.function.BiPredicate;
35  import java.util.function.Function;
36  import java.util.function.Supplier;
37  
38  import javax.annotation.Nullable;
39  
40  import org.slf4j.Logger;
41  import org.slf4j.LoggerFactory;
42  
43  import com.google.common.annotations.VisibleForTesting;
44  import com.google.common.collect.ImmutableList;
45  import com.spotify.futures.CompletableFutures;
46  
47  import com.linecorp.centraldogma.client.AbstractCentralDogma;
48  import com.linecorp.centraldogma.client.CentralDogma;
49  import com.linecorp.centraldogma.client.CentralDogmaRepository;
50  import com.linecorp.centraldogma.client.RepositoryInfo;
51  import com.linecorp.centraldogma.common.Author;
52  import com.linecorp.centraldogma.common.Change;
53  import com.linecorp.centraldogma.common.Commit;
54  import com.linecorp.centraldogma.common.Entry;
55  import com.linecorp.centraldogma.common.EntryType;
56  import com.linecorp.centraldogma.common.Markup;
57  import com.linecorp.centraldogma.common.MergeQuery;
58  import com.linecorp.centraldogma.common.MergedEntry;
59  import com.linecorp.centraldogma.common.PathPattern;
60  import com.linecorp.centraldogma.common.PushResult;
61  import com.linecorp.centraldogma.common.Query;
62  import com.linecorp.centraldogma.common.Revision;
63  import com.linecorp.centraldogma.common.RevisionNotFoundException;
64  
65  import io.micrometer.core.instrument.MeterRegistry;
66  
67  /**
68   * A {@link CentralDogma} client that retries the request automatically when a {@link RevisionNotFoundException}
69   * was raised but it is certain that a given {@link Revision} exists.
70   */
71  public final class ReplicationLagTolerantCentralDogma extends AbstractCentralDogma {
72  
73      private static final Logger logger =
74              LoggerFactory.getLogger(ReplicationLagTolerantCentralDogma.class);
75  
76      private final CentralDogma delegate;
77      private final int maxRetries;
78      private final long retryIntervalMillis;
79      private final Supplier<?> currentReplicaHintSupplier;
80  
81      @VisibleForTesting
82      final Map<RepoId, Revision> latestKnownRevisions = new LinkedHashMap<RepoId, Revision>() {
83          private static final long serialVersionUID = 3587793379404809027L;
84  
85          @Override
86          protected boolean removeEldestEntry(Map.Entry<RepoId, Revision> eldest) {
87              // Keep only up to 8192 repositories, which should be enough for almost all cases.
88              return size() > 8192;
89          }
90      };
91  
92      public ReplicationLagTolerantCentralDogma(ScheduledExecutorService blockingTaskExecutor,
93                                                CentralDogma delegate, int maxRetries, long retryIntervalMillis,
94                                                Supplier<?> currentReplicaHintSupplier,
95                                                @Nullable MeterRegistry meterRegistry) {
96          super(blockingTaskExecutor, meterRegistry);
97  
98          requireNonNull(delegate, "delegate");
99          checkArgument(maxRetries > 0, "maxRetries: %s (expected: > 0)", maxRetries);
100         checkArgument(retryIntervalMillis >= 0,
101                       "retryIntervalMillis: %s (expected: >= 0)", retryIntervalMillis);
102         requireNonNull(currentReplicaHintSupplier, "currentReplicaHintSupplier");
103 
104         this.delegate = delegate;
105         this.maxRetries = maxRetries;
106         this.retryIntervalMillis = retryIntervalMillis;
107         this.currentReplicaHintSupplier = currentReplicaHintSupplier;
108     }
109 
110     @Override
111     public CompletableFuture<Void> createProject(String projectName) {
112         return delegate.createProject(projectName);
113     }
114 
115     @Override
116     public CompletableFuture<Void> removeProject(String projectName) {
117         return delegate.removeProject(projectName).thenAccept(unused -> {
118             synchronized (latestKnownRevisions) {
119                 latestKnownRevisions.entrySet().removeIf(e -> e.getKey().projectName.equals(projectName));
120             }
121         });
122     }
123 
124     @Override
125     public CompletableFuture<Void> purgeProject(String projectName) {
126         return delegate.purgeProject(projectName);
127     }
128 
129     @Override
130     public CompletableFuture<Void> unremoveProject(String projectName) {
131         return delegate.unremoveProject(projectName);
132     }
133 
134     @Override
135     public CompletableFuture<Set<String>> listProjects() {
136         return delegate.listProjects();
137     }
138 
139     @Override
140     public CompletableFuture<Set<String>> listRemovedProjects() {
141         return delegate.listRemovedProjects();
142     }
143 
144     @Override
145     public CompletableFuture<CentralDogmaRepository> createRepository(String projectName,
146                                                                       String repositoryName) {
147         return delegate.createRepository(projectName, repositoryName);
148     }
149 
150     @Override
151     public CompletableFuture<Void> removeRepository(String projectName, String repositoryName) {
152         return delegate.removeRepository(projectName, repositoryName).thenAccept(unused -> {
153             synchronized (latestKnownRevisions) {
154                 latestKnownRevisions.remove(new RepoId(projectName, repositoryName));
155             }
156         });
157     }
158 
159     @Override
160     public CompletableFuture<Void> purgeRepository(String projectName, String repositoryName) {
161         return delegate.purgeRepository(projectName, repositoryName);
162     }
163 
164     @Override
165     public CompletableFuture<CentralDogmaRepository> unremoveRepository(String projectName,
166                                                                         String repositoryName) {
167         return delegate.unremoveRepository(projectName, repositoryName);
168     }
169 
170     @Override
171     public CompletableFuture<Map<String, RepositoryInfo>> listRepositories(String projectName) {
172         return executeWithRetries(
173                 new Supplier<CompletableFuture<Map<String, RepositoryInfo>>>() {
174                     @Override
175                     public CompletableFuture<Map<String, RepositoryInfo>> get() {
176                         return delegate.listRepositories(projectName);
177                     }
178 
179                     @Override
180                     public String toString() {
181                         return "listRepositories(" + projectName + ')';
182                     }
183                 },
184                 (res, cause) -> {
185                     if (res != null) {
186                         for (RepositoryInfo info : res.values()) {
187                             if (!updateLatestKnownRevision(projectName, info.name(), info.headRevision())) {
188                                 return true;
189                             }
190                         }
191                     }
192                     return false;
193                 });
194     }
195 
196     @Override
197     public CompletableFuture<Set<String>> listRemovedRepositories(String projectName) {
198         return delegate.listRemovedRepositories(projectName);
199     }
200 
201     @Override
202     public CompletableFuture<Revision> normalizeRevision(
203             String projectName, String repositoryName, Revision revision) {
204         return executeWithRetries(
205                 new Supplier<CompletableFuture<Revision>>() {
206                     @Override
207                     public CompletableFuture<Revision> get() {
208                         return delegate.normalizeRevision(projectName, repositoryName, revision);
209                     }
210 
211                     @Override
212                     public String toString() {
213                         return "normalizeRevision(" + projectName + ", " + repositoryName + ", " +
214                                revision + ')';
215                     }
216                 },
217                 (res, cause) -> {
218                     if (cause != null) {
219                         return handleRevisionNotFound(projectName, repositoryName, revision, cause);
220                     }
221 
222                     if (revision.isRelative()) {
223                         final Revision headRevision = res.forward(-(revision.major() + 1));
224                         return !updateLatestKnownRevision(projectName, repositoryName, headRevision);
225                     }
226 
227                     updateLatestKnownRevision(projectName, repositoryName, revision);
228                     return false;
229                 });
230     }
231 
232     @Override
233     public CompletableFuture<Map<String, EntryType>> listFiles(
234             String projectName, String repositoryName, Revision revision, PathPattern pathPattern) {
235         return normalizeRevisionAndExecuteWithRetries(
236                 projectName, repositoryName, revision,
237                 new Function<Revision, CompletableFuture<Map<String, EntryType>>>() {
238                     @Override
239                     public CompletableFuture<Map<String, EntryType>> apply(Revision normRev) {
240                         return delegate.listFiles(projectName, repositoryName, normRev, pathPattern);
241                     }
242 
243                     @Override
244                     public String toString() {
245                         return "listFiles(" + projectName + ", " + repositoryName + ", " +
246                                revision + ", " + pathPattern + ')';
247                     }
248                 });
249     }
250 
251     @Override
252     public <T> CompletableFuture<Entry<T>> getFile(
253             String projectName, String repositoryName, Revision revision, Query<T> query) {
254         return normalizeRevisionAndExecuteWithRetries(
255                 projectName, repositoryName, revision,
256                 new Function<Revision, CompletableFuture<Entry<T>>>() {
257                     @Override
258                     public CompletableFuture<Entry<T>> apply(Revision normRev) {
259                         return delegate.getFile(projectName, repositoryName, normRev, query);
260                     }
261 
262                     @Override
263                     public String toString() {
264                         return "getFile(" + projectName + ", " + repositoryName + ", " +
265                                revision + ", " + query + ')';
266                     }
267                 });
268     }
269 
270     @Override
271     public CompletableFuture<Map<String, Entry<?>>> getFiles(
272             String projectName, String repositoryName, Revision revision, PathPattern pathPattern) {
273         return normalizeRevisionAndExecuteWithRetries(
274                 projectName, repositoryName, revision,
275                 new Function<Revision, CompletableFuture<Map<String, Entry<?>>>>() {
276                     @Override
277                     public CompletableFuture<Map<String, Entry<?>>> apply(Revision normRev) {
278                         return delegate.getFiles(projectName, repositoryName, normRev, pathPattern);
279                     }
280 
281                     @Override
282                     public String toString() {
283                         return "getFiles(" + projectName + ", " + repositoryName + ", " +
284                                revision + ", " + pathPattern + ')';
285                     }
286                 });
287     }
288 
289     @Override
290     public <T> CompletableFuture<MergedEntry<T>> mergeFiles(
291             String projectName, String repositoryName, Revision revision,
292             MergeQuery<T> mergeQuery) {
293         return normalizeRevisionAndExecuteWithRetries(
294                 projectName, repositoryName, revision,
295                 new Function<Revision, CompletableFuture<MergedEntry<T>>>() {
296                     @Override
297                     public CompletableFuture<MergedEntry<T>> apply(Revision normRev) {
298                         return delegate.mergeFiles(projectName, repositoryName, normRev, mergeQuery);
299                     }
300 
301                     @Override
302                     public String toString() {
303                         return "mergeFiles(" + projectName + ", " + repositoryName + ", " +
304                                revision + ", " + mergeQuery + ')';
305                     }
306                 });
307     }
308 
309     @Override
310     public CompletableFuture<List<Commit>> getHistory(
311             String projectName, String repositoryName, Revision from,
312             Revision to, PathPattern pathPattern, int maxCommits) {
313         return normalizeRevisionsAndExecuteWithRetries(
314                 projectName, repositoryName, from, to,
315                 new BiFunction<Revision, Revision, CompletableFuture<List<Commit>>>() {
316                     @Override
317                     public CompletableFuture<List<Commit>> apply(Revision normFromRev, Revision normToRev) {
318                         return delegate.getHistory(projectName, repositoryName,
319                                                    normFromRev, normToRev, pathPattern, maxCommits);
320                     }
321 
322                     @Override
323                     public String toString() {
324                         return "getHistory(" + projectName + ", " + repositoryName + ", " +
325                                from + ", " + to + ", " + pathPattern + ", " + maxCommits + ')';
326                     }
327                 });
328     }
329 
330     @Override
331     public <T> CompletableFuture<Change<T>> getDiff(
332             String projectName, String repositoryName, Revision from, Revision to, Query<T> query) {
333         return normalizeRevisionsAndExecuteWithRetries(
334                 projectName, repositoryName, from, to,
335                 new BiFunction<Revision, Revision, CompletableFuture<Change<T>>>() {
336                     @Override
337                     public CompletableFuture<Change<T>> apply(Revision normFromRev, Revision normToRev) {
338                         return delegate.getDiff(projectName, repositoryName,
339                                                 normFromRev, normToRev, query);
340                     }
341 
342                     @Override
343                     public String toString() {
344                         return "getDiff(" + projectName + ", " + repositoryName + ", " +
345                                from + ", " + to + ", " + query + ')';
346                     }
347                 });
348     }
349 
350     @Override
351     public CompletableFuture<List<Change<?>>> getDiff(
352             String projectName, String repositoryName, Revision from, Revision to, PathPattern pathPattern) {
353         return normalizeRevisionsAndExecuteWithRetries(
354                 projectName, repositoryName, from, to,
355                 new BiFunction<Revision, Revision, CompletableFuture<List<Change<?>>>>() {
356                     @Override
357                     public CompletableFuture<List<Change<?>>> apply(Revision normFromRev, Revision normToRev) {
358                         return delegate.getDiff(projectName, repositoryName,
359                                                 normFromRev, normToRev, pathPattern);
360                     }
361 
362                     @Override
363                     public String toString() {
364                         return "getDiffs(" + projectName + ", " + repositoryName + ", " +
365                                from + ", " + to + ", " + pathPattern + ')';
366                     }
367                 });
368     }
369 
370     @Override
371     public CompletableFuture<List<Change<?>>> getPreviewDiffs(
372             String projectName, String repositoryName, Revision baseRevision,
373             Iterable<? extends Change<?>> changes) {
374         return normalizeRevisionAndExecuteWithRetries(
375                 projectName, repositoryName, baseRevision,
376                 new Function<Revision, CompletableFuture<List<Change<?>>>>() {
377                     @Override
378                     public CompletableFuture<List<Change<?>>> apply(Revision normBaseRev) {
379                         return delegate.getPreviewDiffs(projectName, repositoryName, normBaseRev, changes);
380                     }
381 
382                     @Override
383                     public String toString() {
384                         return "getPreviewDiffs(" + projectName + ", " + repositoryName + ", " +
385                                baseRevision + ", ...)";
386                     }
387                 });
388     }
389 
390     @Override
391     public CompletableFuture<PushResult> push(
392             String projectName, String repositoryName, Revision baseRevision,
393             String summary, String detail, Markup markup, Iterable<? extends Change<?>> changes) {
394         return executeWithRetries(
395                 new Supplier<CompletableFuture<PushResult>>() {
396                     @Override
397                     public CompletableFuture<PushResult> get() {
398                         return delegate.push(projectName, repositoryName, baseRevision,
399                                              summary, detail, markup, changes);
400                     }
401 
402                     @Override
403                     public String toString() {
404                         return "push(" + projectName + ", " + repositoryName + ", " +
405                                baseRevision + ", " + summary + ", ...)";
406                     }
407                 },
408                 pushRetryPredicate(projectName, repositoryName, baseRevision));
409     }
410 
411     @Override
412     public CompletableFuture<PushResult> push(
413             String projectName, String repositoryName, Revision baseRevision,
414             Author author, String summary, String detail, Markup markup,
415             Iterable<? extends Change<?>> changes) {
416         return executeWithRetries(
417                 new Supplier<CompletableFuture<PushResult>>() {
418                     @Override
419                     public CompletableFuture<PushResult> get() {
420                         return delegate.push(projectName, repositoryName, baseRevision,
421                                              author, summary, detail, markup, changes);
422                     }
423 
424                     @Override
425                     public String toString() {
426                         return "push(" + projectName + ", " + repositoryName + ", " +
427                                baseRevision + ", " + summary + ", ...)";
428                     }
429                 },
430                 pushRetryPredicate(projectName, repositoryName, baseRevision));
431     }
432 
433     private BiPredicate<PushResult, Throwable> pushRetryPredicate(
434             String projectName, String repositoryName, Revision baseRevision) {
435 
436         return (res, cause) -> {
437             if (cause != null) {
438                 return handleRevisionNotFound(projectName, repositoryName, baseRevision, cause);
439             }
440 
441             updateLatestKnownRevision(projectName, repositoryName, res.revision());
442             return false;
443         };
444     }
445 
446     @Override
447     public CompletableFuture<Revision> watchRepository(
448             String projectName, String repositoryName, Revision lastKnownRevision,
449             PathPattern pathPattern, long timeoutMillis, boolean errorOnEntryNotFound) {
450 
451         return normalizeRevisionAndExecuteWithRetries(
452                 projectName, repositoryName, lastKnownRevision,
453                 new Function<Revision, CompletableFuture<Revision>>() {
454                     @Override
455                     public CompletableFuture<Revision> apply(Revision normLastKnownRevision) {
456                         return delegate.watchRepository(projectName, repositoryName, normLastKnownRevision,
457                                                         pathPattern, timeoutMillis, errorOnEntryNotFound)
458                                        .thenApply(newLastKnownRevision -> {
459                                            if (newLastKnownRevision != null) {
460                                                updateLatestKnownRevision(projectName, repositoryName,
461                                                                          newLastKnownRevision);
462                                            }
463                                            return newLastKnownRevision;
464                                        });
465                     }
466 
467                     @Override
468                     public String toString() {
469                         return "watchRepository(" + projectName + ", " + repositoryName + ", " +
470                                lastKnownRevision + ", " + pathPattern + ", " + timeoutMillis + ", " +
471                                errorOnEntryNotFound + ')';
472                     }
473                 });
474     }
475 
476     @Override
477     public <T> CompletableFuture<Entry<T>> watchFile(
478             String projectName, String repositoryName, Revision lastKnownRevision,
479             Query<T> query, long timeoutMillis, boolean errorOnEntryNotFound) {
480 
481         return normalizeRevisionAndExecuteWithRetries(
482                 projectName, repositoryName, lastKnownRevision,
483                 new Function<Revision, CompletableFuture<Entry<T>>>() {
484                     @Override
485                     public CompletableFuture<Entry<T>> apply(Revision normLastKnownRevision) {
486                         return delegate.watchFile(projectName, repositoryName, normLastKnownRevision,
487                                                   query, timeoutMillis, errorOnEntryNotFound)
488                                        .thenApply(entry -> {
489                                            if (entry != null) {
490                                                updateLatestKnownRevision(projectName, repositoryName,
491                                                                          entry.revision());
492                                            }
493                                            return entry;
494                                        });
495                     }
496 
497                     @Override
498                     public String toString() {
499                         return "watchFile(" + projectName + ", " + repositoryName + ", " +
500                                lastKnownRevision + ", " + query + ", " + timeoutMillis + ", " +
501                                errorOnEntryNotFound + ')';
502                     }
503                 });
504     }
505 
506     @Override
507     public CompletableFuture<Void> whenEndpointReady() {
508         return delegate.whenEndpointReady();
509     }
510 
511     /**
512      * Normalizes the given {@link Revision} and then executes the task by calling {@code taskRunner.apply()}
513      * with the normalized {@link Revision}. The task can be executed repetitively when the task failed with
514      * a {@link RevisionNotFoundException} for the {@link Revision} which is known to exist.
515      */
516     private <T> CompletableFuture<T> normalizeRevisionAndExecuteWithRetries(
517             String projectName, String repositoryName, Revision revision,
518             Function<Revision, CompletableFuture<T>> taskRunner) {
519         return normalizeRevision(projectName, repositoryName, revision)
520                 .thenCompose(normRev -> executeWithRetries(
521                         new Supplier<CompletableFuture<T>>() {
522                             @Override
523                             public CompletableFuture<T> get() {
524                                 return taskRunner.apply(normRev);
525                             }
526 
527                             @Override
528                             public String toString() {
529                                 return taskRunner + " with " + normRev;
530                             }
531                         },
532                         (res, cause) -> cause != null &&
533                                         handleRevisionNotFound(projectName, repositoryName, normRev, cause)));
534     }
535 
536     /**
537      * Normalizes the given {@link Revision} range and then executes the task by calling
538      * {@code taskRunner.apply()} with the normalized {@link Revision} range. The task can be executed
539      * repetitively when the task failed with a {@link RevisionNotFoundException} for the {@link Revision}
540      * range which is known to exist.
541      */
542     private <T> CompletableFuture<T> normalizeRevisionsAndExecuteWithRetries(
543             String projectName, String repositoryName, Revision from, Revision to,
544             BiFunction<Revision, Revision, CompletableFuture<T>> taskRunner) {
545 
546         if (to == null) {
547             return exceptionallyCompletedFuture(new NullPointerException("to"));
548         }
549 
550         if (from == null) {
551             return exceptionallyCompletedFuture(new NullPointerException("from"));
552         }
553 
554         if (from.isRelative() && to.isRelative() ||
555             !from.isRelative() && !to.isRelative()) {
556 
557             // When both revisions are absolute or both revisions are relative,
558             // we can call normalizeRevision() only once and guess the other revision from the distance.
559             final int distance = to.major() - from.major();
560             final Revision baseRevision = to.compareTo(from) >= 0 ? to : from;
561 
562             return normalizeRevision(projectName, repositoryName, baseRevision).thenCompose(normBaseRev -> {
563                 final Revision normFromRev;
564                 final Revision normToRev;
565                 if (distance >= 0) {
566                     normToRev = normBaseRev;
567                     normFromRev = normBaseRev.backward(distance);
568                 } else {
569                     normFromRev = normBaseRev;
570                     normToRev = normBaseRev.backward(-distance);
571                 }
572 
573                 return executeWithRetries(
574                         new Supplier<CompletableFuture<T>>() {
575                             @Override
576                             public CompletableFuture<T> get() {
577                                 return taskRunner.apply(normFromRev, normToRev);
578                             }
579 
580                             @Override
581                             public String toString() {
582                                 return taskRunner + " with [" + normFromRev + ", " + normToRev + ']';
583                             }
584                         },
585                         (res, cause) -> {
586                             if (cause == null) {
587                                 return false;
588                             }
589                             return handleRevisionNotFound(projectName, repositoryName, normBaseRev, cause);
590                         });
591             });
592         } else {
593             // When one revision is absolute and the other is relative, we have to normalize both revisions
594             // because it is impossible to know the distance between them and which is a newer revision.
595             return CompletableFutures.allAsList(ImmutableList.of(
596                     normalizeRevision(projectName, repositoryName, from),
597                     normalizeRevision(projectName, repositoryName, to))).thenCompose(normRevs -> {
598                 final Revision normFromRev = normRevs.get(0);
599                 final Revision normToRev = normRevs.get(1);
600                 return executeWithRetries(
601                         new Supplier<CompletableFuture<T>>() {
602                             @Override
603                             public CompletableFuture<T> get() {
604                                 return taskRunner.apply(normFromRev, normToRev);
605                             }
606 
607                             @Override
608                             public String toString() {
609                                 return taskRunner + " with [" + normFromRev + ", " + normToRev + ']';
610                             }
611                         },
612                         (res, cause) -> {
613                             if (cause == null) {
614                                 return false;
615                             }
616 
617                             final Revision normBaseRev = normFromRev.compareTo(normToRev) > 0 ? normFromRev
618                                                                                               : normToRev;
619                             return handleRevisionNotFound(projectName, repositoryName, normBaseRev, cause);
620                         });
621             });
622         }
623     }
624 
625     /**
626      * Executes the task by calling {@code taskRunner.get()} and re-executes the task if {@code retryPredicated}
627      * returns {@code true}. This method is used as a building block for sending a request repetitively
628      * when the request has failed due to replication lag.
629      */
630     private <T> CompletableFuture<T> executeWithRetries(
631             Supplier<CompletableFuture<T>> taskRunner,
632             BiPredicate<T, Throwable> retryPredicate) {
633         return executeWithRetries(taskRunner, retryPredicate, 0);
634     }
635 
636     private <T> CompletableFuture<T> executeWithRetries(
637             Supplier<CompletableFuture<T>> taskRunner,
638             BiPredicate<T, Throwable> retryPredicate,
639             int attemptsSoFar) {
640 
641         return CompletableFutures.handleCompose(taskRunner.get(), (res, cause) -> {
642             final Object currentReplicaHint = currentReplicaHintSupplier.get();
643             final int nextAttemptsSoFar = attemptsSoFar + 1;
644             final boolean retryRequired = retryPredicate.test(res, cause);
645             if (!retryRequired || nextAttemptsSoFar > maxRetries) {
646                 if (retryRequired) {
647                     if (currentReplicaHint != null) {
648                         logger.warn("[{}] Failed to retrieve the up-to-date data from Central Dogma " +
649                                     "after {} retries: {} => {}",
650                                     currentReplicaHint, attemptsSoFar, taskRunner, resultOrCause(res, cause));
651                     } else {
652                         logger.warn("Failed to retrieve the up-to-date data from Central Dogma " +
653                                     "after {} retries: {} => {}",
654                                     attemptsSoFar, taskRunner, resultOrCause(res, cause));
655                     }
656                 } else if (logger.isDebugEnabled()) {
657                     if (currentReplicaHint != null) {
658                         logger.debug("[{}] Retrieved the up-to-date data after {} retries: {} => {}",
659                                      currentReplicaHint, attemptsSoFar, taskRunner, resultOrCause(res, cause));
660                     } else {
661                         logger.debug("Retrieved the up-to-date data after {} retries: {} => {}",
662                                      attemptsSoFar, taskRunner, resultOrCause(res, cause));
663                     }
664                 }
665 
666                 if (cause == null) {
667                     return completedFuture(res);
668                 } else {
669                     return exceptionallyCompletedFuture(cause);
670                 }
671             }
672 
673             if (logger.isDebugEnabled()) {
674                 if (currentReplicaHint != null) {
675                     logger.debug("[{}] Got the out-of-date data ({} attempt(s) so far): {} => {}",
676                                  currentReplicaHint, nextAttemptsSoFar, taskRunner, resultOrCause(res, cause));
677                 } else {
678                     logger.debug("Got the out-of-date data ({} attempt(s) so far): {} => {}",
679                                  nextAttemptsSoFar, taskRunner, resultOrCause(res, cause));
680                 }
681             }
682 
683             final CompletableFuture<T> nextAttemptFuture = new CompletableFuture<>();
684             executor().schedule(() -> {
685                 try {
686                     executeWithRetries(taskRunner, retryPredicate,
687                                        nextAttemptsSoFar).handle((newRes, newCause) -> {
688                         if (newCause != null) {
689                             nextAttemptFuture.completeExceptionally(newCause);
690                         } else {
691                             nextAttemptFuture.complete(newRes);
692                         }
693                         return null;
694                     });
695                 } catch (Throwable t) {
696                     nextAttemptFuture.completeExceptionally(t);
697                 }
698             }, retryIntervalMillis, TimeUnit.MILLISECONDS);
699 
700             return nextAttemptFuture;
701         }).toCompletableFuture();
702     }
703 
704     /**
705      * Returns the cause of the specified {@code throwable} peeling it recursively, if it is one of the
706      * {@link CompletionException}, {@link ExecutionException}. Otherwise returns the {@code throwable}.
707      */
708     private static Throwable peel(Throwable throwable) {
709         Throwable cause = throwable.getCause();
710         while (cause != null && cause != throwable &&
711                (throwable instanceof CompletionException || throwable instanceof ExecutionException)) {
712             throwable = cause;
713             cause = throwable.getCause();
714         }
715         return throwable;
716     }
717 
718     /**
719      * Returns {@code true} to indicate that the request must be retried if {@code cause} is
720      * a {@link RevisionNotFoundException} and the specified {@link Revision} is supposed to exist.
721      */
722     private boolean handleRevisionNotFound(
723             String projectName, String repositoryName, Revision revision, Throwable cause) {
724         requireNonNull(cause, "cause");
725         cause = peel(cause);
726         if (!(cause instanceof RevisionNotFoundException)) {
727             return false;
728         }
729 
730         final Revision latestKnownRevision = latestKnownRevision(projectName, repositoryName);
731         if (latestKnownRevision == null) {
732             return false;
733         }
734 
735         if (revision.isRelative()) {
736             return revision.major() + latestKnownRevision.major() >= 0;
737         } else {
738             return revision.major() <= latestKnownRevision.major();
739         }
740     }
741 
742     @Nullable
743     @VisibleForTesting
744     Revision latestKnownRevision(String projectName, String repositoryName) {
745         synchronized (latestKnownRevisions) {
746             return latestKnownRevisions.get(new RepoId(projectName, repositoryName));
747         }
748     }
749 
750     /**
751      * Updates the latest known revision for the specified repository.
752      *
753      * @return {@code true} if the latest known revision has been updated to the specified {@link Revision} or
754      *         the latest known revision is equal to the specified {@link Revision}. {@code false} otherwise.
755      */
756     private boolean updateLatestKnownRevision(String projectName, String repositoryName, Revision newRevision) {
757         final Object currentReplicaHint = currentReplicaHintSupplier.get();
758         final RepoId id = new RepoId(projectName, repositoryName);
759         synchronized (latestKnownRevisions) {
760             final Revision oldRevision = latestKnownRevisions.get(id);
761             if (oldRevision == null) {
762                 if (currentReplicaHint != null) {
763                     logger.debug("[{}] Updating the latest known revision for {}/{} from <unknown> to: {}",
764                                  currentReplicaHint, projectName, repositoryName, newRevision);
765                 } else {
766                     logger.debug("Updating the latest known revision for {}/{} from <unknown> to: {}",
767                                  projectName, repositoryName, newRevision);
768                 }
769                 latestKnownRevisions.put(id, newRevision);
770                 return true;
771             }
772 
773             final int comparison = oldRevision.compareTo(newRevision);
774             if (comparison < 0) {
775                 if (currentReplicaHint != null) {
776                     logger.debug("[{}] Updating the latest known revision for {}/{} from {} to: {}",
777                                  currentReplicaHint, projectName, repositoryName, oldRevision, newRevision);
778                 } else {
779                     logger.debug("Updating the latest known revision for {}/{} from {} to: {}",
780                                  projectName, repositoryName, oldRevision, newRevision);
781                 }
782                 latestKnownRevisions.put(id, newRevision);
783                 return true;
784             }
785 
786             if (comparison == 0) {
787                 if (currentReplicaHint != null) {
788                     logger.debug("[{}] The latest known revision for {}/{} stays unchanged at: {}",
789                                  currentReplicaHint, projectName, repositoryName, newRevision);
790                 } else {
791                     logger.debug("The latest known revision for {}/{} stays unchanged at: {}",
792                                  projectName, repositoryName, newRevision);
793                 }
794                 return true;
795             }
796 
797             if (currentReplicaHint != null) {
798                 logger.debug("[{}] An out-of-date latest known revision for {}/{}: {}",
799                              currentReplicaHint, projectName, repositoryName, newRevision);
800             } else {
801                 logger.debug("An out-of-date latest known revision for {}/{}: {}",
802                              projectName, repositoryName, newRevision);
803             }
804             return false;
805         }
806     }
807 
808     @Nullable
809     private static Object resultOrCause(@Nullable Object res, @Nullable Throwable cause) {
810         return res != null ? res : cause;
811     }
812 
813     @Override
814     public void close() throws Exception {
815         delegate.close();
816     }
817 
818     @VisibleForTesting
819     static final class RepoId {
820         final String projectName;
821         final String repositoryName;
822 
823         RepoId(String projectName, String repositoryName) {
824             this.projectName = projectName;
825             this.repositoryName = repositoryName;
826         }
827 
828         @Override
829         public boolean equals(Object o) {
830             if (this == o) {
831                 return true;
832             }
833             if (!(o instanceof RepoId)) {
834                 return false;
835             }
836             final RepoId that = (RepoId) o;
837             return projectName.equals(that.projectName) && repositoryName.equals(that.repositoryName);
838         }
839 
840         @Override
841         public int hashCode() {
842             return Objects.hash(projectName, repositoryName);
843         }
844 
845         @Override
846         public String toString() {
847             return projectName + '/' + repositoryName;
848         }
849     }
850 }