1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.client;
17
18 import static com.google.common.base.MoreObjects.toStringHelper;
19 import static java.util.Objects.requireNonNull;
20
21 import java.util.List;
22 import java.util.Map;
23 import java.util.Map.Entry;
24 import java.util.Objects;
25 import java.util.concurrent.CompletableFuture;
26 import java.util.concurrent.CopyOnWriteArrayList;
27 import java.util.concurrent.Executor;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.function.BiConsumer;
30 import java.util.function.Function;
31
32 import javax.annotation.Nullable;
33
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import com.google.common.collect.Maps;
38
39 import com.linecorp.centraldogma.common.Revision;
40
41 final class MappingWatcher<T, U> implements Watcher<U> {
42
43 private static final Logger logger = LoggerFactory.getLogger(MappingWatcher.class);
44
45 static <T, U> MappingWatcher<T, U> of(Watcher<T> parent, Function<? super T, ? extends U> mapper,
46 Executor executor, boolean closeParentWhenClosing) {
47 requireNonNull(parent, "parent");
48 requireNonNull(mapper, "mapper");
49 requireNonNull(executor, "executor");
50
51 return new MappingWatcher<>(parent, mapper, executor, closeParentWhenClosing);
52 }
53
54 private final Watcher<T> parent;
55 private final Function<? super T, ? extends U> mapper;
56 private final Executor mapperExecutor;
57 private final boolean closeParentWhenClosing;
58 private final CompletableFuture<Latest<U>> initialValueFuture = new CompletableFuture<>();
59 private final List<Entry<BiConsumer<? super Revision, ? super U>, Executor>> updateListeners =
60 new CopyOnWriteArrayList<>();
61
62 @Nullable
63 private volatile Latest<U> mappedLatest;
64 private volatile boolean closed;
65
66 MappingWatcher(Watcher<T> parent, Function<? super T, ? extends U> mapper, Executor mapperExecutor,
67 boolean closeParentWhenClosing) {
68 this.parent = parent;
69 this.mapper = mapper;
70 this.mapperExecutor = mapperExecutor;
71 this.closeParentWhenClosing = closeParentWhenClosing;
72 parent.initialValueFuture().exceptionally(cause -> {
73 initialValueFuture.completeExceptionally(cause);
74 return null;
75 });
76 parent.watch((revision, value) -> {
77 if (closed) {
78 return;
79 }
80 final U mappedValue;
81 try {
82 mappedValue = mapper.apply(value);
83 } catch (Exception e) {
84 logger.warn("Unexpected exception is raised from mapper.apply(). mapper: {}", mapper, e);
85 if (!initialValueFuture.isDone()) {
86 initialValueFuture.completeExceptionally(e);
87 }
88 close();
89 return;
90 }
91 final Latest<U> oldLatest = mappedLatest;
92 if (oldLatest != null && Objects.equals(oldLatest.value(), mappedValue)) {
93 return;
94 }
95
96
97 final Latest<U> newLatest = new Latest<>(revision, mappedValue);
98 mappedLatest = newLatest;
99 notifyListeners(newLatest);
100 if (!initialValueFuture.isDone()) {
101 initialValueFuture.complete(newLatest);
102 }
103 }, mapperExecutor);
104 }
105
106 private void notifyListeners(Latest<U> latest) {
107 if (closed) {
108 return;
109 }
110
111 for (Map.Entry<BiConsumer<? super Revision, ? super U>, Executor> entry : updateListeners) {
112 final BiConsumer<? super Revision, ? super U> listener = entry.getKey();
113 final Executor executor = entry.getValue();
114 if (mapperExecutor == executor) {
115 notifyListener(latest, listener);
116 } else {
117 executor.execute(() -> notifyListener(latest, listener));
118 }
119 }
120 }
121
122 private void notifyListener(Latest<U> latest, BiConsumer<? super Revision, ? super U> listener) {
123 try {
124 listener.accept(latest.revision(), latest.value());
125 } catch (Exception e) {
126 logger.warn("Unexpected exception is raised from {}: rev={}",
127 listener, latest.revision(), e);
128 }
129 }
130
131 @Override
132 public ScheduledExecutorService watchScheduler() {
133 return parent.watchScheduler();
134 }
135
136 @Override
137 public CompletableFuture<Latest<U>> initialValueFuture() {
138 return initialValueFuture;
139 }
140
141 @Override
142 public Latest<U> latest() {
143 final Latest<U> mappedLatest = this.mappedLatest;
144 if (mappedLatest == null) {
145 throw new IllegalStateException("value not available yet");
146 }
147 return mappedLatest;
148 }
149
150 @Override
151 public void close() {
152 closed = true;
153 if (!initialValueFuture.isDone()) {
154 initialValueFuture.cancel(false);
155 }
156 if (closeParentWhenClosing) {
157 parent.close();
158 }
159 }
160
161 @Override
162 public void watch(BiConsumer<? super Revision, ? super U> listener) {
163 watch(listener, parent.watchScheduler());
164 }
165
166 @Override
167 public void watch(BiConsumer<? super Revision, ? super U> listener, Executor executor) {
168 requireNonNull(listener, "listener");
169 requireNonNull(executor, "executor");
170 updateListeners.add(Maps.immutableEntry(listener, executor));
171
172 final Latest<U> mappedLatest = this.mappedLatest;
173 if (mappedLatest != null) {
174
175
176
177
178
179
180
181 executor.execute(() -> listener.accept(mappedLatest.revision(), mappedLatest.value()));
182 }
183 }
184
185 @Override
186 public String toString() {
187 return toStringHelper(this)
188 .add("parent", parent)
189 .add("mapper", mapper)
190 .add("closed", closed)
191 .toString();
192 }
193 }