1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.linecorp.centraldogma.server;
18
19 import static com.google.common.base.MoreObjects.firstNonNull;
20 import static com.google.common.base.Preconditions.checkState;
21 import static com.google.common.base.Strings.isNullOrEmpty;
22 import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.API_V0_PATH_PREFIX;
23 import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.API_V1_PATH_PREFIX;
24 import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.HEALTH_CHECK_PATH;
25 import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.METRICS_PATH;
26 import static com.linecorp.centraldogma.server.auth.AuthProvider.LOGIN_API_ROUTES;
27 import static com.linecorp.centraldogma.server.auth.AuthProvider.LOGIN_PATH;
28 import static com.linecorp.centraldogma.server.auth.AuthProvider.LOGOUT_API_ROUTES;
29 import static com.linecorp.centraldogma.server.auth.AuthProvider.LOGOUT_PATH;
30 import static com.linecorp.centraldogma.server.internal.api.sysadmin.MirrorAccessControlService.MIRROR_ACCESS_CONTROL_PATH;
31 import static com.linecorp.centraldogma.server.internal.storage.repository.MirrorConverter.MIRROR_PROVIDERS;
32 import static com.linecorp.centraldogma.server.storage.project.InternalProjectInitializer.INTERNAL_PROJECT_DOGMA;
33 import static java.util.Objects.requireNonNull;
34
35 import java.io.File;
36 import java.io.IOException;
37 import java.io.InputStream;
38 import java.net.InetSocketAddress;
39 import java.util.Collections;
40 import java.util.List;
41 import java.util.Map;
42 import java.util.Objects;
43 import java.util.Optional;
44 import java.util.concurrent.CompletableFuture;
45 import java.util.concurrent.CompletionStage;
46 import java.util.concurrent.Executor;
47 import java.util.concurrent.ExecutorService;
48 import java.util.concurrent.Executors;
49 import java.util.concurrent.ForkJoinPool;
50 import java.util.concurrent.LinkedBlockingQueue;
51 import java.util.concurrent.ScheduledExecutorService;
52 import java.util.concurrent.ThreadPoolExecutor;
53 import java.util.concurrent.TimeUnit;
54 import java.util.concurrent.TimeoutException;
55 import java.util.concurrent.atomic.AtomicInteger;
56 import java.util.function.BiFunction;
57 import java.util.function.Consumer;
58 import java.util.function.Function;
59
60 import javax.annotation.Nullable;
61
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
64
65 import com.fasterxml.jackson.core.JsonProcessingException;
66 import com.fasterxml.jackson.databind.ObjectMapper;
67 import com.fasterxml.jackson.databind.module.SimpleModule;
68 import com.github.benmanes.caffeine.cache.Caffeine;
69 import com.github.benmanes.caffeine.cache.stats.CacheStats;
70 import com.google.common.collect.ImmutableList;
71 import com.google.common.collect.ImmutableMap;
72
73 import com.linecorp.armeria.common.DependencyInjector;
74 import com.linecorp.armeria.common.Flags;
75 import com.linecorp.armeria.common.HttpData;
76 import com.linecorp.armeria.common.HttpHeaderNames;
77 import com.linecorp.armeria.common.HttpHeaders;
78 import com.linecorp.armeria.common.HttpMethod;
79 import com.linecorp.armeria.common.HttpRequest;
80 import com.linecorp.armeria.common.HttpResponse;
81 import com.linecorp.armeria.common.HttpStatus;
82 import com.linecorp.armeria.common.MediaType;
83 import com.linecorp.armeria.common.ServerCacheControl;
84 import com.linecorp.armeria.common.SessionProtocol;
85 import com.linecorp.armeria.common.metric.MeterIdPrefixFunction;
86 import com.linecorp.armeria.common.prometheus.PrometheusMeterRegistries;
87 import com.linecorp.armeria.common.util.EventLoopGroups;
88 import com.linecorp.armeria.common.util.Exceptions;
89 import com.linecorp.armeria.common.util.StartStopSupport;
90 import com.linecorp.armeria.common.util.SystemInfo;
91 import com.linecorp.armeria.internal.common.ReflectiveDependencyInjector;
92 import com.linecorp.armeria.server.AbstractHttpService;
93 import com.linecorp.armeria.server.ContextPathServicesBuilder;
94 import com.linecorp.armeria.server.DecoratingServiceBindingBuilder;
95 import com.linecorp.armeria.server.GracefulShutdown;
96 import com.linecorp.armeria.server.HttpService;
97 import com.linecorp.armeria.server.Route;
98 import com.linecorp.armeria.server.Server;
99 import com.linecorp.armeria.server.ServerBuilder;
100 import com.linecorp.armeria.server.ServerPort;
101 import com.linecorp.armeria.server.ServiceNaming;
102 import com.linecorp.armeria.server.ServiceRequestContext;
103 import com.linecorp.armeria.server.annotation.JacksonRequestConverterFunction;
104 import com.linecorp.armeria.server.auth.AuthService;
105 import com.linecorp.armeria.server.auth.Authorizer;
106 import com.linecorp.armeria.server.cors.CorsService;
107 import com.linecorp.armeria.server.docs.DocService;
108 import com.linecorp.armeria.server.encoding.DecodingService;
109 import com.linecorp.armeria.server.encoding.EncodingService;
110 import com.linecorp.armeria.server.file.FileService;
111 import com.linecorp.armeria.server.file.HttpFile;
112 import com.linecorp.armeria.server.healthcheck.HealthCheckService;
113 import com.linecorp.armeria.server.healthcheck.SettableHealthChecker;
114 import com.linecorp.armeria.server.logging.AccessLogWriter;
115 import com.linecorp.armeria.server.management.ManagementService;
116 import com.linecorp.armeria.server.metric.MetricCollectingService;
117 import com.linecorp.armeria.server.prometheus.PrometheusExpositionService;
118 import com.linecorp.armeria.server.thrift.THttpService;
119 import com.linecorp.armeria.server.thrift.ThriftCallService;
120 import com.linecorp.centraldogma.common.ShuttingDownException;
121 import com.linecorp.centraldogma.internal.CsrfToken;
122 import com.linecorp.centraldogma.internal.Jackson;
123 import com.linecorp.centraldogma.internal.thrift.CentralDogmaService;
124 import com.linecorp.centraldogma.server.auth.AuthConfig;
125 import com.linecorp.centraldogma.server.auth.AuthProvider;
126 import com.linecorp.centraldogma.server.auth.AuthProviderParameters;
127 import com.linecorp.centraldogma.server.auth.SessionManager;
128 import com.linecorp.centraldogma.server.command.Command;
129 import com.linecorp.centraldogma.server.command.CommandExecutor;
130 import com.linecorp.centraldogma.server.command.StandaloneCommandExecutor;
131 import com.linecorp.centraldogma.server.internal.admin.auth.CachedSessionManager;
132 import com.linecorp.centraldogma.server.internal.admin.auth.CsrfTokenAuthorizer;
133 import com.linecorp.centraldogma.server.internal.admin.auth.ExpiredSessionDeletingSessionManager;
134 import com.linecorp.centraldogma.server.internal.admin.auth.FileBasedSessionManager;
135 import com.linecorp.centraldogma.server.internal.admin.auth.SessionTokenAuthorizer;
136 import com.linecorp.centraldogma.server.internal.admin.service.DefaultLogoutService;
137 import com.linecorp.centraldogma.server.internal.admin.service.RepositoryService;
138 import com.linecorp.centraldogma.server.internal.admin.service.UserService;
139 import com.linecorp.centraldogma.server.internal.api.ContentServiceV1;
140 import com.linecorp.centraldogma.server.internal.api.CredentialServiceV1;
141 import com.linecorp.centraldogma.server.internal.api.GitHttpService;
142 import com.linecorp.centraldogma.server.internal.api.HttpApiExceptionHandler;
143 import com.linecorp.centraldogma.server.internal.api.MetadataApiService;
144 import com.linecorp.centraldogma.server.internal.api.MirroringServiceV1;
145 import com.linecorp.centraldogma.server.internal.api.ProjectServiceV1;
146 import com.linecorp.centraldogma.server.internal.api.RepositoryServiceV1;
147 import com.linecorp.centraldogma.server.internal.api.WatchService;
148 import com.linecorp.centraldogma.server.internal.api.auth.ApplicationTokenAuthorizer;
149 import com.linecorp.centraldogma.server.internal.api.auth.RequiresProjectRoleDecorator.RequiresProjectRoleDecoratorFactory;
150 import com.linecorp.centraldogma.server.internal.api.auth.RequiresRepositoryRoleDecorator.RequiresRepositoryRoleDecoratorFactory;
151 import com.linecorp.centraldogma.server.internal.api.converter.HttpApiRequestConverter;
152 import com.linecorp.centraldogma.server.internal.api.sysadmin.MirrorAccessControlService;
153 import com.linecorp.centraldogma.server.internal.api.sysadmin.ServerStatusService;
154 import com.linecorp.centraldogma.server.internal.api.sysadmin.TokenService;
155 import com.linecorp.centraldogma.server.internal.mirror.DefaultMirrorAccessController;
156 import com.linecorp.centraldogma.server.internal.mirror.DefaultMirroringServicePlugin;
157 import com.linecorp.centraldogma.server.internal.mirror.MirrorAccessControl;
158 import com.linecorp.centraldogma.server.internal.mirror.MirrorRunner;
159 import com.linecorp.centraldogma.server.internal.replication.ZooKeeperCommandExecutor;
160 import com.linecorp.centraldogma.server.internal.storage.project.DefaultProjectManager;
161 import com.linecorp.centraldogma.server.internal.storage.project.ProjectApiManager;
162 import com.linecorp.centraldogma.server.internal.storage.repository.CrudRepository;
163 import com.linecorp.centraldogma.server.internal.storage.repository.git.GitCrudRepository;
164 import com.linecorp.centraldogma.server.internal.thrift.CentralDogmaExceptionTranslator;
165 import com.linecorp.centraldogma.server.internal.thrift.CentralDogmaServiceImpl;
166 import com.linecorp.centraldogma.server.internal.thrift.CentralDogmaTimeoutScheduler;
167 import com.linecorp.centraldogma.server.internal.thrift.TokenlessClientLogger;
168 import com.linecorp.centraldogma.server.management.ServerStatus;
169 import com.linecorp.centraldogma.server.management.ServerStatusManager;
170 import com.linecorp.centraldogma.server.metadata.MetadataService;
171 import com.linecorp.centraldogma.server.mirror.MirrorProvider;
172 import com.linecorp.centraldogma.server.plugin.AllReplicasPlugin;
173 import com.linecorp.centraldogma.server.plugin.Plugin;
174 import com.linecorp.centraldogma.server.plugin.PluginInitContext;
175 import com.linecorp.centraldogma.server.plugin.PluginTarget;
176 import com.linecorp.centraldogma.server.storage.project.InternalProjectInitializer;
177 import com.linecorp.centraldogma.server.storage.project.Project;
178 import com.linecorp.centraldogma.server.storage.project.ProjectManager;
179
180 import io.micrometer.core.instrument.MeterRegistry;
181 import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics;
182 import io.micrometer.core.instrument.binder.jvm.DiskSpaceMetrics;
183 import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
184 import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics;
185 import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
186 import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
187 import io.micrometer.core.instrument.binder.system.FileDescriptorMetrics;
188 import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
189 import io.micrometer.core.instrument.binder.system.UptimeMetrics;
190 import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
191 import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
192 import io.netty.util.concurrent.DefaultThreadFactory;
193 import io.netty.util.concurrent.GlobalEventExecutor;
194
195
196
197
198
199
200 public class CentralDogma implements AutoCloseable {
201
202 private static final Logger logger = LoggerFactory.getLogger(CentralDogma.class);
203
204 private static final boolean GIT_MIRROR_ENABLED;
205
206 static {
207 Jackson.registerModules(new SimpleModule().addSerializer(CacheStats.class, new CacheStatsSerializer()));
208
209 boolean gitMirrorEnabled = false;
210 for (MirrorProvider mirrorProvider : MIRROR_PROVIDERS) {
211 if ("com.linecorp.centraldogma.server.internal.mirror.GitMirrorProvider"
212 .equals(mirrorProvider.getClass().getName())) {
213 gitMirrorEnabled = true;
214 break;
215 }
216 }
217 logger.info("Git mirroring: {}",
218 gitMirrorEnabled ? "enabled"
219 : "disabled ('centraldogma-server-mirror-git' module is not available)");
220 GIT_MIRROR_ENABLED = gitMirrorEnabled;
221 }
222
223
224
225
226
227
228 public static CentralDogma forConfig(File configFile) throws IOException {
229 requireNonNull(configFile, "configFile");
230 return new CentralDogma(CentralDogmaConfig.load(configFile), Flags.meterRegistry(), ImmutableList.of());
231 }
232
233 private final SettableHealthChecker serverHealth = new SettableHealthChecker(false);
234 private final CentralDogmaStartStop startStop;
235
236 private final AtomicInteger numPendingStopRequests = new AtomicInteger();
237
238 private final Map<PluginTarget, PluginGroup> pluginGroups;
239 @Nullable
240 private final PluginGroup pluginsForAllReplicas;
241 @Nullable
242 private final PluginGroup pluginsForLeaderOnly;
243 @Nullable
244 private final PluginGroup pluginsForZoneLeaderOnly;
245
246 private final CentralDogmaConfig cfg;
247 @Nullable
248 private volatile ProjectManager pm;
249 @Nullable
250 private volatile Server server;
251 @Nullable
252 private ExecutorService repositoryWorker;
253 @Nullable
254 private ScheduledExecutorService purgeWorker;
255 @Nullable
256 private CommandExecutor executor;
257 private final MeterRegistry meterRegistry;
258 @Nullable
259 MeterRegistry meterRegistryToBeClosed;
260 @Nullable
261 private SessionManager sessionManager;
262 @Nullable
263 private ServerStatusManager statusManager;
264 @Nullable
265 private InternalProjectInitializer projectInitializer;
266 @Nullable
267 private volatile MirrorRunner mirrorRunner;
268 @Nullable
269 private volatile DefaultMirrorAccessController mirrorAccessController;
270
271 CentralDogma(CentralDogmaConfig cfg, MeterRegistry meterRegistry, List<Plugin> plugins) {
272 this.cfg = requireNonNull(cfg, "cfg");
273 pluginGroups = PluginGroup.loadPlugins(CentralDogma.class.getClassLoader(), cfg, plugins);
274 pluginsForAllReplicas = pluginGroups.get(PluginTarget.ALL_REPLICAS);
275 pluginsForLeaderOnly = pluginGroups.get(PluginTarget.LEADER_ONLY);
276 pluginsForZoneLeaderOnly = pluginGroups.get(PluginTarget.ZONE_LEADER_ONLY);
277 if (pluginsForZoneLeaderOnly != null) {
278 checkState(cfg.zone() != null,
279 "zone must be specified when zone leader plugins are enabled.");
280 }
281 startStop = new CentralDogmaStartStop(pluginsForAllReplicas);
282 this.meterRegistry = meterRegistry;
283 }
284
285
286
287
288
289
290 public CentralDogmaConfig config() {
291 return cfg;
292 }
293
294
295
296
297
298
299 @Nullable
300 public ServerPort activePort() {
301 final Server server = this.server;
302 return server != null ? server.activePort() : null;
303 }
304
305
306
307
308
309
310
311 public Map<InetSocketAddress, ServerPort> activePorts() {
312 final Server server = this.server;
313 if (server != null) {
314 return server.activePorts();
315 } else {
316 return Collections.emptyMap();
317 }
318 }
319
320
321
322
323
324 @Nullable
325 public ProjectManager projectManager() {
326 return pm;
327 }
328
329
330
331
332
333
334
335 @Nullable
336 public MirroringService mirroringService() {
337 return pluginGroups.values()
338 .stream()
339 .map(group -> {
340 return group.findFirstPlugin(DefaultMirroringServicePlugin.class);
341 })
342 .filter(Objects::nonNull)
343 .findFirst()
344 .map(DefaultMirroringServicePlugin::mirroringService)
345 .orElse(null);
346 }
347
348
349
350
351
352
353 public List<Plugin> plugins(PluginTarget target) {
354 requireNonNull(target, "target");
355 return pluginGroups.get(target).plugins();
356 }
357
358
359
360
361 public Optional<MeterRegistry> meterRegistry() {
362 return Optional.ofNullable(meterRegistry);
363 }
364
365
366
367
368 public CompletableFuture<Void> start() {
369 return startStop.start(true);
370 }
371
372
373
374
375 public CompletableFuture<Void> stop() {
376 serverHealth.setHealthy(false);
377
378 final Optional<GracefulShutdownTimeout> gracefulTimeoutOpt = cfg.gracefulShutdownTimeout();
379 if (gracefulTimeoutOpt.isPresent()) {
380 try {
381
382
383 Thread.sleep(1000);
384 } catch (InterruptedException e) {
385 logger.debug("Interrupted while waiting for quiet period", e);
386 Thread.currentThread().interrupt();
387 }
388 }
389
390 numPendingStopRequests.incrementAndGet();
391 return startStop.stop().thenRun(numPendingStopRequests::decrementAndGet);
392 }
393
394 @Override
395 public void close() {
396 startStop.close();
397 }
398
399 private boolean doStart() throws Exception {
400 boolean success = false;
401 ExecutorService repositoryWorker = null;
402 ScheduledExecutorService purgeWorker = null;
403 ProjectManager pm = null;
404 CommandExecutor executor = null;
405 Server server = null;
406 SessionManager sessionManager = null;
407 try {
408 logger.info("Starting the Central Dogma ..");
409
410 final ThreadPoolExecutor repositoryWorkerImpl = new ThreadPoolExecutor(
411 cfg.numRepositoryWorkers(), cfg.numRepositoryWorkers(),
412
413 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
414 new DefaultThreadFactory("repository-worker", true));
415 repositoryWorkerImpl.allowCoreThreadTimeOut(true);
416 repositoryWorker = ExecutorServiceMetrics.monitor(meterRegistry, repositoryWorkerImpl,
417 "repositoryWorker");
418
419 logger.info("Starting the project manager: {}", cfg.dataDir());
420
421 purgeWorker = Executors.newSingleThreadScheduledExecutor(
422 new DefaultThreadFactory("purge-worker", true));
423
424 pm = new DefaultProjectManager(cfg.dataDir(), repositoryWorker, purgeWorker,
425 meterRegistry, cfg.repositoryCacheSpec());
426
427 logger.info("Started the project manager: {}", pm);
428
429 logger.info("Current settings:\n{}", cfg);
430
431 sessionManager = initializeSessionManager();
432
433 logger.info("Starting the command executor ..");
434 executor = startCommandExecutor(pm, repositoryWorker, purgeWorker,
435 meterRegistry, sessionManager);
436
437 assert projectInitializer != null;
438 if (executor.isWritable()) {
439 logger.info("Started the command executor.");
440 }
441
442 logger.info("Starting the RPC server.");
443 server = startServer(pm, executor, purgeWorker, meterRegistry, sessionManager,
444 projectInitializer);
445 logger.info("Started the RPC server at: {}", server.activePorts());
446 logger.info("Started the Central Dogma successfully.");
447 success = true;
448 } finally {
449 if (success) {
450 this.repositoryWorker = repositoryWorker;
451 this.purgeWorker = purgeWorker;
452 this.pm = pm;
453 this.executor = executor;
454 this.server = server;
455 this.sessionManager = sessionManager;
456 } else {
457 doStop(server, executor, pm, repositoryWorker, purgeWorker, sessionManager, mirrorRunner);
458 }
459 }
460 return success;
461 }
462
463 private CommandExecutor startCommandExecutor(
464 ProjectManager pm, Executor repositoryWorker,
465 ScheduledExecutorService purgeWorker, MeterRegistry meterRegistry,
466 @Nullable SessionManager sessionManager) {
467
468 final Consumer<CommandExecutor> onTakeLeadership = exec -> {
469 if (pluginsForLeaderOnly != null) {
470 logger.info("Starting plugins on the leader replica ..");
471 pluginsForLeaderOnly
472 .start(cfg, pm, exec, meterRegistry, purgeWorker, projectInitializer,
473 mirrorAccessController)
474 .handle((unused, cause) -> {
475 if (cause == null) {
476 logger.info("Started plugins on the leader replica.");
477 } else {
478 logger.error("Failed to start plugins on the leader replica..", cause);
479 }
480 return null;
481 });
482 }
483 };
484
485 final Consumer<CommandExecutor> onReleaseLeadership = exec -> {
486 if (pluginsForLeaderOnly != null) {
487 logger.info("Stopping plugins on the leader replica ..");
488 final CompletableFuture<?> future =
489 pluginsForLeaderOnly
490 .stop(cfg, pm, exec, meterRegistry, purgeWorker, projectInitializer,
491 mirrorAccessController)
492 .handle((unused, cause) -> {
493 if (cause == null) {
494 logger.info("Stopped plugins on the leader replica.");
495 } else {
496 logger.error("Failed to stop plugins on the leader replica.", cause);
497 }
498 return null;
499 });
500 try {
501 future.get(10, TimeUnit.SECONDS);
502 } catch (Exception e) {
503 logger.warn("Failed to stop plugins on the leader replica in 10 seconds.", e);
504 }
505 }
506 };
507
508 Consumer<CommandExecutor> onTakeZoneLeadership = null;
509 Consumer<CommandExecutor> onReleaseZoneLeadership = null;
510
511 if (pluginsForZoneLeaderOnly != null) {
512 assert cfg.zone() != null;
513 final String zone = cfg.zone().currentZone();
514 onTakeZoneLeadership = exec -> {
515 logger.info("Starting plugins on the {} zone leader replica ..", zone);
516 pluginsForZoneLeaderOnly
517 .start(cfg, pm, exec, meterRegistry, purgeWorker, projectInitializer,
518 mirrorAccessController)
519 .handle((unused, cause) -> {
520 if (cause == null) {
521 logger.info("Started plugins on the {} zone leader replica.", zone);
522 } else {
523 logger.error("Failed to start plugins on the {} zone leader replica..",
524 zone, cause);
525 }
526 return null;
527 });
528 };
529 onReleaseZoneLeadership = exec -> {
530 logger.info("Stopping plugins on the {} zone leader replica ..", zone);
531 final CompletableFuture<?> future =
532 pluginsForZoneLeaderOnly
533 .stop(cfg, pm, exec, meterRegistry, purgeWorker,
534 projectInitializer, mirrorAccessController)
535 .handle((unused, cause) -> {
536 if (cause == null) {
537 logger.info("Stopped plugins on the {} zone leader replica.", zone);
538 } else {
539 logger.error("Failed to stop plugins on the {} zone leader replica.",
540 zone, cause);
541 }
542 return null;
543 });
544 try {
545 future.get(10, TimeUnit.SECONDS);
546 } catch (Exception e) {
547 logger.warn("Failed to stop plugins on the {} zone leader replica in 10 seconds", zone, e);
548 }
549 };
550 }
551
552 statusManager = new ServerStatusManager(cfg.dataDir());
553 logger.info("Startup mode: {}", statusManager.serverStatus());
554 final CommandExecutor executor;
555 final ReplicationMethod replicationMethod = cfg.replicationConfig().method();
556 switch (replicationMethod) {
557 case ZOOKEEPER:
558 executor = newZooKeeperCommandExecutor(pm, repositoryWorker, statusManager, meterRegistry,
559 sessionManager, onTakeLeadership, onReleaseLeadership,
560 onTakeZoneLeadership, onReleaseZoneLeadership);
561 break;
562 case NONE:
563 logger.info("No replication mechanism specified; entering standalone");
564 executor = new StandaloneCommandExecutor(pm, repositoryWorker, statusManager, sessionManager,
565 onTakeLeadership, onReleaseLeadership,
566 onTakeZoneLeadership, onReleaseZoneLeadership);
567 break;
568 default:
569 throw new Error("unknown replication method: " + replicationMethod);
570 }
571 projectInitializer = new InternalProjectInitializer(executor, pm);
572 mirrorAccessController = new DefaultMirrorAccessController();
573
574 final ServerStatus initialServerStatus = statusManager.serverStatus();
575 executor.setWritable(initialServerStatus.writable());
576 if (!initialServerStatus.replicating()) {
577 projectInitializer.whenInitialized().complete(null);
578 return executor;
579 }
580 try {
581 final CompletableFuture<Void> startFuture = executor.start();
582 while (!startFuture.isDone()) {
583 if (numPendingStopRequests.get() > 0) {
584
585 executor.stop().get();
586 break;
587 }
588
589 try {
590 startFuture.get(100, TimeUnit.MILLISECONDS);
591 } catch (TimeoutException unused) {
592
593 }
594 }
595
596
597 startFuture.get();
598 projectInitializer.initialize();
599 final CrudRepository<MirrorAccessControl> accessControlRepository =
600 new GitCrudRepository<>(MirrorAccessControl.class, executor, pm,
601 INTERNAL_PROJECT_DOGMA, Project.REPO_DOGMA,
602 MIRROR_ACCESS_CONTROL_PATH);
603 mirrorAccessController.setRepository(accessControlRepository);
604 } catch (Exception e) {
605 projectInitializer.whenInitialized().complete(null);
606 logger.warn("Failed to start the command executor. Entering read-only.", e);
607 }
608
609 return executor;
610 }
611
612 @Nullable
613 private SessionManager initializeSessionManager() throws Exception {
614 final AuthConfig authCfg = cfg.authConfig();
615 if (authCfg == null) {
616 return null;
617 }
618
619 boolean success = false;
620 SessionManager manager = null;
621 try {
622 manager = new FileBasedSessionManager(new File(cfg.dataDir(), "_sessions").toPath(),
623 authCfg.sessionValidationSchedule());
624 manager = new CachedSessionManager(manager, Caffeine.from(authCfg.sessionCacheSpec()).build());
625 manager = new ExpiredSessionDeletingSessionManager(manager);
626 success = true;
627 return manager;
628 } finally {
629 if (!success && manager != null) {
630 try {
631
632
633 manager.close();
634 } catch (Exception e) {
635 logger.warn("Failed to close a session manager.", e);
636 }
637 }
638 }
639 }
640
641 private Server startServer(ProjectManager pm, CommandExecutor executor,
642 ScheduledExecutorService purgeWorker, MeterRegistry meterRegistry,
643 @Nullable SessionManager sessionManager,
644 InternalProjectInitializer projectInitializer) {
645 final ServerBuilder sb = Server.builder();
646 cfg.ports().forEach(sb::port);
647
648 final boolean needsTls =
649 cfg.ports().stream().anyMatch(ServerPort::hasTls) ||
650 (cfg.managementConfig() != null && cfg.managementConfig().protocol().isTls());
651
652 if (needsTls) {
653 try {
654 final TlsConfig tlsConfig = cfg.tls();
655 if (tlsConfig != null) {
656 try (InputStream keyCertChainInputStream = tlsConfig.keyCertChainInputStream();
657 InputStream keyInputStream = tlsConfig.keyInputStream()) {
658 sb.tls(keyCertChainInputStream, keyInputStream, tlsConfig.keyPassword());
659 }
660 } else {
661 logger.warn(
662 "Missing TLS configuration. Generating a self-signed certificate for TLS support.");
663 sb.tlsSelfSigned();
664 }
665 } catch (Exception e) {
666 Exceptions.throwUnsafely(e);
667 }
668 }
669
670 sb.clientAddressSources(cfg.clientAddressSourceList());
671 sb.clientAddressTrustedProxyFilter(cfg.trustedProxyAddressPredicate());
672
673 cfg.numWorkers().ifPresent(
674 numWorkers -> sb.workerGroup(EventLoopGroups.newEventLoopGroup(numWorkers), true));
675 cfg.maxNumConnections().ifPresent(sb::maxNumConnections);
676 cfg.idleTimeoutMillis().ifPresent(sb::idleTimeoutMillis);
677 cfg.requestTimeoutMillis().ifPresent(sb::requestTimeoutMillis);
678 cfg.maxFrameLength().ifPresent(sb::maxRequestLength);
679 cfg.gracefulShutdownTimeout().ifPresent(t -> {
680 final GracefulShutdown gracefulShutdown =
681 GracefulShutdown.builder()
682 .quietPeriodMillis(t.quietPeriodMillis())
683 .timeoutMillis(t.timeoutMillis())
684 .toExceptionFunction((ctx, req) -> {
685 return new ShuttingDownException();
686 })
687 .build();
688 sb.gracefulShutdown(gracefulShutdown);
689 });
690
691 final MetadataService mds = new MetadataService(pm, executor, projectInitializer);
692 executor.setRepositoryMetadataSupplier(mds::getRepo);
693 final WatchService watchService = new WatchService(meterRegistry);
694 final AuthProvider authProvider = createAuthProvider(executor, sessionManager, mds);
695 final ProjectApiManager projectApiManager = new ProjectApiManager(pm, executor, mds);
696
697 configureThriftService(sb, projectApiManager, executor, watchService, mds);
698
699 sb.service("/title", webAppTitleFile(cfg.webAppTitle(), SystemInfo.hostname()).asService());
700
701 sb.service(HEALTH_CHECK_PATH, HealthCheckService.builder()
702 .checkers(serverHealth)
703 .build());
704 configManagement(sb, config().managementConfig());
705
706 sb.serviceUnder("/docs/",
707 DocService.builder()
708 .exampleHeaders(CentralDogmaService.class,
709 HttpHeaders.of(HttpHeaderNames.AUTHORIZATION,
710 "Bearer " + CsrfToken.ANONYMOUS))
711 .build());
712 final Function<? super HttpService, AuthService> authService =
713 authService(mds, authProvider, sessionManager);
714 configureHttpApi(sb, projectApiManager, executor, watchService, mds, authProvider, authService,
715 meterRegistry);
716
717 configureMetrics(sb, meterRegistry);
718
719
720 configCors(sb, config().corsConfig());
721
722
723 final String accessLogFormat = cfg.accessLogFormat();
724 if (isNullOrEmpty(accessLogFormat)) {
725 sb.accessLogWriter(AccessLogWriter.disabled(), true);
726 } else if ("common".equals(accessLogFormat)) {
727 sb.accessLogWriter(AccessLogWriter.common(), true);
728 } else if ("combined".equals(accessLogFormat)) {
729 sb.accessLogWriter(AccessLogWriter.combined(), true);
730 } else {
731 sb.accessLogFormat(accessLogFormat);
732 }
733
734 if (pluginsForAllReplicas != null) {
735 final PluginInitContext pluginInitContext =
736 new PluginInitContext(config(), pm, executor, meterRegistry, purgeWorker, sb,
737 authService, projectInitializer, mirrorAccessController);
738 pluginsForAllReplicas.plugins()
739 .forEach(p -> {
740 if (!(p instanceof AllReplicasPlugin)) {
741 return;
742 }
743 final AllReplicasPlugin plugin = (AllReplicasPlugin) p;
744 plugin.init(pluginInitContext);
745 });
746 }
747
748
749 Thread.setDefaultUncaughtExceptionHandler((t, e) -> logger.warn("Uncaught exception: {}", t, e));
750
751 final Server s = sb.build();
752 s.start().join();
753 return s;
754 }
755
756 static HttpFile webAppTitleFile(@Nullable String webAppTitle, String hostname) {
757 requireNonNull(hostname, "hostname");
758 final Map<String, String> titleAndHostname = ImmutableMap.of(
759 "title", firstNonNull(webAppTitle, "Central Dogma at {{hostname}}"),
760 "hostname", hostname);
761
762 try {
763 final HttpData data = HttpData.ofUtf8(Jackson.writeValueAsString(titleAndHostname));
764 return HttpFile.builder(data)
765 .contentType(MediaType.JSON_UTF_8)
766 .cacheControl(ServerCacheControl.REVALIDATED)
767 .build();
768 } catch (JsonProcessingException e) {
769 throw new Error("Failed to encode the title and hostname:", e);
770 }
771 }
772
773 @Nullable
774 private AuthProvider createAuthProvider(
775 CommandExecutor commandExecutor, @Nullable SessionManager sessionManager, MetadataService mds) {
776 final AuthConfig authCfg = cfg.authConfig();
777 if (authCfg == null) {
778 return null;
779 }
780
781 checkState(sessionManager != null, "SessionManager is null");
782 final AuthProviderParameters parameters = new AuthProviderParameters(
783
784 new ApplicationTokenAuthorizer(mds::findTokenBySecret).orElse(
785 new SessionTokenAuthorizer(sessionManager, authCfg.systemAdministrators())),
786 cfg,
787 sessionManager::generateSessionId,
788
789 session -> commandExecutor.execute(Command.createSession(session)),
790 sessionId -> commandExecutor.execute(Command.removeSession(sessionId)));
791 return authCfg.factory().create(parameters);
792 }
793
794 private CommandExecutor newZooKeeperCommandExecutor(
795 ProjectManager pm, Executor repositoryWorker,
796 ServerStatusManager serverStatusManager,
797 MeterRegistry meterRegistry,
798 @Nullable SessionManager sessionManager,
799 @Nullable Consumer<CommandExecutor> onTakeLeadership,
800 @Nullable Consumer<CommandExecutor> onReleaseLeadership,
801 @Nullable Consumer<CommandExecutor> onTakeZoneLeadership,
802 @Nullable Consumer<CommandExecutor> onReleaseZoneLeadership) {
803 final ZooKeeperReplicationConfig zkCfg = (ZooKeeperReplicationConfig) cfg.replicationConfig();
804
805
806 final File dataDir = cfg.dataDir();
807 new File(dataDir, "replica_id").delete();
808
809 String zone = null;
810 if (config().zone() != null) {
811 zone = config().zone().currentZone();
812 }
813
814
815 return new ZooKeeperCommandExecutor(
816 zkCfg, dataDir,
817 new StandaloneCommandExecutor(pm, repositoryWorker, serverStatusManager, sessionManager,
818 null, null,
819 null, null),
820 meterRegistry, zone,
821 onTakeLeadership, onReleaseLeadership,
822 onTakeZoneLeadership, onReleaseZoneLeadership);
823 }
824
825 private void configureThriftService(ServerBuilder sb, ProjectApiManager projectApiManager,
826 CommandExecutor executor,
827 WatchService watchService, MetadataService mds) {
828 final CentralDogmaServiceImpl service =
829 new CentralDogmaServiceImpl(projectApiManager, executor, watchService, mds);
830
831 HttpService thriftService =
832 ThriftCallService.of(service)
833 .decorate(CentralDogmaTimeoutScheduler::new)
834 .decorate(CentralDogmaExceptionTranslator::new)
835 .decorate(THttpService.newDecorator());
836
837 if (cfg.isCsrfTokenRequiredForThrift()) {
838 thriftService = thriftService.decorate(AuthService.newDecorator(new CsrfTokenAuthorizer()));
839 } else {
840 thriftService = thriftService.decorate(TokenlessClientLogger::new);
841 }
842
843
844 thriftService = thriftService.decorate(contentEncodingDecorator());
845
846 sb.service("/cd/thrift/v1", thriftService);
847 }
848
849 private Function<? super HttpService, AuthService> authService(
850 MetadataService mds, @Nullable AuthProvider authProvider, @Nullable SessionManager sessionManager) {
851 if (authProvider == null) {
852 return AuthService.newDecorator(new CsrfTokenAuthorizer());
853 }
854 final AuthConfig authCfg = cfg.authConfig();
855 assert authCfg != null : "authCfg";
856 assert sessionManager != null : "sessionManager";
857 final Authorizer<HttpRequest> tokenAuthorizer =
858 new ApplicationTokenAuthorizer(mds::findTokenBySecret)
859 .orElse(new SessionTokenAuthorizer(sessionManager,
860 authCfg.systemAdministrators()));
861 return AuthService.builder()
862 .add(tokenAuthorizer)
863 .onFailure(new CentralDogmaAuthFailureHandler())
864 .newDecorator();
865 }
866
867 private void configureHttpApi(ServerBuilder sb,
868 ProjectApiManager projectApiManager, CommandExecutor executor,
869 WatchService watchService, MetadataService mds,
870 @Nullable AuthProvider authProvider,
871 Function<? super HttpService, AuthService> authService,
872 MeterRegistry meterRegistry) {
873 final DependencyInjector dependencyInjector = DependencyInjector.ofSingletons(
874
875
876 new JacksonRequestConverterFunction(new ObjectMapper()),
877 new HttpApiRequestConverter(projectApiManager),
878 new RequiresRepositoryRoleDecoratorFactory(mds),
879 new RequiresProjectRoleDecoratorFactory(mds)
880 );
881 sb.dependencyInjector(dependencyInjector, false)
882
883
884 .dependencyInjector(new ReflectiveDependencyInjector(), false);
885
886
887 final Function<? super HttpService, ? extends HttpService> decorator =
888 authService.andThen(contentEncodingDecorator());
889 for (String path : ImmutableList.of(API_V0_PATH_PREFIX, API_V1_PATH_PREFIX)) {
890 final DecoratingServiceBindingBuilder decoratorBuilder =
891 sb.routeDecorator().pathPrefix(path);
892 for (Route loginRoute : LOGIN_API_ROUTES) {
893 decoratorBuilder.exclude(loginRoute);
894 }
895 for (Route logoutRoute : LOGOUT_API_ROUTES) {
896 decoratorBuilder.exclude(logoutRoute);
897 }
898 decoratorBuilder.build(decorator);
899 }
900
901 assert statusManager != null;
902 final ContextPathServicesBuilder apiV1ServiceBuilder = sb.contextPath(API_V1_PATH_PREFIX);
903 apiV1ServiceBuilder
904 .annotatedService(new ServerStatusService(executor, statusManager))
905 .annotatedService(new ProjectServiceV1(projectApiManager, executor))
906 .annotatedService(new RepositoryServiceV1(executor, mds))
907 .annotatedService(new CredentialServiceV1(projectApiManager, executor));
908
909 if (GIT_MIRROR_ENABLED) {
910 mirrorRunner = new MirrorRunner(projectApiManager, executor, cfg, meterRegistry,
911 mirrorAccessController);
912
913 apiV1ServiceBuilder
914 .annotatedService(new MirroringServiceV1(projectApiManager, executor, mirrorRunner, cfg,
915 mirrorAccessController))
916 .annotatedService(new MirrorAccessControlService(executor, mirrorAccessController));
917 }
918
919 apiV1ServiceBuilder.annotatedService()
920 .defaultServiceNaming(new ServiceNaming() {
921 private final String serviceName = ContentServiceV1.class.getName();
922 private final String watchServiceName =
923 serviceName.replace("ContentServiceV1", "WatchContentServiceV1");
924
925 @Override
926 public String serviceName(ServiceRequestContext ctx) {
927 if (ctx.request().headers().contains(HttpHeaderNames.IF_NONE_MATCH)) {
928 return watchServiceName;
929 }
930 return serviceName;
931 }
932 })
933 .build(new ContentServiceV1(executor, watchService, meterRegistry));
934
935 if (authProvider != null) {
936 sb.service("/security_enabled", new AbstractHttpService() {
937 @Override
938 protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) {
939 return HttpResponse.of(HttpStatus.OK);
940 }
941 });
942
943 final AuthConfig authCfg = cfg.authConfig();
944 assert authCfg != null : "authCfg";
945 apiV1ServiceBuilder
946 .annotatedService(new MetadataApiService(executor, mds, authCfg.loginNameNormalizer()))
947 .annotatedService(new TokenService(executor, mds));
948
949
950 Optional.ofNullable(authProvider.loginApiService())
951 .ifPresent(login -> LOGIN_API_ROUTES.forEach(mapping -> sb.service(mapping, login)));
952
953
954 final HttpService logout =
955 Optional.ofNullable(authProvider.logoutApiService())
956 .orElseGet(() -> new DefaultLogoutService(executor));
957 for (Route route : LOGOUT_API_ROUTES) {
958 sb.service(route, decorator.apply(logout));
959 }
960
961 authProvider.moreServices().forEach(sb::service);
962 }
963
964 sb.annotatedService()
965 .decorator(decorator)
966 .decorator(DecodingService.newDecorator())
967 .build(new GitHttpService(projectApiManager));
968
969 if (cfg.isWebAppEnabled()) {
970 sb.contextPath(API_V0_PATH_PREFIX)
971 .annotatedService(new UserService(executor))
972 .annotatedService(new RepositoryService(projectApiManager, executor));
973
974 if (authProvider != null) {
975
976 sb.service(LOGIN_PATH, authProvider.webLoginService());
977
978 sb.service(LOGOUT_PATH, authProvider.webLogoutService());
979 }
980
981
982
983 sb.serviceUnder("/app", HttpFile.of(CentralDogma.class.getClassLoader(),
984 "com/linecorp/centraldogma/webapp/index.html")
985 .asService());
986
987 sb.route()
988 .pathPrefix("/")
989 .exclude("prefix:/app")
990 .exclude("prefix:/api")
991 .build(FileService.builder(CentralDogma.class.getClassLoader(),
992 "com/linecorp/centraldogma/webapp")
993 .cacheControl(ServerCacheControl.REVALIDATED)
994 .autoDecompress(true)
995 .serveCompressedFiles(true)
996 .fallbackFileExtensions("html")
997 .build());
998 }
999
1000 sb.errorHandler(new HttpApiExceptionHandler());
1001 }
1002
1003 private static void configCors(ServerBuilder sb, @Nullable CorsConfig corsConfig) {
1004 if (corsConfig == null) {
1005 return;
1006 }
1007
1008 sb.decorator(CorsService.builder(corsConfig.allowedOrigins())
1009 .allowRequestMethods(HttpMethod.knownMethods())
1010 .allowAllRequestHeaders(true)
1011 .allowCredentials()
1012 .maxAge(corsConfig.maxAgeSeconds())
1013 .newDecorator());
1014 }
1015
1016 private static void configManagement(ServerBuilder sb, @Nullable ManagementConfig managementConfig) {
1017 if (managementConfig == null) {
1018 return;
1019 }
1020
1021
1022
1023 final int port = managementConfig.port();
1024 if (port == 0) {
1025 logger.info("'management.port' is 0, using the same ports as 'ports'.");
1026 sb.route()
1027 .pathPrefix(managementConfig.path())
1028 .defaultServiceName("management")
1029 .build(ManagementService.of());
1030 } else {
1031 final SessionProtocol managementProtocol = managementConfig.protocol();
1032 final String address = managementConfig.address();
1033 if (address == null) {
1034 sb.port(new ServerPort(port, managementProtocol));
1035 } else {
1036 sb.port(new ServerPort(new InetSocketAddress(address, port), managementProtocol));
1037 }
1038 sb.virtualHost(port)
1039 .route()
1040 .pathPrefix(managementConfig.path())
1041 .defaultServiceName("management")
1042 .build(ManagementService.of());
1043 }
1044 }
1045
1046 private static Function<? super HttpService, EncodingService> contentEncodingDecorator() {
1047 return delegate -> EncodingService
1048 .builder()
1049 .encodableContentTypes(contentType -> {
1050 if ("application".equals(contentType.type())) {
1051 final String subtype = contentType.subtype();
1052 switch (subtype) {
1053 case "json":
1054 case "xml":
1055 case "x-thrift":
1056 case "x-git-upload-pack-advertisement":
1057 case "x-git-upload-pack-result":
1058 return true;
1059 default:
1060 return subtype.endsWith("+json") ||
1061 subtype.endsWith("+xml") ||
1062 subtype.startsWith("vnd.apache.thrift.");
1063 }
1064 }
1065 return false;
1066 })
1067 .build(delegate);
1068 }
1069
1070 private void configureMetrics(ServerBuilder sb, MeterRegistry registry) {
1071 sb.meterRegistry(registry);
1072
1073
1074
1075 if (registry instanceof PrometheusMeterRegistry) {
1076 final PrometheusMeterRegistry prometheusMeterRegistry = (PrometheusMeterRegistry) registry;
1077 sb.service(METRICS_PATH,
1078 PrometheusExpositionService.of(prometheusMeterRegistry.getPrometheusRegistry()));
1079 } else if (registry instanceof CompositeMeterRegistry) {
1080 final PrometheusMeterRegistry prometheusMeterRegistry = PrometheusMeterRegistries.newRegistry();
1081 ((CompositeMeterRegistry) registry).add(prometheusMeterRegistry);
1082 sb.service(METRICS_PATH,
1083 PrometheusExpositionService.of(prometheusMeterRegistry.getPrometheusRegistry()));
1084 meterRegistryToBeClosed = prometheusMeterRegistry;
1085 } else {
1086 logger.info("Not exposing a prometheus endpoint for the type: {}", registry.getClass());
1087 }
1088
1089 sb.decorator(MetricCollectingService.newDecorator(MeterIdPrefixFunction.ofDefault("api")));
1090
1091
1092 new FileDescriptorMetrics().bindTo(registry);
1093 new ProcessorMetrics().bindTo(registry);
1094 new ClassLoaderMetrics().bindTo(registry);
1095 new UptimeMetrics().bindTo(registry);
1096 new DiskSpaceMetrics(cfg.dataDir()).bindTo(registry);
1097 new JvmGcMetrics().bindTo(registry);
1098 new JvmMemoryMetrics().bindTo(registry);
1099 new JvmThreadMetrics().bindTo(registry);
1100
1101
1102 ExecutorServiceMetrics.monitor(registry, ForkJoinPool.commonPool(), "commonPool");
1103 }
1104
1105 private void doStop() {
1106 if (server == null) {
1107 return;
1108 }
1109
1110 final Server server = this.server;
1111 final CommandExecutor executor = this.executor;
1112 final ProjectManager pm = this.pm;
1113 final ExecutorService repositoryWorker = this.repositoryWorker;
1114 final ExecutorService purgeWorker = this.purgeWorker;
1115 final SessionManager sessionManager = this.sessionManager;
1116 final MirrorRunner mirrorRunner = this.mirrorRunner;
1117
1118 this.server = null;
1119 this.executor = null;
1120 this.pm = null;
1121 this.repositoryWorker = null;
1122 this.sessionManager = null;
1123 this.mirrorRunner = null;
1124 if (meterRegistryToBeClosed != null) {
1125 assert meterRegistry instanceof CompositeMeterRegistry;
1126 ((CompositeMeterRegistry) meterRegistry).remove(meterRegistryToBeClosed);
1127 meterRegistryToBeClosed.close();
1128 meterRegistryToBeClosed = null;
1129 }
1130
1131 logger.info("Stopping the Central Dogma ..");
1132 if (!doStop(server, executor, pm, repositoryWorker, purgeWorker, sessionManager, mirrorRunner)) {
1133 logger.warn("Stopped the Central Dogma with failure.");
1134 } else {
1135 logger.info("Stopped the Central Dogma successfully.");
1136 }
1137 }
1138
1139 private static boolean doStop(
1140 @Nullable Server server, @Nullable CommandExecutor executor,
1141 @Nullable ProjectManager pm,
1142 @Nullable ExecutorService repositoryWorker, @Nullable ExecutorService purgeWorker,
1143 @Nullable SessionManager sessionManager, @Nullable MirrorRunner mirrorRunner) {
1144
1145 boolean success = true;
1146
1147
1148 try {
1149 if (server != null) {
1150 logger.info("Stopping the RPC server ..");
1151 server.stop().join();
1152 logger.info("Stopped the RPC server.");
1153 }
1154 } catch (Throwable t) {
1155 success = false;
1156 logger.warn("Failed to stop the RPC server:", t);
1157 }
1158
1159 try {
1160 if (sessionManager != null) {
1161 logger.info("Stopping the session manager ..");
1162 sessionManager.close();
1163 logger.info("Stopped the session manager.");
1164 }
1165 } catch (Throwable t) {
1166 success = false;
1167 logger.warn("Failed to stop the session manager:", t);
1168 }
1169
1170 try {
1171 if (pm != null) {
1172 logger.info("Stopping the project manager ..");
1173 pm.close(ShuttingDownException::new);
1174 logger.info("Stopped the project manager.");
1175 }
1176 } catch (Throwable t) {
1177 success = false;
1178 logger.warn("Failed to stop the project manager:", t);
1179 }
1180
1181 try {
1182 if (executor != null) {
1183 logger.info("Stopping the command executor ..");
1184 executor.stop();
1185 logger.info("Stopped the command executor.");
1186 }
1187 } catch (Throwable t) {
1188 success = false;
1189 logger.warn("Failed to stop the command executor:", t);
1190 }
1191
1192 final BiFunction<ExecutorService, String, Boolean> stopWorker = (worker, name) -> {
1193 try {
1194 if (worker != null && !worker.isTerminated()) {
1195 logger.info("Stopping the {} worker ..", name);
1196 boolean interruptLater = false;
1197 while (!worker.isTerminated()) {
1198 worker.shutdownNow();
1199 try {
1200 worker.awaitTermination(1, TimeUnit.SECONDS);
1201 } catch (InterruptedException e) {
1202
1203 interruptLater = true;
1204 }
1205 }
1206 logger.info("Stopped the {} worker.", name);
1207
1208 if (interruptLater) {
1209 Thread.currentThread().interrupt();
1210 }
1211 }
1212 return true;
1213 } catch (Throwable t) {
1214 logger.warn("Failed to stop the " + name + " worker:", t);
1215 return false;
1216 }
1217 };
1218 if (!stopWorker.apply(repositoryWorker, "repository")) {
1219 success = false;
1220 }
1221 if (!stopWorker.apply(purgeWorker, "purge")) {
1222 success = false;
1223 }
1224
1225 try {
1226 if (mirrorRunner != null) {
1227 logger.info("Stopping the mirror runner..");
1228 mirrorRunner.close();
1229 logger.info("Stopped the mirror runner.");
1230 }
1231 } catch (Throwable t) {
1232 success = false;
1233 logger.warn("Failed to stop the mirror runner:", t);
1234 }
1235
1236 return success;
1237 }
1238
1239 private final class CentralDogmaStartStop extends StartStopSupport<Void, Void, Void, Void> {
1240
1241 @Nullable
1242 private final PluginGroup pluginsForAllReplicas;
1243
1244 CentralDogmaStartStop(@Nullable PluginGroup pluginsForAllReplicas) {
1245 super(GlobalEventExecutor.INSTANCE);
1246 this.pluginsForAllReplicas = pluginsForAllReplicas;
1247 }
1248
1249 @Override
1250 protected CompletionStage<Void> doStart(@Nullable Void unused) throws Exception {
1251 return execute("startup", () -> {
1252 try {
1253 final boolean success = CentralDogma.this.doStart();
1254 if (success) {
1255 if (pluginsForAllReplicas != null) {
1256 final ProjectManager pm = CentralDogma.this.pm;
1257 final CommandExecutor executor = CentralDogma.this.executor;
1258 final MeterRegistry meterRegistry = CentralDogma.this.meterRegistry;
1259 if (pm != null && executor != null && meterRegistry != null) {
1260 pluginsForAllReplicas.start(cfg, pm, executor, meterRegistry, purgeWorker,
1261 projectInitializer, mirrorAccessController).join();
1262 }
1263 }
1264 serverHealth.setHealthy(true);
1265 }
1266 } catch (Exception e) {
1267 Exceptions.throwUnsafely(e);
1268 }
1269 });
1270 }
1271
1272 @Override
1273 protected CompletionStage<Void> doStop(@Nullable Void unused) throws Exception {
1274 return execute("shutdown", () -> {
1275 if (pluginsForAllReplicas != null) {
1276 final ProjectManager pm = CentralDogma.this.pm;
1277 final CommandExecutor executor = CentralDogma.this.executor;
1278 final MeterRegistry meterRegistry = CentralDogma.this.meterRegistry;
1279 if (pm != null && executor != null && meterRegistry != null) {
1280 pluginsForAllReplicas.stop(cfg, pm, executor, meterRegistry, purgeWorker,
1281 projectInitializer, mirrorAccessController).join();
1282 }
1283 }
1284 CentralDogma.this.doStop();
1285 });
1286 }
1287
1288 private CompletionStage<Void> execute(String mode, Runnable task) {
1289 final CompletableFuture<Void> future = new CompletableFuture<>();
1290 final Thread thread = new Thread(() -> {
1291 try {
1292 task.run();
1293 future.complete(null);
1294 } catch (Throwable cause) {
1295 future.completeExceptionally(cause);
1296 }
1297 }, "dogma-" + mode + "-0x" + Long.toHexString(CentralDogma.this.hashCode() & 0xFFFFFFFFL));
1298 thread.start();
1299 return future;
1300 }
1301 }
1302 }