1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.internal.client;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.spotify.futures.CompletableFutures.exceptionallyCompletedFuture;
20 import static java.util.Objects.requireNonNull;
21 import static java.util.concurrent.CompletableFuture.completedFuture;
22
23 import java.util.LinkedHashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Objects;
27 import java.util.Set;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.concurrent.CompletionException;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.TimeUnit;
33 import java.util.function.BiFunction;
34 import java.util.function.BiPredicate;
35 import java.util.function.Function;
36 import java.util.function.Supplier;
37
38 import javax.annotation.Nullable;
39
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import com.google.common.annotations.VisibleForTesting;
44 import com.google.common.collect.ImmutableList;
45 import com.spotify.futures.CompletableFutures;
46
47 import com.linecorp.centraldogma.client.AbstractCentralDogma;
48 import com.linecorp.centraldogma.client.CentralDogma;
49 import com.linecorp.centraldogma.client.CentralDogmaRepository;
50 import com.linecorp.centraldogma.client.RepositoryInfo;
51 import com.linecorp.centraldogma.common.Author;
52 import com.linecorp.centraldogma.common.Change;
53 import com.linecorp.centraldogma.common.Commit;
54 import com.linecorp.centraldogma.common.Entry;
55 import com.linecorp.centraldogma.common.EntryType;
56 import com.linecorp.centraldogma.common.Markup;
57 import com.linecorp.centraldogma.common.MergeQuery;
58 import com.linecorp.centraldogma.common.MergedEntry;
59 import com.linecorp.centraldogma.common.PathPattern;
60 import com.linecorp.centraldogma.common.PushResult;
61 import com.linecorp.centraldogma.common.Query;
62 import com.linecorp.centraldogma.common.Revision;
63 import com.linecorp.centraldogma.common.RevisionNotFoundException;
64
65 import io.micrometer.core.instrument.MeterRegistry;
66
67
68
69
70
71 public final class ReplicationLagTolerantCentralDogma extends AbstractCentralDogma {
72
73 private static final Logger logger =
74 LoggerFactory.getLogger(ReplicationLagTolerantCentralDogma.class);
75
76 private final CentralDogma delegate;
77 private final int maxRetries;
78 private final long retryIntervalMillis;
79 private final Supplier<?> currentReplicaHintSupplier;
80
81 @VisibleForTesting
82 final Map<RepoId, Revision> latestKnownRevisions = new LinkedHashMap<RepoId, Revision>() {
83 private static final long serialVersionUID = 3587793379404809027L;
84
85 @Override
86 protected boolean removeEldestEntry(Map.Entry<RepoId, Revision> eldest) {
87
88 return size() > 8192;
89 }
90 };
91
92 public ReplicationLagTolerantCentralDogma(ScheduledExecutorService blockingTaskExecutor,
93 CentralDogma delegate, int maxRetries, long retryIntervalMillis,
94 Supplier<?> currentReplicaHintSupplier,
95 @Nullable MeterRegistry meterRegistry) {
96 super(blockingTaskExecutor, meterRegistry);
97
98 requireNonNull(delegate, "delegate");
99 checkArgument(maxRetries > 0, "maxRetries: %s (expected: > 0)", maxRetries);
100 checkArgument(retryIntervalMillis >= 0,
101 "retryIntervalMillis: %s (expected: >= 0)", retryIntervalMillis);
102 requireNonNull(currentReplicaHintSupplier, "currentReplicaHintSupplier");
103
104 this.delegate = delegate;
105 this.maxRetries = maxRetries;
106 this.retryIntervalMillis = retryIntervalMillis;
107 this.currentReplicaHintSupplier = currentReplicaHintSupplier;
108 }
109
110 @Override
111 public CompletableFuture<Void> createProject(String projectName) {
112 return delegate.createProject(projectName);
113 }
114
115 @Override
116 public CompletableFuture<Void> removeProject(String projectName) {
117 return delegate.removeProject(projectName).thenAccept(unused -> {
118 synchronized (latestKnownRevisions) {
119 latestKnownRevisions.entrySet().removeIf(e -> e.getKey().projectName.equals(projectName));
120 }
121 });
122 }
123
124 @Override
125 public CompletableFuture<Void> purgeProject(String projectName) {
126 return delegate.purgeProject(projectName);
127 }
128
129 @Override
130 public CompletableFuture<Void> unremoveProject(String projectName) {
131 return delegate.unremoveProject(projectName);
132 }
133
134 @Override
135 public CompletableFuture<Set<String>> listProjects() {
136 return delegate.listProjects();
137 }
138
139 @Override
140 public CompletableFuture<Set<String>> listRemovedProjects() {
141 return delegate.listRemovedProjects();
142 }
143
144 @Override
145 public CompletableFuture<CentralDogmaRepository> createRepository(String projectName,
146 String repositoryName) {
147 return delegate.createRepository(projectName, repositoryName);
148 }
149
150 @Override
151 public CompletableFuture<Void> removeRepository(String projectName, String repositoryName) {
152 return delegate.removeRepository(projectName, repositoryName).thenAccept(unused -> {
153 synchronized (latestKnownRevisions) {
154 latestKnownRevisions.remove(new RepoId(projectName, repositoryName));
155 }
156 });
157 }
158
159 @Override
160 public CompletableFuture<Void> purgeRepository(String projectName, String repositoryName) {
161 return delegate.purgeRepository(projectName, repositoryName);
162 }
163
164 @Override
165 public CompletableFuture<CentralDogmaRepository> unremoveRepository(String projectName,
166 String repositoryName) {
167 return delegate.unremoveRepository(projectName, repositoryName);
168 }
169
170 @Override
171 public CompletableFuture<Map<String, RepositoryInfo>> listRepositories(String projectName) {
172 return executeWithRetries(
173 new Supplier<CompletableFuture<Map<String, RepositoryInfo>>>() {
174 @Override
175 public CompletableFuture<Map<String, RepositoryInfo>> get() {
176 return delegate.listRepositories(projectName);
177 }
178
179 @Override
180 public String toString() {
181 return "listRepositories(" + projectName + ')';
182 }
183 },
184 (res, cause) -> {
185 if (res != null) {
186 for (RepositoryInfo info : res.values()) {
187 if (!updateLatestKnownRevision(projectName, info.name(), info.headRevision())) {
188 return true;
189 }
190 }
191 }
192 return false;
193 });
194 }
195
196 @Override
197 public CompletableFuture<Set<String>> listRemovedRepositories(String projectName) {
198 return delegate.listRemovedRepositories(projectName);
199 }
200
201 @Override
202 public CompletableFuture<Revision> normalizeRevision(
203 String projectName, String repositoryName, Revision revision) {
204 return executeWithRetries(
205 new Supplier<CompletableFuture<Revision>>() {
206 @Override
207 public CompletableFuture<Revision> get() {
208 return delegate.normalizeRevision(projectName, repositoryName, revision);
209 }
210
211 @Override
212 public String toString() {
213 return "normalizeRevision(" + projectName + ", " + repositoryName + ", " +
214 revision + ')';
215 }
216 },
217 (res, cause) -> {
218 if (cause != null) {
219 return handleRevisionNotFound(projectName, repositoryName, revision, cause);
220 }
221
222 if (revision.isRelative()) {
223 final Revision headRevision = res.forward(-(revision.major() + 1));
224 return !updateLatestKnownRevision(projectName, repositoryName, headRevision);
225 }
226
227 updateLatestKnownRevision(projectName, repositoryName, revision);
228 return false;
229 });
230 }
231
232 @Override
233 public CompletableFuture<Map<String, EntryType>> listFiles(
234 String projectName, String repositoryName, Revision revision, PathPattern pathPattern) {
235 return normalizeRevisionAndExecuteWithRetries(
236 projectName, repositoryName, revision,
237 new Function<Revision, CompletableFuture<Map<String, EntryType>>>() {
238 @Override
239 public CompletableFuture<Map<String, EntryType>> apply(Revision normRev) {
240 return delegate.listFiles(projectName, repositoryName, normRev, pathPattern);
241 }
242
243 @Override
244 public String toString() {
245 return "listFiles(" + projectName + ", " + repositoryName + ", " +
246 revision + ", " + pathPattern + ')';
247 }
248 });
249 }
250
251 @Override
252 public <T> CompletableFuture<Entry<T>> getFile(
253 String projectName, String repositoryName, Revision revision, Query<T> query, boolean viewRaw) {
254 return normalizeRevisionAndExecuteWithRetries(
255 projectName, repositoryName, revision,
256 new Function<Revision, CompletableFuture<Entry<T>>>() {
257 @Override
258 public CompletableFuture<Entry<T>> apply(Revision normRev) {
259 return delegate.getFile(projectName, repositoryName, normRev, query, viewRaw);
260 }
261
262 @Override
263 public String toString() {
264 return "getFile(" + projectName + ", " + repositoryName + ", " +
265 revision + ", " + query + ", " + viewRaw + ')';
266 }
267 });
268 }
269
270 @Override
271 public CompletableFuture<Map<String, Entry<?>>> getFiles(
272 String projectName, String repositoryName, Revision revision, PathPattern pathPattern,
273 boolean viewRaw) {
274 return normalizeRevisionAndExecuteWithRetries(
275 projectName, repositoryName, revision,
276 new Function<Revision, CompletableFuture<Map<String, Entry<?>>>>() {
277 @Override
278 public CompletableFuture<Map<String, Entry<?>>> apply(Revision normRev) {
279 return delegate.getFiles(projectName, repositoryName, normRev, pathPattern, viewRaw);
280 }
281
282 @Override
283 public String toString() {
284 return "getFiles(" + projectName + ", " + repositoryName + ", " +
285 revision + ", " + pathPattern + ", " + viewRaw + ')';
286 }
287 });
288 }
289
290 @Override
291 public <T> CompletableFuture<MergedEntry<T>> mergeFiles(
292 String projectName, String repositoryName, Revision revision,
293 MergeQuery<T> mergeQuery) {
294 return normalizeRevisionAndExecuteWithRetries(
295 projectName, repositoryName, revision,
296 new Function<Revision, CompletableFuture<MergedEntry<T>>>() {
297 @Override
298 public CompletableFuture<MergedEntry<T>> apply(Revision normRev) {
299 return delegate.mergeFiles(projectName, repositoryName, normRev, mergeQuery);
300 }
301
302 @Override
303 public String toString() {
304 return "mergeFiles(" + projectName + ", " + repositoryName + ", " +
305 revision + ", " + mergeQuery + ')';
306 }
307 });
308 }
309
310 @Override
311 public CompletableFuture<List<Commit>> getHistory(
312 String projectName, String repositoryName, Revision from,
313 Revision to, PathPattern pathPattern, int maxCommits) {
314 return normalizeRevisionsAndExecuteWithRetries(
315 projectName, repositoryName, from, to,
316 new BiFunction<Revision, Revision, CompletableFuture<List<Commit>>>() {
317 @Override
318 public CompletableFuture<List<Commit>> apply(Revision normFromRev, Revision normToRev) {
319 return delegate.getHistory(projectName, repositoryName,
320 normFromRev, normToRev, pathPattern, maxCommits);
321 }
322
323 @Override
324 public String toString() {
325 return "getHistory(" + projectName + ", " + repositoryName + ", " +
326 from + ", " + to + ", " + pathPattern + ", " + maxCommits + ')';
327 }
328 });
329 }
330
331 @Override
332 public <T> CompletableFuture<Change<T>> getDiff(
333 String projectName, String repositoryName, Revision from, Revision to, Query<T> query) {
334 return normalizeRevisionsAndExecuteWithRetries(
335 projectName, repositoryName, from, to,
336 new BiFunction<Revision, Revision, CompletableFuture<Change<T>>>() {
337 @Override
338 public CompletableFuture<Change<T>> apply(Revision normFromRev, Revision normToRev) {
339 return delegate.getDiff(projectName, repositoryName,
340 normFromRev, normToRev, query);
341 }
342
343 @Override
344 public String toString() {
345 return "getDiff(" + projectName + ", " + repositoryName + ", " +
346 from + ", " + to + ", " + query + ')';
347 }
348 });
349 }
350
351 @Override
352 public CompletableFuture<List<Change<?>>> getDiff(
353 String projectName, String repositoryName, Revision from, Revision to, PathPattern pathPattern) {
354 return normalizeRevisionsAndExecuteWithRetries(
355 projectName, repositoryName, from, to,
356 new BiFunction<Revision, Revision, CompletableFuture<List<Change<?>>>>() {
357 @Override
358 public CompletableFuture<List<Change<?>>> apply(Revision normFromRev, Revision normToRev) {
359 return delegate.getDiff(projectName, repositoryName,
360 normFromRev, normToRev, pathPattern);
361 }
362
363 @Override
364 public String toString() {
365 return "getDiffs(" + projectName + ", " + repositoryName + ", " +
366 from + ", " + to + ", " + pathPattern + ')';
367 }
368 });
369 }
370
371 @Override
372 public CompletableFuture<List<Change<?>>> getPreviewDiffs(
373 String projectName, String repositoryName, Revision baseRevision,
374 Iterable<? extends Change<?>> changes) {
375 return normalizeRevisionAndExecuteWithRetries(
376 projectName, repositoryName, baseRevision,
377 new Function<Revision, CompletableFuture<List<Change<?>>>>() {
378 @Override
379 public CompletableFuture<List<Change<?>>> apply(Revision normBaseRev) {
380 return delegate.getPreviewDiffs(projectName, repositoryName, normBaseRev, changes);
381 }
382
383 @Override
384 public String toString() {
385 return "getPreviewDiffs(" + projectName + ", " + repositoryName + ", " +
386 baseRevision + ", ...)";
387 }
388 });
389 }
390
391 @Override
392 public CompletableFuture<PushResult> push(
393 String projectName, String repositoryName, Revision baseRevision,
394 String summary, String detail, Markup markup, Iterable<? extends Change<?>> changes) {
395 return executeWithRetries(
396 new Supplier<CompletableFuture<PushResult>>() {
397 @Override
398 public CompletableFuture<PushResult> get() {
399 return delegate.push(projectName, repositoryName, baseRevision,
400 summary, detail, markup, changes);
401 }
402
403 @Override
404 public String toString() {
405 return "push(" + projectName + ", " + repositoryName + ", " +
406 baseRevision + ", " + summary + ", ...)";
407 }
408 },
409 pushRetryPredicate(projectName, repositoryName, baseRevision));
410 }
411
412 @Override
413 public CompletableFuture<PushResult> push(
414 String projectName, String repositoryName, Revision baseRevision,
415 Author author, String summary, String detail, Markup markup,
416 Iterable<? extends Change<?>> changes) {
417 return executeWithRetries(
418 new Supplier<CompletableFuture<PushResult>>() {
419 @Override
420 public CompletableFuture<PushResult> get() {
421 return delegate.push(projectName, repositoryName, baseRevision,
422 author, summary, detail, markup, changes);
423 }
424
425 @Override
426 public String toString() {
427 return "push(" + projectName + ", " + repositoryName + ", " +
428 baseRevision + ", " + summary + ", ...)";
429 }
430 },
431 pushRetryPredicate(projectName, repositoryName, baseRevision));
432 }
433
434 private BiPredicate<PushResult, Throwable> pushRetryPredicate(
435 String projectName, String repositoryName, Revision baseRevision) {
436
437 return (res, cause) -> {
438 if (cause != null) {
439 return handleRevisionNotFound(projectName, repositoryName, baseRevision, cause);
440 }
441
442 updateLatestKnownRevision(projectName, repositoryName, res.revision());
443 return false;
444 };
445 }
446
447 @Override
448 public CompletableFuture<Revision> watchRepository(
449 String projectName, String repositoryName, Revision lastKnownRevision,
450 PathPattern pathPattern, long timeoutMillis, boolean errorOnEntryNotFound) {
451
452 return normalizeRevisionAndExecuteWithRetries(
453 projectName, repositoryName, lastKnownRevision,
454 new Function<Revision, CompletableFuture<Revision>>() {
455 @Override
456 public CompletableFuture<Revision> apply(Revision normLastKnownRevision) {
457 return delegate.watchRepository(projectName, repositoryName, normLastKnownRevision,
458 pathPattern, timeoutMillis, errorOnEntryNotFound)
459 .thenApply(newLastKnownRevision -> {
460 if (newLastKnownRevision != null) {
461 updateLatestKnownRevision(projectName, repositoryName,
462 newLastKnownRevision);
463 }
464 return newLastKnownRevision;
465 });
466 }
467
468 @Override
469 public String toString() {
470 return "watchRepository(" + projectName + ", " + repositoryName + ", " +
471 lastKnownRevision + ", " + pathPattern + ", " + timeoutMillis + ", " +
472 errorOnEntryNotFound + ')';
473 }
474 });
475 }
476
477 @Override
478 public <T> CompletableFuture<Entry<T>> watchFile(
479 String projectName, String repositoryName, Revision lastKnownRevision,
480 Query<T> query, long timeoutMillis, boolean errorOnEntryNotFound, boolean viewRaw) {
481
482 return normalizeRevisionAndExecuteWithRetries(
483 projectName, repositoryName, lastKnownRevision,
484 new Function<Revision, CompletableFuture<Entry<T>>>() {
485 @Override
486 public CompletableFuture<Entry<T>> apply(Revision normLastKnownRevision) {
487 return delegate.watchFile(projectName, repositoryName, normLastKnownRevision,
488 query, timeoutMillis, errorOnEntryNotFound, viewRaw)
489 .thenApply(entry -> {
490 if (entry != null) {
491 updateLatestKnownRevision(projectName, repositoryName,
492 entry.revision());
493 }
494 return entry;
495 });
496 }
497
498 @Override
499 public String toString() {
500 return "watchFile(" + projectName + ", " + repositoryName + ", " +
501 lastKnownRevision + ", " + query + ", " + timeoutMillis + ", " +
502 errorOnEntryNotFound + ')';
503 }
504 });
505 }
506
507 @Override
508 public CompletableFuture<Void> whenEndpointReady() {
509 return delegate.whenEndpointReady();
510 }
511
512
513
514
515
516
517 private <T> CompletableFuture<T> normalizeRevisionAndExecuteWithRetries(
518 String projectName, String repositoryName, Revision revision,
519 Function<Revision, CompletableFuture<T>> taskRunner) {
520 return normalizeRevision(projectName, repositoryName, revision)
521 .thenCompose(normRev -> executeWithRetries(
522 new Supplier<CompletableFuture<T>>() {
523 @Override
524 public CompletableFuture<T> get() {
525 return taskRunner.apply(normRev);
526 }
527
528 @Override
529 public String toString() {
530 return taskRunner + " with " + normRev;
531 }
532 },
533 (res, cause) -> cause != null &&
534 handleRevisionNotFound(projectName, repositoryName, normRev, cause)));
535 }
536
537
538
539
540
541
542
543 private <T> CompletableFuture<T> normalizeRevisionsAndExecuteWithRetries(
544 String projectName, String repositoryName, Revision from, Revision to,
545 BiFunction<Revision, Revision, CompletableFuture<T>> taskRunner) {
546
547 if (to == null) {
548 return exceptionallyCompletedFuture(new NullPointerException("to"));
549 }
550
551 if (from == null) {
552 return exceptionallyCompletedFuture(new NullPointerException("from"));
553 }
554
555 if (from.isRelative() && to.isRelative() ||
556 !from.isRelative() && !to.isRelative()) {
557
558
559
560 final int distance = to.major() - from.major();
561 final Revision baseRevision = to.compareTo(from) >= 0 ? to : from;
562
563 return normalizeRevision(projectName, repositoryName, baseRevision).thenCompose(normBaseRev -> {
564 final Revision normFromRev;
565 final Revision normToRev;
566 if (distance >= 0) {
567 normToRev = normBaseRev;
568 normFromRev = normBaseRev.backward(distance);
569 } else {
570 normFromRev = normBaseRev;
571 normToRev = normBaseRev.backward(-distance);
572 }
573
574 return executeWithRetries(
575 new Supplier<CompletableFuture<T>>() {
576 @Override
577 public CompletableFuture<T> get() {
578 return taskRunner.apply(normFromRev, normToRev);
579 }
580
581 @Override
582 public String toString() {
583 return taskRunner + " with [" + normFromRev + ", " + normToRev + ']';
584 }
585 },
586 (res, cause) -> {
587 if (cause == null) {
588 return false;
589 }
590 return handleRevisionNotFound(projectName, repositoryName, normBaseRev, cause);
591 });
592 });
593 } else {
594
595
596 return CompletableFutures.allAsList(ImmutableList.of(
597 normalizeRevision(projectName, repositoryName, from),
598 normalizeRevision(projectName, repositoryName, to))).thenCompose(normRevs -> {
599 final Revision normFromRev = normRevs.get(0);
600 final Revision normToRev = normRevs.get(1);
601 return executeWithRetries(
602 new Supplier<CompletableFuture<T>>() {
603 @Override
604 public CompletableFuture<T> get() {
605 return taskRunner.apply(normFromRev, normToRev);
606 }
607
608 @Override
609 public String toString() {
610 return taskRunner + " with [" + normFromRev + ", " + normToRev + ']';
611 }
612 },
613 (res, cause) -> {
614 if (cause == null) {
615 return false;
616 }
617
618 final Revision normBaseRev = normFromRev.compareTo(normToRev) > 0 ? normFromRev
619 : normToRev;
620 return handleRevisionNotFound(projectName, repositoryName, normBaseRev, cause);
621 });
622 });
623 }
624 }
625
626
627
628
629
630
631 private <T> CompletableFuture<T> executeWithRetries(
632 Supplier<CompletableFuture<T>> taskRunner,
633 BiPredicate<T, Throwable> retryPredicate) {
634 return executeWithRetries(taskRunner, retryPredicate, 0);
635 }
636
637 private <T> CompletableFuture<T> executeWithRetries(
638 Supplier<CompletableFuture<T>> taskRunner,
639 BiPredicate<T, Throwable> retryPredicate,
640 int attemptsSoFar) {
641
642 return CompletableFutures.handleCompose(taskRunner.get(), (res, cause) -> {
643 final Object currentReplicaHint = currentReplicaHintSupplier.get();
644 final int nextAttemptsSoFar = attemptsSoFar + 1;
645 final boolean retryRequired = retryPredicate.test(res, cause);
646 if (!retryRequired || nextAttemptsSoFar > maxRetries) {
647 if (retryRequired) {
648 if (currentReplicaHint != null) {
649 logger.warn("[{}] Failed to retrieve the up-to-date data from Central Dogma " +
650 "after {} retries: {} => {}",
651 currentReplicaHint, attemptsSoFar, taskRunner, resultOrCause(res, cause));
652 } else {
653 logger.warn("Failed to retrieve the up-to-date data from Central Dogma " +
654 "after {} retries: {} => {}",
655 attemptsSoFar, taskRunner, resultOrCause(res, cause));
656 }
657 } else if (logger.isDebugEnabled()) {
658 if (currentReplicaHint != null) {
659 logger.debug("[{}] Retrieved the up-to-date data after {} retries: {} => {}",
660 currentReplicaHint, attemptsSoFar, taskRunner, resultOrCause(res, cause));
661 } else {
662 logger.debug("Retrieved the up-to-date data after {} retries: {} => {}",
663 attemptsSoFar, taskRunner, resultOrCause(res, cause));
664 }
665 }
666
667 if (cause == null) {
668 return completedFuture(res);
669 } else {
670 return exceptionallyCompletedFuture(cause);
671 }
672 }
673
674 if (logger.isDebugEnabled()) {
675 if (currentReplicaHint != null) {
676 logger.debug("[{}] Got the out-of-date data ({} attempt(s) so far): {} => {}",
677 currentReplicaHint, nextAttemptsSoFar, taskRunner, resultOrCause(res, cause));
678 } else {
679 logger.debug("Got the out-of-date data ({} attempt(s) so far): {} => {}",
680 nextAttemptsSoFar, taskRunner, resultOrCause(res, cause));
681 }
682 }
683
684 final CompletableFuture<T> nextAttemptFuture = new CompletableFuture<>();
685 executor().schedule(() -> {
686 try {
687 executeWithRetries(taskRunner, retryPredicate,
688 nextAttemptsSoFar).handle((newRes, newCause) -> {
689 if (newCause != null) {
690 nextAttemptFuture.completeExceptionally(newCause);
691 } else {
692 nextAttemptFuture.complete(newRes);
693 }
694 return null;
695 });
696 } catch (Throwable t) {
697 nextAttemptFuture.completeExceptionally(t);
698 }
699 }, retryIntervalMillis, TimeUnit.MILLISECONDS);
700
701 return nextAttemptFuture;
702 }).toCompletableFuture();
703 }
704
705
706
707
708
709 private static Throwable peel(Throwable throwable) {
710 Throwable cause = throwable.getCause();
711 while (cause != null && cause != throwable &&
712 (throwable instanceof CompletionException || throwable instanceof ExecutionException)) {
713 throwable = cause;
714 cause = throwable.getCause();
715 }
716 return throwable;
717 }
718
719
720
721
722
723 private boolean handleRevisionNotFound(
724 String projectName, String repositoryName, Revision revision, Throwable cause) {
725 requireNonNull(cause, "cause");
726 cause = peel(cause);
727 if (!(cause instanceof RevisionNotFoundException)) {
728 return false;
729 }
730
731 final Revision latestKnownRevision = latestKnownRevision(projectName, repositoryName);
732 if (latestKnownRevision == null) {
733 return false;
734 }
735
736 if (revision.isRelative()) {
737 return revision.major() + latestKnownRevision.major() >= 0;
738 } else {
739 return revision.major() <= latestKnownRevision.major();
740 }
741 }
742
743 @Nullable
744 @VisibleForTesting
745 Revision latestKnownRevision(String projectName, String repositoryName) {
746 synchronized (latestKnownRevisions) {
747 return latestKnownRevisions.get(new RepoId(projectName, repositoryName));
748 }
749 }
750
751
752
753
754
755
756
757 private boolean updateLatestKnownRevision(String projectName, String repositoryName, Revision newRevision) {
758 final Object currentReplicaHint = currentReplicaHintSupplier.get();
759 final RepoId id = new RepoId(projectName, repositoryName);
760 synchronized (latestKnownRevisions) {
761 final Revision oldRevision = latestKnownRevisions.get(id);
762 if (oldRevision == null) {
763 if (currentReplicaHint != null) {
764 logger.debug("[{}] Updating the latest known revision for {}/{} from <unknown> to: {}",
765 currentReplicaHint, projectName, repositoryName, newRevision);
766 } else {
767 logger.debug("Updating the latest known revision for {}/{} from <unknown> to: {}",
768 projectName, repositoryName, newRevision);
769 }
770 latestKnownRevisions.put(id, newRevision);
771 return true;
772 }
773
774 final int comparison = oldRevision.compareTo(newRevision);
775 if (comparison < 0) {
776 if (currentReplicaHint != null) {
777 logger.debug("[{}] Updating the latest known revision for {}/{} from {} to: {}",
778 currentReplicaHint, projectName, repositoryName, oldRevision, newRevision);
779 } else {
780 logger.debug("Updating the latest known revision for {}/{} from {} to: {}",
781 projectName, repositoryName, oldRevision, newRevision);
782 }
783 latestKnownRevisions.put(id, newRevision);
784 return true;
785 }
786
787 if (comparison == 0) {
788 if (currentReplicaHint != null) {
789 logger.debug("[{}] The latest known revision for {}/{} stays unchanged at: {}",
790 currentReplicaHint, projectName, repositoryName, newRevision);
791 } else {
792 logger.debug("The latest known revision for {}/{} stays unchanged at: {}",
793 projectName, repositoryName, newRevision);
794 }
795 return true;
796 }
797
798 if (currentReplicaHint != null) {
799 logger.debug("[{}] An out-of-date latest known revision for {}/{}: {}",
800 currentReplicaHint, projectName, repositoryName, newRevision);
801 } else {
802 logger.debug("An out-of-date latest known revision for {}/{}: {}",
803 projectName, repositoryName, newRevision);
804 }
805 return false;
806 }
807 }
808
809 @Nullable
810 private static Object resultOrCause(@Nullable Object res, @Nullable Throwable cause) {
811 return res != null ? res : cause;
812 }
813
814 @Override
815 public void close() throws Exception {
816 delegate.close();
817 }
818
819 @VisibleForTesting
820 static final class RepoId {
821 final String projectName;
822 final String repositoryName;
823
824 RepoId(String projectName, String repositoryName) {
825 this.projectName = projectName;
826 this.repositoryName = repositoryName;
827 }
828
829 @Override
830 public boolean equals(Object o) {
831 if (this == o) {
832 return true;
833 }
834 if (!(o instanceof RepoId)) {
835 return false;
836 }
837 final RepoId that = (RepoId) o;
838 return projectName.equals(that.projectName) && repositoryName.equals(that.repositoryName);
839 }
840
841 @Override
842 public int hashCode() {
843 return Objects.hash(projectName, repositoryName);
844 }
845
846 @Override
847 public String toString() {
848 return projectName + '/' + repositoryName;
849 }
850 }
851 }