1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.linecorp.centraldogma.server.internal.api;
18
19 import static java.util.Objects.requireNonNull;
20
21 import java.util.Collections;
22 import java.util.Set;
23 import java.util.concurrent.CancellationException;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.ConcurrentHashMap;
26 import java.util.concurrent.ScheduledFuture;
27 import java.util.concurrent.ThreadLocalRandom;
28 import java.util.concurrent.TimeUnit;
29
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import com.linecorp.armeria.common.RequestContext;
34 import com.linecorp.armeria.common.util.Exceptions;
35 import com.linecorp.armeria.common.util.TimeoutMode;
36 import com.linecorp.armeria.server.ServiceRequestContext;
37 import com.linecorp.centraldogma.common.Entry;
38 import com.linecorp.centraldogma.common.Query;
39 import com.linecorp.centraldogma.common.Revision;
40 import com.linecorp.centraldogma.internal.api.v1.WatchTimeout;
41 import com.linecorp.centraldogma.server.internal.storage.RequestAlreadyTimedOutException;
42 import com.linecorp.centraldogma.server.storage.repository.Repository;
43
44 import io.micrometer.core.instrument.Counter;
45 import io.micrometer.core.instrument.Gauge;
46 import io.micrometer.core.instrument.MeterRegistry;
47
48
49
50
51 public final class WatchService {
52
53 private static final Logger logger = LoggerFactory.getLogger(WatchService.class);
54
55 private static final CancellationException CANCELLATION_EXCEPTION =
56 Exceptions.clearTrace(new CancellationException("watch timed out"));
57
58 private static final double JITTER_RATE = 0.2;
59
60 private final Set<CompletableFuture<?>> pendingFutures =
61 Collections.newSetFromMap(new ConcurrentHashMap<>());
62 private final Counter wakeupCounter;
63 private final Counter timeoutCounter;
64 private final Counter failureCounter;
65
66 public WatchService(MeterRegistry meterRegistry) {
67 requireNonNull(meterRegistry, "meterRegistry");
68
69 Gauge.builder("watches.active", this, self -> self.pendingFutures.size()).register(meterRegistry);
70
71 wakeupCounter = Counter.builder("watches.processed")
72 .tag("result", "wakeup")
73 .register(meterRegistry);
74 timeoutCounter = Counter.builder("watches.processed")
75 .tag("result", "timeout")
76 .register(meterRegistry);
77 failureCounter = Counter.builder("watches.processed")
78 .tag("result", "failure")
79 .register(meterRegistry);
80 }
81
82
83
84
85
86
87
88 public CompletableFuture<Revision> watchRepository(Repository repo, Revision lastKnownRevision,
89 String pathPattern, long timeoutMillis,
90 boolean errorOnEntryNotFound) {
91 final ServiceRequestContext ctx = RequestContext.current();
92 updateRequestTimeout(ctx, timeoutMillis);
93 final CompletableFuture<Revision> result = repo.watch(lastKnownRevision, pathPattern,
94 errorOnEntryNotFound);
95 if (result.isDone()) {
96 return result;
97 }
98
99 scheduleTimeout(ctx, result, timeoutMillis);
100 return result;
101 }
102
103 private static void updateRequestTimeout(ServiceRequestContext ctx, long timeoutMillis) {
104 final long adjustmentMillis = WatchTimeout.availableTimeout(timeoutMillis, ctx.requestTimeoutMillis());
105 ctx.setRequestTimeoutMillis(TimeoutMode.EXTEND, adjustmentMillis);
106 }
107
108
109
110
111
112
113
114 public <T> CompletableFuture<Entry<T>> watchFile(Repository repo, Revision lastKnownRevision,
115 Query<T> query, long timeoutMillis,
116 boolean errorOnEntryNotFound) {
117 final ServiceRequestContext ctx = RequestContext.current();
118 updateRequestTimeout(ctx, timeoutMillis);
119 final CompletableFuture<Entry<T>> result = repo.watch(lastKnownRevision, query, errorOnEntryNotFound);
120 if (result.isDone()) {
121 return result;
122 }
123
124 scheduleTimeout(ctx, result, timeoutMillis);
125 return result;
126 }
127
128 private <T> void scheduleTimeout(ServiceRequestContext ctx, CompletableFuture<T> result,
129 long timeoutMillis) {
130 pendingFutures.add(result);
131
132 final ScheduledFuture<?> timeoutFuture;
133 final long watchTimeoutMillis;
134 if (timeoutMillis > 0) {
135 watchTimeoutMillis = applyJitter(WatchTimeout.availableTimeout(timeoutMillis));
136 timeoutFuture = ctx.eventLoop().schedule(() -> result.completeExceptionally(CANCELLATION_EXCEPTION),
137 watchTimeoutMillis, TimeUnit.MILLISECONDS);
138 } else {
139 watchTimeoutMillis = 0;
140 timeoutFuture = null;
141 }
142
143 result.whenComplete((revision, cause) -> {
144 if (timeoutFuture != null) {
145 if (timeoutFuture.cancel(true)) {
146 wakeupCounter.increment();
147
148
149
150 if (cause instanceof RequestAlreadyTimedOutException) {
151 logger.warn("Request has timed out before watch timeout: watchTimeoutMillis={}, log={}",
152 watchTimeoutMillis, ctx.log());
153 }
154 } else {
155 timeoutCounter.increment();
156 }
157 } else {
158 if (cause == null) {
159 wakeupCounter.increment();
160 } else {
161 failureCounter.increment();
162 }
163 }
164 pendingFutures.remove(result);
165 });
166 }
167
168 private static long applyJitter(long timeoutMillis) {
169
170 final double rate = ThreadLocalRandom.current().nextDouble(1 - JITTER_RATE, 1.001);
171 if (rate < 1) {
172 return (long) (timeoutMillis * rate);
173 } else {
174 return timeoutMillis;
175 }
176 }
177 }