1   /*
2    * Copyright 2018 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.client.armeria;
17  
18  import static com.google.common.base.Preconditions.checkArgument;
19  import static com.google.common.base.Preconditions.checkState;
20  import static java.util.Objects.requireNonNull;
21  
22  import java.net.InetSocketAddress;
23  import java.net.UnknownHostException;
24  import java.time.Duration;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.Set;
28  import java.util.concurrent.ScheduledExecutorService;
29  import java.util.function.Consumer;
30  
31  import com.google.common.collect.Iterables;
32  
33  import com.linecorp.armeria.client.ClientBuilder;
34  import com.linecorp.armeria.client.ClientFactory;
35  import com.linecorp.armeria.client.Clients;
36  import com.linecorp.armeria.client.Endpoint;
37  import com.linecorp.armeria.client.endpoint.EndpointGroup;
38  import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy;
39  import com.linecorp.armeria.client.endpoint.dns.DnsAddressEndpointGroup;
40  import com.linecorp.armeria.client.endpoint.dns.DnsAddressEndpointGroupBuilder;
41  import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckedEndpointGroup;
42  import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckedEndpointGroupBuilder;
43  import com.linecorp.armeria.common.CommonPools;
44  import com.linecorp.armeria.common.SessionProtocol;
45  import com.linecorp.armeria.common.annotation.Nullable;
46  import com.linecorp.centraldogma.client.AbstractCentralDogmaBuilder;
47  import com.linecorp.centraldogma.client.CentralDogma;
48  import com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants;
49  
50  /**
51   * Builds a {@link CentralDogma} client.
52   */
53  public class AbstractArmeriaCentralDogmaBuilder<B extends AbstractArmeriaCentralDogmaBuilder<B>>
54          extends AbstractCentralDogmaBuilder<B> {
55  
56      private ClientFactory clientFactory = ClientFactory.ofDefault();
57      private ArmeriaClientConfigurator clientConfigurator = cb -> {};
58      @Nullable
59      private Duration healthCheckInterval;
60      private DnsAddressEndpointGroupConfigurator dnsAddressEndpointGroupConfigurator = b -> {};
61      private ScheduledExecutorService blockingTaskExecutor = CommonPools.blockingTaskExecutor();
62  
63      /**
64       * Returns the {@link ClientFactory} that will create an underlying
65       * <a href="https://line.github.io/armeria/">Armeria</a> client which performs the actual socket I/O.
66       */
67      protected final ClientFactory clientFactory() {
68          return clientFactory;
69      }
70  
71      /**
72       * Sets the {@link ClientFactory} that will create an underlying
73       * <a href="https://line.github.io/armeria/">Armeria</a> client which performs the actual socket I/O.
74       */
75      public final B clientFactory(ClientFactory clientFactory) {
76          this.clientFactory = requireNonNull(clientFactory, "clientFactory");
77          return self();
78      }
79  
80      /**
81       * Sets the {@link ArmeriaClientConfigurator} that will configure an underlying
82       * <a href="https://line.github.io/armeria/">Armeria</a> client which performs the actual socket I/O.
83       */
84      public final B clientConfigurator(ArmeriaClientConfigurator clientConfigurator) {
85          this.clientConfigurator = requireNonNull(clientConfigurator, "clientConfigurator");
86          return self();
87      }
88  
89      /**
90       * Sets the {@link DnsAddressEndpointGroupConfigurator} that will configure the DNS lookup
91       * done by the <a href="https://line.github.io/armeria/">Armeria</a> client.
92       */
93      public final B dnsAddressEndpointGroupConfigurator(
94              DnsAddressEndpointGroupConfigurator dnsAddressEndpointGroupConfigurator) {
95          this.dnsAddressEndpointGroupConfigurator = requireNonNull(
96                  dnsAddressEndpointGroupConfigurator, "dnsAddressEndpointGroupConfigurator");
97          return self();
98      }
99  
100     /**
101      * Sets the interval between health check requests.
102      *
103      * @param healthCheckInterval the interval between health check requests. {@link Duration#ZERO} disables
104      *                            health check requests.
105      */
106     public final B healthCheckInterval(Duration healthCheckInterval) {
107         requireNonNull(healthCheckInterval, "healthCheckInterval");
108         checkArgument(!healthCheckInterval.isNegative(),
109                       "healthCheckInterval: %s (expected: >= 0)", healthCheckInterval);
110         this.healthCheckInterval = healthCheckInterval;
111         return self();
112     }
113 
114     /**
115      * Sets the interval between health check requests in milliseconds.
116      *
117      * @param healthCheckIntervalMillis the interval between health check requests in milliseconds.
118      *                                  {@code 0} disables health check requests.
119      */
120     public final B healthCheckIntervalMillis(long healthCheckIntervalMillis) {
121         checkArgument(healthCheckIntervalMillis >= 0,
122                       "healthCheckIntervalMillis: %s (expected: >= 0)", healthCheckIntervalMillis);
123         healthCheckInterval = Duration.ofMillis(healthCheckIntervalMillis);
124         return self();
125     }
126 
127     /**
128      * Returns the {@link EndpointGroup} this client will connect to, derived from {@link #hosts()}.
129      *
130      * @throws UnknownHostException if failed to resolve the host names from the DNS servers
131      */
132     protected final EndpointGroup endpointGroup() throws UnknownHostException {
133         final EndpointGroup group = endpointGroup0();
134 
135         if (healthCheckInterval != null && healthCheckInterval.isZero()) {
136             return group;
137         }
138 
139         final HealthCheckedEndpointGroupBuilder healthCheckedEndpointGroupBuilder =
140                 HealthCheckedEndpointGroup.builder(group, HttpApiV1Constants.HEALTH_CHECK_PATH)
141                                           .clientFactory(clientFactory)
142                                           .allowEmptyEndpoints(false)
143                                           .protocol(isUseTls() ? SessionProtocol.HTTPS
144                                                                : SessionProtocol.HTTP);
145         if (healthCheckInterval != null) {
146             healthCheckedEndpointGroupBuilder.retryInterval(healthCheckInterval);
147         }
148         return healthCheckedEndpointGroupBuilder.build();
149     }
150 
151     private EndpointGroup endpointGroup0() throws UnknownHostException {
152         final Set<InetSocketAddress> hosts = hosts();
153         checkState(!hosts.isEmpty(), "no hosts were added.");
154 
155         final InetSocketAddress firstHost = Iterables.getFirst(hosts, null);
156         if (hosts.size() == 1 && !firstHost.isUnresolved()) {
157             return toResolvedHostEndpoint(firstHost);
158         }
159 
160         final List<Endpoint> staticEndpoints = new ArrayList<>();
161         final List<EndpointGroup> groups = new ArrayList<>();
162         for (final InetSocketAddress addr : hosts) {
163             if (addr.isUnresolved()) {
164                 final DnsAddressEndpointGroupBuilder dnsAddressEndpointGroup = DnsAddressEndpointGroup
165                         .builder(addr.getHostString())
166                         .eventLoop(clientFactory.eventLoopGroup().next());
167                 dnsAddressEndpointGroupConfigurator.configure(dnsAddressEndpointGroup);
168                 groups.add(dnsAddressEndpointGroup.port(addr.getPort()).build());
169             } else {
170                 staticEndpoints.add(toResolvedHostEndpoint(addr));
171             }
172         }
173 
174         if (!staticEndpoints.isEmpty()) {
175             groups.add(EndpointGroup.of(staticEndpoints));
176         }
177 
178         final EndpointGroup group;
179         if (groups.size() == 1) {
180             group = groups.get(0);
181         } else {
182             group = new CompositeEndpointGroup(groups, EndpointSelectionStrategy.roundRobin());
183         }
184 
185         return group;
186     }
187 
188     private static Endpoint toResolvedHostEndpoint(InetSocketAddress addr) {
189         return Endpoint.of(addr.getHostString(), addr.getPort())
190                        .withIpAddr(addr.getAddress().getHostAddress());
191     }
192 
193     /**
194      * Returns the {@link ScheduledExecutorService} dedicated to the execution of blocking tasks or invocations.
195      */
196     protected final ScheduledExecutorService blockingTaskExecutor() {
197         return blockingTaskExecutor;
198     }
199 
200     /**
201      * Sets the {@link ScheduledExecutorService} dedicated to the execution of blocking tasks or invocations.
202      * If not set, {@linkplain CommonPools#blockingTaskExecutor() the common pool} is used.
203      * The {@link ScheduledExecutorService} which will be used for scheduling the tasks related with
204      * automatic retries and invoking the callbacks for watched changes.
205      */
206     public final B blockingTaskExecutor(ScheduledExecutorService blockingTaskExecutor) {
207         requireNonNull(blockingTaskExecutor, "blockingTaskExecutor");
208         this.blockingTaskExecutor = blockingTaskExecutor;
209         return self();
210     }
211 
212     /**
213      * Returns a newly created {@link ClientBuilder} configured with the specified {@code customizer}
214      * and then with the {@link ArmeriaClientConfigurator} specified with
215      * {@link #clientConfigurator(ArmeriaClientConfigurator)}.
216      */
217     protected final ClientBuilder newClientBuilder(String scheme, EndpointGroup endpointGroup,
218                                                    Consumer<ClientBuilder> customizer, String path) {
219         final ClientBuilder builder = Clients.builder(scheme, endpointGroup, path);
220         customizer.accept(builder);
221         clientConfigurator.configure(builder);
222         builder.factory(clientFactory());
223         return builder;
224     }
225 }