1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.linecorp.centraldogma.server.internal.api;
18  
19  import static java.util.Objects.requireNonNull;
20  
21  import java.util.Collections;
22  import java.util.Set;
23  import java.util.concurrent.CancellationException;
24  import java.util.concurrent.CompletableFuture;
25  import java.util.concurrent.ConcurrentHashMap;
26  import java.util.concurrent.ScheduledFuture;
27  import java.util.concurrent.ThreadLocalRandom;
28  import java.util.concurrent.TimeUnit;
29  
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  
33  import com.linecorp.armeria.common.RequestContext;
34  import com.linecorp.armeria.common.util.Exceptions;
35  import com.linecorp.armeria.common.util.TimeoutMode;
36  import com.linecorp.armeria.server.ServiceRequestContext;
37  import com.linecorp.centraldogma.common.Entry;
38  import com.linecorp.centraldogma.common.Query;
39  import com.linecorp.centraldogma.common.Revision;
40  import com.linecorp.centraldogma.internal.api.v1.WatchTimeout;
41  import com.linecorp.centraldogma.server.internal.storage.RequestAlreadyTimedOutException;
42  import com.linecorp.centraldogma.server.storage.repository.Repository;
43  
44  import io.micrometer.core.instrument.Counter;
45  import io.micrometer.core.instrument.Gauge;
46  import io.micrometer.core.instrument.MeterRegistry;
47  
48  /**
49   * A service class for watching repository or a file.
50   */
51  public final class WatchService {
52  
53      private static final Logger logger = LoggerFactory.getLogger(WatchService.class);
54  
55      private static final CancellationException CANCELLATION_EXCEPTION =
56              Exceptions.clearTrace(new CancellationException("watch timed out"));
57  
58      private static final double JITTER_RATE = 0.2;
59  
60      private final Set<CompletableFuture<?>> pendingFutures =
61              Collections.newSetFromMap(new ConcurrentHashMap<>());
62      private final Counter wakeupCounter;
63      private final Counter timeoutCounter;
64      private final Counter failureCounter;
65  
66      public WatchService(MeterRegistry meterRegistry) {
67          requireNonNull(meterRegistry, "meterRegistry");
68  
69          Gauge.builder("watches.active", this, self -> self.pendingFutures.size()).register(meterRegistry);
70  
71          wakeupCounter = Counter.builder("watches.processed")
72                                 .tag("result", "wakeup")
73                                 .register(meterRegistry);
74          timeoutCounter = Counter.builder("watches.processed")
75                                  .tag("result", "timeout")
76                                  .register(meterRegistry);
77          failureCounter = Counter.builder("watches.processed")
78                                  .tag("result", "failure")
79                                  .register(meterRegistry);
80      }
81  
82      /**
83       * Awaits and retrieves the latest revision of the commit that changed the file that matches the specified
84       * {@code pathPattern} since the specified {@code lastKnownRevision}. This will wait until the specified
85       * {@code timeoutMillis} passes. If there's no change during the time, the returned future will be
86       * exceptionally completed with the {@link CancellationException}.
87       */
88      public CompletableFuture<Revision> watchRepository(Repository repo, Revision lastKnownRevision,
89                                                         String pathPattern, long timeoutMillis,
90                                                         boolean errorOnEntryNotFound) {
91          final ServiceRequestContext ctx = RequestContext.current();
92          updateRequestTimeout(ctx, timeoutMillis);
93          final CompletableFuture<Revision> result = repo.watch(lastKnownRevision, pathPattern,
94                                                                errorOnEntryNotFound);
95          if (result.isDone()) {
96              return result;
97          }
98  
99          scheduleTimeout(ctx, result, timeoutMillis);
100         return result;
101     }
102 
103     private static void updateRequestTimeout(ServiceRequestContext ctx, long timeoutMillis) {
104         final long adjustmentMillis = WatchTimeout.availableTimeout(timeoutMillis, ctx.requestTimeoutMillis());
105         ctx.setRequestTimeoutMillis(TimeoutMode.EXTEND, adjustmentMillis);
106     }
107 
108     /**
109      * Awaits and retrieves the latest revision of the commit that changed the file that matches the specified
110      * {@link Query} since the specified {@code lastKnownRevision}. This will wait until the specified
111      * {@code timeoutMillis} passes. If there's no change during the time, the returned future will be
112      * exceptionally completed with the {@link CancellationException}.
113      */
114     public <T> CompletableFuture<Entry<T>> watchFile(Repository repo, Revision lastKnownRevision,
115                                                      Query<T> query, long timeoutMillis,
116                                                      boolean errorOnEntryNotFound) {
117         final ServiceRequestContext ctx = RequestContext.current();
118         updateRequestTimeout(ctx, timeoutMillis);
119         final CompletableFuture<Entry<T>> result = repo.watch(lastKnownRevision, query, errorOnEntryNotFound);
120         if (result.isDone()) {
121             return result;
122         }
123 
124         scheduleTimeout(ctx, result, timeoutMillis);
125         return result;
126     }
127 
128     private <T> void scheduleTimeout(ServiceRequestContext ctx, CompletableFuture<T> result,
129                                      long timeoutMillis) {
130         pendingFutures.add(result);
131 
132         final ScheduledFuture<?> timeoutFuture;
133         final long watchTimeoutMillis;
134         if (timeoutMillis > 0) {
135             watchTimeoutMillis = applyJitter(WatchTimeout.availableTimeout(timeoutMillis));
136             timeoutFuture = ctx.eventLoop().schedule(() -> result.completeExceptionally(CANCELLATION_EXCEPTION),
137                                                      watchTimeoutMillis, TimeUnit.MILLISECONDS);
138         } else {
139             watchTimeoutMillis = 0;
140             timeoutFuture = null;
141         }
142 
143         result.whenComplete((revision, cause) -> {
144             if (timeoutFuture != null) {
145                 if (timeoutFuture.cancel(true)) {
146                     wakeupCounter.increment();
147 
148                     // TODO(hyangtack) Need to investigate why this exception comes before
149                     //                 CancellationException.
150                     if (cause instanceof RequestAlreadyTimedOutException) {
151                         logger.warn("Request has timed out before watch timeout: watchTimeoutMillis={}, log={}",
152                                     watchTimeoutMillis, ctx.log());
153                     }
154                 } else {
155                     timeoutCounter.increment();
156                 }
157             } else {
158                 if (cause == null) {
159                     wakeupCounter.increment();
160                 } else {
161                     failureCounter.increment();
162                 }
163             }
164             pendingFutures.remove(result);
165         });
166     }
167 
168     private static long applyJitter(long timeoutMillis) {
169         // Specify the 'bound' value that's slightly greater than 1.0 because it's exclusive.
170         final double rate = ThreadLocalRandom.current().nextDouble(1 - JITTER_RATE, 1.001);
171         if (rate < 1) {
172             return (long) (timeoutMillis * rate);
173         } else {
174             return timeoutMillis;
175         }
176     }
177 }