1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.client;
17
18 import java.util.concurrent.CompletableFuture;
19 import java.util.concurrent.Executor;
20 import java.util.concurrent.ScheduledExecutorService;
21 import java.util.function.Function;
22
23 import javax.annotation.Nullable;
24
25 import com.linecorp.centraldogma.common.Entry;
26 import com.linecorp.centraldogma.common.Query;
27 import com.linecorp.centraldogma.common.Revision;
28
29 import io.micrometer.core.instrument.MeterRegistry;
30
31 final class FileWatcher<T> extends AbstractWatcher<T> {
32
33 private final CentralDogma centralDogma;
34 private final String projectName;
35 private final String repositoryName;
36 private final Query<T> query;
37 private final long timeoutMillis;
38 private final boolean errorOnEntryNotFound;
39 @Nullable
40 private final Function<Object, ? extends T> mapper;
41 @Nullable
42 private final Executor mapperExecutor;
43
44 FileWatcher(CentralDogma centralDogma, ScheduledExecutorService watchScheduler, String projectName,
45 String repositoryName, Query<T> query, long timeoutMillis, boolean errorOnEntryNotFound,
46 @Nullable Function<Object, ? extends T> mapper, Executor mapperExecutor,
47 long delayOnSuccessMillis, long initialDelayMillis, long maxDelayMillis, double multiplier,
48 double jitterRate, @Nullable MeterRegistry meterRegistry) {
49 super(watchScheduler, projectName, repositoryName, query.path(), errorOnEntryNotFound,
50 delayOnSuccessMillis, initialDelayMillis, maxDelayMillis, multiplier, jitterRate, meterRegistry);
51 this.centralDogma = centralDogma;
52 this.projectName = projectName;
53 this.repositoryName = repositoryName;
54 this.query = query;
55 this.timeoutMillis = timeoutMillis;
56 this.errorOnEntryNotFound = errorOnEntryNotFound;
57 this.mapper = mapper;
58 this.mapperExecutor = mapperExecutor;
59 }
60
61 @Override
62 CompletableFuture<Latest<T>> doWatch(Revision lastKnownRevision) {
63 final CompletableFuture<Entry<T>> future = centralDogma.watchFile(projectName, repositoryName,
64 lastKnownRevision, query,
65 timeoutMillis, errorOnEntryNotFound);
66 if (mapper == null) {
67 return future.thenApply(entry -> {
68 if (entry == null) {
69 return null;
70 }
71 return new Latest<>(entry.revision(), entry.content());
72 });
73 }
74 return future.thenApplyAsync(entry -> {
75 if (entry == null) {
76 return null;
77 }
78 return new Latest<>(entry.revision(), mapper.apply(entry.content()));
79 }, mapperExecutor);
80 }
81 }