1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.internal.client;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.spotify.futures.CompletableFutures.exceptionallyCompletedFuture;
20 import static java.util.Objects.requireNonNull;
21 import static java.util.concurrent.CompletableFuture.completedFuture;
22
23 import java.util.LinkedHashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Objects;
27 import java.util.Set;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.concurrent.CompletionException;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.TimeUnit;
33 import java.util.function.BiFunction;
34 import java.util.function.BiPredicate;
35 import java.util.function.Function;
36 import java.util.function.Supplier;
37
38 import org.jspecify.annotations.Nullable;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import com.google.common.annotations.VisibleForTesting;
43 import com.google.common.collect.ImmutableList;
44 import com.spotify.futures.CompletableFutures;
45
46 import com.linecorp.centraldogma.client.AbstractCentralDogma;
47 import com.linecorp.centraldogma.client.CentralDogma;
48 import com.linecorp.centraldogma.client.CentralDogmaRepository;
49 import com.linecorp.centraldogma.client.RepositoryInfo;
50 import com.linecorp.centraldogma.common.Author;
51 import com.linecorp.centraldogma.common.Change;
52 import com.linecorp.centraldogma.common.Commit;
53 import com.linecorp.centraldogma.common.Entry;
54 import com.linecorp.centraldogma.common.EntryType;
55 import com.linecorp.centraldogma.common.Markup;
56 import com.linecorp.centraldogma.common.MergeQuery;
57 import com.linecorp.centraldogma.common.MergedEntry;
58 import com.linecorp.centraldogma.common.PathPattern;
59 import com.linecorp.centraldogma.common.PushResult;
60 import com.linecorp.centraldogma.common.Query;
61 import com.linecorp.centraldogma.common.Revision;
62 import com.linecorp.centraldogma.common.RevisionNotFoundException;
63
64 import io.micrometer.core.instrument.MeterRegistry;
65
66
67
68
69
70 public final class ReplicationLagTolerantCentralDogma extends AbstractCentralDogma {
71
72 private static final Logger logger =
73 LoggerFactory.getLogger(ReplicationLagTolerantCentralDogma.class);
74
75 private final CentralDogma delegate;
76 private final int maxRetries;
77 private final long retryIntervalMillis;
78 private final Supplier<?> currentReplicaHintSupplier;
79
80 @VisibleForTesting
81 final Map<RepoId, Revision> latestKnownRevisions = new LinkedHashMap<RepoId, Revision>() {
82 private static final long serialVersionUID = 3587793379404809027L;
83
84 @Override
85 protected boolean removeEldestEntry(Map.Entry<RepoId, Revision> eldest) {
86
87 return size() > 8192;
88 }
89 };
90
91 public ReplicationLagTolerantCentralDogma(ScheduledExecutorService blockingTaskExecutor,
92 CentralDogma delegate, int maxRetries, long retryIntervalMillis,
93 Supplier<?> currentReplicaHintSupplier,
94 @Nullable MeterRegistry meterRegistry) {
95 super(blockingTaskExecutor, meterRegistry);
96
97 requireNonNull(delegate, "delegate");
98 checkArgument(maxRetries > 0, "maxRetries: %s (expected: > 0)", maxRetries);
99 checkArgument(retryIntervalMillis >= 0,
100 "retryIntervalMillis: %s (expected: >= 0)", retryIntervalMillis);
101 requireNonNull(currentReplicaHintSupplier, "currentReplicaHintSupplier");
102
103 this.delegate = delegate;
104 this.maxRetries = maxRetries;
105 this.retryIntervalMillis = retryIntervalMillis;
106 this.currentReplicaHintSupplier = currentReplicaHintSupplier;
107 }
108
109 @Override
110 public CompletableFuture<Void> createProject(String projectName) {
111 return delegate.createProject(projectName);
112 }
113
114 @Override
115 public CompletableFuture<Void> removeProject(String projectName) {
116 return delegate.removeProject(projectName).thenAccept(unused -> {
117 synchronized (latestKnownRevisions) {
118 latestKnownRevisions.entrySet().removeIf(e -> e.getKey().projectName.equals(projectName));
119 }
120 });
121 }
122
123 @Override
124 public CompletableFuture<Void> purgeProject(String projectName) {
125 return delegate.purgeProject(projectName);
126 }
127
128 @Override
129 public CompletableFuture<Void> unremoveProject(String projectName) {
130 return delegate.unremoveProject(projectName);
131 }
132
133 @Override
134 public CompletableFuture<Set<String>> listProjects() {
135 return delegate.listProjects();
136 }
137
138 @Override
139 public CompletableFuture<Set<String>> listRemovedProjects() {
140 return delegate.listRemovedProjects();
141 }
142
143 @Override
144 public CompletableFuture<CentralDogmaRepository> createRepository(String projectName,
145 String repositoryName) {
146 return delegate.createRepository(projectName, repositoryName);
147 }
148
149 @Override
150 public CompletableFuture<Void> removeRepository(String projectName, String repositoryName) {
151 return delegate.removeRepository(projectName, repositoryName).thenAccept(unused -> {
152 synchronized (latestKnownRevisions) {
153 latestKnownRevisions.remove(new RepoId(projectName, repositoryName));
154 }
155 });
156 }
157
158 @Override
159 public CompletableFuture<Void> purgeRepository(String projectName, String repositoryName) {
160 return delegate.purgeRepository(projectName, repositoryName);
161 }
162
163 @Override
164 public CompletableFuture<CentralDogmaRepository> unremoveRepository(String projectName,
165 String repositoryName) {
166 return delegate.unremoveRepository(projectName, repositoryName);
167 }
168
169 @Override
170 public CompletableFuture<Map<String, RepositoryInfo>> listRepositories(String projectName) {
171 return executeWithRetries(
172 new Supplier<CompletableFuture<Map<String, RepositoryInfo>>>() {
173 @Override
174 public CompletableFuture<Map<String, RepositoryInfo>> get() {
175 return delegate.listRepositories(projectName);
176 }
177
178 @Override
179 public String toString() {
180 return "listRepositories(" + projectName + ')';
181 }
182 },
183 (res, cause) -> {
184 if (res != null) {
185 for (RepositoryInfo info : res.values()) {
186 if (!updateLatestKnownRevision(projectName, info.name(), info.headRevision())) {
187 return true;
188 }
189 }
190 }
191 return false;
192 });
193 }
194
195 @Override
196 public CompletableFuture<Set<String>> listRemovedRepositories(String projectName) {
197 return delegate.listRemovedRepositories(projectName);
198 }
199
200 @Override
201 public CompletableFuture<Revision> normalizeRevision(
202 String projectName, String repositoryName, Revision revision) {
203 return executeWithRetries(
204 new Supplier<CompletableFuture<Revision>>() {
205 @Override
206 public CompletableFuture<Revision> get() {
207 return delegate.normalizeRevision(projectName, repositoryName, revision);
208 }
209
210 @Override
211 public String toString() {
212 return "normalizeRevision(" + projectName + ", " + repositoryName + ", " +
213 revision + ')';
214 }
215 },
216 (res, cause) -> {
217 if (cause != null) {
218 return handleRevisionNotFound(projectName, repositoryName, revision, cause);
219 }
220
221 if (revision.isRelative()) {
222 final Revision headRevision = res.forward(-(revision.major() + 1));
223 return !updateLatestKnownRevision(projectName, repositoryName, headRevision);
224 }
225
226 updateLatestKnownRevision(projectName, repositoryName, revision);
227 return false;
228 });
229 }
230
231 @Override
232 public CompletableFuture<Map<String, EntryType>> listFiles(
233 String projectName, String repositoryName, Revision revision, PathPattern pathPattern) {
234 return normalizeRevisionAndExecuteWithRetries(
235 projectName, repositoryName, revision,
236 new Function<Revision, CompletableFuture<Map<String, EntryType>>>() {
237 @Override
238 public CompletableFuture<Map<String, EntryType>> apply(Revision normRev) {
239 return delegate.listFiles(projectName, repositoryName, normRev, pathPattern);
240 }
241
242 @Override
243 public String toString() {
244 return "listFiles(" + projectName + ", " + repositoryName + ", " +
245 revision + ", " + pathPattern + ')';
246 }
247 });
248 }
249
250 @Override
251 public <T> CompletableFuture<Entry<T>> getFile(
252 String projectName, String repositoryName, Revision revision, Query<T> query,
253 boolean viewRaw, boolean renderTemplate, @Nullable String variableFile) {
254 return normalizeRevisionAndExecuteWithRetries(
255 projectName, repositoryName, revision,
256 new Function<>() {
257 @Override
258 public CompletableFuture<Entry<T>> apply(Revision normRev) {
259 return delegate.getFile(projectName, repositoryName, normRev, query,
260 viewRaw, renderTemplate, variableFile);
261 }
262
263 @Override
264 public String toString() {
265 return "getFile(" + projectName + ", " + repositoryName + ", " +
266 revision + ", " + query + ", " + viewRaw + ", " + renderTemplate + ", " +
267 variableFile + ')';
268 }
269 });
270 }
271
272 @Override
273 public CompletableFuture<Map<String, Entry<?>>> getFiles(
274 String projectName, String repositoryName, Revision revision, PathPattern pathPattern,
275 boolean viewRaw, boolean renderTemplate, @Nullable String variableFile) {
276 return normalizeRevisionAndExecuteWithRetries(
277 projectName, repositoryName, revision,
278 new Function<>() {
279 @Override
280 public CompletableFuture<Map<String, Entry<?>>> apply(Revision normRev) {
281 return delegate.getFiles(projectName, repositoryName, normRev, pathPattern,
282 viewRaw, renderTemplate, variableFile);
283 }
284
285 @Override
286 public String toString() {
287 return "getFiles(" + projectName + ", " + repositoryName + ", " +
288 revision + ", " + pathPattern + ", " + viewRaw + ", " + renderTemplate + ", " +
289 variableFile + ')';
290 }
291 });
292 }
293
294 @Override
295 public <T> CompletableFuture<MergedEntry<T>> mergeFiles(
296 String projectName, String repositoryName, Revision revision,
297 MergeQuery<T> mergeQuery) {
298 return normalizeRevisionAndExecuteWithRetries(
299 projectName, repositoryName, revision,
300 new Function<>() {
301 @Override
302 public CompletableFuture<MergedEntry<T>> apply(Revision normRev) {
303 return delegate.mergeFiles(projectName, repositoryName, normRev, mergeQuery);
304 }
305
306 @Override
307 public String toString() {
308 return "mergeFiles(" + projectName + ", " + repositoryName + ", " +
309 revision + ", " + mergeQuery + ')';
310 }
311 });
312 }
313
314 @Override
315 public CompletableFuture<List<Commit>> getHistory(
316 String projectName, String repositoryName, Revision from,
317 Revision to, PathPattern pathPattern, int maxCommits) {
318 return normalizeRevisionsAndExecuteWithRetries(
319 projectName, repositoryName, from, to,
320 new BiFunction<>() {
321 @Override
322 public CompletableFuture<List<Commit>> apply(Revision normFromRev, Revision normToRev) {
323 return delegate.getHistory(projectName, repositoryName,
324 normFromRev, normToRev, pathPattern, maxCommits);
325 }
326
327 @Override
328 public String toString() {
329 return "getHistory(" + projectName + ", " + repositoryName + ", " +
330 from + ", " + to + ", " + pathPattern + ", " + maxCommits + ')';
331 }
332 });
333 }
334
335 @Override
336 public <T> CompletableFuture<Change<T>> getDiff(
337 String projectName, String repositoryName, Revision from, Revision to, Query<T> query) {
338 return normalizeRevisionsAndExecuteWithRetries(
339 projectName, repositoryName, from, to,
340 new BiFunction<Revision, Revision, CompletableFuture<Change<T>>>() {
341 @Override
342 public CompletableFuture<Change<T>> apply(Revision normFromRev, Revision normToRev) {
343 return delegate.getDiff(projectName, repositoryName,
344 normFromRev, normToRev, query);
345 }
346
347 @Override
348 public String toString() {
349 return "getDiff(" + projectName + ", " + repositoryName + ", " +
350 from + ", " + to + ", " + query + ')';
351 }
352 });
353 }
354
355 @Override
356 public CompletableFuture<List<Change<?>>> getDiff(
357 String projectName, String repositoryName, Revision from, Revision to, PathPattern pathPattern) {
358 return normalizeRevisionsAndExecuteWithRetries(
359 projectName, repositoryName, from, to,
360 new BiFunction<Revision, Revision, CompletableFuture<List<Change<?>>>>() {
361 @Override
362 public CompletableFuture<List<Change<?>>> apply(Revision normFromRev, Revision normToRev) {
363 return delegate.getDiff(projectName, repositoryName,
364 normFromRev, normToRev, pathPattern);
365 }
366
367 @Override
368 public String toString() {
369 return "getDiffs(" + projectName + ", " + repositoryName + ", " +
370 from + ", " + to + ", " + pathPattern + ')';
371 }
372 });
373 }
374
375 @Override
376 public CompletableFuture<List<Change<?>>> getPreviewDiffs(
377 String projectName, String repositoryName, Revision baseRevision,
378 Iterable<? extends Change<?>> changes) {
379 return normalizeRevisionAndExecuteWithRetries(
380 projectName, repositoryName, baseRevision,
381 new Function<Revision, CompletableFuture<List<Change<?>>>>() {
382 @Override
383 public CompletableFuture<List<Change<?>>> apply(Revision normBaseRev) {
384 return delegate.getPreviewDiffs(projectName, repositoryName, normBaseRev, changes);
385 }
386
387 @Override
388 public String toString() {
389 return "getPreviewDiffs(" + projectName + ", " + repositoryName + ", " +
390 baseRevision + ", ...)";
391 }
392 });
393 }
394
395 @Override
396 public CompletableFuture<PushResult> push(
397 String projectName, String repositoryName, Revision baseRevision,
398 String summary, String detail, Markup markup, Iterable<? extends Change<?>> changes) {
399 return executeWithRetries(
400 new Supplier<CompletableFuture<PushResult>>() {
401 @Override
402 public CompletableFuture<PushResult> get() {
403 return delegate.push(projectName, repositoryName, baseRevision,
404 summary, detail, markup, changes);
405 }
406
407 @Override
408 public String toString() {
409 return "push(" + projectName + ", " + repositoryName + ", " +
410 baseRevision + ", " + summary + ", ...)";
411 }
412 },
413 pushRetryPredicate(projectName, repositoryName, baseRevision));
414 }
415
416 @Override
417 public CompletableFuture<PushResult> push(
418 String projectName, String repositoryName, Revision baseRevision,
419 Author author, String summary, String detail, Markup markup,
420 Iterable<? extends Change<?>> changes) {
421 return executeWithRetries(
422 new Supplier<CompletableFuture<PushResult>>() {
423 @Override
424 public CompletableFuture<PushResult> get() {
425 return delegate.push(projectName, repositoryName, baseRevision,
426 author, summary, detail, markup, changes);
427 }
428
429 @Override
430 public String toString() {
431 return "push(" + projectName + ", " + repositoryName + ", " +
432 baseRevision + ", " + summary + ", ...)";
433 }
434 },
435 pushRetryPredicate(projectName, repositoryName, baseRevision));
436 }
437
438 private BiPredicate<PushResult, Throwable> pushRetryPredicate(
439 String projectName, String repositoryName, Revision baseRevision) {
440
441 return (res, cause) -> {
442 if (cause != null) {
443 return handleRevisionNotFound(projectName, repositoryName, baseRevision, cause);
444 }
445
446 updateLatestKnownRevision(projectName, repositoryName, res.revision());
447 return false;
448 };
449 }
450
451 @Override
452 public CompletableFuture<Revision> watchRepository(
453 String projectName, String repositoryName, Revision lastKnownRevision,
454 PathPattern pathPattern, long timeoutMillis, boolean errorOnEntryNotFound) {
455
456 return normalizeRevisionAndExecuteWithRetries(
457 projectName, repositoryName, lastKnownRevision,
458 new Function<Revision, CompletableFuture<Revision>>() {
459 @Override
460 public CompletableFuture<Revision> apply(Revision normLastKnownRevision) {
461 return delegate.watchRepository(projectName, repositoryName, normLastKnownRevision,
462 pathPattern, timeoutMillis, errorOnEntryNotFound)
463 .thenApply(newLastKnownRevision -> {
464 if (newLastKnownRevision != null) {
465 updateLatestKnownRevision(projectName, repositoryName,
466 newLastKnownRevision);
467 }
468 return newLastKnownRevision;
469 });
470 }
471
472 @Override
473 public String toString() {
474 return "watchRepository(" + projectName + ", " + repositoryName + ", " +
475 lastKnownRevision + ", " + pathPattern + ", " + timeoutMillis + ", " +
476 errorOnEntryNotFound + ')';
477 }
478 });
479 }
480
481 @Override
482 public <T> CompletableFuture<Entry<T>> watchFile(
483 String projectName, String repositoryName, Revision lastKnownRevision,
484 Query<T> query, long timeoutMillis, boolean errorOnEntryNotFound,
485 boolean viewRaw, boolean renderTemplate, @Nullable String variableFile,
486 @Nullable Revision templateRevision) {
487
488 return normalizeRevisionAndExecuteWithRetries(
489 projectName, repositoryName, lastKnownRevision,
490 new Function<>() {
491 @Override
492 public CompletableFuture<Entry<T>> apply(Revision normLastKnownRevision) {
493 return delegate.watchFile(projectName, repositoryName, normLastKnownRevision,
494 query, timeoutMillis, errorOnEntryNotFound,
495 viewRaw, renderTemplate, variableFile, templateRevision)
496 .thenApply(entry -> {
497 if (entry != null) {
498 updateLatestKnownRevision(projectName, repositoryName,
499 entry.revision());
500 }
501 return entry;
502 });
503 }
504
505 @Override
506 public String toString() {
507 return "watchFile(" + projectName + ", " + repositoryName + ", " +
508 lastKnownRevision + ", " + query + ", " + timeoutMillis + ", " +
509 errorOnEntryNotFound + ", " + viewRaw + ", " + renderTemplate + ", " +
510 variableFile + ", " + templateRevision + ')';
511 }
512 });
513 }
514
515 @Override
516 public CompletableFuture<Void> whenEndpointReady() {
517 return delegate.whenEndpointReady();
518 }
519
520
521
522
523
524
525 private <T> CompletableFuture<T> normalizeRevisionAndExecuteWithRetries(
526 String projectName, String repositoryName, Revision revision,
527 Function<Revision, CompletableFuture<T>> taskRunner) {
528 return normalizeRevision(projectName, repositoryName, revision)
529 .thenCompose(normRev -> executeWithRetries(
530 new Supplier<>() {
531 @Override
532 public CompletableFuture<T> get() {
533 return taskRunner.apply(normRev);
534 }
535
536 @Override
537 public String toString() {
538 return taskRunner + " with " + normRev;
539 }
540 },
541 (res, cause) -> cause != null &&
542 handleRevisionNotFound(projectName, repositoryName, normRev, cause)));
543 }
544
545
546
547
548
549
550
551 private <T> CompletableFuture<T> normalizeRevisionsAndExecuteWithRetries(
552 String projectName, String repositoryName, Revision from, Revision to,
553 BiFunction<Revision, Revision, CompletableFuture<T>> taskRunner) {
554
555 if (to == null) {
556 return exceptionallyCompletedFuture(new NullPointerException("to"));
557 }
558
559 if (from == null) {
560 return exceptionallyCompletedFuture(new NullPointerException("from"));
561 }
562
563 if (from.isRelative() && to.isRelative() ||
564 !from.isRelative() && !to.isRelative()) {
565
566
567
568 final int distance = to.major() - from.major();
569 final Revision baseRevision = to.compareTo(from) >= 0 ? to : from;
570
571 return normalizeRevision(projectName, repositoryName, baseRevision).thenCompose(normBaseRev -> {
572 final Revision normFromRev;
573 final Revision normToRev;
574 if (distance >= 0) {
575 normToRev = normBaseRev;
576 normFromRev = normBaseRev.backward(distance);
577 } else {
578 normFromRev = normBaseRev;
579 normToRev = normBaseRev.backward(-distance);
580 }
581
582 return executeWithRetries(
583 new Supplier<CompletableFuture<T>>() {
584 @Override
585 public CompletableFuture<T> get() {
586 return taskRunner.apply(normFromRev, normToRev);
587 }
588
589 @Override
590 public String toString() {
591 return taskRunner + " with [" + normFromRev + ", " + normToRev + ']';
592 }
593 },
594 (res, cause) -> {
595 if (cause == null) {
596 return false;
597 }
598 return handleRevisionNotFound(projectName, repositoryName, normBaseRev, cause);
599 });
600 });
601 } else {
602
603
604 return CompletableFutures.allAsList(ImmutableList.of(
605 normalizeRevision(projectName, repositoryName, from),
606 normalizeRevision(projectName, repositoryName, to))).thenCompose(normRevs -> {
607 final Revision normFromRev = normRevs.get(0);
608 final Revision normToRev = normRevs.get(1);
609 return executeWithRetries(
610 new Supplier<CompletableFuture<T>>() {
611 @Override
612 public CompletableFuture<T> get() {
613 return taskRunner.apply(normFromRev, normToRev);
614 }
615
616 @Override
617 public String toString() {
618 return taskRunner + " with [" + normFromRev + ", " + normToRev + ']';
619 }
620 },
621 (res, cause) -> {
622 if (cause == null) {
623 return false;
624 }
625
626 final Revision normBaseRev = normFromRev.compareTo(normToRev) > 0 ? normFromRev
627 : normToRev;
628 return handleRevisionNotFound(projectName, repositoryName, normBaseRev, cause);
629 });
630 });
631 }
632 }
633
634
635
636
637
638
639 private <T> CompletableFuture<T> executeWithRetries(
640 Supplier<CompletableFuture<T>> taskRunner,
641 BiPredicate<T, Throwable> retryPredicate) {
642 return executeWithRetries(taskRunner, retryPredicate, 0);
643 }
644
645 private <T> CompletableFuture<T> executeWithRetries(
646 Supplier<CompletableFuture<T>> taskRunner,
647 BiPredicate<T, Throwable> retryPredicate,
648 int attemptsSoFar) {
649
650 return CompletableFutures.handleCompose(taskRunner.get(), (res, cause) -> {
651 final Object currentReplicaHint = currentReplicaHintSupplier.get();
652 final int nextAttemptsSoFar = attemptsSoFar + 1;
653 final boolean retryRequired = retryPredicate.test(res, cause);
654 if (!retryRequired || nextAttemptsSoFar > maxRetries) {
655 if (retryRequired) {
656 if (currentReplicaHint != null) {
657 logger.warn("[{}] Failed to retrieve the up-to-date data from Central Dogma " +
658 "after {} retries: {} => {}",
659 currentReplicaHint, attemptsSoFar, taskRunner, resultOrCause(res, cause));
660 } else {
661 logger.warn("Failed to retrieve the up-to-date data from Central Dogma " +
662 "after {} retries: {} => {}",
663 attemptsSoFar, taskRunner, resultOrCause(res, cause));
664 }
665 } else if (logger.isDebugEnabled()) {
666 if (currentReplicaHint != null) {
667 logger.debug("[{}] Retrieved the up-to-date data after {} retries: {} => {}",
668 currentReplicaHint, attemptsSoFar, taskRunner, resultOrCause(res, cause));
669 } else {
670 logger.debug("Retrieved the up-to-date data after {} retries: {} => {}",
671 attemptsSoFar, taskRunner, resultOrCause(res, cause));
672 }
673 }
674
675 if (cause == null) {
676 return completedFuture(res);
677 } else {
678 return exceptionallyCompletedFuture(cause);
679 }
680 }
681
682 if (logger.isDebugEnabled()) {
683 if (currentReplicaHint != null) {
684 logger.debug("[{}] Got the out-of-date data ({} attempt(s) so far): {} => {}",
685 currentReplicaHint, nextAttemptsSoFar, taskRunner, resultOrCause(res, cause));
686 } else {
687 logger.debug("Got the out-of-date data ({} attempt(s) so far): {} => {}",
688 nextAttemptsSoFar, taskRunner, resultOrCause(res, cause));
689 }
690 }
691
692 final CompletableFuture<T> nextAttemptFuture = new CompletableFuture<>();
693 executor().schedule(() -> {
694 try {
695 executeWithRetries(taskRunner, retryPredicate,
696 nextAttemptsSoFar).handle((newRes, newCause) -> {
697 if (newCause != null) {
698 nextAttemptFuture.completeExceptionally(newCause);
699 } else {
700 nextAttemptFuture.complete(newRes);
701 }
702 return null;
703 });
704 } catch (Throwable t) {
705 nextAttemptFuture.completeExceptionally(t);
706 }
707 }, retryIntervalMillis, TimeUnit.MILLISECONDS);
708
709 return nextAttemptFuture;
710 }).toCompletableFuture();
711 }
712
713
714
715
716
717 private static Throwable peel(Throwable throwable) {
718 Throwable cause = throwable.getCause();
719 while (cause != null && cause != throwable &&
720 (throwable instanceof CompletionException || throwable instanceof ExecutionException)) {
721 throwable = cause;
722 cause = throwable.getCause();
723 }
724 return throwable;
725 }
726
727
728
729
730
731 private boolean handleRevisionNotFound(
732 String projectName, String repositoryName, Revision revision, Throwable cause) {
733 requireNonNull(cause, "cause");
734 cause = peel(cause);
735 if (!(cause instanceof RevisionNotFoundException)) {
736 return false;
737 }
738
739 final Revision latestKnownRevision = latestKnownRevision(projectName, repositoryName);
740 if (latestKnownRevision == null) {
741 return false;
742 }
743
744 if (revision.isRelative()) {
745 return revision.major() + latestKnownRevision.major() >= 0;
746 } else {
747 return revision.major() <= latestKnownRevision.major();
748 }
749 }
750
751 @Nullable
752 @VisibleForTesting
753 Revision latestKnownRevision(String projectName, String repositoryName) {
754 synchronized (latestKnownRevisions) {
755 return latestKnownRevisions.get(new RepoId(projectName, repositoryName));
756 }
757 }
758
759
760
761
762
763
764
765 private boolean updateLatestKnownRevision(String projectName, String repositoryName, Revision newRevision) {
766 final Object currentReplicaHint = currentReplicaHintSupplier.get();
767 final RepoId id = new RepoId(projectName, repositoryName);
768 synchronized (latestKnownRevisions) {
769 final Revision oldRevision = latestKnownRevisions.get(id);
770 if (oldRevision == null) {
771 if (currentReplicaHint != null) {
772 logger.debug("[{}] Updating the latest known revision for {}/{} from <unknown> to: {}",
773 currentReplicaHint, projectName, repositoryName, newRevision);
774 } else {
775 logger.debug("Updating the latest known revision for {}/{} from <unknown> to: {}",
776 projectName, repositoryName, newRevision);
777 }
778 latestKnownRevisions.put(id, newRevision);
779 return true;
780 }
781
782 final int comparison = oldRevision.compareTo(newRevision);
783 if (comparison < 0) {
784 if (currentReplicaHint != null) {
785 logger.debug("[{}] Updating the latest known revision for {}/{} from {} to: {}",
786 currentReplicaHint, projectName, repositoryName, oldRevision, newRevision);
787 } else {
788 logger.debug("Updating the latest known revision for {}/{} from {} to: {}",
789 projectName, repositoryName, oldRevision, newRevision);
790 }
791 latestKnownRevisions.put(id, newRevision);
792 return true;
793 }
794
795 if (comparison == 0) {
796 if (currentReplicaHint != null) {
797 logger.debug("[{}] The latest known revision for {}/{} stays unchanged at: {}",
798 currentReplicaHint, projectName, repositoryName, newRevision);
799 } else {
800 logger.debug("The latest known revision for {}/{} stays unchanged at: {}",
801 projectName, repositoryName, newRevision);
802 }
803 return true;
804 }
805
806 if (currentReplicaHint != null) {
807 logger.debug("[{}] An out-of-date latest known revision for {}/{}: {}",
808 currentReplicaHint, projectName, repositoryName, newRevision);
809 } else {
810 logger.debug("An out-of-date latest known revision for {}/{}: {}",
811 projectName, repositoryName, newRevision);
812 }
813 return false;
814 }
815 }
816
817 @Nullable
818 private static Object resultOrCause(@Nullable Object res, @Nullable Throwable cause) {
819 return res != null ? res : cause;
820 }
821
822 @Override
823 public void close() throws Exception {
824 delegate.close();
825 }
826
827 @VisibleForTesting
828 static final class RepoId {
829 final String projectName;
830 final String repositoryName;
831
832 RepoId(String projectName, String repositoryName) {
833 this.projectName = projectName;
834 this.repositoryName = repositoryName;
835 }
836
837 @Override
838 public boolean equals(Object o) {
839 if (this == o) {
840 return true;
841 }
842 if (!(o instanceof RepoId)) {
843 return false;
844 }
845 final RepoId that = (RepoId) o;
846 return projectName.equals(that.projectName) && repositoryName.equals(that.repositoryName);
847 }
848
849 @Override
850 public int hashCode() {
851 return Objects.hash(projectName, repositoryName);
852 }
853
854 @Override
855 public String toString() {
856 return projectName + '/' + repositoryName;
857 }
858 }
859 }