1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.client.updater;
17  
18  import static com.google.common.base.Preconditions.checkArgument;
19  import static com.google.common.base.Preconditions.checkState;
20  import static com.google.common.base.Strings.isNullOrEmpty;
21  import static com.linecorp.centraldogma.internal.Json5.isJsonCompatible;
22  import static java.util.Objects.requireNonNull;
23  
24  import java.util.concurrent.TimeUnit;
25  import java.util.concurrent.TimeoutException;
26  import java.util.function.Consumer;
27  
28  import javax.inject.Inject;
29  import javax.inject.Named;
30  
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  
34  import com.fasterxml.jackson.core.JsonProcessingException;
35  import com.fasterxml.jackson.databind.JsonNode;
36  import com.fasterxml.jackson.databind.ObjectMapper;
37  
38  import com.linecorp.centraldogma.client.CentralDogma;
39  import com.linecorp.centraldogma.client.Latest;
40  import com.linecorp.centraldogma.client.Watcher;
41  import com.linecorp.centraldogma.common.CentralDogmaException;
42  import com.linecorp.centraldogma.common.Query;
43  
44  import javassist.util.proxy.ProxyFactory;
45  
46  /**
47   * Creates a new bean instance that mirrors its properties from Central Dogma.
48   *
49   * <p>In the following example, {@code mirroredFoo.getA()} and {@code getB()} will return the latest known
50   * values retrieved from {@code /myProject/myRepo/foo.json} along with the latest revision in Central Dogma.
51   * If the latest values are not available yet, the revision will be set to null.
52   *
53   * <pre>{@code
54   * > CentralDogma dogma = ...;
55   *
56   * > // Optionally, waits for the initial endpoints in order to fetch the first value without additional delay.
57   * > dogma.whenEndpointReady().get(10, TimeUnit.SECONDS);
58   *
59   * > CentralDogmaBeanFactory factory = new CentralDogmaBeanFactory(dogma, new ObjectMapper());
60   * > Foo mirroredFoo = factory.get(new Foo(), Foo.class);
61   * >
62   * > @CentralDogmaBean(project = "myProject",
63   * >                   repository = "myRepo",
64   * >                   path = "/foo.json")
65   * > public class Foo {
66   * >     private int a;
67   * >     private String b;
68   * >
69   * >     public int getA() { return a; }
70   * >     public void setA(int a) { this.a = a; }
71   * >     public String getB() { return b; }
72   * >     public void setB(String b) { this.b = b; }
73   * >     public Revision getRevision() { return null; }
74   * > }
75   * }</pre>
76   *
77   * <p>In the following example, callback will be called when a property is updated with a new value:
78   *
79   * <pre>{@code
80   * CentralDogmaBeanFactory factory = new CentralDogmaBeanFactory(dogma, new ObjectMapper());
81   * Consumer<Foo> fooUpdatedListener = (Foo f) -> {
82   *     System.out.println("foo has updated to: " + f);
83   * };
84   *
85   * Foo mirroredFoo = factory.get(new Foo(), Foo.class, fooUpdatedListener);
86   * }</pre>
87   */
88  @Named
89  public class CentralDogmaBeanFactory {
90  
91      private static final Logger logger = LoggerFactory.getLogger(CentralDogmaBeanFactory.class);
92  
93      private static final Class<?>[] EMPTY_TYPES = new Class<?>[0];
94      private static final Object[] EMPTY_ARGS = new Object[0];
95  
96      private final CentralDogma dogma;
97      private final ObjectMapper objectMapper;
98  
99      /**
100      * Creates a new factory instance.
101      */
102     @Inject
103     public CentralDogmaBeanFactory(CentralDogma dogma, ObjectMapper objectMapper) {
104         this.dogma = requireNonNull(dogma, "dogma");
105         this.objectMapper = requireNonNull(objectMapper, "objectMapper");
106     }
107 
108     /**
109      * Returns a newly-created bean instance with the settings specified by {@link CentralDogmaBean} annotation.
110      *
111      * @param defaultValue a Java bean annotated with {@link CentralDogmaBean}. The default value is used before
112      *                     initialization.
113      * @param beanType the type of {@code bean}
114      *
115      * @return a new Java bean whose getters return the latest known values mirrored from Central Dogma
116      */
117     public <T> T get(T defaultValue, Class<T> beanType) {
118         return get(defaultValue, beanType, (T x) -> {
119         }, CentralDogmaBeanConfig.EMPTY);
120     }
121 
122     /**
123      * Returns a newly-created bean instance with the settings specified by {@link CentralDogmaBean} annotation.
124      *
125      * @param defaultValue a Java bean annotated with {@link CentralDogmaBean}. The default value is used before
126      *                     initialization.
127      * @param beanType the type of {@code bean}
128      * @param changeListener the {@link Consumer} of {@code beanType}, invoked when {@code bean} is updated.
129      *                       Will consume the new value of the bean.
130      *
131      * @return a new Java bean whose getters return the latest known values mirrored from Central Dogma
132      */
133     public <T> T get(T defaultValue, Class<T> beanType, Consumer<T> changeListener) {
134         return get(defaultValue, beanType, changeListener, CentralDogmaBeanConfig.EMPTY);
135     }
136 
137     /**
138      * Returns a newly-created bean instance with some or all of its settings overridden.
139      *
140      * @param defaultValue a Java bean annotated with {@link CentralDogmaBean}. The default value is used before
141      *                     initialization.
142      * @param beanType the type of {@code bean}
143      * @param changeListener the {@link Consumer} of {@code beanType}, invoked when {@code bean} is updated.
144      *                       Will consume the new value of the bean.
145      * @param overrides the {@link CentralDogmaBeanConfig} whose properties will override the settings
146      *                  specified by the {@link CentralDogmaBean} annotation
147      *
148      * @return a new Java bean whose getters return the latest known values mirrored from Central Dogma
149      */
150     public <T> T get(T defaultValue, Class<T> beanType, Consumer<T> changeListener,
151                      CentralDogmaBeanConfig overrides) {
152         try {
153             return get(defaultValue, beanType, changeListener, overrides, 0L, TimeUnit.SECONDS);
154         } catch (InterruptedException | TimeoutException e) {
155             // when initialValueTimeoutMillis set to zero, this exception never happens in practice.
156             throw new RuntimeException("Error: unexpected exception caught", e);
157         }
158     }
159 
160     /**
161      * Returns a newly-created bean instance with the settings specified by {@link CentralDogmaBean} annotation.
162      *
163      * @param defaultValue a Java bean annotated with {@link CentralDogmaBean}. The default value is used before
164      *                     initialization.
165      * @param beanType the type of {@code bean}
166      * @param changeListener the {@link Consumer} of {@code beanType}, invoked when {@code bean} is updated.
167      *                       Will consume the new value of the bean.
168      * @param initialValueTimeout when a value larger than zero given to this argument, this method
169      *                            tries to wait for the initial value to be fetched until the timeout
170      * @param initialValueTimeoutUnit the {@link TimeUnit} of {@code initialValueTimeout}
171      *
172      * @return a new Java bean whose getters return the latest known values mirrored from Central Dogma
173      *
174      * @throws TimeoutException when {@code initialValueTimeoutMillis} is positive and
175      *                          it failed to obtain the initial value within configured timeout
176      * @throws InterruptedException when {@code initialValueTimeoutMillis} is positive and
177      *                              it got interrupted while waiting for the initial value
178      */
179     public <T> T get(T defaultValue, Class<T> beanType, Consumer<T> changeListener, long initialValueTimeout,
180                      TimeUnit initialValueTimeoutUnit)
181             throws TimeoutException, InterruptedException {
182         return get(defaultValue, beanType, changeListener, CentralDogmaBeanConfig.EMPTY,
183                    initialValueTimeout, initialValueTimeoutUnit);
184     }
185 
186     /**
187      * Returns a newly-created bean instance with some or all of its settings overridden.
188      *
189      * @param defaultValue a Java bean annotated with {@link CentralDogmaBean}. The default value is used before
190      *                     initialization.
191      * @param beanType the type of {@code bean}
192      * @param overrides the {@link CentralDogmaBeanConfig} whose properties will override the settings
193      *                  specified by the {@link CentralDogmaBean} annotation
194      *
195      * @return a new Java bean whose getters return the latest known values mirrored from Central Dogma
196      */
197     public <T> T get(T defaultValue, Class<T> beanType, CentralDogmaBeanConfig overrides) {
198         try {
199             return get(defaultValue, beanType, overrides, 0L, TimeUnit.SECONDS);
200         } catch (InterruptedException | TimeoutException e) {
201             // when initialValueTimeoutMillis set to zero, this exception never happens in practice.
202             throw new RuntimeException("Error: unexpected exception caught", e);
203         }
204     }
205 
206     /**
207      * Returns a newly-created bean instance with the settings specified by {@link CentralDogmaBean} annotation.
208      *
209      * @param defaultValue a Java bean annotated with {@link CentralDogmaBean}. The default value is used before
210      *                     initialization.
211      * @param beanType the type of {@code bean}
212      * @param initialValueTimeout when a value larger than zero given to this argument, this method
213      *                            tries to wait for the initial value to be fetched until the timeout
214      * @param initialValueTimeoutUnit the {@link TimeUnit} of {@code initialValueTimeout}
215      *
216      * @return a new Java bean whose getters return the latest known values mirrored from Central Dogma
217      *
218      * @throws TimeoutException when {@code initialValueTimeoutMillis} is positive and
219      *                          it failed to obtain the initial value within configured timeout
220      * @throws InterruptedException when {@code initialValueTimeoutMillis} is positive and
221      *                              it got interrupted while waiting for the initial value
222      */
223     public <T> T get(T defaultValue,
224                      Class<T> beanType,
225                      long initialValueTimeout,
226                      TimeUnit initialValueTimeoutUnit) throws TimeoutException, InterruptedException {
227         return get(defaultValue, beanType, CentralDogmaBeanConfig.EMPTY,
228                    initialValueTimeout, initialValueTimeoutUnit);
229     }
230 
231     /**
232      * Returns a newly-created bean instance with some or all of its settings overridden.
233      *
234      * @param defaultValue a Java bean annotated with {@link CentralDogmaBean}. The default value is used before
235      *                     initialization.
236      * @param beanType the type of {@code bean}
237      * @param overrides the {@link CentralDogmaBeanConfig} whose properties will override the settings
238      *                  specified by the {@link CentralDogmaBean} annotation
239      * @param initialValueTimeout when a value larger than zero given to this argument, this method
240      *                            tries to wait for the initial value to be fetched until the timeout
241      * @param initialValueTimeoutUnit the {@link TimeUnit} of {@code initialValueTimeout}
242      *
243      * @return a new Java bean whose getters return the latest known values mirrored from Central Dogma
244      *
245      * @throws TimeoutException when {@code initialValueTimeoutMillis} is positive and
246      *                          it failed to obtain the initial value within configured timeout
247      * @throws InterruptedException when {@code initialValueTimeoutMillis} is positive and
248      *                              it got interrupted while waiting for the initial value
249      */
250     public <T> T get(T defaultValue, Class<T> beanType, CentralDogmaBeanConfig overrides,
251                      long initialValueTimeout, TimeUnit initialValueTimeoutUnit)
252             throws TimeoutException, InterruptedException {
253         return get(defaultValue, beanType, (T x) -> {
254         }, overrides, initialValueTimeout, initialValueTimeoutUnit);
255     }
256 
257     /**
258      * Returns a newly-created bean instance with some or all of its settings overridden.
259      *
260      * @param defaultValue a Java bean annotated with {@link CentralDogmaBean}. The default value is used before
261      *                     initialization.
262      * @param beanType the type of {@code bean}
263      * @param changeListener the {@link Consumer} of {@code beanType}, invoked when {@code bean} is updated.
264      *                       Will consume the new value of the bean.
265      * @param overrides the {@link CentralDogmaBeanConfig} whose properties will override the settings
266      *                  specified by the {@link CentralDogmaBean} annotation
267      * @param initialValueTimeout when a value larger than zero given to this argument, this method
268      *                            tries to wait for the initial value to be fetched until the timeout
269      * @param initialValueTimeoutUnit the {@link TimeUnit} of {@code initialValueTimeout}
270      *
271      * @return a new Java bean whose getters return the latest known values mirrored from Central Dogma
272      *
273      * @throws TimeoutException when {@code initialValueTimeoutMillis} is positive and
274      *                          it failed to obtain the initial value within configured timeout
275      * @throws InterruptedException when {@code initialValueTimeoutMillis} is positive and
276      *                              it got interrupted while waiting for the initial value
277      */
278     @SuppressWarnings("unchecked")
279     public <T> T get(T defaultValue, Class<T> beanType, Consumer<T> changeListener,
280                      CentralDogmaBeanConfig overrides, long initialValueTimeout,
281                      TimeUnit initialValueTimeoutUnit)
282             throws TimeoutException, InterruptedException {
283         requireNonNull(defaultValue, "defaultValue");
284         requireNonNull(beanType, "beanType");
285         requireNonNull(overrides, "overrides");
286         requireNonNull(initialValueTimeoutUnit, "initialValueTimeoutUnit");
287 
288         final CentralDogmaBean centralDogmaBean = beanType.getAnnotation(CentralDogmaBean.class);
289         if (centralDogmaBean == null) {
290             throw new IllegalArgumentException("missing CentralDogmaBean annotation");
291         }
292 
293         final CentralDogmaBeanConfig settings = merge(convertToSettings(centralDogmaBean), overrides);
294         checkState(!isNullOrEmpty(settings.project().get()), "settings.projectName should non-null");
295         checkState(!isNullOrEmpty(settings.repository().get()), "settings.repositoryName should non-null");
296         checkState(!isNullOrEmpty(settings.path().get()), "settings.fileName should non-null");
297 
298         final Watcher<T> watcher =
299                 dogma.forRepo(settings.project().get(), settings.repository().get())
300                      .watcher(buildQuery(settings))
301                      .map(jsonNode -> {
302                          try {
303                              final T value = objectMapper.treeToValue(jsonNode, beanType);
304                              changeListener.accept(value);
305                              return value;
306                          } catch (JsonProcessingException e) {
307                              throw new IllegalStateException(
308                                      "Failed to convert a JSON node into: " + beanType.getName(), e);
309                          }
310                      })
311                      .start();
312 
313         if (initialValueTimeout > 0) {
314             final long t0 = System.nanoTime();
315             final Latest<T> latest;
316             try {
317                 latest = watcher.awaitInitialValue(initialValueTimeout, initialValueTimeoutUnit);
318             } catch (TimeoutException ex) {
319                 final long initialTimeoutMillis =
320                         TimeUnit.MILLISECONDS.convert(initialValueTimeout, initialValueTimeoutUnit);
321                 if (!dogma.whenEndpointReady().isDone()) {
322                     final String message =
323                             "Failed to resolve the initial endpoints of the given Central Dogma client in " +
324                             initialTimeoutMillis + " ms. You may want to increase 'initialValueTimeout' or " +
325                             "wait for the initial endpoints using 'CentralDogma.whenEndpointReady().get()' " +
326                             "before initializing this " + CentralDogmaBeanFactory.class.getSimpleName() + '.';
327                     throw new CentralDogmaException(message, ex);
328                 } else {
329                     throw ex;
330                 }
331             }
332             final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0);
333             logger.debug("Initial value of {} obtained in {} ms: rev={}",
334                          settings, elapsedMillis, latest.revision());
335         }
336 
337         final ProxyFactory factory = new ProxyFactory();
338         factory.setSuperclass(beanType);
339         factory.setFilter(method -> {
340             // Allow all non-parameter methods (getter/closeWatcher..)
341             if (method.getParameterCount() == 0) {
342                 return true;
343             }
344             // Allow methods have bidirectional property and looks like setter
345             return centralDogmaBean.bidirectional() && method.getParameterCount() == 1;
346         });
347 
348         try {
349             return (T) factory.create(EMPTY_TYPES, EMPTY_ARGS,
350                                       new CentralDogmaBeanMethodHandler<>(watcher, defaultValue));
351         } catch (Exception e) {
352             throw new RuntimeException(e);
353         }
354     }
355 
356     private static CentralDogmaBeanConfig convertToSettings(CentralDogmaBean property) {
357         return new CentralDogmaBeanConfigBuilder()
358                 .project(property.project())
359                 .repository(property.repository())
360                 .path(property.path())
361                 .jsonPath(property.jsonPath())
362                 .build();
363     }
364 
365     private static CentralDogmaBeanConfig merge(CentralDogmaBeanConfig defaultSettings,
366                                                 CentralDogmaBeanConfig overridden) {
367         return new CentralDogmaBeanConfigBuilder(defaultSettings).merge(overridden).build();
368     }
369 
370     private static Query<JsonNode> buildQuery(CentralDogmaBeanConfig config) {
371         final String path = config.path().get();
372         checkArgument(isJsonCompatible(path),
373                       "path: %s (expected: ends with '.json' or '.json5')", path);
374         return Query.ofJsonPath(path, config.jsonPath().orElse("$"));
375     }
376 }