1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.client;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static java.util.Objects.requireNonNull;
20
21 import java.time.Duration;
22 import java.util.concurrent.Executor;
23 import java.util.concurrent.ScheduledExecutorService;
24 import java.util.concurrent.TimeUnit;
25 import java.util.function.Function;
26
27 import javax.annotation.Nullable;
28
29 import com.linecorp.centraldogma.common.PathPattern;
30 import com.linecorp.centraldogma.common.Query;
31
32
33
34
35 public final class WatcherRequest<T> extends WatchOptions {
36
37 private static final long DEFAULT_DELAY_ON_SUCCESS_MILLIS = TimeUnit.SECONDS.toMillis(1);
38 private static final long DEFAULT_MAX_DELAY_MILLIS = TimeUnit.MINUTES.toMillis(1);
39 private static final double DEFAULT_MULTIPLIER = 2.0;
40 private static final double DEFAULT_JITTER_RATE = 0.2;
41
42 private final CentralDogmaRepository centralDogmaRepo;
43 @Nullable
44 private final Query<T> query;
45 @Nullable
46 private final PathPattern pathPattern;
47 private final ScheduledExecutorService blockingTaskExecutor;
48
49 @Nullable
50 private Function<Object, ? extends T> mapper;
51 private Executor executor;
52
53 private long delayOnSuccessMillis = DEFAULT_DELAY_ON_SUCCESS_MILLIS;
54 private long initialDelayMillis = DEFAULT_DELAY_ON_SUCCESS_MILLIS * 2;
55 private long maxDelayMillis = DEFAULT_MAX_DELAY_MILLIS;
56 private double multiplier = DEFAULT_MULTIPLIER;
57 private double jitterRate = DEFAULT_JITTER_RATE;
58
59 WatcherRequest(CentralDogmaRepository centralDogmaRepo, Query<T> query,
60 ScheduledExecutorService blockingTaskExecutor) {
61 this(centralDogmaRepo, query, null, blockingTaskExecutor);
62 }
63
64 WatcherRequest(CentralDogmaRepository centralDogmaRepo, PathPattern pathPattern,
65 ScheduledExecutorService blockingTaskExecutor) {
66 this(centralDogmaRepo, null, pathPattern, blockingTaskExecutor);
67 }
68
69 private WatcherRequest(CentralDogmaRepository centralDogmaRepo,
70 @Nullable Query<T> query, @Nullable PathPattern pathPattern,
71 ScheduledExecutorService blockingTaskExecutor) {
72 this.centralDogmaRepo = centralDogmaRepo;
73 this.query = query;
74 this.pathPattern = pathPattern;
75 this.blockingTaskExecutor = blockingTaskExecutor;
76 executor = blockingTaskExecutor;
77 }
78
79
80
81
82 @SuppressWarnings("unchecked")
83 public <U> WatcherRequest<U> map(Function<? super T, ? extends U> mapper) {
84 if (this.mapper == null) {
85 this.mapper = (Function<Object, ? extends T>) mapper;
86 } else {
87 this.mapper = (Function<Object, ? extends T>) this.mapper.andThen(mapper);
88 }
89 return (WatcherRequest<U>) this;
90 }
91
92
93
94
95 public WatcherRequest<T> mapperExecutor(Executor executor) {
96 this.executor = executor;
97 return this;
98 }
99
100
101
102
103 public WatcherRequest<T> delayOnSuccess(Duration delayOnSuccess) {
104 requireNonNull(delayOnSuccess, "delayOnSuccess");
105 checkArgument(!delayOnSuccess.isNegative(),
106 "delayOnSuccess: %s (expected: >= 0)", delayOnSuccess);
107 return delayOnSuccessMillis(delayOnSuccess.toMillis());
108 }
109
110
111
112
113 public WatcherRequest<T> delayOnSuccessMillis(long delayOnSuccessMillis) {
114 this.delayOnSuccessMillis = delayOnSuccessMillis;
115 checkArgument(delayOnSuccessMillis >= 0,
116 "delayOnSuccessMillis: %s (expected: >= 0)", delayOnSuccessMillis);
117 return this;
118 }
119
120
121
122
123
124
125 public WatcherRequest<T> backoffOnFailure(long initialDelayMillis, long maxDelayMillis,
126 double multiplier) {
127 checkArgument(initialDelayMillis >= 0, "initialDelayMillis: %s (expected: >= 0)", initialDelayMillis);
128 checkArgument(initialDelayMillis <= maxDelayMillis, "maxDelayMillis: %s (expected: >= %s)",
129 maxDelayMillis, initialDelayMillis);
130 checkArgument(multiplier > 1.0, "multiplier: %s (expected: > 1.0)", multiplier);
131 this.initialDelayMillis = initialDelayMillis;
132 this.maxDelayMillis = maxDelayMillis;
133 this.multiplier = multiplier;
134 return this;
135 }
136
137
138
139
140 public WatcherRequest<T> jitterRate(double jitterRate) {
141 checkArgument(0.0 <= jitterRate && jitterRate <= 1.0,
142 "jitterRate: %s (expected: >= 0.0 and <= 1.0)", jitterRate);
143 this.jitterRate = jitterRate;
144 return this;
145 }
146
147 @Override
148 public WatcherRequest<T> timeout(Duration timeout) {
149
150 return (WatcherRequest<T>) super.timeout(timeout);
151 }
152
153 @Override
154 public WatcherRequest<T> timeoutMillis(long timeoutMillis) {
155
156 return (WatcherRequest<T>) super.timeoutMillis(timeoutMillis);
157 }
158
159 @Override
160 public WatcherRequest<T> errorOnEntryNotFound(boolean errorOnEntryNotFound) {
161
162 return (WatcherRequest<T>) super.errorOnEntryNotFound(errorOnEntryNotFound);
163 }
164
165
166
167
168
169 public Watcher<T> start() {
170 final String proName = centralDogmaRepo.projectName();
171 final String repoName = centralDogmaRepo.repositoryName();
172 final AbstractWatcher<T> watcher;
173 if (query != null) {
174 watcher = new FileWatcher<>(
175 centralDogmaRepo.centralDogma(), blockingTaskExecutor, proName, repoName, query,
176 timeoutMillis(), errorOnEntryNotFound(), mapper, executor, delayOnSuccessMillis,
177 initialDelayMillis, maxDelayMillis, multiplier, jitterRate);
178 } else {
179 assert pathPattern != null;
180 watcher = new FilesWatcher<>(
181 centralDogmaRepo.centralDogma(), blockingTaskExecutor, proName, repoName, pathPattern,
182 timeoutMillis(), errorOnEntryNotFound(), mapper, executor, delayOnSuccessMillis,
183 initialDelayMillis, maxDelayMillis, multiplier, jitterRate);
184 }
185 watcher.start();
186 return watcher;
187 }
188 }