1   /*
2    * Copyright 2019 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.server;
17  
18  import static com.google.common.collect.ImmutableList.toImmutableList;
19  import static com.google.common.collect.ImmutableMap.toImmutableMap;
20  import static java.util.Objects.requireNonNull;
21  
22  import java.util.List;
23  import java.util.Map;
24  import java.util.Map.Entry;
25  import java.util.Optional;
26  import java.util.ServiceLoader;
27  import java.util.concurrent.CompletableFuture;
28  import java.util.concurrent.CompletionStage;
29  import java.util.concurrent.Executor;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.ScheduledExecutorService;
32  import java.util.stream.Collectors;
33  
34  import javax.annotation.Nullable;
35  
36  import org.slf4j.Logger;
37  import org.slf4j.LoggerFactory;
38  
39  import com.google.common.annotations.VisibleForTesting;
40  import com.google.common.collect.ImmutableList;
41  import com.google.common.collect.ImmutableMap;
42  import com.google.common.collect.Iterables;
43  import com.spotify.futures.CompletableFutures;
44  
45  import com.linecorp.armeria.common.util.StartStopSupport;
46  import com.linecorp.centraldogma.server.command.CommandExecutor;
47  import com.linecorp.centraldogma.server.mirror.MirrorAccessController;
48  import com.linecorp.centraldogma.server.plugin.Plugin;
49  import com.linecorp.centraldogma.server.plugin.PluginContext;
50  import com.linecorp.centraldogma.server.plugin.PluginTarget;
51  import com.linecorp.centraldogma.server.storage.project.InternalProjectInitializer;
52  import com.linecorp.centraldogma.server.storage.project.ProjectManager;
53  
54  import io.micrometer.core.instrument.MeterRegistry;
55  import io.netty.util.concurrent.DefaultThreadFactory;
56  
57  /**
58   * Provides asynchronous start-stop life cycle support for the {@link Plugin}s.
59   */
60  final class PluginGroup {
61  
62      private static final Logger logger = LoggerFactory.getLogger(PluginGroup.class);
63  
64      /**
65       * Returns a new {@link PluginGroup} which holds the {@link Plugin}s loaded from the classpath.
66       * {@code null} is returned if there is no {@link Plugin} whose target equals to the specified
67       * {@code target}.
68       *
69       * @param target the {@link PluginTarget} which would be loaded
70       */
71      @VisibleForTesting
72      @Nullable
73      static PluginGroup loadPlugins(PluginTarget target, CentralDogmaConfig config) {
74          return loadPlugins(PluginGroup.class.getClassLoader(), config, ImmutableList.of()).get(target);
75      }
76  
77      /**
78       * Returns a new {@link PluginGroup} which holds the {@link Plugin}s loaded from the classpath.
79       * An empty map is returned if there is no {@link Plugin} whose target equals to the specified
80       * {@code target}.
81       *
82       * @param classLoader which is used to load the {@link Plugin}s
83       */
84      static Map<PluginTarget, PluginGroup> loadPlugins(ClassLoader classLoader, CentralDogmaConfig config,
85                                                        List<Plugin> plugins) {
86          requireNonNull(classLoader, "classLoader");
87          requireNonNull(config, "config");
88  
89          final ServiceLoader<Plugin> loader = ServiceLoader.load(Plugin.class, classLoader);
90          final ImmutableMap.Builder<Class<?>, Plugin> allPlugins = new ImmutableMap.Builder<>();
91          for (Plugin plugin : Iterables.concat(plugins, loader)) {
92              if (plugin.isEnabled(config)) {
93                  allPlugins.put(plugin.configType(), plugin);
94              }
95          }
96  
97          // IllegalArgumentException is thrown if there are duplicate keys.
98          final Map<Class<?>, Plugin> pluginMap = allPlugins.build();
99          if (pluginMap.isEmpty()) {
100             return ImmutableMap.of();
101         }
102 
103         final Map<PluginTarget, PluginGroup> pluginGroups =
104                 pluginMap.values()
105                          .stream()
106                          .collect(Collectors.groupingBy(plugin -> plugin.target(config)))
107                          .entrySet()
108                          .stream()
109                          .collect(toImmutableMap(Entry::getKey, e -> {
110                              final PluginTarget target = e.getKey();
111                              final List<Plugin> targetPlugins = e.getValue();
112                              final String poolName =
113                                      "plugins-for-" + target.name().toLowerCase().replace("_", "-");
114                              return new PluginGroup(targetPlugins,
115                                                     Executors.newSingleThreadExecutor(
116                                                             new DefaultThreadFactory(poolName, true)));
117                          }));
118 
119         pluginGroups.forEach((target, group) -> {
120             logger.debug("Loaded plugins for target {}: {}", target,
121                          group.plugins().stream().map(plugin -> plugin.getClass().getName())
122                               .collect(toImmutableList()));
123         });
124         return pluginGroups;
125     }
126 
127     private final List<Plugin> plugins;
128     private final PluginGroupStartStop startStop;
129 
130     private PluginGroup(Iterable<Plugin> plugins, Executor executor) {
131         this.plugins = ImmutableList.copyOf(requireNonNull(plugins, "plugins"));
132         startStop = new PluginGroupStartStop(requireNonNull(executor, "executor"));
133     }
134 
135     /**
136      * Returns the {@link Plugin}s managed by this {@link PluginGroup}.
137      */
138     List<Plugin> plugins() {
139         return plugins;
140     }
141 
142     /**
143      * Returns the first {@link Plugin} of the specified {@code clazz} as wrapped by an {@link Optional}.
144      */
145     @Nullable
146     <T extends Plugin> T findFirstPlugin(Class<T> clazz) {
147         requireNonNull(clazz, "clazz");
148         return plugins.stream().filter(clazz::isInstance).map(clazz::cast).findFirst().orElse(null);
149     }
150 
151     /**
152      * Starts the {@link Plugin}s managed by this {@link PluginGroup}.
153      */
154     CompletableFuture<Void> start(CentralDogmaConfig config, ProjectManager projectManager,
155                                   CommandExecutor commandExecutor, MeterRegistry meterRegistry,
156                                   ScheduledExecutorService purgeWorker,
157                                   InternalProjectInitializer internalProjectInitializer,
158                                   MirrorAccessController mirrorAccessController) {
159         final PluginContext context = new PluginContext(config, projectManager, commandExecutor, meterRegistry,
160                                                         purgeWorker, internalProjectInitializer,
161                                                         mirrorAccessController);
162         return startStop.start(context, context, true);
163     }
164 
165     /**
166      * Stops the {@link Plugin}s managed by this {@link PluginGroup}.
167      */
168     CompletableFuture<Void> stop(CentralDogmaConfig config, ProjectManager projectManager,
169                                  CommandExecutor commandExecutor, MeterRegistry meterRegistry,
170                                  ScheduledExecutorService purgeWorker,
171                                  InternalProjectInitializer internalProjectInitializer,
172                                  MirrorAccessController mirrorAccessController) {
173         return startStop.stop(
174                 new PluginContext(config, projectManager, commandExecutor, meterRegistry, purgeWorker,
175                                   internalProjectInitializer, mirrorAccessController));
176     }
177 
178     private class PluginGroupStartStop extends StartStopSupport<PluginContext, PluginContext, Void, Void> {
179 
180         PluginGroupStartStop(Executor executor) {
181             super(executor);
182         }
183 
184         @Override
185         protected CompletionStage<Void> doStart(@Nullable PluginContext arg) throws Exception {
186             assert arg != null;
187             // Wait until the internal project is initialized.
188             arg.internalProjectInitializer().whenInitialized().get();
189             final List<CompletionStage<Void>> futures = plugins.stream().map(
190                     plugin -> plugin.start(arg)
191                                     .thenAccept(unused -> logger.info("Plugin started: {}", plugin))
192                                     .exceptionally(cause -> {
193                                         logger.info("Failed to start plugin: {}", plugin, cause);
194                                         return null;
195                                     })).collect(toImmutableList());
196             return CompletableFutures.allAsList(futures).thenApply(unused -> null);
197         }
198 
199         @Override
200         protected CompletionStage<Void> doStop(@Nullable PluginContext arg) throws Exception {
201             assert arg != null;
202             final List<CompletionStage<Void>> futures = plugins.stream().map(
203                     plugin -> plugin.stop(arg)
204                                     .thenAccept(unused -> logger.info("Plugin stopped: {}", plugin))
205                                     .exceptionally(cause -> {
206                                         logger.info("Failed to stop plugin: {}", plugin, cause);
207                                         return null;
208                                     })).collect(toImmutableList());
209             return CompletableFutures.allAsList(futures).thenApply(unused -> null);
210         }
211     }
212 }