1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.internal.client;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.spotify.futures.CompletableFutures.exceptionallyCompletedFuture;
20 import static java.util.Objects.requireNonNull;
21 import static java.util.concurrent.CompletableFuture.completedFuture;
22
23 import java.util.LinkedHashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Objects;
27 import java.util.Set;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.concurrent.CompletionException;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.concurrent.TimeUnit;
33 import java.util.function.BiFunction;
34 import java.util.function.BiPredicate;
35 import java.util.function.Function;
36 import java.util.function.Supplier;
37
38 import javax.annotation.Nullable;
39
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import com.google.common.annotations.VisibleForTesting;
44 import com.google.common.collect.ImmutableList;
45 import com.spotify.futures.CompletableFutures;
46
47 import com.linecorp.centraldogma.client.AbstractCentralDogma;
48 import com.linecorp.centraldogma.client.CentralDogma;
49 import com.linecorp.centraldogma.client.CentralDogmaRepository;
50 import com.linecorp.centraldogma.client.RepositoryInfo;
51 import com.linecorp.centraldogma.common.Author;
52 import com.linecorp.centraldogma.common.Change;
53 import com.linecorp.centraldogma.common.Commit;
54 import com.linecorp.centraldogma.common.Entry;
55 import com.linecorp.centraldogma.common.EntryType;
56 import com.linecorp.centraldogma.common.Markup;
57 import com.linecorp.centraldogma.common.MergeQuery;
58 import com.linecorp.centraldogma.common.MergedEntry;
59 import com.linecorp.centraldogma.common.PathPattern;
60 import com.linecorp.centraldogma.common.PushResult;
61 import com.linecorp.centraldogma.common.Query;
62 import com.linecorp.centraldogma.common.Revision;
63 import com.linecorp.centraldogma.common.RevisionNotFoundException;
64
65 import io.micrometer.core.instrument.MeterRegistry;
66
67
68
69
70
71 public final class ReplicationLagTolerantCentralDogma extends AbstractCentralDogma {
72
73 private static final Logger logger =
74 LoggerFactory.getLogger(ReplicationLagTolerantCentralDogma.class);
75
76 private final CentralDogma delegate;
77 private final int maxRetries;
78 private final long retryIntervalMillis;
79 private final Supplier<?> currentReplicaHintSupplier;
80 private final Map<RepoId, Revision> latestKnownRevisions = new LinkedHashMap<RepoId, Revision>() {
81 private static final long serialVersionUID = 3587793379404809027L;
82
83 @Override
84 protected boolean removeEldestEntry(Map.Entry<RepoId, Revision> eldest) {
85
86 return size() > 8192;
87 }
88 };
89
90 public ReplicationLagTolerantCentralDogma(ScheduledExecutorService blockingTaskExecutor,
91 CentralDogma delegate, int maxRetries, long retryIntervalMillis,
92 Supplier<?> currentReplicaHintSupplier,
93 @Nullable MeterRegistry meterRegistry) {
94 super(blockingTaskExecutor, meterRegistry);
95
96 requireNonNull(delegate, "delegate");
97 checkArgument(maxRetries > 0, "maxRetries: %s (expected: > 0)", maxRetries);
98 checkArgument(retryIntervalMillis >= 0,
99 "retryIntervalMillis: %s (expected: >= 0)", retryIntervalMillis);
100 requireNonNull(currentReplicaHintSupplier, "currentReplicaHintSupplier");
101
102 this.delegate = delegate;
103 this.maxRetries = maxRetries;
104 this.retryIntervalMillis = retryIntervalMillis;
105 this.currentReplicaHintSupplier = currentReplicaHintSupplier;
106 }
107
108 @Override
109 public CompletableFuture<Void> createProject(String projectName) {
110 return delegate.createProject(projectName);
111 }
112
113 @Override
114 public CompletableFuture<Void> removeProject(String projectName) {
115 return delegate.removeProject(projectName);
116 }
117
118 @Override
119 public CompletableFuture<Void> purgeProject(String projectName) {
120 return delegate.purgeProject(projectName);
121 }
122
123 @Override
124 public CompletableFuture<Void> unremoveProject(String projectName) {
125 return delegate.unremoveProject(projectName);
126 }
127
128 @Override
129 public CompletableFuture<Set<String>> listProjects() {
130 return delegate.listProjects();
131 }
132
133 @Override
134 public CompletableFuture<Set<String>> listRemovedProjects() {
135 return delegate.listRemovedProjects();
136 }
137
138 @Override
139 public CompletableFuture<CentralDogmaRepository> createRepository(String projectName,
140 String repositoryName) {
141 return delegate.createRepository(projectName, repositoryName);
142 }
143
144 @Override
145 public CompletableFuture<Void> removeRepository(String projectName, String repositoryName) {
146 return delegate.removeRepository(projectName, repositoryName);
147 }
148
149 @Override
150 public CompletableFuture<Void> purgeRepository(String projectName, String repositoryName) {
151 return delegate.purgeRepository(projectName, repositoryName);
152 }
153
154 @Override
155 public CompletableFuture<CentralDogmaRepository> unremoveRepository(String projectName,
156 String repositoryName) {
157 return delegate.unremoveRepository(projectName, repositoryName);
158 }
159
160 @Override
161 public CompletableFuture<Map<String, RepositoryInfo>> listRepositories(String projectName) {
162 return executeWithRetries(
163 new Supplier<CompletableFuture<Map<String, RepositoryInfo>>>() {
164 @Override
165 public CompletableFuture<Map<String, RepositoryInfo>> get() {
166 return delegate.listRepositories(projectName);
167 }
168
169 @Override
170 public String toString() {
171 return "listRepositories(" + projectName + ')';
172 }
173 },
174 (res, cause) -> {
175 if (res != null) {
176 for (RepositoryInfo info : res.values()) {
177 if (!updateLatestKnownRevision(projectName, info.name(), info.headRevision())) {
178 return true;
179 }
180 }
181 }
182 return false;
183 });
184 }
185
186 @Override
187 public CompletableFuture<Set<String>> listRemovedRepositories(String projectName) {
188 return delegate.listRemovedRepositories(projectName);
189 }
190
191 @Override
192 public CompletableFuture<Revision> normalizeRevision(
193 String projectName, String repositoryName, Revision revision) {
194 return executeWithRetries(
195 new Supplier<CompletableFuture<Revision>>() {
196 @Override
197 public CompletableFuture<Revision> get() {
198 return delegate.normalizeRevision(projectName, repositoryName, revision);
199 }
200
201 @Override
202 public String toString() {
203 return "normalizeRevision(" + projectName + ", " + repositoryName + ", " +
204 revision + ')';
205 }
206 },
207 (res, cause) -> {
208 if (cause != null) {
209 return handleRevisionNotFound(projectName, repositoryName, revision, cause);
210 }
211
212 if (revision.isRelative()) {
213 final Revision headRevision = res.forward(-(revision.major() + 1));
214 return !updateLatestKnownRevision(projectName, repositoryName, headRevision);
215 }
216
217 updateLatestKnownRevision(projectName, repositoryName, revision);
218 return false;
219 });
220 }
221
222 @Override
223 public CompletableFuture<Map<String, EntryType>> listFiles(
224 String projectName, String repositoryName, Revision revision, PathPattern pathPattern) {
225 return normalizeRevisionAndExecuteWithRetries(
226 projectName, repositoryName, revision,
227 new Function<Revision, CompletableFuture<Map<String, EntryType>>>() {
228 @Override
229 public CompletableFuture<Map<String, EntryType>> apply(Revision normRev) {
230 return delegate.listFiles(projectName, repositoryName, normRev, pathPattern);
231 }
232
233 @Override
234 public String toString() {
235 return "listFiles(" + projectName + ", " + repositoryName + ", " +
236 revision + ", " + pathPattern + ')';
237 }
238 });
239 }
240
241 @Override
242 public <T> CompletableFuture<Entry<T>> getFile(
243 String projectName, String repositoryName, Revision revision, Query<T> query) {
244 return normalizeRevisionAndExecuteWithRetries(
245 projectName, repositoryName, revision,
246 new Function<Revision, CompletableFuture<Entry<T>>>() {
247 @Override
248 public CompletableFuture<Entry<T>> apply(Revision normRev) {
249 return delegate.getFile(projectName, repositoryName, normRev, query);
250 }
251
252 @Override
253 public String toString() {
254 return "getFile(" + projectName + ", " + repositoryName + ", " +
255 revision + ", " + query + ')';
256 }
257 });
258 }
259
260 @Override
261 public CompletableFuture<Map<String, Entry<?>>> getFiles(
262 String projectName, String repositoryName, Revision revision, PathPattern pathPattern) {
263 return normalizeRevisionAndExecuteWithRetries(
264 projectName, repositoryName, revision,
265 new Function<Revision, CompletableFuture<Map<String, Entry<?>>>>() {
266 @Override
267 public CompletableFuture<Map<String, Entry<?>>> apply(Revision normRev) {
268 return delegate.getFiles(projectName, repositoryName, normRev, pathPattern);
269 }
270
271 @Override
272 public String toString() {
273 return "getFiles(" + projectName + ", " + repositoryName + ", " +
274 revision + ", " + pathPattern + ')';
275 }
276 });
277 }
278
279 @Override
280 public <T> CompletableFuture<MergedEntry<T>> mergeFiles(
281 String projectName, String repositoryName, Revision revision,
282 MergeQuery<T> mergeQuery) {
283 return normalizeRevisionAndExecuteWithRetries(
284 projectName, repositoryName, revision,
285 new Function<Revision, CompletableFuture<MergedEntry<T>>>() {
286 @Override
287 public CompletableFuture<MergedEntry<T>> apply(Revision normRev) {
288 return delegate.mergeFiles(projectName, repositoryName, normRev, mergeQuery);
289 }
290
291 @Override
292 public String toString() {
293 return "mergeFiles(" + projectName + ", " + repositoryName + ", " +
294 revision + ", " + mergeQuery + ')';
295 }
296 });
297 }
298
299 @Override
300 public CompletableFuture<List<Commit>> getHistory(
301 String projectName, String repositoryName, Revision from,
302 Revision to, PathPattern pathPattern, int maxCommits) {
303 return normalizeRevisionsAndExecuteWithRetries(
304 projectName, repositoryName, from, to,
305 new BiFunction<Revision, Revision, CompletableFuture<List<Commit>>>() {
306 @Override
307 public CompletableFuture<List<Commit>> apply(Revision normFromRev, Revision normToRev) {
308 return delegate.getHistory(projectName, repositoryName,
309 normFromRev, normToRev, pathPattern, maxCommits);
310 }
311
312 @Override
313 public String toString() {
314 return "getHistory(" + projectName + ", " + repositoryName + ", " +
315 from + ", " + to + ", " + pathPattern + ", " + maxCommits + ')';
316 }
317 });
318 }
319
320 @Override
321 public <T> CompletableFuture<Change<T>> getDiff(
322 String projectName, String repositoryName, Revision from, Revision to, Query<T> query) {
323 return normalizeRevisionsAndExecuteWithRetries(
324 projectName, repositoryName, from, to,
325 new BiFunction<Revision, Revision, CompletableFuture<Change<T>>>() {
326 @Override
327 public CompletableFuture<Change<T>> apply(Revision normFromRev, Revision normToRev) {
328 return delegate.getDiff(projectName, repositoryName,
329 normFromRev, normToRev, query);
330 }
331
332 @Override
333 public String toString() {
334 return "getDiff(" + projectName + ", " + repositoryName + ", " +
335 from + ", " + to + ", " + query + ')';
336 }
337 });
338 }
339
340 @Override
341 public CompletableFuture<List<Change<?>>> getDiff(
342 String projectName, String repositoryName, Revision from, Revision to, PathPattern pathPattern) {
343 return normalizeRevisionsAndExecuteWithRetries(
344 projectName, repositoryName, from, to,
345 new BiFunction<Revision, Revision, CompletableFuture<List<Change<?>>>>() {
346 @Override
347 public CompletableFuture<List<Change<?>>> apply(Revision normFromRev, Revision normToRev) {
348 return delegate.getDiff(projectName, repositoryName,
349 normFromRev, normToRev, pathPattern);
350 }
351
352 @Override
353 public String toString() {
354 return "getDiffs(" + projectName + ", " + repositoryName + ", " +
355 from + ", " + to + ", " + pathPattern + ')';
356 }
357 });
358 }
359
360 @Override
361 public CompletableFuture<List<Change<?>>> getPreviewDiffs(
362 String projectName, String repositoryName, Revision baseRevision,
363 Iterable<? extends Change<?>> changes) {
364 return normalizeRevisionAndExecuteWithRetries(
365 projectName, repositoryName, baseRevision,
366 new Function<Revision, CompletableFuture<List<Change<?>>>>() {
367 @Override
368 public CompletableFuture<List<Change<?>>> apply(Revision normBaseRev) {
369 return delegate.getPreviewDiffs(projectName, repositoryName, normBaseRev, changes);
370 }
371
372 @Override
373 public String toString() {
374 return "getPreviewDiffs(" + projectName + ", " + repositoryName + ", " +
375 baseRevision + ", ...)";
376 }
377 });
378 }
379
380 @Override
381 public CompletableFuture<PushResult> push(
382 String projectName, String repositoryName, Revision baseRevision,
383 String summary, String detail, Markup markup, Iterable<? extends Change<?>> changes) {
384 return executeWithRetries(
385 new Supplier<CompletableFuture<PushResult>>() {
386 @Override
387 public CompletableFuture<PushResult> get() {
388 return delegate.push(projectName, repositoryName, baseRevision,
389 summary, detail, markup, changes);
390 }
391
392 @Override
393 public String toString() {
394 return "push(" + projectName + ", " + repositoryName + ", " +
395 baseRevision + ", " + summary + ", ...)";
396 }
397 },
398 pushRetryPredicate(projectName, repositoryName, baseRevision));
399 }
400
401 @Override
402 public CompletableFuture<PushResult> push(
403 String projectName, String repositoryName, Revision baseRevision,
404 Author author, String summary, String detail, Markup markup,
405 Iterable<? extends Change<?>> changes) {
406 return executeWithRetries(
407 new Supplier<CompletableFuture<PushResult>>() {
408 @Override
409 public CompletableFuture<PushResult> get() {
410 return delegate.push(projectName, repositoryName, baseRevision,
411 author, summary, detail, markup, changes);
412 }
413
414 @Override
415 public String toString() {
416 return "push(" + projectName + ", " + repositoryName + ", " +
417 baseRevision + ", " + summary + ", ...)";
418 }
419 },
420 pushRetryPredicate(projectName, repositoryName, baseRevision));
421 }
422
423 private BiPredicate<PushResult, Throwable> pushRetryPredicate(
424 String projectName, String repositoryName, Revision baseRevision) {
425
426 return (res, cause) -> {
427 if (cause != null) {
428 return handleRevisionNotFound(projectName, repositoryName, baseRevision, cause);
429 }
430
431 updateLatestKnownRevision(projectName, repositoryName, res.revision());
432 return false;
433 };
434 }
435
436 @Override
437 public CompletableFuture<Revision> watchRepository(
438 String projectName, String repositoryName, Revision lastKnownRevision,
439 PathPattern pathPattern, long timeoutMillis, boolean errorOnEntryNotFound) {
440
441 return normalizeRevisionAndExecuteWithRetries(
442 projectName, repositoryName, lastKnownRevision,
443 new Function<Revision, CompletableFuture<Revision>>() {
444 @Override
445 public CompletableFuture<Revision> apply(Revision normLastKnownRevision) {
446 return delegate.watchRepository(projectName, repositoryName, normLastKnownRevision,
447 pathPattern, timeoutMillis, errorOnEntryNotFound)
448 .thenApply(newLastKnownRevision -> {
449 if (newLastKnownRevision != null) {
450 updateLatestKnownRevision(projectName, repositoryName,
451 newLastKnownRevision);
452 }
453 return newLastKnownRevision;
454 });
455 }
456
457 @Override
458 public String toString() {
459 return "watchRepository(" + projectName + ", " + repositoryName + ", " +
460 lastKnownRevision + ", " + pathPattern + ", " + timeoutMillis + ", " +
461 errorOnEntryNotFound + ')';
462 }
463 });
464 }
465
466 @Override
467 public <T> CompletableFuture<Entry<T>> watchFile(
468 String projectName, String repositoryName, Revision lastKnownRevision,
469 Query<T> query, long timeoutMillis, boolean errorOnEntryNotFound) {
470
471 return normalizeRevisionAndExecuteWithRetries(
472 projectName, repositoryName, lastKnownRevision,
473 new Function<Revision, CompletableFuture<Entry<T>>>() {
474 @Override
475 public CompletableFuture<Entry<T>> apply(Revision normLastKnownRevision) {
476 return delegate.watchFile(projectName, repositoryName, normLastKnownRevision,
477 query, timeoutMillis, errorOnEntryNotFound)
478 .thenApply(entry -> {
479 if (entry != null) {
480 updateLatestKnownRevision(projectName, repositoryName,
481 entry.revision());
482 }
483 return entry;
484 });
485 }
486
487 @Override
488 public String toString() {
489 return "watchFile(" + projectName + ", " + repositoryName + ", " +
490 lastKnownRevision + ", " + query + ", " + timeoutMillis + ", " +
491 errorOnEntryNotFound + ')';
492 }
493 });
494 }
495
496 @Override
497 public CompletableFuture<Void> whenEndpointReady() {
498 return delegate.whenEndpointReady();
499 }
500
501
502
503
504
505
506 private <T> CompletableFuture<T> normalizeRevisionAndExecuteWithRetries(
507 String projectName, String repositoryName, Revision revision,
508 Function<Revision, CompletableFuture<T>> taskRunner) {
509 return normalizeRevision(projectName, repositoryName, revision)
510 .thenCompose(normRev -> executeWithRetries(
511 new Supplier<CompletableFuture<T>>() {
512 @Override
513 public CompletableFuture<T> get() {
514 return taskRunner.apply(normRev);
515 }
516
517 @Override
518 public String toString() {
519 return taskRunner + " with " + normRev;
520 }
521 },
522 (res, cause) -> cause != null &&
523 handleRevisionNotFound(projectName, repositoryName, normRev, cause)));
524 }
525
526
527
528
529
530
531
532 private <T> CompletableFuture<T> normalizeRevisionsAndExecuteWithRetries(
533 String projectName, String repositoryName, Revision from, Revision to,
534 BiFunction<Revision, Revision, CompletableFuture<T>> taskRunner) {
535
536 if (to == null) {
537 return exceptionallyCompletedFuture(new NullPointerException("to"));
538 }
539
540 if (from == null) {
541 return exceptionallyCompletedFuture(new NullPointerException("from"));
542 }
543
544 if (from.isRelative() && to.isRelative() ||
545 !from.isRelative() && !to.isRelative()) {
546
547
548
549 final int distance = to.major() - from.major();
550 final Revision baseRevision = to.compareTo(from) >= 0 ? to : from;
551
552 return normalizeRevision(projectName, repositoryName, baseRevision).thenCompose(normBaseRev -> {
553 final Revision normFromRev;
554 final Revision normToRev;
555 if (distance >= 0) {
556 normToRev = normBaseRev;
557 normFromRev = normBaseRev.backward(distance);
558 } else {
559 normFromRev = normBaseRev;
560 normToRev = normBaseRev.backward(-distance);
561 }
562
563 return executeWithRetries(
564 new Supplier<CompletableFuture<T>>() {
565 @Override
566 public CompletableFuture<T> get() {
567 return taskRunner.apply(normFromRev, normToRev);
568 }
569
570 @Override
571 public String toString() {
572 return taskRunner + " with [" + normFromRev + ", " + normToRev + ']';
573 }
574 },
575 (res, cause) -> {
576 if (cause == null) {
577 return false;
578 }
579 return handleRevisionNotFound(projectName, repositoryName, normBaseRev, cause);
580 });
581 });
582 } else {
583
584
585 return CompletableFutures.allAsList(ImmutableList.of(
586 normalizeRevision(projectName, repositoryName, from),
587 normalizeRevision(projectName, repositoryName, to))).thenCompose(normRevs -> {
588 final Revision normFromRev = normRevs.get(0);
589 final Revision normToRev = normRevs.get(1);
590 return executeWithRetries(
591 new Supplier<CompletableFuture<T>>() {
592 @Override
593 public CompletableFuture<T> get() {
594 return taskRunner.apply(normFromRev, normToRev);
595 }
596
597 @Override
598 public String toString() {
599 return taskRunner + " with [" + normFromRev + ", " + normToRev + ']';
600 }
601 },
602 (res, cause) -> {
603 if (cause == null) {
604 return false;
605 }
606
607 final Revision normBaseRev = normFromRev.compareTo(normToRev) > 0 ? normFromRev
608 : normToRev;
609 return handleRevisionNotFound(projectName, repositoryName, normBaseRev, cause);
610 });
611 });
612 }
613 }
614
615
616
617
618
619
620 private <T> CompletableFuture<T> executeWithRetries(
621 Supplier<CompletableFuture<T>> taskRunner,
622 BiPredicate<T, Throwable> retryPredicate) {
623 return executeWithRetries(taskRunner, retryPredicate, 0);
624 }
625
626 private <T> CompletableFuture<T> executeWithRetries(
627 Supplier<CompletableFuture<T>> taskRunner,
628 BiPredicate<T, Throwable> retryPredicate,
629 int attemptsSoFar) {
630
631 return CompletableFutures.handleCompose(taskRunner.get(), (res, cause) -> {
632 final Object currentReplicaHint = currentReplicaHintSupplier.get();
633 final int nextAttemptsSoFar = attemptsSoFar + 1;
634 final boolean retryRequired = retryPredicate.test(res, cause);
635 if (!retryRequired || nextAttemptsSoFar > maxRetries) {
636 if (retryRequired) {
637 if (currentReplicaHint != null) {
638 logger.warn("[{}] Failed to retrieve the up-to-date data from Central Dogma " +
639 "after {} retries: {} => {}",
640 currentReplicaHint, attemptsSoFar, taskRunner, resultOrCause(res, cause));
641 } else {
642 logger.warn("Failed to retrieve the up-to-date data from Central Dogma " +
643 "after {} retries: {} => {}",
644 attemptsSoFar, taskRunner, resultOrCause(res, cause));
645 }
646 } else if (logger.isDebugEnabled()) {
647 if (currentReplicaHint != null) {
648 logger.debug("[{}] Retrieved the up-to-date data after {} retries: {} => {}",
649 currentReplicaHint, attemptsSoFar, taskRunner, resultOrCause(res, cause));
650 } else {
651 logger.debug("Retrieved the up-to-date data after {} retries: {} => {}",
652 attemptsSoFar, taskRunner, resultOrCause(res, cause));
653 }
654 }
655
656 if (cause == null) {
657 return completedFuture(res);
658 } else {
659 return exceptionallyCompletedFuture(cause);
660 }
661 }
662
663 if (logger.isDebugEnabled()) {
664 if (currentReplicaHint != null) {
665 logger.debug("[{}] Got the out-of-date data ({} attempt(s) so far): {} => {}",
666 currentReplicaHint, nextAttemptsSoFar, taskRunner, resultOrCause(res, cause));
667 } else {
668 logger.debug("Got the out-of-date data ({} attempt(s) so far): {} => {}",
669 nextAttemptsSoFar, taskRunner, resultOrCause(res, cause));
670 }
671 }
672
673 final CompletableFuture<T> nextAttemptFuture = new CompletableFuture<>();
674 executor().schedule(() -> {
675 try {
676 executeWithRetries(taskRunner, retryPredicate,
677 nextAttemptsSoFar).handle((newRes, newCause) -> {
678 if (newCause != null) {
679 nextAttemptFuture.completeExceptionally(newCause);
680 } else {
681 nextAttemptFuture.complete(newRes);
682 }
683 return null;
684 });
685 } catch (Throwable t) {
686 nextAttemptFuture.completeExceptionally(t);
687 }
688 }, retryIntervalMillis, TimeUnit.MILLISECONDS);
689
690 return nextAttemptFuture;
691 }).toCompletableFuture();
692 }
693
694
695
696
697
698 private static Throwable peel(Throwable throwable) {
699 Throwable cause = throwable.getCause();
700 while (cause != null && cause != throwable &&
701 (throwable instanceof CompletionException || throwable instanceof ExecutionException)) {
702 throwable = cause;
703 cause = throwable.getCause();
704 }
705 return throwable;
706 }
707
708
709
710
711
712 private boolean handleRevisionNotFound(
713 String projectName, String repositoryName, Revision revision, Throwable cause) {
714 requireNonNull(cause, "cause");
715 cause = peel(cause);
716 if (!(cause instanceof RevisionNotFoundException)) {
717 return false;
718 }
719
720 final Revision latestKnownRevision = latestKnownRevision(projectName, repositoryName);
721 if (latestKnownRevision == null) {
722 return false;
723 }
724
725 if (revision.isRelative()) {
726 return revision.major() + latestKnownRevision.major() >= 0;
727 } else {
728 return revision.major() <= latestKnownRevision.major();
729 }
730 }
731
732 @Nullable
733 @VisibleForTesting
734 Revision latestKnownRevision(String projectName, String repositoryName) {
735 synchronized (latestKnownRevisions) {
736 return latestKnownRevisions.get(new RepoId(projectName, repositoryName));
737 }
738 }
739
740
741
742
743
744
745
746 private boolean updateLatestKnownRevision(String projectName, String repositoryName, Revision newRevision) {
747 final Object currentReplicaHint = currentReplicaHintSupplier.get();
748 final RepoId id = new RepoId(projectName, repositoryName);
749 synchronized (latestKnownRevisions) {
750 final Revision oldRevision = latestKnownRevisions.get(id);
751 if (oldRevision == null) {
752 if (currentReplicaHint != null) {
753 logger.debug("[{}] Updating the latest known revision for {}/{} from <unknown> to: {}",
754 currentReplicaHint, projectName, repositoryName, newRevision);
755 } else {
756 logger.debug("Updating the latest known revision for {}/{} from <unknown> to: {}",
757 projectName, repositoryName, newRevision);
758 }
759 latestKnownRevisions.put(id, newRevision);
760 return true;
761 }
762
763 final int comparison = oldRevision.compareTo(newRevision);
764 if (comparison < 0) {
765 if (currentReplicaHint != null) {
766 logger.debug("[{}] Updating the latest known revision for {}/{} from {} to: {}",
767 currentReplicaHint, projectName, repositoryName, oldRevision, newRevision);
768 } else {
769 logger.debug("Updating the latest known revision for {}/{} from {} to: {}",
770 projectName, repositoryName, oldRevision, newRevision);
771 }
772 latestKnownRevisions.put(id, newRevision);
773 return true;
774 }
775
776 if (comparison == 0) {
777 if (currentReplicaHint != null) {
778 logger.debug("[{}] The latest known revision for {}/{} stays unchanged at: {}",
779 currentReplicaHint, projectName, repositoryName, newRevision);
780 } else {
781 logger.debug("The latest known revision for {}/{} stays unchanged at: {}",
782 projectName, repositoryName, newRevision);
783 }
784 return true;
785 }
786
787 if (currentReplicaHint != null) {
788 logger.debug("[{}] An out-of-date latest known revision for {}/{}: {}",
789 currentReplicaHint, projectName, repositoryName, newRevision);
790 } else {
791 logger.debug("An out-of-date latest known revision for {}/{}: {}",
792 projectName, repositoryName, newRevision);
793 }
794 return false;
795 }
796 }
797
798 @Nullable
799 private static Object resultOrCause(@Nullable Object res, @Nullable Throwable cause) {
800 return res != null ? res : cause;
801 }
802
803 @Override
804 public void close() throws Exception {
805 delegate.close();
806 }
807
808 private static final class RepoId {
809 private final String projectName;
810 private final String repositoryName;
811
812 RepoId(String projectName, String repositoryName) {
813 this.projectName = projectName;
814 this.repositoryName = repositoryName;
815 }
816
817 @Override
818 public boolean equals(Object o) {
819 if (this == o) {
820 return true;
821 }
822 if (!(o instanceof RepoId)) {
823 return false;
824 }
825 final RepoId that = (RepoId) o;
826 return projectName.equals(that.projectName) && repositoryName.equals(that.repositoryName);
827 }
828
829 @Override
830 public int hashCode() {
831 return Objects.hash(projectName, repositoryName);
832 }
833
834 @Override
835 public String toString() {
836 return projectName + '/' + repositoryName;
837 }
838 }
839 }