1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.internal.client;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.spotify.futures.CompletableFutures.exceptionallyCompletedFuture;
20 import static java.util.Objects.requireNonNull;
21 import static java.util.concurrent.CompletableFuture.completedFuture;
22
23 import java.util.LinkedHashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Objects;
27 import java.util.Set;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.concurrent.CompletionException;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.TimeUnit;
33 import java.util.function.BiFunction;
34 import java.util.function.BiPredicate;
35 import java.util.function.Function;
36 import java.util.function.Supplier;
37
38 import javax.annotation.Nullable;
39
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import com.google.common.annotations.VisibleForTesting;
44 import com.google.common.collect.ImmutableList;
45 import com.spotify.futures.CompletableFutures;
46
47 import com.linecorp.centraldogma.client.AbstractCentralDogma;
48 import com.linecorp.centraldogma.client.CentralDogma;
49 import com.linecorp.centraldogma.client.CentralDogmaRepository;
50 import com.linecorp.centraldogma.client.RepositoryInfo;
51 import com.linecorp.centraldogma.common.Author;
52 import com.linecorp.centraldogma.common.Change;
53 import com.linecorp.centraldogma.common.Commit;
54 import com.linecorp.centraldogma.common.Entry;
55 import com.linecorp.centraldogma.common.EntryType;
56 import com.linecorp.centraldogma.common.Markup;
57 import com.linecorp.centraldogma.common.MergeQuery;
58 import com.linecorp.centraldogma.common.MergedEntry;
59 import com.linecorp.centraldogma.common.PathPattern;
60 import com.linecorp.centraldogma.common.PushResult;
61 import com.linecorp.centraldogma.common.Query;
62 import com.linecorp.centraldogma.common.Revision;
63 import com.linecorp.centraldogma.common.RevisionNotFoundException;
64
65 import io.micrometer.core.instrument.MeterRegistry;
66
67
68
69
70
71 public final class ReplicationLagTolerantCentralDogma extends AbstractCentralDogma {
72
73 private static final Logger logger =
74 LoggerFactory.getLogger(ReplicationLagTolerantCentralDogma.class);
75
76 private final CentralDogma delegate;
77 private final int maxRetries;
78 private final long retryIntervalMillis;
79 private final Supplier<?> currentReplicaHintSupplier;
80
81 @VisibleForTesting
82 final Map<RepoId, Revision> latestKnownRevisions = new LinkedHashMap<RepoId, Revision>() {
83 private static final long serialVersionUID = 3587793379404809027L;
84
85 @Override
86 protected boolean removeEldestEntry(Map.Entry<RepoId, Revision> eldest) {
87
88 return size() > 8192;
89 }
90 };
91
92 public ReplicationLagTolerantCentralDogma(ScheduledExecutorService blockingTaskExecutor,
93 CentralDogma delegate, int maxRetries, long retryIntervalMillis,
94 Supplier<?> currentReplicaHintSupplier,
95 @Nullable MeterRegistry meterRegistry) {
96 super(blockingTaskExecutor, meterRegistry);
97
98 requireNonNull(delegate, "delegate");
99 checkArgument(maxRetries > 0, "maxRetries: %s (expected: > 0)", maxRetries);
100 checkArgument(retryIntervalMillis >= 0,
101 "retryIntervalMillis: %s (expected: >= 0)", retryIntervalMillis);
102 requireNonNull(currentReplicaHintSupplier, "currentReplicaHintSupplier");
103
104 this.delegate = delegate;
105 this.maxRetries = maxRetries;
106 this.retryIntervalMillis = retryIntervalMillis;
107 this.currentReplicaHintSupplier = currentReplicaHintSupplier;
108 }
109
110 @Override
111 public CompletableFuture<Void> createProject(String projectName) {
112 return delegate.createProject(projectName);
113 }
114
115 @Override
116 public CompletableFuture<Void> removeProject(String projectName) {
117 return delegate.removeProject(projectName).thenAccept(unused -> {
118 synchronized (latestKnownRevisions) {
119 latestKnownRevisions.entrySet().removeIf(e -> e.getKey().projectName.equals(projectName));
120 }
121 });
122 }
123
124 @Override
125 public CompletableFuture<Void> purgeProject(String projectName) {
126 return delegate.purgeProject(projectName);
127 }
128
129 @Override
130 public CompletableFuture<Void> unremoveProject(String projectName) {
131 return delegate.unremoveProject(projectName);
132 }
133
134 @Override
135 public CompletableFuture<Set<String>> listProjects() {
136 return delegate.listProjects();
137 }
138
139 @Override
140 public CompletableFuture<Set<String>> listRemovedProjects() {
141 return delegate.listRemovedProjects();
142 }
143
144 @Override
145 public CompletableFuture<CentralDogmaRepository> createRepository(String projectName,
146 String repositoryName) {
147 return delegate.createRepository(projectName, repositoryName);
148 }
149
150 @Override
151 public CompletableFuture<Void> removeRepository(String projectName, String repositoryName) {
152 return delegate.removeRepository(projectName, repositoryName).thenAccept(unused -> {
153 synchronized (latestKnownRevisions) {
154 latestKnownRevisions.remove(new RepoId(projectName, repositoryName));
155 }
156 });
157 }
158
159 @Override
160 public CompletableFuture<Void> purgeRepository(String projectName, String repositoryName) {
161 return delegate.purgeRepository(projectName, repositoryName);
162 }
163
164 @Override
165 public CompletableFuture<CentralDogmaRepository> unremoveRepository(String projectName,
166 String repositoryName) {
167 return delegate.unremoveRepository(projectName, repositoryName);
168 }
169
170 @Override
171 public CompletableFuture<Map<String, RepositoryInfo>> listRepositories(String projectName) {
172 return executeWithRetries(
173 new Supplier<CompletableFuture<Map<String, RepositoryInfo>>>() {
174 @Override
175 public CompletableFuture<Map<String, RepositoryInfo>> get() {
176 return delegate.listRepositories(projectName);
177 }
178
179 @Override
180 public String toString() {
181 return "listRepositories(" + projectName + ')';
182 }
183 },
184 (res, cause) -> {
185 if (res != null) {
186 for (RepositoryInfo info : res.values()) {
187 if (!updateLatestKnownRevision(projectName, info.name(), info.headRevision())) {
188 return true;
189 }
190 }
191 }
192 return false;
193 });
194 }
195
196 @Override
197 public CompletableFuture<Set<String>> listRemovedRepositories(String projectName) {
198 return delegate.listRemovedRepositories(projectName);
199 }
200
201 @Override
202 public CompletableFuture<Revision> normalizeRevision(
203 String projectName, String repositoryName, Revision revision) {
204 return executeWithRetries(
205 new Supplier<CompletableFuture<Revision>>() {
206 @Override
207 public CompletableFuture<Revision> get() {
208 return delegate.normalizeRevision(projectName, repositoryName, revision);
209 }
210
211 @Override
212 public String toString() {
213 return "normalizeRevision(" + projectName + ", " + repositoryName + ", " +
214 revision + ')';
215 }
216 },
217 (res, cause) -> {
218 if (cause != null) {
219 return handleRevisionNotFound(projectName, repositoryName, revision, cause);
220 }
221
222 if (revision.isRelative()) {
223 final Revision headRevision = res.forward(-(revision.major() + 1));
224 return !updateLatestKnownRevision(projectName, repositoryName, headRevision);
225 }
226
227 updateLatestKnownRevision(projectName, repositoryName, revision);
228 return false;
229 });
230 }
231
232 @Override
233 public CompletableFuture<Map<String, EntryType>> listFiles(
234 String projectName, String repositoryName, Revision revision, PathPattern pathPattern) {
235 return normalizeRevisionAndExecuteWithRetries(
236 projectName, repositoryName, revision,
237 new Function<Revision, CompletableFuture<Map<String, EntryType>>>() {
238 @Override
239 public CompletableFuture<Map<String, EntryType>> apply(Revision normRev) {
240 return delegate.listFiles(projectName, repositoryName, normRev, pathPattern);
241 }
242
243 @Override
244 public String toString() {
245 return "listFiles(" + projectName + ", " + repositoryName + ", " +
246 revision + ", " + pathPattern + ')';
247 }
248 });
249 }
250
251 @Override
252 public <T> CompletableFuture<Entry<T>> getFile(
253 String projectName, String repositoryName, Revision revision, Query<T> query) {
254 return normalizeRevisionAndExecuteWithRetries(
255 projectName, repositoryName, revision,
256 new Function<Revision, CompletableFuture<Entry<T>>>() {
257 @Override
258 public CompletableFuture<Entry<T>> apply(Revision normRev) {
259 return delegate.getFile(projectName, repositoryName, normRev, query);
260 }
261
262 @Override
263 public String toString() {
264 return "getFile(" + projectName + ", " + repositoryName + ", " +
265 revision + ", " + query + ')';
266 }
267 });
268 }
269
270 @Override
271 public CompletableFuture<Map<String, Entry<?>>> getFiles(
272 String projectName, String repositoryName, Revision revision, PathPattern pathPattern) {
273 return normalizeRevisionAndExecuteWithRetries(
274 projectName, repositoryName, revision,
275 new Function<Revision, CompletableFuture<Map<String, Entry<?>>>>() {
276 @Override
277 public CompletableFuture<Map<String, Entry<?>>> apply(Revision normRev) {
278 return delegate.getFiles(projectName, repositoryName, normRev, pathPattern);
279 }
280
281 @Override
282 public String toString() {
283 return "getFiles(" + projectName + ", " + repositoryName + ", " +
284 revision + ", " + pathPattern + ')';
285 }
286 });
287 }
288
289 @Override
290 public <T> CompletableFuture<MergedEntry<T>> mergeFiles(
291 String projectName, String repositoryName, Revision revision,
292 MergeQuery<T> mergeQuery) {
293 return normalizeRevisionAndExecuteWithRetries(
294 projectName, repositoryName, revision,
295 new Function<Revision, CompletableFuture<MergedEntry<T>>>() {
296 @Override
297 public CompletableFuture<MergedEntry<T>> apply(Revision normRev) {
298 return delegate.mergeFiles(projectName, repositoryName, normRev, mergeQuery);
299 }
300
301 @Override
302 public String toString() {
303 return "mergeFiles(" + projectName + ", " + repositoryName + ", " +
304 revision + ", " + mergeQuery + ')';
305 }
306 });
307 }
308
309 @Override
310 public CompletableFuture<List<Commit>> getHistory(
311 String projectName, String repositoryName, Revision from,
312 Revision to, PathPattern pathPattern, int maxCommits) {
313 return normalizeRevisionsAndExecuteWithRetries(
314 projectName, repositoryName, from, to,
315 new BiFunction<Revision, Revision, CompletableFuture<List<Commit>>>() {
316 @Override
317 public CompletableFuture<List<Commit>> apply(Revision normFromRev, Revision normToRev) {
318 return delegate.getHistory(projectName, repositoryName,
319 normFromRev, normToRev, pathPattern, maxCommits);
320 }
321
322 @Override
323 public String toString() {
324 return "getHistory(" + projectName + ", " + repositoryName + ", " +
325 from + ", " + to + ", " + pathPattern + ", " + maxCommits + ')';
326 }
327 });
328 }
329
330 @Override
331 public <T> CompletableFuture<Change<T>> getDiff(
332 String projectName, String repositoryName, Revision from, Revision to, Query<T> query) {
333 return normalizeRevisionsAndExecuteWithRetries(
334 projectName, repositoryName, from, to,
335 new BiFunction<Revision, Revision, CompletableFuture<Change<T>>>() {
336 @Override
337 public CompletableFuture<Change<T>> apply(Revision normFromRev, Revision normToRev) {
338 return delegate.getDiff(projectName, repositoryName,
339 normFromRev, normToRev, query);
340 }
341
342 @Override
343 public String toString() {
344 return "getDiff(" + projectName + ", " + repositoryName + ", " +
345 from + ", " + to + ", " + query + ')';
346 }
347 });
348 }
349
350 @Override
351 public CompletableFuture<List<Change<?>>> getDiff(
352 String projectName, String repositoryName, Revision from, Revision to, PathPattern pathPattern) {
353 return normalizeRevisionsAndExecuteWithRetries(
354 projectName, repositoryName, from, to,
355 new BiFunction<Revision, Revision, CompletableFuture<List<Change<?>>>>() {
356 @Override
357 public CompletableFuture<List<Change<?>>> apply(Revision normFromRev, Revision normToRev) {
358 return delegate.getDiff(projectName, repositoryName,
359 normFromRev, normToRev, pathPattern);
360 }
361
362 @Override
363 public String toString() {
364 return "getDiffs(" + projectName + ", " + repositoryName + ", " +
365 from + ", " + to + ", " + pathPattern + ')';
366 }
367 });
368 }
369
370 @Override
371 public CompletableFuture<List<Change<?>>> getPreviewDiffs(
372 String projectName, String repositoryName, Revision baseRevision,
373 Iterable<? extends Change<?>> changes) {
374 return normalizeRevisionAndExecuteWithRetries(
375 projectName, repositoryName, baseRevision,
376 new Function<Revision, CompletableFuture<List<Change<?>>>>() {
377 @Override
378 public CompletableFuture<List<Change<?>>> apply(Revision normBaseRev) {
379 return delegate.getPreviewDiffs(projectName, repositoryName, normBaseRev, changes);
380 }
381
382 @Override
383 public String toString() {
384 return "getPreviewDiffs(" + projectName + ", " + repositoryName + ", " +
385 baseRevision + ", ...)";
386 }
387 });
388 }
389
390 @Override
391 public CompletableFuture<PushResult> push(
392 String projectName, String repositoryName, Revision baseRevision,
393 String summary, String detail, Markup markup, Iterable<? extends Change<?>> changes) {
394 return executeWithRetries(
395 new Supplier<CompletableFuture<PushResult>>() {
396 @Override
397 public CompletableFuture<PushResult> get() {
398 return delegate.push(projectName, repositoryName, baseRevision,
399 summary, detail, markup, changes);
400 }
401
402 @Override
403 public String toString() {
404 return "push(" + projectName + ", " + repositoryName + ", " +
405 baseRevision + ", " + summary + ", ...)";
406 }
407 },
408 pushRetryPredicate(projectName, repositoryName, baseRevision));
409 }
410
411 @Override
412 public CompletableFuture<PushResult> push(
413 String projectName, String repositoryName, Revision baseRevision,
414 Author author, String summary, String detail, Markup markup,
415 Iterable<? extends Change<?>> changes) {
416 return executeWithRetries(
417 new Supplier<CompletableFuture<PushResult>>() {
418 @Override
419 public CompletableFuture<PushResult> get() {
420 return delegate.push(projectName, repositoryName, baseRevision,
421 author, summary, detail, markup, changes);
422 }
423
424 @Override
425 public String toString() {
426 return "push(" + projectName + ", " + repositoryName + ", " +
427 baseRevision + ", " + summary + ", ...)";
428 }
429 },
430 pushRetryPredicate(projectName, repositoryName, baseRevision));
431 }
432
433 private BiPredicate<PushResult, Throwable> pushRetryPredicate(
434 String projectName, String repositoryName, Revision baseRevision) {
435
436 return (res, cause) -> {
437 if (cause != null) {
438 return handleRevisionNotFound(projectName, repositoryName, baseRevision, cause);
439 }
440
441 updateLatestKnownRevision(projectName, repositoryName, res.revision());
442 return false;
443 };
444 }
445
446 @Override
447 public CompletableFuture<Revision> watchRepository(
448 String projectName, String repositoryName, Revision lastKnownRevision,
449 PathPattern pathPattern, long timeoutMillis, boolean errorOnEntryNotFound) {
450
451 return normalizeRevisionAndExecuteWithRetries(
452 projectName, repositoryName, lastKnownRevision,
453 new Function<Revision, CompletableFuture<Revision>>() {
454 @Override
455 public CompletableFuture<Revision> apply(Revision normLastKnownRevision) {
456 return delegate.watchRepository(projectName, repositoryName, normLastKnownRevision,
457 pathPattern, timeoutMillis, errorOnEntryNotFound)
458 .thenApply(newLastKnownRevision -> {
459 if (newLastKnownRevision != null) {
460 updateLatestKnownRevision(projectName, repositoryName,
461 newLastKnownRevision);
462 }
463 return newLastKnownRevision;
464 });
465 }
466
467 @Override
468 public String toString() {
469 return "watchRepository(" + projectName + ", " + repositoryName + ", " +
470 lastKnownRevision + ", " + pathPattern + ", " + timeoutMillis + ", " +
471 errorOnEntryNotFound + ')';
472 }
473 });
474 }
475
476 @Override
477 public <T> CompletableFuture<Entry<T>> watchFile(
478 String projectName, String repositoryName, Revision lastKnownRevision,
479 Query<T> query, long timeoutMillis, boolean errorOnEntryNotFound) {
480
481 return normalizeRevisionAndExecuteWithRetries(
482 projectName, repositoryName, lastKnownRevision,
483 new Function<Revision, CompletableFuture<Entry<T>>>() {
484 @Override
485 public CompletableFuture<Entry<T>> apply(Revision normLastKnownRevision) {
486 return delegate.watchFile(projectName, repositoryName, normLastKnownRevision,
487 query, timeoutMillis, errorOnEntryNotFound)
488 .thenApply(entry -> {
489 if (entry != null) {
490 updateLatestKnownRevision(projectName, repositoryName,
491 entry.revision());
492 }
493 return entry;
494 });
495 }
496
497 @Override
498 public String toString() {
499 return "watchFile(" + projectName + ", " + repositoryName + ", " +
500 lastKnownRevision + ", " + query + ", " + timeoutMillis + ", " +
501 errorOnEntryNotFound + ')';
502 }
503 });
504 }
505
506 @Override
507 public CompletableFuture<Void> whenEndpointReady() {
508 return delegate.whenEndpointReady();
509 }
510
511
512
513
514
515
516 private <T> CompletableFuture<T> normalizeRevisionAndExecuteWithRetries(
517 String projectName, String repositoryName, Revision revision,
518 Function<Revision, CompletableFuture<T>> taskRunner) {
519 return normalizeRevision(projectName, repositoryName, revision)
520 .thenCompose(normRev -> executeWithRetries(
521 new Supplier<CompletableFuture<T>>() {
522 @Override
523 public CompletableFuture<T> get() {
524 return taskRunner.apply(normRev);
525 }
526
527 @Override
528 public String toString() {
529 return taskRunner + " with " + normRev;
530 }
531 },
532 (res, cause) -> cause != null &&
533 handleRevisionNotFound(projectName, repositoryName, normRev, cause)));
534 }
535
536
537
538
539
540
541
542 private <T> CompletableFuture<T> normalizeRevisionsAndExecuteWithRetries(
543 String projectName, String repositoryName, Revision from, Revision to,
544 BiFunction<Revision, Revision, CompletableFuture<T>> taskRunner) {
545
546 if (to == null) {
547 return exceptionallyCompletedFuture(new NullPointerException("to"));
548 }
549
550 if (from == null) {
551 return exceptionallyCompletedFuture(new NullPointerException("from"));
552 }
553
554 if (from.isRelative() && to.isRelative() ||
555 !from.isRelative() && !to.isRelative()) {
556
557
558
559 final int distance = to.major() - from.major();
560 final Revision baseRevision = to.compareTo(from) >= 0 ? to : from;
561
562 return normalizeRevision(projectName, repositoryName, baseRevision).thenCompose(normBaseRev -> {
563 final Revision normFromRev;
564 final Revision normToRev;
565 if (distance >= 0) {
566 normToRev = normBaseRev;
567 normFromRev = normBaseRev.backward(distance);
568 } else {
569 normFromRev = normBaseRev;
570 normToRev = normBaseRev.backward(-distance);
571 }
572
573 return executeWithRetries(
574 new Supplier<CompletableFuture<T>>() {
575 @Override
576 public CompletableFuture<T> get() {
577 return taskRunner.apply(normFromRev, normToRev);
578 }
579
580 @Override
581 public String toString() {
582 return taskRunner + " with [" + normFromRev + ", " + normToRev + ']';
583 }
584 },
585 (res, cause) -> {
586 if (cause == null) {
587 return false;
588 }
589 return handleRevisionNotFound(projectName, repositoryName, normBaseRev, cause);
590 });
591 });
592 } else {
593
594
595 return CompletableFutures.allAsList(ImmutableList.of(
596 normalizeRevision(projectName, repositoryName, from),
597 normalizeRevision(projectName, repositoryName, to))).thenCompose(normRevs -> {
598 final Revision normFromRev = normRevs.get(0);
599 final Revision normToRev = normRevs.get(1);
600 return executeWithRetries(
601 new Supplier<CompletableFuture<T>>() {
602 @Override
603 public CompletableFuture<T> get() {
604 return taskRunner.apply(normFromRev, normToRev);
605 }
606
607 @Override
608 public String toString() {
609 return taskRunner + " with [" + normFromRev + ", " + normToRev + ']';
610 }
611 },
612 (res, cause) -> {
613 if (cause == null) {
614 return false;
615 }
616
617 final Revision normBaseRev = normFromRev.compareTo(normToRev) > 0 ? normFromRev
618 : normToRev;
619 return handleRevisionNotFound(projectName, repositoryName, normBaseRev, cause);
620 });
621 });
622 }
623 }
624
625
626
627
628
629
630 private <T> CompletableFuture<T> executeWithRetries(
631 Supplier<CompletableFuture<T>> taskRunner,
632 BiPredicate<T, Throwable> retryPredicate) {
633 return executeWithRetries(taskRunner, retryPredicate, 0);
634 }
635
636 private <T> CompletableFuture<T> executeWithRetries(
637 Supplier<CompletableFuture<T>> taskRunner,
638 BiPredicate<T, Throwable> retryPredicate,
639 int attemptsSoFar) {
640
641 return CompletableFutures.handleCompose(taskRunner.get(), (res, cause) -> {
642 final Object currentReplicaHint = currentReplicaHintSupplier.get();
643 final int nextAttemptsSoFar = attemptsSoFar + 1;
644 final boolean retryRequired = retryPredicate.test(res, cause);
645 if (!retryRequired || nextAttemptsSoFar > maxRetries) {
646 if (retryRequired) {
647 if (currentReplicaHint != null) {
648 logger.warn("[{}] Failed to retrieve the up-to-date data from Central Dogma " +
649 "after {} retries: {} => {}",
650 currentReplicaHint, attemptsSoFar, taskRunner, resultOrCause(res, cause));
651 } else {
652 logger.warn("Failed to retrieve the up-to-date data from Central Dogma " +
653 "after {} retries: {} => {}",
654 attemptsSoFar, taskRunner, resultOrCause(res, cause));
655 }
656 } else if (logger.isDebugEnabled()) {
657 if (currentReplicaHint != null) {
658 logger.debug("[{}] Retrieved the up-to-date data after {} retries: {} => {}",
659 currentReplicaHint, attemptsSoFar, taskRunner, resultOrCause(res, cause));
660 } else {
661 logger.debug("Retrieved the up-to-date data after {} retries: {} => {}",
662 attemptsSoFar, taskRunner, resultOrCause(res, cause));
663 }
664 }
665
666 if (cause == null) {
667 return completedFuture(res);
668 } else {
669 return exceptionallyCompletedFuture(cause);
670 }
671 }
672
673 if (logger.isDebugEnabled()) {
674 if (currentReplicaHint != null) {
675 logger.debug("[{}] Got the out-of-date data ({} attempt(s) so far): {} => {}",
676 currentReplicaHint, nextAttemptsSoFar, taskRunner, resultOrCause(res, cause));
677 } else {
678 logger.debug("Got the out-of-date data ({} attempt(s) so far): {} => {}",
679 nextAttemptsSoFar, taskRunner, resultOrCause(res, cause));
680 }
681 }
682
683 final CompletableFuture<T> nextAttemptFuture = new CompletableFuture<>();
684 executor().schedule(() -> {
685 try {
686 executeWithRetries(taskRunner, retryPredicate,
687 nextAttemptsSoFar).handle((newRes, newCause) -> {
688 if (newCause != null) {
689 nextAttemptFuture.completeExceptionally(newCause);
690 } else {
691 nextAttemptFuture.complete(newRes);
692 }
693 return null;
694 });
695 } catch (Throwable t) {
696 nextAttemptFuture.completeExceptionally(t);
697 }
698 }, retryIntervalMillis, TimeUnit.MILLISECONDS);
699
700 return nextAttemptFuture;
701 }).toCompletableFuture();
702 }
703
704
705
706
707
708 private static Throwable peel(Throwable throwable) {
709 Throwable cause = throwable.getCause();
710 while (cause != null && cause != throwable &&
711 (throwable instanceof CompletionException || throwable instanceof ExecutionException)) {
712 throwable = cause;
713 cause = throwable.getCause();
714 }
715 return throwable;
716 }
717
718
719
720
721
722 private boolean handleRevisionNotFound(
723 String projectName, String repositoryName, Revision revision, Throwable cause) {
724 requireNonNull(cause, "cause");
725 cause = peel(cause);
726 if (!(cause instanceof RevisionNotFoundException)) {
727 return false;
728 }
729
730 final Revision latestKnownRevision = latestKnownRevision(projectName, repositoryName);
731 if (latestKnownRevision == null) {
732 return false;
733 }
734
735 if (revision.isRelative()) {
736 return revision.major() + latestKnownRevision.major() >= 0;
737 } else {
738 return revision.major() <= latestKnownRevision.major();
739 }
740 }
741
742 @Nullable
743 @VisibleForTesting
744 Revision latestKnownRevision(String projectName, String repositoryName) {
745 synchronized (latestKnownRevisions) {
746 return latestKnownRevisions.get(new RepoId(projectName, repositoryName));
747 }
748 }
749
750
751
752
753
754
755
756 private boolean updateLatestKnownRevision(String projectName, String repositoryName, Revision newRevision) {
757 final Object currentReplicaHint = currentReplicaHintSupplier.get();
758 final RepoId id = new RepoId(projectName, repositoryName);
759 synchronized (latestKnownRevisions) {
760 final Revision oldRevision = latestKnownRevisions.get(id);
761 if (oldRevision == null) {
762 if (currentReplicaHint != null) {
763 logger.debug("[{}] Updating the latest known revision for {}/{} from <unknown> to: {}",
764 currentReplicaHint, projectName, repositoryName, newRevision);
765 } else {
766 logger.debug("Updating the latest known revision for {}/{} from <unknown> to: {}",
767 projectName, repositoryName, newRevision);
768 }
769 latestKnownRevisions.put(id, newRevision);
770 return true;
771 }
772
773 final int comparison = oldRevision.compareTo(newRevision);
774 if (comparison < 0) {
775 if (currentReplicaHint != null) {
776 logger.debug("[{}] Updating the latest known revision for {}/{} from {} to: {}",
777 currentReplicaHint, projectName, repositoryName, oldRevision, newRevision);
778 } else {
779 logger.debug("Updating the latest known revision for {}/{} from {} to: {}",
780 projectName, repositoryName, oldRevision, newRevision);
781 }
782 latestKnownRevisions.put(id, newRevision);
783 return true;
784 }
785
786 if (comparison == 0) {
787 if (currentReplicaHint != null) {
788 logger.debug("[{}] The latest known revision for {}/{} stays unchanged at: {}",
789 currentReplicaHint, projectName, repositoryName, newRevision);
790 } else {
791 logger.debug("The latest known revision for {}/{} stays unchanged at: {}",
792 projectName, repositoryName, newRevision);
793 }
794 return true;
795 }
796
797 if (currentReplicaHint != null) {
798 logger.debug("[{}] An out-of-date latest known revision for {}/{}: {}",
799 currentReplicaHint, projectName, repositoryName, newRevision);
800 } else {
801 logger.debug("An out-of-date latest known revision for {}/{}: {}",
802 projectName, repositoryName, newRevision);
803 }
804 return false;
805 }
806 }
807
808 @Nullable
809 private static Object resultOrCause(@Nullable Object res, @Nullable Throwable cause) {
810 return res != null ? res : cause;
811 }
812
813 @Override
814 public void close() throws Exception {
815 delegate.close();
816 }
817
818 @VisibleForTesting
819 static final class RepoId {
820 final String projectName;
821 final String repositoryName;
822
823 RepoId(String projectName, String repositoryName) {
824 this.projectName = projectName;
825 this.repositoryName = repositoryName;
826 }
827
828 @Override
829 public boolean equals(Object o) {
830 if (this == o) {
831 return true;
832 }
833 if (!(o instanceof RepoId)) {
834 return false;
835 }
836 final RepoId that = (RepoId) o;
837 return projectName.equals(that.projectName) && repositoryName.equals(that.repositoryName);
838 }
839
840 @Override
841 public int hashCode() {
842 return Objects.hash(projectName, repositoryName);
843 }
844
845 @Override
846 public String toString() {
847 return projectName + '/' + repositoryName;
848 }
849 }
850 }