1   /*
2    * Copyright 2021 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.client;
17  
18  import static com.google.common.base.Preconditions.checkArgument;
19  import static java.util.Objects.requireNonNull;
20  
21  import java.time.Duration;
22  import java.util.concurrent.Executor;
23  import java.util.concurrent.ScheduledExecutorService;
24  import java.util.concurrent.TimeUnit;
25  import java.util.function.Function;
26  
27  import javax.annotation.Nullable;
28  
29  import com.linecorp.centraldogma.common.PathPattern;
30  import com.linecorp.centraldogma.common.Query;
31  
32  import io.micrometer.core.instrument.MeterRegistry;
33  
34  /**
35   * Prepares to create a {@link Watcher}.
36   */
37  public final class WatcherRequest<T> extends WatchOptions {
38  
39      private static final long DEFAULT_DELAY_ON_SUCCESS_MILLIS = TimeUnit.SECONDS.toMillis(1);
40      private static final long DEFAULT_MAX_DELAY_MILLIS = TimeUnit.MINUTES.toMillis(1);
41      private static final double DEFAULT_MULTIPLIER = 2.0;
42      private static final double DEFAULT_JITTER_RATE = 0.2;
43  
44      private final CentralDogmaRepository centralDogmaRepo;
45      @Nullable
46      private final Query<T> query;
47      @Nullable
48      private final PathPattern pathPattern;
49      private final ScheduledExecutorService blockingTaskExecutor;
50  
51      @Nullable
52      private Function<Object, ? extends T> mapper;
53      private Executor executor;
54  
55      @Nullable
56      private final MeterRegistry meterRegistry;
57  
58      private long delayOnSuccessMillis = DEFAULT_DELAY_ON_SUCCESS_MILLIS;
59      private long initialDelayMillis = DEFAULT_DELAY_ON_SUCCESS_MILLIS * 2;
60      private long maxDelayMillis = DEFAULT_MAX_DELAY_MILLIS;
61      private double multiplier = DEFAULT_MULTIPLIER;
62      private double jitterRate = DEFAULT_JITTER_RATE;
63  
64      WatcherRequest(CentralDogmaRepository centralDogmaRepo, Query<T> query,
65                     ScheduledExecutorService blockingTaskExecutor, @Nullable MeterRegistry meterRegistry) {
66          this(centralDogmaRepo, query, null, blockingTaskExecutor, meterRegistry);
67      }
68  
69      WatcherRequest(CentralDogmaRepository centralDogmaRepo, PathPattern pathPattern,
70                     ScheduledExecutorService blockingTaskExecutor, @Nullable MeterRegistry meterRegistry) {
71          this(centralDogmaRepo, null, pathPattern, blockingTaskExecutor, meterRegistry);
72      }
73  
74      private WatcherRequest(CentralDogmaRepository centralDogmaRepo, @Nullable Query<T> query,
75                             @Nullable PathPattern pathPattern, ScheduledExecutorService blockingTaskExecutor,
76                             @Nullable MeterRegistry meterRegistry) {
77          this.centralDogmaRepo = centralDogmaRepo;
78          this.query = query;
79          this.pathPattern = pathPattern;
80          this.blockingTaskExecutor = blockingTaskExecutor;
81          executor = blockingTaskExecutor;
82          this.meterRegistry = meterRegistry;
83      }
84  
85      /**
86       * Sets the {@link Function} to apply to the result of a watch request.
87       */
88      @SuppressWarnings("unchecked")
89      public <U> WatcherRequest<U> map(Function<? super T, ? extends U> mapper) {
90          if (this.mapper == null) {
91              this.mapper = (Function<Object, ? extends T>) mapper;
92          } else {
93              this.mapper = (Function<Object, ? extends T>) this.mapper.andThen(mapper);
94          }
95          return (WatcherRequest<U>) this;
96      }
97  
98      /**
99       * Sets the {@link Executor} to execute the {@link #map(Function)}.
100      */
101     public WatcherRequest<T> mapperExecutor(Executor executor) {
102         this.executor = executor;
103         return this;
104     }
105 
106     /**
107      * Sets the delay for sending the next watch request when the previous request succeeds.
108      */
109     public WatcherRequest<T> delayOnSuccess(Duration delayOnSuccess) {
110         requireNonNull(delayOnSuccess, "delayOnSuccess");
111         checkArgument(!delayOnSuccess.isNegative(),
112                       "delayOnSuccess: %s (expected: >= 0)", delayOnSuccess);
113         return delayOnSuccessMillis(delayOnSuccess.toMillis());
114     }
115 
116     /**
117      * Sets the delay in milliseconds for sending the next watch request when the previous request succeeds.
118      */
119     public WatcherRequest<T> delayOnSuccessMillis(long delayOnSuccessMillis) {
120         this.delayOnSuccessMillis = delayOnSuccessMillis;
121         checkArgument(delayOnSuccessMillis >= 0,
122                       "delayOnSuccessMillis: %s (expected: >= 0)", delayOnSuccessMillis);
123         return this;
124     }
125 
126     /**
127      * Sets the delays and multiplier which is used to calculate the delay
128      * for sending the next watch request when the previous request fails.
129      * Currently, it uses exponential backoff. File a feature request if you need another algorithm.
130      */
131     public WatcherRequest<T> backoffOnFailure(long initialDelayMillis, long maxDelayMillis,
132                                               double multiplier) {
133         checkArgument(initialDelayMillis >= 0, "initialDelayMillis: %s (expected: >= 0)", initialDelayMillis);
134         checkArgument(initialDelayMillis <= maxDelayMillis, "maxDelayMillis: %s (expected: >= %s)",
135                       maxDelayMillis, initialDelayMillis);
136         checkArgument(multiplier > 1.0, "multiplier: %s (expected: > 1.0)", multiplier);
137         this.initialDelayMillis = initialDelayMillis;
138         this.maxDelayMillis = maxDelayMillis;
139         this.multiplier = multiplier;
140         return this;
141     }
142 
143     /**
144      * Sets the jitter to apply the delay.
145      */
146     public WatcherRequest<T> jitterRate(double jitterRate) {
147         checkArgument(0.0 <= jitterRate && jitterRate <= 1.0,
148                       "jitterRate: %s (expected: >= 0.0 and <= 1.0)", jitterRate);
149         this.jitterRate = jitterRate;
150         return this;
151     }
152 
153     @Override
154     public WatcherRequest<T> timeout(Duration timeout) {
155         //noinspection unchecked
156         return (WatcherRequest<T>) super.timeout(timeout);
157     }
158 
159     @Override
160     public WatcherRequest<T> timeoutMillis(long timeoutMillis) {
161         //noinspection unchecked
162         return (WatcherRequest<T>) super.timeoutMillis(timeoutMillis);
163     }
164 
165     @Override
166     public WatcherRequest<T> errorOnEntryNotFound(boolean errorOnEntryNotFound) {
167         //noinspection unchecked
168         return (WatcherRequest<T>) super.errorOnEntryNotFound(errorOnEntryNotFound);
169     }
170 
171     /**
172      * Creates a new {@link Watcher} and starts to watch the target. The {@link Watcher} must be closed via
173      * {@link Watcher#close()} after use.
174      */
175     public Watcher<T> start() {
176         final String proName = centralDogmaRepo.projectName();
177         final String repoName = centralDogmaRepo.repositoryName();
178         final AbstractWatcher<T> watcher;
179         if (query != null) {
180             watcher = new FileWatcher<>(
181                     centralDogmaRepo.centralDogma(), blockingTaskExecutor, proName, repoName, query,
182                     timeoutMillis(), errorOnEntryNotFound(), mapper, executor, delayOnSuccessMillis,
183                     initialDelayMillis, maxDelayMillis, multiplier, jitterRate, meterRegistry);
184         } else {
185             assert pathPattern != null;
186             watcher = new FilesWatcher<>(
187                     centralDogmaRepo.centralDogma(), blockingTaskExecutor, proName, repoName, pathPattern,
188                     timeoutMillis(), errorOnEntryNotFound(), mapper, executor, delayOnSuccessMillis,
189                     initialDelayMillis, maxDelayMillis, multiplier, jitterRate, meterRegistry);
190         }
191         watcher.start();
192         return watcher;
193     }
194 }