1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.server;
17
18 import static com.google.common.collect.ImmutableList.toImmutableList;
19 import static com.google.common.collect.ImmutableMap.toImmutableMap;
20 import static java.util.Objects.requireNonNull;
21
22 import java.util.List;
23 import java.util.Map;
24 import java.util.Map.Entry;
25 import java.util.Optional;
26 import java.util.ServiceLoader;
27 import java.util.concurrent.CompletableFuture;
28 import java.util.concurrent.CompletionStage;
29 import java.util.concurrent.Executor;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.ScheduledExecutorService;
32 import java.util.stream.Collectors;
33
34 import javax.annotation.Nullable;
35
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import com.google.common.annotations.VisibleForTesting;
40 import com.google.common.collect.ImmutableList;
41 import com.google.common.collect.ImmutableMap;
42 import com.google.common.collect.Iterables;
43 import com.spotify.futures.CompletableFutures;
44
45 import com.linecorp.armeria.common.util.StartStopSupport;
46 import com.linecorp.centraldogma.server.command.CommandExecutor;
47 import com.linecorp.centraldogma.server.mirror.MirrorAccessController;
48 import com.linecorp.centraldogma.server.plugin.Plugin;
49 import com.linecorp.centraldogma.server.plugin.PluginContext;
50 import com.linecorp.centraldogma.server.plugin.PluginTarget;
51 import com.linecorp.centraldogma.server.storage.project.InternalProjectInitializer;
52 import com.linecorp.centraldogma.server.storage.project.ProjectManager;
53
54 import io.micrometer.core.instrument.MeterRegistry;
55 import io.netty.util.concurrent.DefaultThreadFactory;
56
57
58
59
60 final class PluginGroup {
61
62 private static final Logger logger = LoggerFactory.getLogger(PluginGroup.class);
63
64
65
66
67
68
69
70
71 @VisibleForTesting
72 @Nullable
73 static PluginGroup loadPlugins(PluginTarget target, CentralDogmaConfig config) {
74 return loadPlugins(PluginGroup.class.getClassLoader(), config, ImmutableList.of()).get(target);
75 }
76
77
78
79
80
81
82
83
84 static Map<PluginTarget, PluginGroup> loadPlugins(ClassLoader classLoader, CentralDogmaConfig config,
85 List<Plugin> plugins) {
86 requireNonNull(classLoader, "classLoader");
87 requireNonNull(config, "config");
88
89 final ServiceLoader<Plugin> loader = ServiceLoader.load(Plugin.class, classLoader);
90 final ImmutableMap.Builder<Class<?>, Plugin> allPlugins = new ImmutableMap.Builder<>();
91 for (Plugin plugin : Iterables.concat(plugins, loader)) {
92 if (plugin.isEnabled(config)) {
93 allPlugins.put(plugin.configType(), plugin);
94 }
95 }
96
97
98 final Map<Class<?>, Plugin> pluginMap = allPlugins.build();
99 if (pluginMap.isEmpty()) {
100 return ImmutableMap.of();
101 }
102
103 final Map<PluginTarget, PluginGroup> pluginGroups =
104 pluginMap.values()
105 .stream()
106 .collect(Collectors.groupingBy(plugin -> plugin.target(config)))
107 .entrySet()
108 .stream()
109 .collect(toImmutableMap(Entry::getKey, e -> {
110 final PluginTarget target = e.getKey();
111 final List<Plugin> targetPlugins = e.getValue();
112 final String poolName =
113 "plugins-for-" + target.name().toLowerCase().replace("_", "-");
114 return new PluginGroup(targetPlugins,
115 Executors.newSingleThreadExecutor(
116 new DefaultThreadFactory(poolName, true)));
117 }));
118
119 pluginGroups.forEach((target, group) -> {
120 logger.debug("Loaded plugins for target {}: {}", target,
121 group.plugins().stream().map(plugin -> plugin.getClass().getName())
122 .collect(toImmutableList()));
123 });
124 return pluginGroups;
125 }
126
127 private final List<Plugin> plugins;
128 private final PluginGroupStartStop startStop;
129
130 private PluginGroup(Iterable<Plugin> plugins, Executor executor) {
131 this.plugins = ImmutableList.copyOf(requireNonNull(plugins, "plugins"));
132 startStop = new PluginGroupStartStop(requireNonNull(executor, "executor"));
133 }
134
135
136
137
138 List<Plugin> plugins() {
139 return plugins;
140 }
141
142
143
144
145 @Nullable
146 <T extends Plugin> T findFirstPlugin(Class<T> clazz) {
147 requireNonNull(clazz, "clazz");
148 return plugins.stream().filter(clazz::isInstance).map(clazz::cast).findFirst().orElse(null);
149 }
150
151
152
153
154 CompletableFuture<Void> start(CentralDogmaConfig config, ProjectManager projectManager,
155 CommandExecutor commandExecutor, MeterRegistry meterRegistry,
156 ScheduledExecutorService purgeWorker,
157 InternalProjectInitializer internalProjectInitializer,
158 MirrorAccessController mirrorAccessController) {
159 final PluginContext context = new PluginContext(config, projectManager, commandExecutor, meterRegistry,
160 purgeWorker, internalProjectInitializer,
161 mirrorAccessController);
162 return startStop.start(context, context, true);
163 }
164
165
166
167
168 CompletableFuture<Void> stop(CentralDogmaConfig config, ProjectManager projectManager,
169 CommandExecutor commandExecutor, MeterRegistry meterRegistry,
170 ScheduledExecutorService purgeWorker,
171 InternalProjectInitializer internalProjectInitializer,
172 MirrorAccessController mirrorAccessController) {
173 return startStop.stop(
174 new PluginContext(config, projectManager, commandExecutor, meterRegistry, purgeWorker,
175 internalProjectInitializer, mirrorAccessController));
176 }
177
178 private class PluginGroupStartStop extends StartStopSupport<PluginContext, PluginContext, Void, Void> {
179
180 PluginGroupStartStop(Executor executor) {
181 super(executor);
182 }
183
184 @Override
185 protected CompletionStage<Void> doStart(@Nullable PluginContext arg) throws Exception {
186 assert arg != null;
187
188 arg.internalProjectInitializer().whenInitialized().get();
189 final List<CompletionStage<Void>> futures = plugins.stream().map(
190 plugin -> plugin.start(arg)
191 .thenAccept(unused -> logger.info("Plugin started: {}", plugin))
192 .exceptionally(cause -> {
193 logger.info("Failed to start plugin: {}", plugin, cause);
194 return null;
195 })).collect(toImmutableList());
196 return CompletableFutures.allAsList(futures).thenApply(unused -> null);
197 }
198
199 @Override
200 protected CompletionStage<Void> doStop(@Nullable PluginContext arg) throws Exception {
201 assert arg != null;
202 final List<CompletionStage<Void>> futures = plugins.stream().map(
203 plugin -> plugin.stop(arg)
204 .thenAccept(unused -> logger.info("Plugin stopped: {}", plugin))
205 .exceptionally(cause -> {
206 logger.info("Failed to stop plugin: {}", plugin, cause);
207 return null;
208 })).collect(toImmutableList());
209 return CompletableFutures.allAsList(futures).thenApply(unused -> null);
210 }
211 }
212 }