1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.server;
17
18 import static com.google.common.collect.ImmutableList.toImmutableList;
19 import static com.google.common.collect.ImmutableMap.toImmutableMap;
20 import static java.util.Objects.requireNonNull;
21
22 import java.time.Duration;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Map.Entry;
26 import java.util.Optional;
27 import java.util.ServiceLoader;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.concurrent.CompletionStage;
30 import java.util.concurrent.Executor;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.ScheduledExecutorService;
33 import java.util.stream.Collectors;
34
35 import javax.annotation.Nullable;
36
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import com.google.common.annotations.VisibleForTesting;
41 import com.google.common.collect.ImmutableList;
42 import com.google.common.collect.ImmutableMap;
43 import com.google.common.collect.Iterables;
44 import com.spotify.futures.CompletableFutures;
45
46 import com.linecorp.armeria.common.util.StartStopSupport;
47 import com.linecorp.centraldogma.server.command.CommandExecutor;
48 import com.linecorp.centraldogma.server.mirror.MirrorAccessController;
49 import com.linecorp.centraldogma.server.plugin.Plugin;
50 import com.linecorp.centraldogma.server.plugin.PluginContext;
51 import com.linecorp.centraldogma.server.plugin.PluginTarget;
52 import com.linecorp.centraldogma.server.storage.project.InternalProjectInitializer;
53 import com.linecorp.centraldogma.server.storage.project.ProjectManager;
54
55 import io.micrometer.core.instrument.MeterRegistry;
56 import io.netty.util.concurrent.DefaultThreadFactory;
57
58
59
60
61 final class PluginGroup {
62
63 private static final Logger logger = LoggerFactory.getLogger(PluginGroup.class);
64
65
66
67
68
69
70
71
72 @VisibleForTesting
73 @Nullable
74 static PluginGroup loadPlugins(PluginTarget target, CentralDogmaConfig config) {
75 return loadPlugins(PluginGroup.class.getClassLoader(), config, ImmutableList.of()).get(target);
76 }
77
78
79
80
81
82
83
84
85 static Map<PluginTarget, PluginGroup> loadPlugins(ClassLoader classLoader, CentralDogmaConfig config,
86 List<Plugin> plugins) {
87 requireNonNull(classLoader, "classLoader");
88 requireNonNull(config, "config");
89
90 final ServiceLoader<Plugin> loader = ServiceLoader.load(Plugin.class, classLoader);
91 final ImmutableMap.Builder<Class<?>, Plugin> allPlugins = new ImmutableMap.Builder<>();
92 for (Plugin plugin : Iterables.concat(plugins, loader)) {
93 if (plugin.isEnabled(config)) {
94 allPlugins.put(plugin.configType(), plugin);
95 }
96 }
97
98
99 final Map<Class<?>, Plugin> pluginMap = allPlugins.build();
100 if (pluginMap.isEmpty()) {
101 return ImmutableMap.of();
102 }
103
104 final Map<PluginTarget, PluginGroup> pluginGroups =
105 pluginMap.values()
106 .stream()
107 .collect(Collectors.groupingBy(plugin -> plugin.target(config)))
108 .entrySet()
109 .stream()
110 .collect(toImmutableMap(Entry::getKey, e -> {
111 final PluginTarget target = e.getKey();
112 final List<Plugin> targetPlugins = e.getValue();
113 final String poolName =
114 "plugins-for-" + target.name().toLowerCase().replace("_", "-");
115 return new PluginGroup(targetPlugins,
116 Executors.newSingleThreadExecutor(
117 new DefaultThreadFactory(poolName, true)));
118 }));
119
120 pluginGroups.forEach((target, group) -> {
121 logger.debug("Loaded plugins for target {}: {}", target,
122 group.plugins().stream().map(plugin -> plugin.getClass().getName())
123 .collect(toImmutableList()));
124 });
125 return pluginGroups;
126 }
127
128 private final List<Plugin> plugins;
129 private final PluginGroupStartStop startStop;
130
131 private PluginGroup(Iterable<Plugin> plugins, Executor executor) {
132 this.plugins = ImmutableList.copyOf(requireNonNull(plugins, "plugins"));
133 startStop = new PluginGroupStartStop(requireNonNull(executor, "executor"));
134 }
135
136
137
138
139 List<Plugin> plugins() {
140 return plugins;
141 }
142
143
144
145
146 @Nullable
147 <T extends Plugin> T findFirstPlugin(Class<T> clazz) {
148 requireNonNull(clazz, "clazz");
149 return plugins.stream().filter(clazz::isInstance).map(clazz::cast).findFirst().orElse(null);
150 }
151
152
153
154
155 CompletableFuture<Void> start(CentralDogmaConfig config, ProjectManager projectManager,
156 CommandExecutor commandExecutor, MeterRegistry meterRegistry,
157 ScheduledExecutorService purgeWorker,
158 InternalProjectInitializer internalProjectInitializer,
159 MirrorAccessController mirrorAccessController) {
160 final PluginContext context = new PluginContext(config, projectManager, commandExecutor, meterRegistry,
161 purgeWorker, internalProjectInitializer,
162 mirrorAccessController);
163 return startStop.start(context, context, true);
164 }
165
166
167
168
169 CompletableFuture<Void> stop(CentralDogmaConfig config, ProjectManager projectManager,
170 CommandExecutor commandExecutor, MeterRegistry meterRegistry,
171 ScheduledExecutorService purgeWorker,
172 InternalProjectInitializer internalProjectInitializer,
173 MirrorAccessController mirrorAccessController) {
174 return startStop.stop(
175 new PluginContext(config, projectManager, commandExecutor, meterRegistry, purgeWorker,
176 internalProjectInitializer, mirrorAccessController));
177 }
178
179 private class PluginGroupStartStop extends StartStopSupport<PluginContext, PluginContext, Void, Void> {
180
181 PluginGroupStartStop(Executor executor) {
182 super(executor);
183 }
184
185 @Override
186 protected CompletionStage<Void> doStart(@Nullable PluginContext arg) throws Exception {
187 assert arg != null;
188
189 arg.internalProjectInitializer().whenInitialized().get();
190 final List<CompletionStage<Void>> futures = plugins.stream().map(
191 plugin -> {
192 logger.info("Starting plugin: {}", plugin);
193 final long start = System.nanoTime();
194 return plugin.start(arg)
195 .thenAccept(unused -> logger.info(
196 "Plugin started: {} in {} seconds", plugin,
197 Duration.ofNanos(System.nanoTime() - start).getSeconds()))
198 .exceptionally(cause -> {
199 logger.info("Failed to start plugin: {}", plugin, cause);
200 return null;
201 });
202 }).collect(toImmutableList());
203 return CompletableFutures.allAsList(futures).thenApply(unused -> null);
204 }
205
206 @Override
207 protected CompletionStage<Void> doStop(@Nullable PluginContext arg) throws Exception {
208 assert arg != null;
209 final List<CompletionStage<Void>> futures = plugins.stream().map(
210 plugin -> {
211 logger.info("Stopping plugin: {}", plugin);
212 final long start = System.nanoTime();
213 return plugin.stop(arg)
214 .thenAccept(unused -> logger.info(
215 "Stopped plugin: {} in {} seconds.", plugin,
216 Duration.ofNanos(System.nanoTime() - start).getSeconds()))
217 .exceptionally(cause -> {
218 logger.info("Failed to stop plugin: {}", plugin, cause);
219 return null;
220 });
221 }).collect(toImmutableList());
222 return CompletableFutures.allAsList(futures).thenApply(unused -> null);
223 }
224 }
225 }