1 /* 2 * Copyright 2017 LINE Corporation 3 * 4 * LINE Corporation licenses this file to you under the Apache License, 5 * version 2.0 (the "License"); you may not use this file except in compliance 6 * with the License. You may obtain a copy of the License at: 7 * 8 * https://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations 14 * under the License. 15 */ 16 package com.linecorp.centraldogma.client; 17 18 import static java.util.Objects.requireNonNull; 19 20 import java.util.concurrent.CancellationException; 21 import java.util.concurrent.CompletableFuture; 22 import java.util.concurrent.ExecutionException; 23 import java.util.concurrent.Executor; 24 import java.util.concurrent.ScheduledExecutorService; 25 import java.util.concurrent.TimeUnit; 26 import java.util.concurrent.TimeoutException; 27 import java.util.function.BiConsumer; 28 import java.util.function.Consumer; 29 import java.util.function.Function; 30 31 import javax.annotation.Nullable; 32 33 import com.fasterxml.jackson.databind.JsonNode; 34 import com.fasterxml.jackson.databind.node.MissingNode; 35 36 import com.linecorp.centraldogma.common.EntryNotFoundException; 37 import com.linecorp.centraldogma.common.PathPattern; 38 import com.linecorp.centraldogma.common.Query; 39 import com.linecorp.centraldogma.common.Revision; 40 41 /** 42 * Watches the changes of a repository or a file. The {@link Watcher} must be closed via 43 * {@link Watcher#close()} after use. 44 * 45 * @param <T> the watch result type 46 */ 47 public interface Watcher<T> extends AutoCloseable { 48 49 /** 50 * Creates a forked {@link Watcher} based on an existing {@link JsonNode}-watching {@link Watcher}. 51 * 52 * @param jsonPointer a <a href="https://tools.ietf.org/html/rfc6901">JSON pointer</a> that is encoded 53 * 54 * @return A new child {@link Watcher}, whose transformation is a 55 * <a href="https://tools.ietf.org/html/rfc6901">JSON pointer</a> query. 56 */ 57 static Watcher<JsonNode> atJsonPointer(Watcher<JsonNode> watcher, String jsonPointer) { 58 requireNonNull(watcher, "watcher"); 59 requireNonNull(jsonPointer, "jsonPointer"); 60 return watcher.newChild(new Function<JsonNode, JsonNode>() { 61 @Override 62 public JsonNode apply(JsonNode node) { 63 if (node == null) { 64 return MissingNode.getInstance(); 65 } else { 66 return node.at(jsonPointer); 67 } 68 } 69 70 @Override 71 public String toString() { 72 return "JSON pointer " + jsonPointer; 73 } 74 }); 75 } 76 77 /** 78 * Returns the {@link ScheduledExecutorService} that is used to schedule watch. 79 */ 80 ScheduledExecutorService watchScheduler(); 81 82 /** 83 * Returns the {@link CompletableFuture} which is completed when the initial value retrieval is done 84 * successfully. 85 */ 86 CompletableFuture<Latest<T>> initialValueFuture(); 87 88 /** 89 * Waits for the initial value to be available. 90 * 91 * @return the {@link Latest} object that contains the initial value and the {@link Revision} where 92 * the initial value came from. 93 * 94 * @throws CancellationException if this watcher has been closed by {@link #close()} 95 * @throws EntryNotFoundException if {@code errorOnEntryNotFound} is {@code true} and entry isn't found on 96 * watching the initial value 97 */ 98 default Latest<T> awaitInitialValue() throws InterruptedException { 99 try { 100 return initialValueFuture().get(); 101 } catch (ExecutionException e) { 102 throw new Error(e); 103 } 104 } 105 106 /** 107 * Waits for the initial value to be available. Specify the default value with 108 * {@link #awaitInitialValue(long, TimeUnit, Object)} or make sure to handle 109 * a {@link TimeoutException} properly if the initial value must be available 110 * even when the server is unavailable. 111 * 112 * @param timeout the maximum amount of time to wait for the initial value. Note that timeout is basically 113 * a trade-off. If you specify a smaller timeout, this method will have a higher chance of 114 * throwing a {@link TimeoutException} when the server does not respond in time. 115 * If you specify a larger timeout, you will have a better chance of successful retrieval. 116 * It is generally recommended to use a value not less than 117 * {@value WatchConstants#RECOMMENDED_AWAIT_TIMEOUT_SECONDS} seconds so that 118 * the client can retry at least a few times before timing out. 119 * Consider using {@link #awaitInitialValue(long, TimeUnit, Object)} with a sensible default 120 * value if you cannot tolerate a timeout or need to use a small timeout. 121 * @param unit the {@link TimeUnit} of {@code timeout}. 122 * 123 * @return the {@link Latest} object that contains the initial value and the {@link Revision} where 124 * the initial value came from. 125 * 126 * @throws CancellationException if this watcher has been closed by {@link #close()} 127 * @throws EntryNotFoundException if {@code errorOnEntryNotFound} is {@code true} and entry isn't found on 128 * watching the initial value 129 * @throws TimeoutException if failed to retrieve the initial value within the specified timeout 130 */ 131 default Latest<T> awaitInitialValue(long timeout, TimeUnit unit) throws InterruptedException, 132 TimeoutException { 133 requireNonNull(unit, "unit"); 134 try { 135 return initialValueFuture().get(timeout, unit); 136 } catch (TimeoutException e) { 137 final long initialTimeoutMillis = TimeUnit.MILLISECONDS.convert(timeout, unit); 138 throw new TimeoutException("Failed to get the initial value in " + initialTimeoutMillis + " ms. " + 139 "It's probably because the timeout value is too small or " + 140 "the target entry doesn't exist. Please consider using the " + 141 "'errorOnEntryNotFound' option to get an " + 142 EntryNotFoundException.class.getSimpleName() + " for a missing entry."); 143 } catch (ExecutionException e) { 144 throw new Error(e); 145 } 146 } 147 148 /** 149 * Waits for the initial value to be available and returns the specified default value if failed 150 * to retrieve the initial value from the server. 151 * 152 * @param timeout the maximum amount of time to wait for the initial value. Note that timeout is basically 153 * a trade-off. If you specify a smaller timeout, this method will have a higher chance of 154 * falling back to the {@code defaultValue} when the server does not respond in time. 155 * If you specify a larger timeout, you will have a better chance of retrieving 156 * an up-to-date initial value. It is generally recommended to use a value not less than 157 * {@value WatchConstants#RECOMMENDED_AWAIT_TIMEOUT_SECONDS} seconds 158 * so that the client can retry at least a few times before timing out. 159 * @param unit the {@link TimeUnit} of {@code timeout}. 160 * @param defaultValue the default value to use when timed out. 161 * 162 * @return the initial value, or the default value if timed out. 163 * 164 * @throws CancellationException if this watcher has been closed by {@link #close()} 165 * @throws EntryNotFoundException if {@code errorOnEntryNotFound} is {@code true} and entry isn't found on 166 * watching the initial value 167 */ 168 @Nullable 169 default T awaitInitialValue(long timeout, TimeUnit unit, @Nullable T defaultValue) 170 throws InterruptedException { 171 try { 172 return awaitInitialValue(timeout, unit).value(); 173 } catch (TimeoutException e) { 174 return defaultValue; 175 } 176 } 177 178 /** 179 * Returns the latest {@link Revision} and value of {@code watchFile()} or {@code watchRepository()} result. 180 * 181 * @throws IllegalStateException if the value is not available yet. 182 * Use {@link #awaitInitialValue(long, TimeUnit)} first or 183 * add a listener using {@link #watch(BiConsumer)} instead. 184 */ 185 Latest<T> latest(); 186 187 /** 188 * Returns the latest value of {@code watchFile()} or {@code watchRepository()} result. 189 * 190 * @throws IllegalStateException if the value is not available yet. 191 * Use {@link #awaitInitialValue(long, TimeUnit)} first or 192 * add a listener using {@link #watch(BiConsumer)} instead. 193 */ 194 @Nullable 195 default T latestValue() { 196 return latest().value(); 197 } 198 199 /** 200 * Returns the latest value of {@code watchFile()} or {@code watchRepository()} result. 201 * 202 * @param defaultValue the default value which is returned when the value is not available yet 203 */ 204 @Nullable 205 default T latestValue(@Nullable T defaultValue) { 206 final CompletableFuture<Latest<T>> initialValueFuture = initialValueFuture(); 207 if (initialValueFuture.isDone() && !initialValueFuture.isCompletedExceptionally()) { 208 return latest().value(); 209 } else { 210 return defaultValue; 211 } 212 } 213 214 /** 215 * Stops watching the file specified in the {@link Query} or the {@link PathPattern} in the repository. 216 */ 217 @Override 218 void close(); 219 220 /** 221 * Registers a {@link BiConsumer} that will be invoked when the value of the watched entry becomes 222 * available or changes. 223 * 224 * <p>Note that the specified {@link BiConsumer} is not called when {@code errorOnEntryNotFound} is 225 * {@code true} and the target doesn't exist in the Central Dogma server when this {@link Watcher} sends 226 * the initial watch call. You should use {@link #initialValueFuture()} or {@link #awaitInitialValue()} to 227 * check the target exists or not. 228 */ 229 void watch(BiConsumer<? super Revision, ? super T> listener); 230 231 /** 232 * Registers a {@link BiConsumer} that will be invoked when the value of the watched entry becomes 233 * available or changes. 234 * 235 * <p>Note that the specified {@link BiConsumer} is not called when {@code errorOnEntryNotFound} is 236 * {@code true} and the target doesn't exist in the Central Dogma server when this {@link Watcher} sends 237 * the initial watch call. You should use {@link #initialValueFuture()} or {@link #awaitInitialValue()} to 238 * check the target exists or not. 239 * 240 * @param executor the {@link Executor} that executes the {@link BiConsumer} 241 */ 242 void watch(BiConsumer<? super Revision, ? super T> listener, Executor executor); 243 244 /** 245 * Registers a {@link Consumer} that will be invoked when the value of the watched entry becomes available 246 * or changes. 247 */ 248 default void watch(Consumer<? super T> listener) { 249 requireNonNull(listener, "listener"); 250 watch((revision, value) -> listener.accept(value)); 251 } 252 253 /** 254 * Registers a {@link Consumer} that will be invoked when the value of the watched entry becomes available 255 * or changes. 256 */ 257 default void watch(Consumer<? super T> listener, Executor executor) { 258 requireNonNull(listener, "listener"); 259 requireNonNull(executor, "executor"); 260 watch((revision, value) -> listener.accept(value), executor); 261 } 262 263 /** 264 * Returns a {@link Watcher} that applies the {@link Function} for the {@link Latest#value()}. 265 */ 266 default <U> Watcher<U> newChild(Function<? super T, ? extends U> mapper) { 267 return newChild(mapper, watchScheduler()); 268 } 269 270 /** 271 * Returns a {@link Watcher} that applies the {@link Function} for the {@link Latest#value()}. 272 */ 273 default <U> Watcher<U> newChild(Function<? super T, ? extends U> mapper, Executor executor) { 274 requireNonNull(mapper, "mapper"); 275 requireNonNull(executor, "executor"); 276 return MappingWatcher.of(this, mapper, executor, false); 277 } 278 }