1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.linecorp.centraldogma.xds.internal;
18
19 import static com.linecorp.centraldogma.server.internal.storage.project.ProjectInitializer.INTERNAL_PROJECT_DOGMA;
20 import static com.linecorp.centraldogma.server.internal.storage.project.ProjectInitializer.initializeInternalRepos;
21
22 import java.util.Collection;
23 import java.util.Map;
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.CompletionStage;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.BiFunction;
30
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 import com.google.common.collect.ImmutableList;
35 import com.google.common.collect.ImmutableList.Builder;
36 import com.google.protobuf.InvalidProtocolBufferException;
37
38 import com.linecorp.armeria.common.util.UnmodifiableFuture;
39 import com.linecorp.armeria.server.ServerBuilder;
40 import com.linecorp.armeria.server.grpc.GrpcService;
41 import com.linecorp.centraldogma.common.Entry;
42 import com.linecorp.centraldogma.common.Revision;
43 import com.linecorp.centraldogma.server.command.CommandExecutor;
44 import com.linecorp.centraldogma.server.plugin.AllReplicasPlugin;
45 import com.linecorp.centraldogma.server.plugin.PluginContext;
46 import com.linecorp.centraldogma.server.plugin.PluginInitContext;
47 import com.linecorp.centraldogma.server.storage.repository.Repository;
48 import com.linecorp.centraldogma.server.storage.repository.RepositoryManager;
49
50 import io.envoyproxy.controlplane.cache.Resources.ResourceType;
51 import io.envoyproxy.controlplane.cache.SnapshotResources;
52 import io.envoyproxy.controlplane.cache.v3.SimpleCache;
53 import io.envoyproxy.controlplane.cache.v3.Snapshot;
54 import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks;
55 import io.envoyproxy.controlplane.server.V3DiscoveryServer;
56 import io.envoyproxy.controlplane.server.exception.RequestException;
57 import io.envoyproxy.envoy.config.cluster.v3.Cluster;
58 import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
59 import io.envoyproxy.envoy.config.listener.v3.Listener;
60 import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
61 import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret;
62 import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest;
63 import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
64 import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
65 import io.netty.util.concurrent.DefaultThreadFactory;
66
67 public final class ControlPlanePlugin extends AllReplicasPlugin {
68
69 private static final Logger logger = LoggerFactory.getLogger(ControlPlanePlugin.class);
70
71 public static final String CLUSTER_REPO = "clusters";
72 public static final String CLUSTER_FILE = Cluster.getDescriptor().getFullName() + ".json";
73
74 public static final String ENDPOINT_REPO = "endpoints";
75 public static final String ENDPOINT_FILE = ClusterLoadAssignment.getDescriptor().getFullName() + ".json";
76
77 public static final String LISTENER_REPO = "listeners";
78 public static final String LISTENER_FILE = Listener.getDescriptor().getFullName() + ".json";
79
80 public static final String ROUTE_REPO = "routes";
81 public static final String ROUTE_FILE = RouteConfiguration.getDescriptor().getFullName() + ".json";
82
83 public static final String DEFAULT_GROUP = "default_group";
84
85 public static final long BACKOFF_SECONDS = 60;
86
87 private static final ScheduledExecutorService CONTROL_PLANE_EXECUTOR =
88 Executors.newSingleThreadScheduledExecutor(
89 new DefaultThreadFactory("control-plane-executor", true));
90
91 private volatile boolean stop;
92
93 @Override
94 public void init(PluginInitContext pluginInitContext) {
95 final CommandExecutor commandExecutor = pluginInitContext.commandExecutor();
96 final long currentTimeMillis = System.currentTimeMillis();
97 initializeInternalRepos(commandExecutor, currentTimeMillis,
98 ImmutableList.of(CLUSTER_REPO, ENDPOINT_REPO, LISTENER_REPO, ROUTE_REPO));
99
100 final ServerBuilder sb = pluginInitContext.serverBuilder();
101
102
103 final SimpleCache<String> cache = new SimpleCache<>(node -> DEFAULT_GROUP);
104 final RepositoryManager repositoryManager = pluginInitContext.projectManager()
105 .get(INTERNAL_PROJECT_DOGMA)
106 .repos();
107 watchRepository(repositoryManager.get(CLUSTER_REPO), CLUSTER_FILE, Revision.INIT,
108 (entries, revision) -> updateClusters(entries, revision, cache));
109 watchRepository(repositoryManager.get(ENDPOINT_REPO), ENDPOINT_FILE, Revision.INIT,
110 (entries, revision) -> updateEndpoints(entries, revision, cache));
111 watchRepository(repositoryManager.get(LISTENER_REPO), LISTENER_FILE, Revision.INIT,
112 (entries, revision) -> updateListeners(entries, revision, cache));
113 watchRepository(repositoryManager.get(ROUTE_REPO), ROUTE_FILE, Revision.INIT,
114 (entries, revision) -> routes(entries, revision, cache));
115 final V3DiscoveryServer server = new V3DiscoveryServer(new LoggingDiscoveryServerCallbacks(), cache);
116
117 final GrpcService grpcService = GrpcService.builder()
118 .addService(server.getClusterDiscoveryServiceImpl())
119 .addService(server.getEndpointDiscoveryServiceImpl())
120 .addService(server.getListenerDiscoveryServiceImpl())
121 .addService(server.getRouteDiscoveryServiceImpl())
122 .addService(server.getSecretDiscoveryServiceImpl())
123 .addService(server.getAggregatedDiscoveryServiceImpl())
124 .useBlockingTaskExecutor(true)
125 .build();
126 sb.route().build(grpcService);
127 }
128
129 private void watchRepository(Repository repository, String fileName, Revision revision,
130 ThrowingBiConsumer<Collection<Entry<?>>, Revision> updatingSnapshotFunction) {
131 final CompletableFuture<Revision> future = repository.watch(revision, "/**");
132 future.handleAsync((BiFunction<Revision, Throwable, Void>) (watchedRevision, cause) -> {
133 if (stop) {
134 return null;
135 }
136 if (cause != null) {
137 logger.warn("Unexpected exception is raised while watching {}. Try watching after {} seconds..",
138 repository, BACKOFF_SECONDS, cause);
139 CONTROL_PLANE_EXECUTOR.schedule(
140 () -> watchRepository(repository, fileName, revision, updatingSnapshotFunction),
141 BACKOFF_SECONDS, TimeUnit.SECONDS);
142 return null;
143 }
144 final CompletableFuture<Map<String, Entry<?>>> entriesFuture =
145 repository.find(watchedRevision, "/**/" + fileName);
146 entriesFuture.handleAsync(
147 (BiFunction<Map<String, Entry<?>>, Throwable, Void>) (entries, findCause) -> {
148 if (stop) {
149 return null;
150 }
151 if (findCause != null) {
152 logger.warn(
153 "Unexpected exception is raised while finding {}" +
154 " with revision {} from {}. Try watching after {} seconds..",
155 fileName, watchedRevision, repository, BACKOFF_SECONDS, findCause);
156 CONTROL_PLANE_EXECUTOR.schedule(
157 () -> watchRepository(repository, fileName,
158 watchedRevision, updatingSnapshotFunction),
159 BACKOFF_SECONDS, TimeUnit.SECONDS);
160 return null;
161 }
162 try {
163 updatingSnapshotFunction.accept(entries.values(), watchedRevision);
164 CONTROL_PLANE_EXECUTOR.execute(() -> watchRepository(
165 repository, fileName, watchedRevision, updatingSnapshotFunction));
166 } catch (Throwable t) {
167 logger.warn("Unexpected exception is raised while building from {} using {}" +
168 ". Try watching after {} seconds..",
169 repository.name(), entries.values(), BACKOFF_SECONDS, t);
170 CONTROL_PLANE_EXECUTOR.schedule(
171 () -> watchRepository(
172 repository, fileName, watchedRevision, updatingSnapshotFunction),
173 BACKOFF_SECONDS, TimeUnit.SECONDS);
174 }
175 return null;
176 },
177 CONTROL_PLANE_EXECUTOR);
178 return null;
179 }, CONTROL_PLANE_EXECUTOR);
180 }
181
182 private static boolean updateClusters(Collection<Entry<?>> entries, Revision revision,
183 SimpleCache<String> cache) throws InvalidProtocolBufferException {
184 final Builder<Cluster> clustersBuilder = ImmutableList.builder();
185 for (Entry<?> entry : entries) {
186 final Cluster.Builder clusterBuilder = Cluster.newBuilder();
187 JsonFormatUtil.parser().merge(entry.contentAsText(), clusterBuilder);
188 clustersBuilder.add(clusterBuilder.build());
189 }
190
191 setNewSnapshot(cache, ResourceType.CLUSTER,
192 CentralDogmaSnapshotResources.create(clustersBuilder.build(), revision));
193 return true;
194 }
195
196 private static boolean updateEndpoints(Collection<Entry<?>> entries, Revision revision,
197 SimpleCache<String> cache) throws InvalidProtocolBufferException {
198 final Builder<ClusterLoadAssignment> endpointsBuilder = ImmutableList.builder();
199 for (Entry<?> entry : entries) {
200 final ClusterLoadAssignment.Builder endpointBuilder = ClusterLoadAssignment.newBuilder();
201 JsonFormatUtil.parser().merge(entry.contentAsText(), endpointBuilder);
202 endpointsBuilder.add(endpointBuilder.build());
203 }
204
205 setNewSnapshot(cache, ResourceType.ENDPOINT,
206 CentralDogmaSnapshotResources.create(endpointsBuilder.build(), revision));
207 return true;
208 }
209
210 private static boolean updateListeners(Collection<Entry<?>> entries, Revision revision,
211 SimpleCache<String> cache) throws InvalidProtocolBufferException {
212 final Builder<Listener> listenersBuilder = ImmutableList.builder();
213 for (Entry<?> entry : entries) {
214 final Listener.Builder listenerBuilder = Listener.newBuilder();
215 JsonFormatUtil.parser().merge(entry.contentAsText(), listenerBuilder);
216 listenersBuilder.add(listenerBuilder.build());
217 }
218
219 setNewSnapshot(cache, ResourceType.LISTENER,
220 CentralDogmaSnapshotResources.create(listenersBuilder.build(), revision));
221 return true;
222 }
223
224 private static boolean routes(Collection<Entry<?>> entries, Revision revision, SimpleCache<String> cache)
225 throws InvalidProtocolBufferException {
226 final Builder<RouteConfiguration> routesBuilder = ImmutableList.builder();
227 for (Entry<?> entry : entries) {
228 final RouteConfiguration.Builder routeBuilder = RouteConfiguration.newBuilder();
229 JsonFormatUtil.parser().merge(entry.contentAsText(), routeBuilder);
230 routesBuilder.add(routeBuilder.build());
231 }
232
233 setNewSnapshot(cache, ResourceType.ROUTE,
234 CentralDogmaSnapshotResources.create(routesBuilder.build(), revision));
235 return true;
236 }
237
238 @SuppressWarnings("unchecked")
239 private static void setNewSnapshot(SimpleCache<String> cache, ResourceType resourceType,
240 SnapshotResources<?> resources) {
241 SnapshotResources<Cluster> clusters;
242 SnapshotResources<ClusterLoadAssignment> endpoints;
243 SnapshotResources<Listener> listeners;
244 SnapshotResources<RouteConfiguration> routes;
245 SnapshotResources<Secret> secrets;
246 final Snapshot previousSnapshot = cache.getSnapshot(DEFAULT_GROUP);
247 if (previousSnapshot == null) {
248 final SnapshotResources<?> emptyResources =
249 SnapshotResources.create(ImmutableList.of(), "empty_resources");
250 clusters = (SnapshotResources<Cluster>) emptyResources;
251 endpoints = (SnapshotResources<ClusterLoadAssignment>) emptyResources;
252 listeners = (SnapshotResources<Listener>) emptyResources;
253 routes = (SnapshotResources<RouteConfiguration>) emptyResources;
254 secrets = (SnapshotResources<Secret>) emptyResources;
255 } else {
256 clusters = previousSnapshot.clusters();
257 endpoints = previousSnapshot.endpoints();
258 listeners = previousSnapshot.listeners();
259 routes = previousSnapshot.routes();
260 secrets = previousSnapshot.secrets();
261 }
262 switch (resourceType) {
263 case CLUSTER:
264 clusters = (SnapshotResources<Cluster>) resources;
265 break;
266 case ENDPOINT:
267 endpoints = (SnapshotResources<ClusterLoadAssignment>) resources;
268 break;
269 case LISTENER:
270 listeners = (SnapshotResources<Listener>) resources;
271 break;
272 case ROUTE:
273 routes = (SnapshotResources<RouteConfiguration>) resources;
274 break;
275 case SECRET:
276 secrets = (SnapshotResources<Secret>) resources;
277 break;
278 default:
279
280 throw new Error();
281 }
282
283 cache.setSnapshot(DEFAULT_GROUP,
284 new CentralDogmaSnapshot(clusters, endpoints, listeners, routes, secrets));
285 }
286
287 @Override
288 public CompletionStage<Void> start(PluginContext context) {
289 return UnmodifiableFuture.completedFuture(null);
290 }
291
292 @Override
293 public CompletionStage<Void> stop(PluginContext context) {
294 stop = true;
295 return UnmodifiableFuture.completedFuture(null);
296 }
297
298 @FunctionalInterface
299 interface ThrowingBiConsumer<A, B> {
300 void accept(A a, B b) throws Exception;
301 }
302
303 private static final class LoggingDiscoveryServerCallbacks implements DiscoveryServerCallbacks {
304 @Override
305 public void onV3StreamRequest(long streamId, DiscoveryRequest request) throws RequestException {
306 logger.debug("Received v3 stream request. streamId: {}, version: {}, resource_names: {}, " +
307 "response_nonce: {}, type_url: {}",
308 streamId, request.getVersionInfo(), request.getResourceNamesList(),
309 request.getResponseNonce(), request.getTypeUrl());
310 }
311
312 @Override
313 public void onV3StreamDeltaRequest(long streamId, DeltaDiscoveryRequest request)
314 throws RequestException {}
315
316 @Override
317 public void onV3StreamResponse(long streamId, DiscoveryRequest request,
318 DiscoveryResponse response) {
319 logger.debug("Sent v3 stream response. streamId: {}, onV3StreamResponse: {}", streamId, response);
320 }
321 }
322 }