1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.client;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static java.util.Objects.requireNonNull;
20
21 import java.time.Duration;
22 import java.util.concurrent.Executor;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.TimeUnit;
25 import java.util.function.Function;
26
27 import javax.annotation.Nullable;
28
29 import com.linecorp.centraldogma.common.PathPattern;
30 import com.linecorp.centraldogma.common.Query;
31
32 import io.micrometer.core.instrument.MeterRegistry;
33
34
35
36
37 public final class WatcherRequest<T> extends WatchOptions {
38
39 private static final long DEFAULT_DELAY_ON_SUCCESS_MILLIS = TimeUnit.SECONDS.toMillis(1);
40 private static final long DEFAULT_MAX_DELAY_MILLIS = TimeUnit.MINUTES.toMillis(1);
41 private static final double DEFAULT_MULTIPLIER = 2.0;
42 private static final double DEFAULT_JITTER_RATE = 0.2;
43
44 private final CentralDogmaRepository centralDogmaRepo;
45 @Nullable
46 private final Query<T> query;
47 @Nullable
48 private final PathPattern pathPattern;
49 private final ScheduledExecutorService blockingTaskExecutor;
50
51 @Nullable
52 private Function<Object, ? extends T> mapper;
53 private Executor executor;
54
55 @Nullable
56 private final MeterRegistry meterRegistry;
57
58 private long delayOnSuccessMillis = DEFAULT_DELAY_ON_SUCCESS_MILLIS;
59 private long initialDelayMillis = DEFAULT_DELAY_ON_SUCCESS_MILLIS * 2;
60 private long maxDelayMillis = DEFAULT_MAX_DELAY_MILLIS;
61 private double multiplier = DEFAULT_MULTIPLIER;
62 private double jitterRate = DEFAULT_JITTER_RATE;
63
64 WatcherRequest(CentralDogmaRepository centralDogmaRepo, Query<T> query,
65 ScheduledExecutorService blockingTaskExecutor, @Nullable MeterRegistry meterRegistry) {
66 this(centralDogmaRepo, query, null, blockingTaskExecutor, meterRegistry);
67 }
68
69 WatcherRequest(CentralDogmaRepository centralDogmaRepo, PathPattern pathPattern,
70 ScheduledExecutorService blockingTaskExecutor, @Nullable MeterRegistry meterRegistry) {
71 this(centralDogmaRepo, null, pathPattern, blockingTaskExecutor, meterRegistry);
72 }
73
74 private WatcherRequest(CentralDogmaRepository centralDogmaRepo, @Nullable Query<T> query,
75 @Nullable PathPattern pathPattern, ScheduledExecutorService blockingTaskExecutor,
76 @Nullable MeterRegistry meterRegistry) {
77 this.centralDogmaRepo = centralDogmaRepo;
78 this.query = query;
79 this.pathPattern = pathPattern;
80 this.blockingTaskExecutor = blockingTaskExecutor;
81 executor = blockingTaskExecutor;
82 this.meterRegistry = meterRegistry;
83 }
84
85
86
87
88 @SuppressWarnings("unchecked")
89 public <U> WatcherRequest<U> map(Function<? super T, ? extends U> mapper) {
90 if (this.mapper == null) {
91 this.mapper = (Function<Object, ? extends T>) mapper;
92 } else {
93 this.mapper = (Function<Object, ? extends T>) this.mapper.andThen(mapper);
94 }
95 return (WatcherRequest<U>) this;
96 }
97
98
99
100
101 public WatcherRequest<T> mapperExecutor(Executor executor) {
102 this.executor = executor;
103 return this;
104 }
105
106
107
108
109 public WatcherRequest<T> delayOnSuccess(Duration delayOnSuccess) {
110 requireNonNull(delayOnSuccess, "delayOnSuccess");
111 checkArgument(!delayOnSuccess.isNegative(),
112 "delayOnSuccess: %s (expected: >= 0)", delayOnSuccess);
113 return delayOnSuccessMillis(delayOnSuccess.toMillis());
114 }
115
116
117
118
119 public WatcherRequest<T> delayOnSuccessMillis(long delayOnSuccessMillis) {
120 this.delayOnSuccessMillis = delayOnSuccessMillis;
121 checkArgument(delayOnSuccessMillis >= 0,
122 "delayOnSuccessMillis: %s (expected: >= 0)", delayOnSuccessMillis);
123 return this;
124 }
125
126
127
128
129
130
131 public WatcherRequest<T> backoffOnFailure(long initialDelayMillis, long maxDelayMillis,
132 double multiplier) {
133 checkArgument(initialDelayMillis >= 0, "initialDelayMillis: %s (expected: >= 0)", initialDelayMillis);
134 checkArgument(initialDelayMillis <= maxDelayMillis, "maxDelayMillis: %s (expected: >= %s)",
135 maxDelayMillis, initialDelayMillis);
136 checkArgument(multiplier > 1.0, "multiplier: %s (expected: > 1.0)", multiplier);
137 this.initialDelayMillis = initialDelayMillis;
138 this.maxDelayMillis = maxDelayMillis;
139 this.multiplier = multiplier;
140 return this;
141 }
142
143
144
145
146 public WatcherRequest<T> jitterRate(double jitterRate) {
147 checkArgument(0.0 <= jitterRate && jitterRate <= 1.0,
148 "jitterRate: %s (expected: >= 0.0 and <= 1.0)", jitterRate);
149 this.jitterRate = jitterRate;
150 return this;
151 }
152
153 @Override
154 public WatcherRequest<T> timeout(Duration timeout) {
155
156 return (WatcherRequest<T>) super.timeout(timeout);
157 }
158
159 @Override
160 public WatcherRequest<T> timeoutMillis(long timeoutMillis) {
161
162 return (WatcherRequest<T>) super.timeoutMillis(timeoutMillis);
163 }
164
165 @Override
166 public WatcherRequest<T> errorOnEntryNotFound(boolean errorOnEntryNotFound) {
167
168 return (WatcherRequest<T>) super.errorOnEntryNotFound(errorOnEntryNotFound);
169 }
170
171
172
173
174
175 public Watcher<T> start() {
176 final String proName = centralDogmaRepo.projectName();
177 final String repoName = centralDogmaRepo.repositoryName();
178 final AbstractWatcher<T> watcher;
179 if (query != null) {
180 watcher = new FileWatcher<>(
181 centralDogmaRepo.centralDogma(), blockingTaskExecutor, proName, repoName, query,
182 timeoutMillis(), errorOnEntryNotFound(), mapper, executor, delayOnSuccessMillis,
183 initialDelayMillis, maxDelayMillis, multiplier, jitterRate, meterRegistry);
184 } else {
185 assert pathPattern != null;
186 watcher = new FilesWatcher<>(
187 centralDogmaRepo.centralDogma(), blockingTaskExecutor, proName, repoName, pathPattern,
188 timeoutMillis(), errorOnEntryNotFound(), mapper, executor, delayOnSuccessMillis,
189 initialDelayMillis, maxDelayMillis, multiplier, jitterRate, meterRegistry);
190 }
191 watcher.start();
192 return watcher;
193 }
194 }