1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.client;
17  
18  import static java.util.Objects.requireNonNull;
19  
20  import java.util.concurrent.CancellationException;
21  import java.util.concurrent.CompletableFuture;
22  import java.util.concurrent.ExecutionException;
23  import java.util.concurrent.Executor;
24  import java.util.concurrent.ScheduledExecutorService;
25  import java.util.concurrent.TimeUnit;
26  import java.util.concurrent.TimeoutException;
27  import java.util.function.BiConsumer;
28  import java.util.function.Consumer;
29  import java.util.function.Function;
30  
31  import javax.annotation.Nullable;
32  
33  import com.fasterxml.jackson.databind.JsonNode;
34  import com.fasterxml.jackson.databind.node.MissingNode;
35  
36  import com.linecorp.centraldogma.common.EntryNotFoundException;
37  import com.linecorp.centraldogma.common.PathPattern;
38  import com.linecorp.centraldogma.common.Query;
39  import com.linecorp.centraldogma.common.Revision;
40  
41  /**
42   * Watches the changes of a repository or a file. The {@link Watcher} must be closed via
43   * {@link Watcher#close()} after use.
44   *
45   * @param <T> the watch result type
46   */
47  public interface Watcher<T> extends AutoCloseable {
48  
49      /**
50       * Creates a forked {@link Watcher} based on an existing {@link JsonNode}-watching {@link Watcher}.
51       *
52       * @param jsonPointer a <a href="https://tools.ietf.org/html/rfc6901">JSON pointer</a> that is encoded
53       *
54       * @return A new child {@link Watcher}, whose transformation is a
55       *         <a href="https://tools.ietf.org/html/rfc6901">JSON pointer</a> query.
56       */
57      static Watcher<JsonNode> atJsonPointer(Watcher<JsonNode> watcher, String jsonPointer) {
58          requireNonNull(watcher, "watcher");
59          requireNonNull(jsonPointer, "jsonPointer");
60          return watcher.newChild(new Function<JsonNode, JsonNode>() {
61              @Override
62              public JsonNode apply(JsonNode node) {
63                  if (node == null) {
64                      return MissingNode.getInstance();
65                  } else {
66                      return node.at(jsonPointer);
67                  }
68              }
69  
70              @Override
71              public String toString() {
72                  return "JSON pointer " + jsonPointer;
73              }
74          });
75      }
76  
77      /**
78       * Returns the {@link ScheduledExecutorService} that is used to schedule watch.
79       */
80      ScheduledExecutorService watchScheduler();
81  
82      /**
83       * Returns the {@link CompletableFuture} which is completed when the initial value retrieval is done
84       * successfully.
85       */
86      CompletableFuture<Latest<T>> initialValueFuture();
87  
88      /**
89       * Waits for the initial value to be available.
90       *
91       * @return the {@link Latest} object that contains the initial value and the {@link Revision} where
92       *         the initial value came from.
93       *
94       * @throws CancellationException if this watcher has been closed by {@link #close()}
95       * @throws EntryNotFoundException if {@code errorOnEntryNotFound} is {@code true} and entry isn't found on
96       *         watching the initial value
97       */
98      default Latest<T> awaitInitialValue() throws InterruptedException {
99          try {
100             return initialValueFuture().get();
101         } catch (ExecutionException e) {
102             throw new Error(e);
103         }
104     }
105 
106     /**
107      * Waits for the initial value to be available. Specify the default value with
108      * {@link #awaitInitialValue(long, TimeUnit, Object)} or make sure to handle
109      * a {@link TimeoutException} properly if the initial value must be available
110      * even when the server is unavailable.
111      *
112      * @param timeout the maximum amount of time to wait for the initial value. Note that timeout is basically
113      *                a trade-off. If you specify a smaller timeout, this method will have a higher chance of
114      *                throwing a {@link TimeoutException} when the server does not respond in time.
115      *                If you specify a larger timeout, you will have a better chance of successful retrieval.
116      *                It is generally recommended to use a value not less than
117      *                {@value WatchConstants#RECOMMENDED_AWAIT_TIMEOUT_SECONDS} seconds so that
118      *                the client can retry at least a few times before timing out.
119      *                Consider using {@link #awaitInitialValue(long, TimeUnit, Object)} with a sensible default
120      *                value if you cannot tolerate a timeout or need to use a small timeout.
121      * @param unit the {@link TimeUnit} of {@code timeout}.
122      *
123      * @return the {@link Latest} object that contains the initial value and the {@link Revision} where
124      *         the initial value came from.
125      *
126      * @throws CancellationException if this watcher has been closed by {@link #close()}
127      * @throws EntryNotFoundException if {@code errorOnEntryNotFound} is {@code true} and entry isn't found on
128      *                                watching the initial value
129      * @throws TimeoutException if failed to retrieve the initial value within the specified timeout
130      */
131     default Latest<T> awaitInitialValue(long timeout, TimeUnit unit) throws InterruptedException,
132                                                                             TimeoutException {
133         requireNonNull(unit, "unit");
134         try {
135             return initialValueFuture().get(timeout, unit);
136         } catch (TimeoutException e) {
137             final long initialTimeoutMillis = TimeUnit.MILLISECONDS.convert(timeout, unit);
138             throw new TimeoutException("Failed to get the initial value in " + initialTimeoutMillis + " ms. " +
139                                        "It's probably because the timeout value is too small or " +
140                                        "the target entry doesn't exist. Please consider using the " +
141                                        "'errorOnEntryNotFound' option to get an " +
142                                        EntryNotFoundException.class.getSimpleName() + " for a missing entry.");
143         } catch (ExecutionException e) {
144             throw new Error(e);
145         }
146     }
147 
148     /**
149      * Waits for the initial value to be available and returns the specified default value if failed
150      * to retrieve the initial value from the server.
151      *
152      * @param timeout the maximum amount of time to wait for the initial value. Note that timeout is basically
153      *                a trade-off. If you specify a smaller timeout, this method will have a higher chance of
154      *                falling back to the {@code defaultValue} when the server does not respond in time.
155      *                If you specify a larger timeout, you will have a better chance of retrieving
156      *                an up-to-date initial value. It is generally recommended to use a value not less than
157      *                {@value WatchConstants#RECOMMENDED_AWAIT_TIMEOUT_SECONDS} seconds
158      *                so that the client can retry at least a few times before timing out.
159      * @param unit the {@link TimeUnit} of {@code timeout}.
160      * @param defaultValue the default value to use when timed out.
161      *
162      * @return the initial value, or the default value if timed out.
163      *
164      * @throws CancellationException if this watcher has been closed by {@link #close()}
165      * @throws EntryNotFoundException if {@code errorOnEntryNotFound} is {@code true} and entry isn't found on
166      *                                watching the initial value
167      */
168     @Nullable
169     default T awaitInitialValue(long timeout, TimeUnit unit, @Nullable T defaultValue)
170             throws InterruptedException {
171         try {
172             return awaitInitialValue(timeout, unit).value();
173         } catch (TimeoutException e) {
174             return defaultValue;
175         }
176     }
177 
178     /**
179      * Returns the latest {@link Revision} and value of {@code watchFile()} or {@code watchRepository()} result.
180      *
181      * @throws IllegalStateException if the value is not available yet.
182      *                               Use {@link #awaitInitialValue(long, TimeUnit)} first or
183      *                               add a listener using {@link #watch(BiConsumer)} instead.
184      */
185     Latest<T> latest();
186 
187     /**
188      * Returns the latest value of {@code watchFile()} or {@code watchRepository()} result.
189      *
190      * @throws IllegalStateException if the value is not available yet.
191      *                               Use {@link #awaitInitialValue(long, TimeUnit)} first or
192      *                               add a listener using {@link #watch(BiConsumer)} instead.
193      */
194     @Nullable
195     default T latestValue() {
196         return latest().value();
197     }
198 
199     /**
200      * Returns the latest value of {@code watchFile()} or {@code watchRepository()} result.
201      *
202      * @param defaultValue the default value which is returned when the value is not available yet
203      */
204     @Nullable
205     default T latestValue(@Nullable T defaultValue) {
206         final CompletableFuture<Latest<T>> initialValueFuture = initialValueFuture();
207         if (initialValueFuture.isDone() && !initialValueFuture.isCompletedExceptionally()) {
208             return latest().value();
209         } else {
210             return defaultValue;
211         }
212     }
213 
214     /**
215      * Stops watching the file specified in the {@link Query} or the {@link PathPattern} in the repository.
216      */
217     @Override
218     void close();
219 
220     /**
221      * Registers a {@link BiConsumer} that will be invoked when the value of the watched entry becomes
222      * available or changes.
223      *
224      * <p>Note that the specified {@link BiConsumer} is not called when {@code errorOnEntryNotFound} is
225      * {@code true} and the target doesn't exist in the Central Dogma server when this {@link Watcher} sends
226      * the initial watch call. You should use {@link #initialValueFuture()} or {@link #awaitInitialValue()} to
227      * check the target exists or not.
228      */
229     void watch(BiConsumer<? super Revision, ? super T> listener);
230 
231     /**
232      * Registers a {@link BiConsumer} that will be invoked when the value of the watched entry becomes
233      * available or changes.
234      *
235      * <p>Note that the specified {@link BiConsumer} is not called when {@code errorOnEntryNotFound} is
236      * {@code true} and the target doesn't exist in the Central Dogma server when this {@link Watcher} sends
237      * the initial watch call. You should use {@link #initialValueFuture()} or {@link #awaitInitialValue()} to
238      * check the target exists or not.
239      *
240      * @param executor the {@link Executor} that executes the {@link BiConsumer}
241      */
242     void watch(BiConsumer<? super Revision, ? super T> listener, Executor executor);
243 
244     /**
245      * Registers a {@link Consumer} that will be invoked when the value of the watched entry becomes available
246      * or changes.
247      */
248     default void watch(Consumer<? super T> listener) {
249         requireNonNull(listener, "listener");
250         watch((revision, value) -> listener.accept(value));
251     }
252 
253     /**
254      * Registers a {@link Consumer} that will be invoked when the value of the watched entry becomes available
255      * or changes.
256      */
257     default void watch(Consumer<? super T> listener, Executor executor) {
258         requireNonNull(listener, "listener");
259         requireNonNull(executor, "executor");
260         watch((revision, value) -> listener.accept(value), executor);
261     }
262 
263     /**
264      * Returns a {@link Watcher} that applies the {@link Function} for the {@link Latest#value()}.
265      */
266     default <U> Watcher<U> newChild(Function<? super T, ? extends U> mapper) {
267         return newChild(mapper, watchScheduler());
268     }
269 
270     /**
271      * Returns a {@link Watcher} that applies the {@link Function} for the {@link Latest#value()}.
272      */
273     default <U> Watcher<U> newChild(Function<? super T, ? extends U> mapper, Executor executor) {
274         requireNonNull(mapper, "mapper");
275         requireNonNull(executor, "executor");
276         return MappingWatcher.of(this, mapper, executor, false);
277     }
278 }