1   /*
2    * Copyright 2019 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.server;
17  
18  import static com.google.common.collect.ImmutableList.toImmutableList;
19  import static com.google.common.collect.ImmutableMap.toImmutableMap;
20  import static java.util.Objects.requireNonNull;
21  
22  import java.time.Duration;
23  import java.util.List;
24  import java.util.Map;
25  import java.util.Map.Entry;
26  import java.util.Optional;
27  import java.util.ServiceLoader;
28  import java.util.concurrent.CompletableFuture;
29  import java.util.concurrent.CompletionStage;
30  import java.util.concurrent.Executor;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.ScheduledExecutorService;
33  import java.util.stream.Collectors;
34  
35  import javax.annotation.Nullable;
36  
37  import org.slf4j.Logger;
38  import org.slf4j.LoggerFactory;
39  
40  import com.google.common.annotations.VisibleForTesting;
41  import com.google.common.collect.ImmutableList;
42  import com.google.common.collect.ImmutableMap;
43  import com.google.common.collect.Iterables;
44  import com.spotify.futures.CompletableFutures;
45  
46  import com.linecorp.armeria.common.util.StartStopSupport;
47  import com.linecorp.centraldogma.server.command.CommandExecutor;
48  import com.linecorp.centraldogma.server.mirror.MirrorAccessController;
49  import com.linecorp.centraldogma.server.plugin.Plugin;
50  import com.linecorp.centraldogma.server.plugin.PluginContext;
51  import com.linecorp.centraldogma.server.plugin.PluginTarget;
52  import com.linecorp.centraldogma.server.storage.project.InternalProjectInitializer;
53  import com.linecorp.centraldogma.server.storage.project.ProjectManager;
54  
55  import io.micrometer.core.instrument.MeterRegistry;
56  import io.netty.util.concurrent.DefaultThreadFactory;
57  
58  /**
59   * Provides asynchronous start-stop life cycle support for the {@link Plugin}s.
60   */
61  final class PluginGroup {
62  
63      private static final Logger logger = LoggerFactory.getLogger(PluginGroup.class);
64  
65      /**
66       * Returns a new {@link PluginGroup} which holds the {@link Plugin}s loaded from the classpath.
67       * {@code null} is returned if there is no {@link Plugin} whose target equals to the specified
68       * {@code target}.
69       *
70       * @param target the {@link PluginTarget} which would be loaded
71       */
72      @VisibleForTesting
73      @Nullable
74      static PluginGroup loadPlugins(PluginTarget target, CentralDogmaConfig config) {
75          return loadPlugins(PluginGroup.class.getClassLoader(), config, ImmutableList.of()).get(target);
76      }
77  
78      /**
79       * Returns a new {@link PluginGroup} which holds the {@link Plugin}s loaded from the classpath.
80       * An empty map is returned if there is no {@link Plugin} whose target equals to the specified
81       * {@code target}.
82       *
83       * @param classLoader which is used to load the {@link Plugin}s
84       */
85      static Map<PluginTarget, PluginGroup> loadPlugins(ClassLoader classLoader, CentralDogmaConfig config,
86                                                        List<Plugin> plugins) {
87          requireNonNull(classLoader, "classLoader");
88          requireNonNull(config, "config");
89  
90          final ServiceLoader<Plugin> loader = ServiceLoader.load(Plugin.class, classLoader);
91          final ImmutableMap.Builder<Class<?>, Plugin> allPlugins = new ImmutableMap.Builder<>();
92          for (Plugin plugin : Iterables.concat(plugins, loader)) {
93              if (plugin.isEnabled(config)) {
94                  allPlugins.put(plugin.configType(), plugin);
95              }
96          }
97  
98          // IllegalArgumentException is thrown if there are duplicate keys.
99          final Map<Class<?>, Plugin> pluginMap = allPlugins.build();
100         if (pluginMap.isEmpty()) {
101             return ImmutableMap.of();
102         }
103 
104         final Map<PluginTarget, PluginGroup> pluginGroups =
105                 pluginMap.values()
106                          .stream()
107                          .collect(Collectors.groupingBy(plugin -> plugin.target(config)))
108                          .entrySet()
109                          .stream()
110                          .collect(toImmutableMap(Entry::getKey, e -> {
111                              final PluginTarget target = e.getKey();
112                              final List<Plugin> targetPlugins = e.getValue();
113                              final String poolName =
114                                      "plugins-for-" + target.name().toLowerCase().replace("_", "-");
115                              return new PluginGroup(targetPlugins,
116                                                     Executors.newSingleThreadExecutor(
117                                                             new DefaultThreadFactory(poolName, true)));
118                          }));
119 
120         pluginGroups.forEach((target, group) -> {
121             logger.debug("Loaded plugins for target {}: {}", target,
122                          group.plugins().stream().map(plugin -> plugin.getClass().getName())
123                               .collect(toImmutableList()));
124         });
125         return pluginGroups;
126     }
127 
128     private final List<Plugin> plugins;
129     private final PluginGroupStartStop startStop;
130 
131     private PluginGroup(Iterable<Plugin> plugins, Executor executor) {
132         this.plugins = ImmutableList.copyOf(requireNonNull(plugins, "plugins"));
133         startStop = new PluginGroupStartStop(requireNonNull(executor, "executor"));
134     }
135 
136     /**
137      * Returns the {@link Plugin}s managed by this {@link PluginGroup}.
138      */
139     List<Plugin> plugins() {
140         return plugins;
141     }
142 
143     /**
144      * Returns the first {@link Plugin} of the specified {@code clazz} as wrapped by an {@link Optional}.
145      */
146     @Nullable
147     <T extends Plugin> T findFirstPlugin(Class<T> clazz) {
148         requireNonNull(clazz, "clazz");
149         return plugins.stream().filter(clazz::isInstance).map(clazz::cast).findFirst().orElse(null);
150     }
151 
152     /**
153      * Starts the {@link Plugin}s managed by this {@link PluginGroup}.
154      */
155     CompletableFuture<Void> start(CentralDogmaConfig config, ProjectManager projectManager,
156                                   CommandExecutor commandExecutor, MeterRegistry meterRegistry,
157                                   ScheduledExecutorService purgeWorker,
158                                   InternalProjectInitializer internalProjectInitializer,
159                                   MirrorAccessController mirrorAccessController) {
160         final PluginContext context = new PluginContext(config, projectManager, commandExecutor, meterRegistry,
161                                                         purgeWorker, internalProjectInitializer,
162                                                         mirrorAccessController);
163         return startStop.start(context, context, true);
164     }
165 
166     /**
167      * Stops the {@link Plugin}s managed by this {@link PluginGroup}.
168      */
169     CompletableFuture<Void> stop(CentralDogmaConfig config, ProjectManager projectManager,
170                                  CommandExecutor commandExecutor, MeterRegistry meterRegistry,
171                                  ScheduledExecutorService purgeWorker,
172                                  InternalProjectInitializer internalProjectInitializer,
173                                  MirrorAccessController mirrorAccessController) {
174         return startStop.stop(
175                 new PluginContext(config, projectManager, commandExecutor, meterRegistry, purgeWorker,
176                                   internalProjectInitializer, mirrorAccessController));
177     }
178 
179     private class PluginGroupStartStop extends StartStopSupport<PluginContext, PluginContext, Void, Void> {
180 
181         PluginGroupStartStop(Executor executor) {
182             super(executor);
183         }
184 
185         @Override
186         protected CompletionStage<Void> doStart(@Nullable PluginContext arg) throws Exception {
187             assert arg != null;
188             // Wait until the internal project is initialized.
189             arg.internalProjectInitializer().whenInitialized().get();
190             final List<CompletionStage<Void>> futures = plugins.stream().map(
191                     plugin -> {
192                         logger.info("Starting plugin: {}", plugin);
193                         final long start = System.nanoTime();
194                         return plugin.start(arg)
195                                      .thenAccept(unused -> logger.info(
196                                              "Plugin started: {} in {} seconds", plugin,
197                                              Duration.ofNanos(System.nanoTime() - start).getSeconds()))
198                                      .exceptionally(cause -> {
199                                          logger.info("Failed to start plugin: {}", plugin, cause);
200                                          return null;
201                                      });
202                     }).collect(toImmutableList());
203             return CompletableFutures.allAsList(futures).thenApply(unused -> null);
204         }
205 
206         @Override
207         protected CompletionStage<Void> doStop(@Nullable PluginContext arg) throws Exception {
208             assert arg != null;
209             final List<CompletionStage<Void>> futures = plugins.stream().map(
210                     plugin -> {
211                         logger.info("Stopping plugin: {}", plugin);
212                         final long start = System.nanoTime();
213                         return plugin.stop(arg)
214                                      .thenAccept(unused -> logger.info(
215                                              "Stopped plugin: {} in {} seconds.", plugin,
216                                              Duration.ofNanos(System.nanoTime() - start).getSeconds()))
217                                      .exceptionally(cause -> {
218                                          logger.info("Failed to stop plugin: {}", plugin, cause);
219                                          return null;
220                                      });
221                     }).collect(toImmutableList());
222             return CompletableFutures.allAsList(futures).thenApply(unused -> null);
223         }
224     }
225 }