1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.client;
17  
18  import static com.google.common.base.Preconditions.checkArgument;
19  import static com.google.common.base.Preconditions.checkNotNull;
20  import static com.google.common.base.Preconditions.checkState;
21  import static java.util.Objects.requireNonNull;
22  
23  import java.io.IOException;
24  import java.net.InetSocketAddress;
25  import java.net.URI;
26  import java.net.URL;
27  import java.time.Duration;
28  import java.util.ArrayList;
29  import java.util.Collections;
30  import java.util.Enumeration;
31  import java.util.HashMap;
32  import java.util.List;
33  import java.util.Map;
34  import java.util.Set;
35  import java.util.concurrent.TimeUnit;
36  
37  import javax.annotation.Nullable;
38  
39  import com.fasterxml.jackson.core.type.TypeReference;
40  import com.fasterxml.jackson.databind.ObjectMapper;
41  import com.google.common.collect.ImmutableList;
42  import com.google.common.collect.ImmutableSet;
43  import com.google.common.collect.Iterables;
44  import com.google.common.net.InetAddresses;
45  
46  import com.linecorp.centraldogma.common.RevisionNotFoundException;
47  import com.linecorp.centraldogma.internal.CsrfToken;
48  
49  /**
50   * Builds a {@link CentralDogma} client.
51   */
52  public abstract class AbstractCentralDogmaBuilder<B extends AbstractCentralDogmaBuilder<B>> {
53  
54      private static final String TEST_PROFILE_RESOURCE_PATH = "centraldogma-profiles-test.json";
55      private static final String PROFILE_RESOURCE_PATH = "centraldogma-profiles.json";
56      private static final List<String> DEFAULT_PROFILE_RESOURCE_PATHS =
57              ImmutableList.of(TEST_PROFILE_RESOURCE_PATH, PROFILE_RESOURCE_PATH);
58  
59      static final int DEFAULT_PORT = 36462;
60  
61      private static final int DEFAULT_MAX_NUM_RETRIES_ON_REPLICATION_LAG = 5;
62      private static final int DEFAULT_RETRY_INTERVAL_ON_REPLICATION_LAG_SECONDS = 2;
63  
64      private ImmutableSet<InetSocketAddress> hosts = ImmutableSet.of();
65      private boolean useTls;
66      private List<String> profileResourcePaths = DEFAULT_PROFILE_RESOURCE_PATHS;
67      @Nullable
68      private String selectedProfile;
69      private String accessToken = CsrfToken.ANONYMOUS;
70      private int maxNumRetriesOnReplicationLag = DEFAULT_MAX_NUM_RETRIES_ON_REPLICATION_LAG;
71      private long retryIntervalOnReplicationLagMillis =
72              TimeUnit.SECONDS.toMillis(DEFAULT_RETRY_INTERVAL_ON_REPLICATION_LAG_SECONDS);
73  
74      /**
75       * Returns {@code this}.
76       */
77      @SuppressWarnings("unchecked")
78      protected final B self() {
79          return (B) this;
80      }
81  
82      /**
83       * Adds the {@link URI} of the Central Dogma server.
84       *
85       * @param uri the URI of the Central Dogma server. e.g.
86       *            {@code tbinary+http://example.com:36462/cd/thrift/v1}
87       *
88       * @deprecated Use {@link #host(String)} or {@link #profile(String...)}.
89       */
90      @Deprecated
91      public final B uri(String uri) {
92          final URI parsed = URI.create(requireNonNull(uri, "uri"));
93          final String host = parsed.getHost();
94          final int port = parsed.getPort();
95          checkArgument(host != null, "uri: %s (must contain a host part)", uri);
96          if (port < 0) {
97              host(host);
98          } else {
99              host(host, port);
100         }
101         return self();
102     }
103 
104     /**
105      * Adds the host name or IP address of the Central Dogma Server and uses the default port number of
106      * {@value #DEFAULT_PORT}.
107      *
108      * @param host the host name or IP address of the Central Dogma server
109      */
110     public final B host(String host) {
111         return host(host, DEFAULT_PORT);
112     }
113 
114     /**
115      * Adds the host name (or IP address) and the port number of the Central Dogma server.
116      *
117      * @param host the host name or IP address of the Central Dogma server
118      * @param port the port number of the Central Dogma server
119      */
120     public final B host(String host, int port) {
121         requireNonNull(host, "host");
122         checkArgument(!host.startsWith("group:"), "host: %s (must not start with 'group:')", host);
123         checkArgument(port >= 1 && port < 65536, "port: %s (expected: 1 .. 65535)", port);
124 
125         final InetSocketAddress addr = newEndpoint(host, port);
126         checkState(selectedProfile == null, "profile() and host() cannot be used together.");
127         hosts = ImmutableSet.<InetSocketAddress>builder().addAll(hosts).add(addr).build();
128         return self();
129     }
130 
131     /**
132      * Sets the client to use TLS.
133      */
134     public final B useTls() {
135         return useTls(true);
136     }
137 
138     /**
139      * Sets whether the client uses TLS or not.
140      */
141     public final B useTls(boolean useTls) {
142         checkState(selectedProfile == null, "useTls() cannot be called once a profile is selected.");
143         this.useTls = useTls;
144         return self();
145     }
146 
147     /**
148      * Returns whether the client uses TLS or not.
149      *
150      * @see #useTls(boolean)
151      */
152     protected final boolean isUseTls() {
153         return useTls;
154     }
155 
156     /**
157      * Sets the paths to look for to read the {@code .json} file that contains the client profiles.
158      * The paths are tried in the order of iteration. The default value of this property is
159      * <code>[ {@value #TEST_PROFILE_RESOURCE_PATH}, {@value #PROFILE_RESOURCE_PATH} ]</code>, which means
160      * the builder will check if {@value #TEST_PROFILE_RESOURCE_PATH} exists first and will try
161      * {@value #PROFILE_RESOURCE_PATH} only if {@value #TEST_PROFILE_RESOURCE_PATH} is missing.
162      */
163     public final B profileResources(String... paths) {
164         return profileResources(ImmutableList.copyOf(requireNonNull(paths, "paths")));
165     }
166 
167     /**
168      * Sets the paths to look for to read the {@code .json} file that contains the client profiles.
169      * The paths are tried in the order of iteration. The default value of this property is
170      * <code>[ {@value #TEST_PROFILE_RESOURCE_PATH}, {@value #PROFILE_RESOURCE_PATH} ]</code>, which means
171      * the builder will check if {@value #TEST_PROFILE_RESOURCE_PATH} exists first and will try
172      * {@value #PROFILE_RESOURCE_PATH} only if {@value #TEST_PROFILE_RESOURCE_PATH} is missing.
173      */
174     public final B profileResources(Iterable<String> paths) {
175         final List<String> newPaths = ImmutableList.copyOf(requireNonNull(paths, "paths"));
176         checkArgument(!newPaths.isEmpty(), "paths is empty.");
177         checkState(selectedProfile == null, "profileResources cannot be set after profile() is called.");
178         profileResourcePaths = newPaths;
179         return self();
180     }
181 
182     /**
183      * Adds the host names (or IP addresses) and the port numbers of the Central Dogma servers loaded from the
184      * client profile resources. When more than one profile is matched, the last matching one will be used. See
185      * <a href="https://line.github.io/centraldogma/client-java.html#using-client-profiles">Using client
186      * profiles</a> for more information.
187      *
188      * @param profiles the list of profile names
189      *
190      * @throws IllegalArgumentException if failed to load any hosts from all the specified profiles
191      */
192     public final B profile(String... profiles) {
193         requireNonNull(profiles, "profiles");
194         return profile(ImmutableList.copyOf(profiles));
195     }
196 
197     /**
198      * Adds the host names (or IP addresses) and the port numbers of the Central Dogma servers loaded from the
199      * client profile resources. When more than one profile is matched, the last matching one will be used. See
200      * <a href="https://line.github.io/centraldogma/client-java.html#using-client-profiles">Using client
201      * profiles</a> for more information.
202      *
203      * @param profiles the list of profile names
204      *
205      * @throws IllegalArgumentException if failed to load any hosts from all the specified profiles
206      */
207     public final B profile(ClassLoader classLoader, String... profiles) {
208         requireNonNull(profiles, "profiles");
209         return profile(classLoader, ImmutableList.copyOf(profiles));
210     }
211 
212     /**
213      * Adds the host names (or IP address) and the port numbers of the Central Dogma servers loaded from the
214      * client profile resources. When more than one profile is matched, the last matching one will be used. See
215      * <a href="https://line.github.io/centraldogma/client-java.html#using-client-profiles">Using client
216      * profiles</a> for more information.
217      *
218      * @param profiles the list of profile names
219      *
220      * @throws IllegalArgumentException if failed to load any hosts from all the specified profiles
221      */
222     public final B profile(Iterable<String> profiles) {
223         final ClassLoader ccl = Thread.currentThread().getContextClassLoader();
224         if (ccl != null) {
225             profile(ccl, profiles);
226         } else {
227             profile(getClass().getClassLoader(), profiles);
228         }
229         return self();
230     }
231 
232     /**
233      * Adds the host names (or IP address) and the port numbers of the Central Dogma servers loaded from the
234      * client profile resources. When more than one profile is matched, the last matching one will be used. See
235      * <a href="https://line.github.io/centraldogma/client-java.html#using-client-profiles">Using client
236      * profiles</a> for more information.
237      *
238      * @param profiles the list of profile names
239      *
240      * @throws IllegalArgumentException if failed to load any hosts from all the specified profiles
241      */
242     public final B profile(ClassLoader classLoader, Iterable<String> profiles) {
243         requireNonNull(classLoader, "classLoader");
244         requireNonNull(profiles, "profiles");
245         checkState(selectedProfile == null, "profile cannot be loaded more than once.");
246         checkState(hosts.isEmpty(), "profile() and host() cannot be used together.");
247 
248         final Map<String, ClientProfile> availableProfiles = new HashMap<>();
249         try {
250             final List<URL> resourceUrls = findProfileResources(classLoader);
251             checkState(!resourceUrls.isEmpty(), "failed to find any of: ", profileResourcePaths);
252 
253             for (URL resourceUrl : resourceUrls) {
254                 final List<ClientProfile> availableProfileList =
255                         new ObjectMapper().readValue(resourceUrl, new TypeReference<List<ClientProfile>>() {});
256 
257                 // Collect all profiles checking the profiles ignoring the duplicate profile names.
258                 availableProfileList.forEach(profile -> {
259                     final String name = profile.name();
260                     final ClientProfile existingProfile = availableProfiles.get(name);
261                     if (existingProfile == null || existingProfile.priority() < profile.priority()) {
262                         // Not a duplicate or higher priority
263                         availableProfiles.put(name, profile);
264                     }
265                 });
266             }
267         } catch (IOException e) {
268             throw new IllegalStateException("failed to load: " + PROFILE_RESOURCE_PATH, e);
269         }
270 
271         final List<String> reversedProfiles = reverse(profiles);
272         checkArgument(!reversedProfiles.isEmpty(), "profiles is empty.");
273         for (String candidateName : reversedProfiles) {
274             checkNotNull(candidateName, "profiles contains null: %s", profiles);
275 
276             final ClientProfile candidate = availableProfiles.get(candidateName);
277             if (candidate == null) {
278                 continue;
279             }
280 
281             final ImmutableSet.Builder<InetSocketAddress> newHostsBuilder = ImmutableSet.builder();
282             candidate.hosts().stream()
283                      .filter(e -> (useTls ? "https" : "http").equals(e.protocol()))
284                      .forEach(e -> newHostsBuilder.add(newEndpoint(e.host(), e.port())));
285 
286             final ImmutableSet<InetSocketAddress> newHosts = newHostsBuilder.build();
287             if (!newHosts.isEmpty()) {
288                 selectedProfile = candidateName;
289                 hosts = newHosts;
290                 return self();
291             }
292         }
293 
294         throw new IllegalArgumentException("no profile matches: " + profiles);
295     }
296 
297     private List<URL> findProfileResources(ClassLoader classLoader) throws IOException {
298         final ImmutableList.Builder<URL> urls = ImmutableList.builder();
299         for (String p : profileResourcePaths) {
300             for (final Enumeration<URL> e = classLoader.getResources(p); e.hasMoreElements();) {
301                 urls.add(e.nextElement());
302             }
303         }
304         return urls.build();
305     }
306 
307     private static List<String> reverse(Iterable<String> profiles) {
308         final List<String> reversedProfiles = new ArrayList<>();
309         Iterables.addAll(reversedProfiles, profiles);
310         Collections.reverse(reversedProfiles);
311         return reversedProfiles;
312     }
313 
314     private static InetSocketAddress newEndpoint(String host, int port) {
315         final InetSocketAddress endpoint;
316         if (InetAddresses.isInetAddress(host)) {
317             endpoint = new InetSocketAddress(InetAddresses.forString(host), port);
318         } else {
319             endpoint = InetSocketAddress.createUnresolved(host, port);
320         }
321         return endpoint;
322     }
323 
324     /**
325      * Returns the name of the selected profile.
326      *
327      * @return the profile name, or {@code null} if no profile was specified or matched
328      */
329     @Nullable
330     protected final String selectedProfile() {
331         return selectedProfile;
332     }
333 
334     /**
335      * Returns the hosts added via {@link #host(String, int)} or {@link #profile(String...)}.
336      */
337     protected final Set<InetSocketAddress> hosts() {
338         return hosts;
339     }
340 
341     /**
342      * Sets the access token to use when authenticating a client.
343      */
344     public final B accessToken(String accessToken) {
345         requireNonNull(accessToken, "accessToken");
346         checkArgument(!accessToken.isEmpty(), "accessToken is empty.");
347         this.accessToken = accessToken;
348         return self();
349     }
350 
351     /**
352      * Returns the access token to use when authenticating a client.
353      */
354     protected String accessToken() {
355         return accessToken;
356     }
357 
358     /**
359      * Sets the maximum number of retries to perform when replication lag is detected. For example,
360      * without replication lag detection and retries, the {@code getFile()} in the following example
361      * might fail with a {@link RevisionNotFoundException} when replication is enabled on the server side:
362      * <pre>{@code
363      * CentralDogma dogma = ...;
364      * // getFile() may fail if:
365      * // 1) the replica A serves getFile() while the replica B serves the normalizeRevision() and
366      * // 2) the replica A did not catch up all the commits made in the replica B.
367      * Revision headRevision = dogma.normalizeRevision("proj", "repo", Revision.HEAD).join();
368      * Entry<String> entry = dogma.getFile("proj", "repo", headRevision, Query.ofText("/a.txt")).join();
369      * }</pre>
370      *
371      * <p>Setting a value greater than {@code 0} to this property will make the client detect such situations
372      * and retry automatically. By default, the client will retry up to
373      * {@value #DEFAULT_MAX_NUM_RETRIES_ON_REPLICATION_LAG} times.</p>
374      */
375     public final B maxNumRetriesOnReplicationLag(int maxRetriesOnReplicationLag) {
376         checkArgument(maxRetriesOnReplicationLag >= 0,
377                       "maxRetriesOnReplicationLag: %s (expected: >= 0)", maxRetriesOnReplicationLag);
378         this.maxNumRetriesOnReplicationLag = maxRetriesOnReplicationLag;
379         return self();
380     }
381 
382     /**
383      * Returns the maximum number of retries to perform when replication lag is detected.
384      */
385     protected int maxNumRetriesOnReplicationLag() {
386         return maxNumRetriesOnReplicationLag;
387     }
388 
389     /**
390      * Sets the interval between retries which occurred due to replication lag. By default, the interval
391      * between retries is {@value #DEFAULT_RETRY_INTERVAL_ON_REPLICATION_LAG_SECONDS} seconds.
392      */
393     public final B retryIntervalOnReplicationLag(Duration retryIntervalOnReplicationLag) {
394         requireNonNull(retryIntervalOnReplicationLag, "retryIntervalOnReplicationLag");
395         checkArgument(!retryIntervalOnReplicationLag.isNegative(),
396                       "retryIntervalOnReplicationLag: %s (expected: >= 0)", retryIntervalOnReplicationLag);
397         return retryIntervalOnReplicationLagMillis(retryIntervalOnReplicationLag.toMillis());
398     }
399 
400     /**
401      * Sets the interval between retries which occurred due to replication lag in milliseconds. By default,
402      * the interval between retries is {@value #DEFAULT_RETRY_INTERVAL_ON_REPLICATION_LAG_SECONDS} seconds.
403      */
404     public final B retryIntervalOnReplicationLagMillis(long retryIntervalOnReplicationLagMillis) {
405         checkArgument(retryIntervalOnReplicationLagMillis >= 0,
406                       "retryIntervalOnReplicationLagMillis: %s (expected: >= 0)",
407                       retryIntervalOnReplicationLagMillis);
408         this.retryIntervalOnReplicationLagMillis = retryIntervalOnReplicationLagMillis;
409         return self();
410     }
411 
412     /**
413      * Returns the interval between retries which occurred due to replication lag in milliseconds.
414      */
415     protected long retryIntervalOnReplicationLagMillis() {
416         return retryIntervalOnReplicationLagMillis;
417     }
418 }