1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.client;
17
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.Executor;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.function.Function;
22
23 import javax.annotation.Nullable;
24
25 import com.linecorp.centraldogma.common.Entry;
26 import com.linecorp.centraldogma.common.Query;
27 import com.linecorp.centraldogma.common.Revision;
28
29 final class FileWatcher<T> extends AbstractWatcher<T> {
30
31 private final CentralDogma centralDogma;
32 private final String projectName;
33 private final String repositoryName;
34 private final Query<T> query;
35 private final long timeoutMillis;
36 private final boolean errorOnEntryNotFound;
37 @Nullable
38 private final Function<Object, ? extends T> mapper;
39 @Nullable
40 private final Executor mapperExecutor;
41
42 FileWatcher(CentralDogma centralDogma, ScheduledExecutorService watchScheduler,
43 String projectName, String repositoryName, Query<T> query,
44 long timeoutMillis, boolean errorOnEntryNotFound,
45 @Nullable Function<Object, ? extends T> mapper, Executor mapperExecutor,
46 long delayOnSuccessMillis, long initialDelayMillis, long maxDelayMillis,
47 double multiplier, double jitterRate) {
48 super(watchScheduler, projectName, repositoryName, query.path(), errorOnEntryNotFound,
49 delayOnSuccessMillis, initialDelayMillis, maxDelayMillis, multiplier, jitterRate);
50 this.centralDogma = centralDogma;
51 this.projectName = projectName;
52 this.repositoryName = repositoryName;
53 this.query = query;
54 this.timeoutMillis = timeoutMillis;
55 this.errorOnEntryNotFound = errorOnEntryNotFound;
56 this.mapper = mapper;
57 this.mapperExecutor = mapperExecutor;
58 }
59
60 @Override
61 CompletableFuture<Latest<T>> doWatch(Revision lastKnownRevision) {
62 final CompletableFuture<Entry<T>> future = centralDogma.watchFile(
63 projectName, repositoryName, lastKnownRevision, query, timeoutMillis, errorOnEntryNotFound);
64 if (mapper == null) {
65 return future.thenApply(entry -> {
66 if (entry == null) {
67 return null;
68 }
69 return new Latest<>(entry.revision(), entry.content());
70 });
71 }
72 return future.thenApplyAsync(entry -> {
73 if (entry == null) {
74 return null;
75 }
76 return new Latest<>(entry.revision(), mapper.apply(entry.content()));
77 }, mapperExecutor);
78 }
79 }