1   /*
2    * Copyright 2019 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.client;
17  
18  import static com.google.common.base.MoreObjects.toStringHelper;
19  import static java.util.Objects.requireNonNull;
20  
21  import java.util.List;
22  import java.util.Map;
23  import java.util.Map.Entry;
24  import java.util.Objects;
25  import java.util.concurrent.CompletableFuture;
26  import java.util.concurrent.CopyOnWriteArrayList;
27  import java.util.concurrent.Executor;
28  import java.util.concurrent.ScheduledExecutorService;
29  import java.util.function.BiConsumer;
30  import java.util.function.Function;
31  
32  import javax.annotation.Nullable;
33  
34  import org.slf4j.Logger;
35  import org.slf4j.LoggerFactory;
36  
37  import com.google.common.collect.Maps;
38  
39  import com.linecorp.centraldogma.common.Revision;
40  
41  final class MappingWatcher<T, U> implements Watcher<U> {
42  
43      private static final Logger logger = LoggerFactory.getLogger(MappingWatcher.class);
44  
45      static <T, U> MappingWatcher<T, U> of(Watcher<T> parent, Function<? super T, ? extends U> mapper,
46                                            Executor executor, boolean closeParentWhenClosing) {
47          requireNonNull(parent, "parent");
48          requireNonNull(mapper, "mapper");
49          requireNonNull(executor, "executor");
50          // TODO(minwoo): extract mapper function and combine it with the new mapper.
51          return new MappingWatcher<>(parent, mapper, executor, closeParentWhenClosing);
52      }
53  
54      private final Watcher<T> parent;
55      private final Function<? super T, ? extends U> mapper;
56      private final Executor mapperExecutor;
57      private final boolean closeParentWhenClosing;
58      private final CompletableFuture<Latest<U>> initialValueFuture = new CompletableFuture<>();
59      private final List<Entry<BiConsumer<? super Revision, ? super U>, Executor>> updateListeners =
60              new CopyOnWriteArrayList<>();
61  
62      @Nullable
63      private volatile Latest<U> mappedLatest;
64      private volatile boolean closed;
65  
66      MappingWatcher(Watcher<T> parent, Function<? super T, ? extends U> mapper, Executor mapperExecutor,
67                     boolean closeParentWhenClosing) {
68          this.parent = parent;
69          this.mapper = mapper;
70          this.mapperExecutor = mapperExecutor;
71          this.closeParentWhenClosing = closeParentWhenClosing;
72          parent.initialValueFuture().exceptionally(cause -> {
73              initialValueFuture.completeExceptionally(cause);
74              return null;
75          });
76          parent.watch((revision, value) -> {
77              if (closed) {
78                  return;
79              }
80              final U mappedValue;
81              try {
82                  mappedValue = mapper.apply(value);
83              } catch (Exception e) {
84                  logger.warn("Unexpected exception is raised from mapper.apply(). mapper: {}", mapper, e);
85                  if (!initialValueFuture.isDone()) {
86                      initialValueFuture.completeExceptionally(e);
87                  }
88                  close();
89                  return;
90              }
91              final Latest<U> oldLatest = mappedLatest;
92              if (oldLatest != null && Objects.equals(oldLatest.value(), mappedValue)) {
93                  return;
94              }
95  
96              // mappedValue can be nullable which is fine.
97              final Latest<U> newLatest = new Latest<>(revision, mappedValue);
98              mappedLatest = newLatest;
99              notifyListeners(newLatest);
100             if (!initialValueFuture.isDone()) {
101                 initialValueFuture.complete(newLatest);
102             }
103         }, mapperExecutor);
104     }
105 
106     private void notifyListeners(Latest<U> latest) {
107         if (closed) {
108             return;
109         }
110 
111         for (Map.Entry<BiConsumer<? super Revision, ? super U>, Executor> entry : updateListeners) {
112             final BiConsumer<? super Revision, ? super U> listener = entry.getKey();
113             final Executor executor = entry.getValue();
114             if (mapperExecutor == executor) {
115                 notifyListener(latest, listener);
116             } else {
117                 executor.execute(() -> notifyListener(latest, listener));
118             }
119         }
120     }
121 
122     private void notifyListener(Latest<U> latest, BiConsumer<? super Revision, ? super U> listener) {
123         try {
124             listener.accept(latest.revision(), latest.value());
125         } catch (Exception e) {
126             logger.warn("Unexpected exception is raised from {}: rev={}",
127                         listener, latest.revision(), e);
128         }
129     }
130 
131     @Override
132     public ScheduledExecutorService watchScheduler() {
133         return parent.watchScheduler();
134     }
135 
136     @Override
137     public CompletableFuture<Latest<U>> initialValueFuture() {
138         return initialValueFuture;
139     }
140 
141     @Override
142     public Latest<U> latest() {
143         final Latest<U> mappedLatest = this.mappedLatest;
144         if (mappedLatest == null) {
145             throw new IllegalStateException("value not available yet");
146         }
147         return mappedLatest;
148     }
149 
150     @Override
151     public void close() {
152         closed = true;
153         if (!initialValueFuture.isDone()) {
154             initialValueFuture.cancel(false);
155         }
156         if (closeParentWhenClosing) {
157             parent.close();
158         }
159     }
160 
161     @Override
162     public void watch(BiConsumer<? super Revision, ? super U> listener) {
163         watch(listener, parent.watchScheduler());
164     }
165 
166     @Override
167     public void watch(BiConsumer<? super Revision, ? super U> listener, Executor executor) {
168         requireNonNull(listener, "listener");
169         requireNonNull(executor, "executor");
170         updateListeners.add(Maps.immutableEntry(listener, executor));
171 
172         final Latest<U> mappedLatest = this.mappedLatest;
173         if (mappedLatest != null) {
174             // There's a chance that listener.accept(...) is called twice for the same value
175             // if this watch method is called:
176             // - after "mappedLatest = newLatest;" is invoked.
177             // - and before notifyListener() is called.
178             // However, it's such a rare case and we usually call `watch` method after creating a Watcher,
179             // which means mappedLatest is probably not set yet, so we don't use a lock to guarantee
180             // the atomicity.
181             executor.execute(() -> listener.accept(mappedLatest.revision(), mappedLatest.value()));
182         }
183     }
184 
185     @Override
186     public String toString() {
187         return toStringHelper(this)
188                 .add("parent", parent)
189                 .add("mapper", mapper)
190                 .add("closed", closed)
191                 .toString();
192     }
193 }