1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.client;
17  
18  import static com.google.common.base.Preconditions.checkArgument;
19  import static com.google.common.base.Preconditions.checkNotNull;
20  import static com.google.common.base.Preconditions.checkState;
21  import static java.util.Objects.requireNonNull;
22  
23  import java.io.IOException;
24  import java.net.InetSocketAddress;
25  import java.net.URI;
26  import java.net.URL;
27  import java.time.Duration;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.Enumeration;
31  import java.util.HashMap;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.concurrent.TimeUnit;
36  
37  import javax.annotation.Nullable;
38  
39  import com.fasterxml.jackson.core.type.TypeReference;
40  import com.fasterxml.jackson.databind.ObjectMapper;
41  import com.google.common.collect.ImmutableList;
42  import com.google.common.collect.ImmutableSet;
43  import com.google.common.collect.Iterables;
44  import com.google.common.net.InetAddresses;
45  
46  import com.linecorp.centraldogma.common.RevisionNotFoundException;
47  import com.linecorp.centraldogma.internal.CsrfToken;
48  
49  import io.micrometer.core.instrument.MeterRegistry;
50  
51  /**
52   * Builds a {@link CentralDogma} client.
53   */
54  public abstract class AbstractCentralDogmaBuilder<B extends AbstractCentralDogmaBuilder<B>> {
55  
56      private static final String TEST_PROFILE_RESOURCE_PATH = "centraldogma-profiles-test.json";
57      private static final String PROFILE_RESOURCE_PATH = "centraldogma-profiles.json";
58      private static final List<String> DEFAULT_PROFILE_RESOURCE_PATHS =
59              ImmutableList.of(TEST_PROFILE_RESOURCE_PATH, PROFILE_RESOURCE_PATH);
60  
61      static final int DEFAULT_PORT = 36462;
62  
63      private static final int DEFAULT_MAX_NUM_RETRIES_ON_REPLICATION_LAG = 5;
64      private static final int DEFAULT_RETRY_INTERVAL_ON_REPLICATION_LAG_SECONDS = 2;
65  
66      private ImmutableSet<InetSocketAddress> hosts = ImmutableSet.of();
67      private boolean useTls;
68      private List<String> profileResourcePaths = DEFAULT_PROFILE_RESOURCE_PATHS;
69      @Nullable
70      private String selectedProfile;
71      private String accessToken = CsrfToken.ANONYMOUS;
72      private int maxNumRetriesOnReplicationLag = DEFAULT_MAX_NUM_RETRIES_ON_REPLICATION_LAG;
73      private long retryIntervalOnReplicationLagMillis =
74              TimeUnit.SECONDS.toMillis(DEFAULT_RETRY_INTERVAL_ON_REPLICATION_LAG_SECONDS);
75      @Nullable
76      private MeterRegistry meterRegistry;
77  
78      /**
79       * Returns {@code this}.
80       */
81      @SuppressWarnings("unchecked")
82      protected final B self() {
83          return (B) this;
84      }
85  
86      /**
87       * Adds the {@link URI} of the Central Dogma server.
88       *
89       * @param uri the URI of the Central Dogma server. e.g.
90       *            {@code tbinary+http://example.com:36462/cd/thrift/v1}
91       *
92       * @deprecated Use {@link #host(String)} or {@link #profile(String...)}.
93       */
94      @Deprecated
95      public final B uri(String uri) {
96          final URI parsed = URI.create(requireNonNull(uri, "uri"));
97          final String host = parsed.getHost();
98          final int port = parsed.getPort();
99          checkArgument(host != null, "uri: %s (must contain a host part)", uri);
100         if (port < 0) {
101             host(host);
102         } else {
103             host(host, port);
104         }
105         return self();
106     }
107 
108     /**
109      * Adds the host name or IP address of the Central Dogma Server and uses the default port number of
110      * {@value #DEFAULT_PORT}.
111      *
112      * @param host the host name or IP address of the Central Dogma server
113      */
114     public final B host(String host) {
115         return host(host, DEFAULT_PORT);
116     }
117 
118     /**
119      * Adds the host name (or IP address) and the port number of the Central Dogma server.
120      *
121      * @param host the host name or IP address of the Central Dogma server
122      * @param port the port number of the Central Dogma server
123      */
124     public final B host(String host, int port) {
125         requireNonNull(host, "host");
126         checkArgument(!host.startsWith("group:"), "host: %s (must not start with 'group:')", host);
127         checkArgument(port >= 1 && port < 65536, "port: %s (expected: 1 .. 65535)", port);
128 
129         final InetSocketAddress addr = newEndpoint(host, port);
130         checkState(selectedProfile == null, "profile() and host() cannot be used together.");
131         hosts = ImmutableSet.<InetSocketAddress>builder().addAll(hosts).add(addr).build();
132         return self();
133     }
134 
135     /**
136      * Sets the client to use TLS.
137      */
138     public final B useTls() {
139         return useTls(true);
140     }
141 
142     /**
143      * Sets whether the client uses TLS or not.
144      */
145     public final B useTls(boolean useTls) {
146         checkState(selectedProfile == null, "useTls() cannot be called once a profile is selected.");
147         this.useTls = useTls;
148         return self();
149     }
150 
151     /**
152      * Returns whether the client uses TLS or not.
153      *
154      * @see #useTls(boolean)
155      */
156     protected final boolean isUseTls() {
157         return useTls;
158     }
159 
160     /**
161      * Sets the paths to look for to read the {@code .json} file that contains the client profiles.
162      * The paths are tried in the order of iteration. The default value of this property is
163      * <code>[ {@value #TEST_PROFILE_RESOURCE_PATH}, {@value #PROFILE_RESOURCE_PATH} ]</code>, which means
164      * the builder will check if {@value #TEST_PROFILE_RESOURCE_PATH} exists first and will try
165      * {@value #PROFILE_RESOURCE_PATH} only if {@value #TEST_PROFILE_RESOURCE_PATH} is missing.
166      */
167     public final B profileResources(String... paths) {
168         return profileResources(ImmutableList.copyOf(requireNonNull(paths, "paths")));
169     }
170 
171     /**
172      * Sets the paths to look for to read the {@code .json} file that contains the client profiles.
173      * The paths are tried in the order of iteration. The default value of this property is
174      * <code>[ {@value #TEST_PROFILE_RESOURCE_PATH}, {@value #PROFILE_RESOURCE_PATH} ]</code>, which means
175      * the builder will check if {@value #TEST_PROFILE_RESOURCE_PATH} exists first and will try
176      * {@value #PROFILE_RESOURCE_PATH} only if {@value #TEST_PROFILE_RESOURCE_PATH} is missing.
177      */
178     public final B profileResources(Iterable<String> paths) {
179         final List<String> newPaths = ImmutableList.copyOf(requireNonNull(paths, "paths"));
180         checkArgument(!newPaths.isEmpty(), "paths is empty.");
181         checkState(selectedProfile == null, "profileResources cannot be set after profile() is called.");
182         profileResourcePaths = newPaths;
183         return self();
184     }
185 
186     /**
187      * Adds the host names (or IP addresses) and the port numbers of the Central Dogma servers loaded from the
188      * client profile resources. When more than one profile is matched, the last matching one will be used. See
189      * <a href="https://line.github.io/centraldogma/client-java.html#using-client-profiles">Using client
190      * profiles</a> for more information.
191      *
192      * @param profiles the list of profile names
193      *
194      * @throws IllegalArgumentException if failed to load any hosts from all the specified profiles
195      */
196     public final B profile(String... profiles) {
197         requireNonNull(profiles, "profiles");
198         return profile(ImmutableList.copyOf(profiles));
199     }
200 
201     /**
202      * Adds the host names (or IP addresses) and the port numbers of the Central Dogma servers loaded from the
203      * client profile resources. When more than one profile is matched, the last matching one will be used. See
204      * <a href="https://line.github.io/centraldogma/client-java.html#using-client-profiles">Using client
205      * profiles</a> for more information.
206      *
207      * @param profiles the list of profile names
208      *
209      * @throws IllegalArgumentException if failed to load any hosts from all the specified profiles
210      */
211     public final B profile(ClassLoader classLoader, String... profiles) {
212         requireNonNull(profiles, "profiles");
213         return profile(classLoader, ImmutableList.copyOf(profiles));
214     }
215 
216     /**
217      * Adds the host names (or IP address) and the port numbers of the Central Dogma servers loaded from the
218      * client profile resources. When more than one profile is matched, the last matching one will be used. See
219      * <a href="https://line.github.io/centraldogma/client-java.html#using-client-profiles">Using client
220      * profiles</a> for more information.
221      *
222      * @param profiles the list of profile names
223      *
224      * @throws IllegalArgumentException if failed to load any hosts from all the specified profiles
225      */
226     public final B profile(Iterable<String> profiles) {
227         final ClassLoader ccl = Thread.currentThread().getContextClassLoader();
228         if (ccl != null) {
229             profile(ccl, profiles);
230         } else {
231             profile(getClass().getClassLoader(), profiles);
232         }
233         return self();
234     }
235 
236     /**
237      * Adds the host names (or IP address) and the port numbers of the Central Dogma servers loaded from the
238      * client profile resources. When more than one profile is matched, the last matching one will be used. See
239      * <a href="https://line.github.io/centraldogma/client-java.html#using-client-profiles">Using client
240      * profiles</a> for more information.
241      *
242      * @param profiles the list of profile names
243      *
244      * @throws IllegalArgumentException if failed to load any hosts from all the specified profiles
245      */
246     public final B profile(ClassLoader classLoader, Iterable<String> profiles) {
247         requireNonNull(classLoader, "classLoader");
248         requireNonNull(profiles, "profiles");
249         checkState(selectedProfile == null, "profile cannot be loaded more than once.");
250         checkState(hosts.isEmpty(), "profile() and host() cannot be used together.");
251 
252         final Map<String, ClientProfile> availableProfiles = new HashMap<>();
253         try {
254             final List<URL> resourceUrls = findProfileResources(classLoader);
255             checkState(!resourceUrls.isEmpty(), "failed to find any of: ", profileResourcePaths);
256 
257             for (URL resourceUrl : resourceUrls) {
258                 final List<ClientProfile> availableProfileList =
259                         new ObjectMapper().readValue(resourceUrl, new TypeReference<List<ClientProfile>>() {});
260 
261                 // Collect all profiles checking the profiles ignoring the duplicate profile names.
262                 availableProfileList.forEach(profile -> {
263                     final String name = profile.name();
264                     final ClientProfile existingProfile = availableProfiles.get(name);
265                     if (existingProfile == null || existingProfile.priority() < profile.priority()) {
266                         // Not a duplicate or higher priority
267                         availableProfiles.put(name, profile);
268                     }
269                 });
270             }
271         } catch (IOException e) {
272             throw new IllegalStateException("failed to load: " + PROFILE_RESOURCE_PATH, e);
273         }
274 
275         final List<String> reversedProfiles = reverse(profiles);
276         checkArgument(!reversedProfiles.isEmpty(), "profiles is empty.");
277         for (String candidateName : reversedProfiles) {
278             checkNotNull(candidateName, "profiles contains null: %s", profiles);
279 
280             final ClientProfile candidate = availableProfiles.get(candidateName);
281             if (candidate == null) {
282                 continue;
283             }
284 
285             final ImmutableSet.Builder<InetSocketAddress> newHostsBuilder = ImmutableSet.builder();
286             candidate.hosts().stream()
287                      .filter(e -> (useTls ? "https" : "http").equals(e.protocol()))
288                      .forEach(e -> newHostsBuilder.add(newEndpoint(e.host(), e.port())));
289 
290             final ImmutableSet<InetSocketAddress> newHosts = newHostsBuilder.build();
291             if (!newHosts.isEmpty()) {
292                 selectedProfile = candidateName;
293                 hosts = newHosts;
294                 return self();
295             }
296         }
297 
298         throw new IllegalArgumentException("no profile matches: " + profiles);
299     }
300 
301     private List<URL> findProfileResources(ClassLoader classLoader) throws IOException {
302         final ImmutableList.Builder<URL> urls = ImmutableList.builder();
303         for (String p : profileResourcePaths) {
304             for (final Enumeration<URL> e = classLoader.getResources(p); e.hasMoreElements();) {
305                 urls.add(e.nextElement());
306             }
307         }
308         return urls.build();
309     }
310 
311     private static List<String> reverse(Iterable<String> profiles) {
312         final List<String> reversedProfiles = new ArrayList<>();
313         Iterables.addAll(reversedProfiles, profiles);
314         Collections.reverse(reversedProfiles);
315         return reversedProfiles;
316     }
317 
318     private static InetSocketAddress newEndpoint(String host, int port) {
319         final InetSocketAddress endpoint;
320         if (InetAddresses.isInetAddress(host)) {
321             endpoint = new InetSocketAddress(InetAddresses.forString(host), port);
322         } else {
323             endpoint = InetSocketAddress.createUnresolved(host, port);
324         }
325         return endpoint;
326     }
327 
328     /**
329      * Returns the name of the selected profile.
330      *
331      * @return the profile name, or {@code null} if no profile was specified or matched
332      */
333     @Nullable
334     protected final String selectedProfile() {
335         return selectedProfile;
336     }
337 
338     /**
339      * Returns the hosts added via {@link #host(String, int)} or {@link #profile(String...)}.
340      */
341     protected final Set<InetSocketAddress> hosts() {
342         return hosts;
343     }
344 
345     /**
346      * Sets the access token to use when authenticating a client.
347      */
348     public final B accessToken(String accessToken) {
349         requireNonNull(accessToken, "accessToken");
350         checkArgument(!accessToken.isEmpty(), "accessToken is empty.");
351         this.accessToken = accessToken;
352         return self();
353     }
354 
355     /**
356      * Returns the access token to use when authenticating a client.
357      */
358     protected String accessToken() {
359         return accessToken;
360     }
361 
362     /**
363      * Sets the maximum number of retries to perform when replication lag is detected. For example,
364      * without replication lag detection and retries, the {@code getFile()} in the following example
365      * might fail with a {@link RevisionNotFoundException} when replication is enabled on the server side:
366      * <pre>{@code
367      * CentralDogma dogma = ...;
368      * // getFile() may fail if:
369      * // 1) the replica A serves getFile() while the replica B serves the normalizeRevision() and
370      * // 2) the replica A did not catch up all the commits made in the replica B.
371      * Revision headRevision = dogma.normalizeRevision("proj", "repo", Revision.HEAD).join();
372      * Entry<String> entry = dogma.getFile("proj", "repo", headRevision, Query.ofText("/a.txt")).join();
373      * }</pre>
374      *
375      * <p>Setting a value greater than {@code 0} to this property will make the client detect such situations
376      * and retry automatically. By default, the client will retry up to
377      * {@value #DEFAULT_MAX_NUM_RETRIES_ON_REPLICATION_LAG} times.</p>
378      */
379     public final B maxNumRetriesOnReplicationLag(int maxRetriesOnReplicationLag) {
380         checkArgument(maxRetriesOnReplicationLag >= 0,
381                       "maxRetriesOnReplicationLag: %s (expected: >= 0)", maxRetriesOnReplicationLag);
382         this.maxNumRetriesOnReplicationLag = maxRetriesOnReplicationLag;
383         return self();
384     }
385 
386     /**
387      * Returns the maximum number of retries to perform when replication lag is detected.
388      */
389     protected int maxNumRetriesOnReplicationLag() {
390         return maxNumRetriesOnReplicationLag;
391     }
392 
393     /**
394      * Sets the interval between retries which occurred due to replication lag. By default, the interval
395      * between retries is {@value #DEFAULT_RETRY_INTERVAL_ON_REPLICATION_LAG_SECONDS} seconds.
396      */
397     public final B retryIntervalOnReplicationLag(Duration retryIntervalOnReplicationLag) {
398         requireNonNull(retryIntervalOnReplicationLag, "retryIntervalOnReplicationLag");
399         checkArgument(!retryIntervalOnReplicationLag.isNegative(),
400                       "retryIntervalOnReplicationLag: %s (expected: >= 0)", retryIntervalOnReplicationLag);
401         return retryIntervalOnReplicationLagMillis(retryIntervalOnReplicationLag.toMillis());
402     }
403 
404     /**
405      * Sets the interval between retries which occurred due to replication lag in milliseconds. By default,
406      * the interval between retries is {@value #DEFAULT_RETRY_INTERVAL_ON_REPLICATION_LAG_SECONDS} seconds.
407      */
408     public final B retryIntervalOnReplicationLagMillis(long retryIntervalOnReplicationLagMillis) {
409         checkArgument(retryIntervalOnReplicationLagMillis >= 0,
410                       "retryIntervalOnReplicationLagMillis: %s (expected: >= 0)",
411                       retryIntervalOnReplicationLagMillis);
412         this.retryIntervalOnReplicationLagMillis = retryIntervalOnReplicationLagMillis;
413         return self();
414     }
415 
416     /**
417      * Returns the interval between retries which occurred due to replication lag in milliseconds.
418      */
419     protected long retryIntervalOnReplicationLagMillis() {
420         return retryIntervalOnReplicationLagMillis;
421     }
422 
423     /**
424      * If you want to record metrics using Micrometer, please specify a {@link MeterRegistry}.
425      */
426     public B meterRegistry(MeterRegistry meterRegistry) {
427         requireNonNull(meterRegistry, "meterRegistry");
428         this.meterRegistry = meterRegistry;
429         return self();
430     }
431 
432     /**
433      * Returns the {@link MeterRegistry}.
434      */
435     @Nullable
436     protected final MeterRegistry meterRegistry() {
437         return meterRegistry;
438     }
439 }