1   /*
2    * Copyright 2023 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.linecorp.centraldogma.xds.internal;
18  
19  import static com.linecorp.centraldogma.server.internal.storage.project.ProjectInitializer.INTERNAL_PROJECT_DOGMA;
20  import static com.linecorp.centraldogma.server.internal.storage.project.ProjectInitializer.initializeInternalRepos;
21  
22  import java.util.Collection;
23  import java.util.Map;
24  import java.util.concurrent.CompletableFuture;
25  import java.util.concurrent.CompletionStage;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.ScheduledExecutorService;
28  import java.util.concurrent.TimeUnit;
29  import java.util.function.BiFunction;
30  
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  
34  import com.google.common.collect.ImmutableList;
35  import com.google.common.collect.ImmutableList.Builder;
36  import com.google.protobuf.InvalidProtocolBufferException;
37  
38  import com.linecorp.armeria.common.util.UnmodifiableFuture;
39  import com.linecorp.armeria.server.ServerBuilder;
40  import com.linecorp.armeria.server.grpc.GrpcService;
41  import com.linecorp.centraldogma.common.Entry;
42  import com.linecorp.centraldogma.common.Revision;
43  import com.linecorp.centraldogma.server.command.CommandExecutor;
44  import com.linecorp.centraldogma.server.plugin.AllReplicasPlugin;
45  import com.linecorp.centraldogma.server.plugin.PluginContext;
46  import com.linecorp.centraldogma.server.plugin.PluginInitContext;
47  import com.linecorp.centraldogma.server.storage.repository.Repository;
48  import com.linecorp.centraldogma.server.storage.repository.RepositoryManager;
49  
50  import io.envoyproxy.controlplane.cache.Resources.ResourceType;
51  import io.envoyproxy.controlplane.cache.SnapshotResources;
52  import io.envoyproxy.controlplane.cache.v3.SimpleCache;
53  import io.envoyproxy.controlplane.cache.v3.Snapshot;
54  import io.envoyproxy.controlplane.server.DiscoveryServerCallbacks;
55  import io.envoyproxy.controlplane.server.V3DiscoveryServer;
56  import io.envoyproxy.controlplane.server.exception.RequestException;
57  import io.envoyproxy.envoy.config.cluster.v3.Cluster;
58  import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;
59  import io.envoyproxy.envoy.config.listener.v3.Listener;
60  import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
61  import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.Secret;
62  import io.envoyproxy.envoy.service.discovery.v3.DeltaDiscoveryRequest;
63  import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
64  import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
65  import io.netty.util.concurrent.DefaultThreadFactory;
66  
67  public final class ControlPlanePlugin extends AllReplicasPlugin {
68  
69      private static final Logger logger = LoggerFactory.getLogger(ControlPlanePlugin.class);
70  
71      public static final String CLUSTER_REPO = "clusters";
72      public static final String CLUSTER_FILE = Cluster.getDescriptor().getFullName() + ".json";
73  
74      public static final String ENDPOINT_REPO = "endpoints";
75      public static final String ENDPOINT_FILE = ClusterLoadAssignment.getDescriptor().getFullName() + ".json";
76  
77      public static final String LISTENER_REPO = "listeners";
78      public static final String LISTENER_FILE = Listener.getDescriptor().getFullName() + ".json";
79  
80      public static final String ROUTE_REPO = "routes";
81      public static final String ROUTE_FILE = RouteConfiguration.getDescriptor().getFullName() + ".json";
82  
83      public static final String DEFAULT_GROUP = "default_group";
84  
85      public static final long BACKOFF_SECONDS = 60; // Should we use backoff?
86  
87      private static final ScheduledExecutorService CONTROL_PLANE_EXECUTOR =
88              Executors.newSingleThreadScheduledExecutor(
89                      new DefaultThreadFactory("control-plane-executor", true));
90  
91      private volatile boolean stop;
92  
93      @Override
94      public void init(PluginInitContext pluginInitContext) {
95          final CommandExecutor commandExecutor = pluginInitContext.commandExecutor();
96          final long currentTimeMillis = System.currentTimeMillis();
97          initializeInternalRepos(commandExecutor, currentTimeMillis,
98                                  ImmutableList.of(CLUSTER_REPO, ENDPOINT_REPO, LISTENER_REPO, ROUTE_REPO));
99  
100         final ServerBuilder sb = pluginInitContext.serverBuilder();
101 
102         // TODO(minwoox): Implement better cache implementation that updates only changed resources.
103         final SimpleCache<String> cache = new SimpleCache<>(node -> DEFAULT_GROUP);
104         final RepositoryManager repositoryManager = pluginInitContext.projectManager()
105                                                                      .get(INTERNAL_PROJECT_DOGMA)
106                                                                      .repos();
107         watchRepository(repositoryManager.get(CLUSTER_REPO), CLUSTER_FILE, Revision.INIT,
108                         (entries, revision) -> updateClusters(entries, revision, cache));
109         watchRepository(repositoryManager.get(ENDPOINT_REPO), ENDPOINT_FILE, Revision.INIT,
110                         (entries, revision) -> updateEndpoints(entries, revision, cache));
111         watchRepository(repositoryManager.get(LISTENER_REPO), LISTENER_FILE, Revision.INIT,
112                         (entries, revision) -> updateListeners(entries, revision, cache));
113         watchRepository(repositoryManager.get(ROUTE_REPO), ROUTE_FILE, Revision.INIT,
114                         (entries, revision) -> routes(entries, revision, cache));
115         final V3DiscoveryServer server = new V3DiscoveryServer(new LoggingDiscoveryServerCallbacks(), cache);
116         // xDS, ADS
117         final GrpcService grpcService = GrpcService.builder()
118                                                    .addService(server.getClusterDiscoveryServiceImpl())
119                                                    .addService(server.getEndpointDiscoveryServiceImpl())
120                                                    .addService(server.getListenerDiscoveryServiceImpl())
121                                                    .addService(server.getRouteDiscoveryServiceImpl())
122                                                    .addService(server.getSecretDiscoveryServiceImpl())
123                                                    .addService(server.getAggregatedDiscoveryServiceImpl())
124                                                    .useBlockingTaskExecutor(true)
125                                                    .build();
126         sb.route().build(grpcService);
127     }
128 
129     private void watchRepository(Repository repository, String fileName, Revision revision,
130                                  ThrowingBiConsumer<Collection<Entry<?>>, Revision> updatingSnapshotFunction) {
131         final CompletableFuture<Revision> future = repository.watch(revision, "/**");
132         future.handleAsync((BiFunction<Revision, Throwable, Void>) (watchedRevision, cause) -> {
133             if (stop) {
134                 return null;
135             }
136             if (cause != null) {
137                 logger.warn("Unexpected exception is raised while watching {}. Try watching after {} seconds..",
138                             repository, BACKOFF_SECONDS, cause);
139                 CONTROL_PLANE_EXECUTOR.schedule(
140                         () -> watchRepository(repository, fileName, revision, updatingSnapshotFunction),
141                         BACKOFF_SECONDS, TimeUnit.SECONDS);
142                 return null;
143             }
144             final CompletableFuture<Map<String, Entry<?>>> entriesFuture =
145                     repository.find(watchedRevision, "/**/" + fileName);
146             entriesFuture.handleAsync(
147                     (BiFunction<Map<String, Entry<?>>, Throwable, Void>) (entries, findCause) -> {
148                         if (stop) {
149                             return null;
150                         }
151                         if (findCause != null) {
152                             logger.warn(
153                                     "Unexpected exception is raised while finding {}" +
154                                     " with revision {} from {}. Try watching after {} seconds..",
155                                     fileName, watchedRevision, repository, BACKOFF_SECONDS, findCause);
156                             CONTROL_PLANE_EXECUTOR.schedule(
157                                     () -> watchRepository(repository, fileName,
158                                                           watchedRevision, updatingSnapshotFunction),
159                                     BACKOFF_SECONDS, TimeUnit.SECONDS);
160                             return null;
161                         }
162                         try {
163                             updatingSnapshotFunction.accept(entries.values(), watchedRevision);
164                             CONTROL_PLANE_EXECUTOR.execute(() -> watchRepository(
165                                     repository, fileName, watchedRevision, updatingSnapshotFunction));
166                         } catch (Throwable t) {
167                             logger.warn("Unexpected exception is raised while building from {} using {}" +
168                                         ". Try watching after {} seconds..",
169                                         repository.name(), entries.values(), BACKOFF_SECONDS, t);
170                             CONTROL_PLANE_EXECUTOR.schedule(
171                                     () -> watchRepository(
172                                             repository, fileName, watchedRevision, updatingSnapshotFunction),
173                                     BACKOFF_SECONDS, TimeUnit.SECONDS);
174                         }
175                         return null;
176                     },
177                     CONTROL_PLANE_EXECUTOR);
178             return null;
179         }, CONTROL_PLANE_EXECUTOR);
180     }
181 
182     private static boolean updateClusters(Collection<Entry<?>> entries, Revision revision,
183                                           SimpleCache<String> cache) throws InvalidProtocolBufferException {
184         final Builder<Cluster> clustersBuilder = ImmutableList.builder();
185         for (Entry<?> entry : entries) {
186             final Cluster.Builder clusterBuilder = Cluster.newBuilder();
187             JsonFormatUtil.parser().merge(entry.contentAsText(), clusterBuilder);
188             clustersBuilder.add(clusterBuilder.build());
189         }
190 
191         setNewSnapshot(cache, ResourceType.CLUSTER,
192                        CentralDogmaSnapshotResources.create(clustersBuilder.build(), revision));
193         return true;
194     }
195 
196     private static boolean updateEndpoints(Collection<Entry<?>> entries, Revision revision,
197                                            SimpleCache<String> cache) throws InvalidProtocolBufferException {
198         final Builder<ClusterLoadAssignment> endpointsBuilder = ImmutableList.builder();
199         for (Entry<?> entry : entries) {
200             final ClusterLoadAssignment.Builder endpointBuilder = ClusterLoadAssignment.newBuilder();
201             JsonFormatUtil.parser().merge(entry.contentAsText(), endpointBuilder);
202             endpointsBuilder.add(endpointBuilder.build());
203         }
204 
205         setNewSnapshot(cache, ResourceType.ENDPOINT,
206                        CentralDogmaSnapshotResources.create(endpointsBuilder.build(), revision));
207         return true;
208     }
209 
210     private static boolean updateListeners(Collection<Entry<?>> entries, Revision revision,
211                                            SimpleCache<String> cache) throws InvalidProtocolBufferException {
212         final Builder<Listener> listenersBuilder = ImmutableList.builder();
213         for (Entry<?> entry : entries) {
214             final Listener.Builder listenerBuilder = Listener.newBuilder();
215             JsonFormatUtil.parser().merge(entry.contentAsText(), listenerBuilder);
216             listenersBuilder.add(listenerBuilder.build());
217         }
218 
219         setNewSnapshot(cache, ResourceType.LISTENER,
220                        CentralDogmaSnapshotResources.create(listenersBuilder.build(), revision));
221         return true;
222     }
223 
224     private static boolean routes(Collection<Entry<?>> entries, Revision revision, SimpleCache<String> cache)
225             throws InvalidProtocolBufferException {
226         final Builder<RouteConfiguration> routesBuilder = ImmutableList.builder();
227         for (Entry<?> entry : entries) {
228             final RouteConfiguration.Builder routeBuilder = RouteConfiguration.newBuilder();
229             JsonFormatUtil.parser().merge(entry.contentAsText(), routeBuilder);
230             routesBuilder.add(routeBuilder.build());
231         }
232 
233         setNewSnapshot(cache, ResourceType.ROUTE,
234                        CentralDogmaSnapshotResources.create(routesBuilder.build(), revision));
235         return true;
236     }
237 
238     @SuppressWarnings("unchecked")
239     private static void setNewSnapshot(SimpleCache<String> cache, ResourceType resourceType,
240                                        SnapshotResources<?> resources) {
241         SnapshotResources<Cluster> clusters;
242         SnapshotResources<ClusterLoadAssignment> endpoints;
243         SnapshotResources<Listener> listeners;
244         SnapshotResources<RouteConfiguration> routes;
245         SnapshotResources<Secret> secrets;
246         final Snapshot previousSnapshot = cache.getSnapshot(DEFAULT_GROUP);
247         if (previousSnapshot == null) {
248             final SnapshotResources<?> emptyResources =
249                     SnapshotResources.create(ImmutableList.of(), "empty_resources");
250             clusters = (SnapshotResources<Cluster>) emptyResources;
251             endpoints = (SnapshotResources<ClusterLoadAssignment>) emptyResources;
252             listeners = (SnapshotResources<Listener>) emptyResources;
253             routes = (SnapshotResources<RouteConfiguration>) emptyResources;
254             secrets = (SnapshotResources<Secret>) emptyResources;
255         } else {
256             clusters = previousSnapshot.clusters();
257             endpoints = previousSnapshot.endpoints();
258             listeners = previousSnapshot.listeners();
259             routes = previousSnapshot.routes();
260             secrets = previousSnapshot.secrets();
261         }
262         switch (resourceType) {
263             case CLUSTER:
264                 clusters = (SnapshotResources<Cluster>) resources;
265                 break;
266             case ENDPOINT:
267                 endpoints = (SnapshotResources<ClusterLoadAssignment>) resources;
268                 break;
269             case LISTENER:
270                 listeners = (SnapshotResources<Listener>) resources;
271                 break;
272             case ROUTE:
273                 routes = (SnapshotResources<RouteConfiguration>) resources;
274                 break;
275             case SECRET:
276                 secrets = (SnapshotResources<Secret>) resources;
277                 break;
278             default:
279                 // Should never reach here.
280                 throw new Error();
281         }
282 
283         cache.setSnapshot(DEFAULT_GROUP,
284                           new CentralDogmaSnapshot(clusters, endpoints, listeners, routes, secrets));
285     }
286 
287     @Override
288     public CompletionStage<Void> start(PluginContext context) {
289         return UnmodifiableFuture.completedFuture(null);
290     }
291 
292     @Override
293     public CompletionStage<Void> stop(PluginContext context) {
294         stop = true;
295         return UnmodifiableFuture.completedFuture(null);
296     }
297 
298     @FunctionalInterface
299     interface ThrowingBiConsumer<A, B> {
300         void accept(A a, B b) throws Exception;
301     }
302 
303     private static final class LoggingDiscoveryServerCallbacks implements DiscoveryServerCallbacks {
304         @Override
305         public void onV3StreamRequest(long streamId, DiscoveryRequest request) throws RequestException {
306             logger.debug("Received v3 stream request. streamId: {}, version: {}, resource_names: {}, " +
307                          "response_nonce: {}, type_url: {}",
308                          streamId, request.getVersionInfo(), request.getResourceNamesList(),
309                          request.getResponseNonce(), request.getTypeUrl());
310         }
311 
312         @Override
313         public void onV3StreamDeltaRequest(long streamId, DeltaDiscoveryRequest request)
314                 throws RequestException {}
315 
316         @Override
317         public void onV3StreamResponse(long streamId, DiscoveryRequest request,
318                                        DiscoveryResponse response) {
319             logger.debug("Sent v3 stream response. streamId: {}, onV3StreamResponse: {}", streamId, response);
320         }
321     }
322 }