1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.client.armeria;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.base.Preconditions.checkState;
20 import static java.util.Objects.requireNonNull;
21
22 import java.net.InetSocketAddress;
23 import java.net.UnknownHostException;
24 import java.time.Duration;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Set;
28 import java.util.concurrent.ScheduledExecutorService;
29 import java.util.function.Consumer;
30
31 import com.google.common.collect.Iterables;
32
33 import com.linecorp.armeria.client.ClientBuilder;
34 import com.linecorp.armeria.client.ClientFactory;
35 import com.linecorp.armeria.client.Clients;
36 import com.linecorp.armeria.client.Endpoint;
37 import com.linecorp.armeria.client.endpoint.EndpointGroup;
38 import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy;
39 import com.linecorp.armeria.client.endpoint.dns.DnsAddressEndpointGroup;
40 import com.linecorp.armeria.client.endpoint.dns.DnsAddressEndpointGroupBuilder;
41 import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckedEndpointGroup;
42 import com.linecorp.armeria.client.endpoint.healthcheck.HealthCheckedEndpointGroupBuilder;
43 import com.linecorp.armeria.common.CommonPools;
44 import com.linecorp.armeria.common.SessionProtocol;
45 import com.linecorp.armeria.common.annotation.Nullable;
46 import com.linecorp.centraldogma.client.AbstractCentralDogmaBuilder;
47 import com.linecorp.centraldogma.client.CentralDogma;
48 import com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants;
49
50
51
52
53 public class AbstractArmeriaCentralDogmaBuilder<B extends AbstractArmeriaCentralDogmaBuilder<B>>
54 extends AbstractCentralDogmaBuilder<B> {
55
56 private ClientFactory clientFactory = ClientFactory.ofDefault();
57 private ArmeriaClientConfigurator clientConfigurator = cb -> {};
58 @Nullable
59 private Duration healthCheckInterval;
60 private DnsAddressEndpointGroupConfigurator dnsAddressEndpointGroupConfigurator = b -> {};
61 private ScheduledExecutorService blockingTaskExecutor = CommonPools.blockingTaskExecutor();
62
63
64
65
66
67 protected final ClientFactory clientFactory() {
68 return clientFactory;
69 }
70
71
72
73
74
75 public final B clientFactory(ClientFactory clientFactory) {
76 this.clientFactory = requireNonNull(clientFactory, "clientFactory");
77 return self();
78 }
79
80
81
82
83
84 public final B clientConfigurator(ArmeriaClientConfigurator clientConfigurator) {
85 this.clientConfigurator = requireNonNull(clientConfigurator, "clientConfigurator");
86 return self();
87 }
88
89
90
91
92
93 public final B dnsAddressEndpointGroupConfigurator(
94 DnsAddressEndpointGroupConfigurator dnsAddressEndpointGroupConfigurator) {
95 this.dnsAddressEndpointGroupConfigurator = requireNonNull(
96 dnsAddressEndpointGroupConfigurator, "dnsAddressEndpointGroupConfigurator");
97 return self();
98 }
99
100
101
102
103
104
105
106 public final B healthCheckInterval(Duration healthCheckInterval) {
107 requireNonNull(healthCheckInterval, "healthCheckInterval");
108 checkArgument(!healthCheckInterval.isNegative(),
109 "healthCheckInterval: %s (expected: >= 0)", healthCheckInterval);
110 this.healthCheckInterval = healthCheckInterval;
111 return self();
112 }
113
114
115
116
117
118
119
120 public final B healthCheckIntervalMillis(long healthCheckIntervalMillis) {
121 checkArgument(healthCheckIntervalMillis >= 0,
122 "healthCheckIntervalMillis: %s (expected: >= 0)", healthCheckIntervalMillis);
123 healthCheckInterval = Duration.ofMillis(healthCheckIntervalMillis);
124 return self();
125 }
126
127
128
129
130
131
132 protected final EndpointGroup endpointGroup() throws UnknownHostException {
133 final EndpointGroup group = endpointGroup0();
134
135 if (healthCheckInterval != null && healthCheckInterval.isZero()) {
136 return group;
137 }
138
139 final HealthCheckedEndpointGroupBuilder healthCheckedEndpointGroupBuilder =
140 HealthCheckedEndpointGroup.builder(group, HttpApiV1Constants.HEALTH_CHECK_PATH)
141 .clientFactory(clientFactory)
142 .allowEmptyEndpoints(false)
143 .protocol(isUseTls() ? SessionProtocol.HTTPS
144 : SessionProtocol.HTTP);
145 if (healthCheckInterval != null) {
146 healthCheckedEndpointGroupBuilder.retryInterval(healthCheckInterval);
147 }
148 return healthCheckedEndpointGroupBuilder.build();
149 }
150
151 private EndpointGroup endpointGroup0() throws UnknownHostException {
152 final Set<InetSocketAddress> hosts = hosts();
153 checkState(!hosts.isEmpty(), "no hosts were added.");
154
155 final InetSocketAddress firstHost = Iterables.getFirst(hosts, null);
156 if (hosts.size() == 1 && !firstHost.isUnresolved()) {
157 return toResolvedHostEndpoint(firstHost);
158 }
159
160 final List<Endpoint> staticEndpoints = new ArrayList<>();
161 final List<EndpointGroup> groups = new ArrayList<>();
162 for (final InetSocketAddress addr : hosts) {
163 if (addr.isUnresolved()) {
164 final DnsAddressEndpointGroupBuilder dnsAddressEndpointGroup = DnsAddressEndpointGroup
165 .builder(addr.getHostString())
166 .eventLoop(clientFactory.eventLoopGroup().next());
167 dnsAddressEndpointGroupConfigurator.configure(dnsAddressEndpointGroup);
168 groups.add(dnsAddressEndpointGroup.port(addr.getPort()).build());
169 } else {
170 staticEndpoints.add(toResolvedHostEndpoint(addr));
171 }
172 }
173
174 if (!staticEndpoints.isEmpty()) {
175 groups.add(EndpointGroup.of(staticEndpoints));
176 }
177
178 final EndpointGroup group;
179 if (groups.size() == 1) {
180 group = groups.get(0);
181 } else {
182 group = new CompositeEndpointGroup(groups, EndpointSelectionStrategy.roundRobin());
183 }
184
185 return group;
186 }
187
188 private static Endpoint toResolvedHostEndpoint(InetSocketAddress addr) {
189 return Endpoint.of(addr.getHostString(), addr.getPort())
190 .withIpAddr(addr.getAddress().getHostAddress());
191 }
192
193
194
195
196 protected final ScheduledExecutorService blockingTaskExecutor() {
197 return blockingTaskExecutor;
198 }
199
200
201
202
203
204
205
206 public final B blockingTaskExecutor(ScheduledExecutorService blockingTaskExecutor) {
207 requireNonNull(blockingTaskExecutor, "blockingTaskExecutor");
208 this.blockingTaskExecutor = blockingTaskExecutor;
209 return self();
210 }
211
212
213
214
215
216
217 protected final ClientBuilder newClientBuilder(String scheme, EndpointGroup endpointGroup,
218 Consumer<ClientBuilder> customizer, String path) {
219 final ClientBuilder builder = Clients.builder(scheme, endpointGroup, path);
220 customizer.accept(builder);
221 clientConfigurator.configure(builder);
222 builder.factory(clientFactory());
223 return builder;
224 }
225 }