1   /*
2    * Copyright 2021 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.client;
17  
18  import static com.google.common.base.Preconditions.checkArgument;
19  import static java.util.Objects.requireNonNull;
20  
21  import java.time.Duration;
22  import java.util.concurrent.Executor;
23  import java.util.concurrent.ScheduledExecutorService;
24  import java.util.concurrent.TimeUnit;
25  import java.util.function.Function;
26  
27  import javax.annotation.Nullable;
28  
29  import com.linecorp.centraldogma.common.PathPattern;
30  import com.linecorp.centraldogma.common.Query;
31  
32  /**
33   * Prepares to create a {@link Watcher}.
34   */
35  public final class WatcherRequest<T> extends WatchOptions {
36  
37      private static final long DEFAULT_DELAY_ON_SUCCESS_MILLIS = TimeUnit.SECONDS.toMillis(1);
38      private static final long DEFAULT_MAX_DELAY_MILLIS = TimeUnit.MINUTES.toMillis(1);
39      private static final double DEFAULT_MULTIPLIER = 2.0;
40      private static final double DEFAULT_JITTER_RATE = 0.2;
41  
42      private final CentralDogmaRepository centralDogmaRepo;
43      @Nullable
44      private final Query<T> query;
45      @Nullable
46      private final PathPattern pathPattern;
47      private final ScheduledExecutorService blockingTaskExecutor;
48  
49      @Nullable
50      private Function<Object, ? extends T> mapper;
51      private Executor executor;
52  
53      private long delayOnSuccessMillis = DEFAULT_DELAY_ON_SUCCESS_MILLIS;
54      private long initialDelayMillis = DEFAULT_DELAY_ON_SUCCESS_MILLIS * 2;
55      private long maxDelayMillis = DEFAULT_MAX_DELAY_MILLIS;
56      private double multiplier = DEFAULT_MULTIPLIER;
57      private double jitterRate = DEFAULT_JITTER_RATE;
58  
59      WatcherRequest(CentralDogmaRepository centralDogmaRepo, Query<T> query,
60                     ScheduledExecutorService blockingTaskExecutor) {
61          this(centralDogmaRepo, query, null, blockingTaskExecutor);
62      }
63  
64      WatcherRequest(CentralDogmaRepository centralDogmaRepo, PathPattern pathPattern,
65                     ScheduledExecutorService blockingTaskExecutor) {
66          this(centralDogmaRepo, null, pathPattern, blockingTaskExecutor);
67      }
68  
69      private WatcherRequest(CentralDogmaRepository centralDogmaRepo,
70                             @Nullable Query<T> query, @Nullable PathPattern pathPattern,
71                             ScheduledExecutorService blockingTaskExecutor) {
72          this.centralDogmaRepo = centralDogmaRepo;
73          this.query = query;
74          this.pathPattern = pathPattern;
75          this.blockingTaskExecutor = blockingTaskExecutor;
76          executor = blockingTaskExecutor;
77      }
78  
79      /**
80       * Sets the {@link Function} to apply to the result of a watch request.
81       */
82      @SuppressWarnings("unchecked")
83      public <U> WatcherRequest<U> map(Function<? super T, ? extends U> mapper) {
84          if (this.mapper == null) {
85              this.mapper = (Function<Object, ? extends T>) mapper;
86          } else {
87              this.mapper = (Function<Object, ? extends T>) this.mapper.andThen(mapper);
88          }
89          return (WatcherRequest<U>) this;
90      }
91  
92      /**
93       * Sets the {@link Executor} to execute the {@link #map(Function)}.
94       */
95      public WatcherRequest<T> mapperExecutor(Executor executor) {
96          this.executor = executor;
97          return this;
98      }
99  
100     /**
101      * Sets the delay for sending the next watch request when the previous request succeeds.
102      */
103     public WatcherRequest<T> delayOnSuccess(Duration delayOnSuccess) {
104         requireNonNull(delayOnSuccess, "delayOnSuccess");
105         checkArgument(!delayOnSuccess.isNegative(),
106                       "delayOnSuccess: %s (expected: >= 0)", delayOnSuccess);
107         return delayOnSuccessMillis(delayOnSuccess.toMillis());
108     }
109 
110     /**
111      * Sets the delay in milliseconds for sending the next watch request when the previous request succeeds.
112      */
113     public WatcherRequest<T> delayOnSuccessMillis(long delayOnSuccessMillis) {
114         this.delayOnSuccessMillis = delayOnSuccessMillis;
115         checkArgument(delayOnSuccessMillis >= 0,
116                       "delayOnSuccessMillis: %s (expected: >= 0)", delayOnSuccessMillis);
117         return this;
118     }
119 
120     /**
121      * Sets the delays and multiplier which is used to calculate the delay
122      * for sending the next watch request when the previous request fails.
123      * Currently, it uses exponential backoff. File a feature request if you need another algorithm.
124      */
125     public WatcherRequest<T> backoffOnFailure(long initialDelayMillis, long maxDelayMillis,
126                                               double multiplier) {
127         checkArgument(initialDelayMillis >= 0, "initialDelayMillis: %s (expected: >= 0)", initialDelayMillis);
128         checkArgument(initialDelayMillis <= maxDelayMillis, "maxDelayMillis: %s (expected: >= %s)",
129                       maxDelayMillis, initialDelayMillis);
130         checkArgument(multiplier > 1.0, "multiplier: %s (expected: > 1.0)", multiplier);
131         this.initialDelayMillis = initialDelayMillis;
132         this.maxDelayMillis = maxDelayMillis;
133         this.multiplier = multiplier;
134         return this;
135     }
136 
137     /**
138      * Sets the jitter to apply the delay.
139      */
140     public WatcherRequest<T> jitterRate(double jitterRate) {
141         checkArgument(0.0 <= jitterRate && jitterRate <= 1.0,
142                       "jitterRate: %s (expected: >= 0.0 and <= 1.0)", jitterRate);
143         this.jitterRate = jitterRate;
144         return this;
145     }
146 
147     @Override
148     public WatcherRequest<T> timeout(Duration timeout) {
149         //noinspection unchecked
150         return (WatcherRequest<T>) super.timeout(timeout);
151     }
152 
153     @Override
154     public WatcherRequest<T> timeoutMillis(long timeoutMillis) {
155         //noinspection unchecked
156         return (WatcherRequest<T>) super.timeoutMillis(timeoutMillis);
157     }
158 
159     @Override
160     public WatcherRequest<T> errorOnEntryNotFound(boolean errorOnEntryNotFound) {
161         //noinspection unchecked
162         return (WatcherRequest<T>) super.errorOnEntryNotFound(errorOnEntryNotFound);
163     }
164 
165     /**
166      * Creates a new {@link Watcher} and starts to watch the target. The {@link Watcher} must be closed via
167      * {@link Watcher#close()} after use.
168      */
169     public Watcher<T> start() {
170         final String proName = centralDogmaRepo.projectName();
171         final String repoName = centralDogmaRepo.repositoryName();
172         final AbstractWatcher<T> watcher;
173         if (query != null) {
174             watcher = new FileWatcher<>(
175                     centralDogmaRepo.centralDogma(), blockingTaskExecutor, proName, repoName, query,
176                     timeoutMillis(), errorOnEntryNotFound(), mapper, executor, delayOnSuccessMillis,
177                     initialDelayMillis, maxDelayMillis, multiplier, jitterRate);
178         } else {
179             assert pathPattern != null;
180             watcher = new FilesWatcher<>(
181                     centralDogmaRepo.centralDogma(), blockingTaskExecutor, proName, repoName, pathPattern,
182                     timeoutMillis(), errorOnEntryNotFound(), mapper, executor, delayOnSuccessMillis,
183                     initialDelayMillis, maxDelayMillis, multiplier, jitterRate);
184         }
185         watcher.start();
186         return watcher;
187     }
188 }