1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.linecorp.centraldogma.server;
18  
19  import static com.google.common.base.MoreObjects.firstNonNull;
20  import static com.google.common.base.Preconditions.checkState;
21  import static com.google.common.base.Strings.isNullOrEmpty;
22  import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.API_V0_PATH_PREFIX;
23  import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.API_V1_PATH_PREFIX;
24  import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.HEALTH_CHECK_PATH;
25  import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.METRICS_PATH;
26  import static com.linecorp.centraldogma.server.auth.AuthProvider.LOGIN_API_ROUTES;
27  import static com.linecorp.centraldogma.server.auth.AuthProvider.LOGIN_PATH;
28  import static com.linecorp.centraldogma.server.auth.AuthProvider.LOGOUT_API_ROUTES;
29  import static com.linecorp.centraldogma.server.auth.AuthProvider.LOGOUT_PATH;
30  import static com.linecorp.centraldogma.server.internal.api.sysadmin.MirrorAccessControlService.MIRROR_ACCESS_CONTROL_PATH;
31  import static com.linecorp.centraldogma.server.internal.storage.repository.MirrorConverter.MIRROR_PROVIDERS;
32  import static com.linecorp.centraldogma.server.storage.project.InternalProjectInitializer.INTERNAL_PROJECT_DOGMA;
33  import static java.util.Objects.requireNonNull;
34  
35  import java.io.File;
36  import java.io.IOException;
37  import java.io.InputStream;
38  import java.net.InetSocketAddress;
39  import java.util.Collections;
40  import java.util.List;
41  import java.util.Map;
42  import java.util.Objects;
43  import java.util.Optional;
44  import java.util.concurrent.CompletableFuture;
45  import java.util.concurrent.CompletionStage;
46  import java.util.concurrent.Executor;
47  import java.util.concurrent.ExecutorService;
48  import java.util.concurrent.Executors;
49  import java.util.concurrent.ForkJoinPool;
50  import java.util.concurrent.LinkedBlockingQueue;
51  import java.util.concurrent.ScheduledExecutorService;
52  import java.util.concurrent.ThreadPoolExecutor;
53  import java.util.concurrent.TimeUnit;
54  import java.util.concurrent.TimeoutException;
55  import java.util.concurrent.atomic.AtomicInteger;
56  import java.util.function.BiFunction;
57  import java.util.function.Consumer;
58  import java.util.function.Function;
59  
60  import javax.annotation.Nullable;
61  
62  import org.slf4j.Logger;
63  import org.slf4j.LoggerFactory;
64  
65  import com.fasterxml.jackson.core.JsonProcessingException;
66  import com.fasterxml.jackson.databind.ObjectMapper;
67  import com.fasterxml.jackson.databind.module.SimpleModule;
68  import com.github.benmanes.caffeine.cache.Caffeine;
69  import com.github.benmanes.caffeine.cache.stats.CacheStats;
70  import com.google.common.collect.ImmutableList;
71  import com.google.common.collect.ImmutableMap;
72  
73  import com.linecorp.armeria.common.DependencyInjector;
74  import com.linecorp.armeria.common.Flags;
75  import com.linecorp.armeria.common.HttpData;
76  import com.linecorp.armeria.common.HttpHeaderNames;
77  import com.linecorp.armeria.common.HttpHeaders;
78  import com.linecorp.armeria.common.HttpMethod;
79  import com.linecorp.armeria.common.HttpRequest;
80  import com.linecorp.armeria.common.HttpResponse;
81  import com.linecorp.armeria.common.HttpStatus;
82  import com.linecorp.armeria.common.MediaType;
83  import com.linecorp.armeria.common.ServerCacheControl;
84  import com.linecorp.armeria.common.SessionProtocol;
85  import com.linecorp.armeria.common.metric.MeterIdPrefixFunction;
86  import com.linecorp.armeria.common.prometheus.PrometheusMeterRegistries;
87  import com.linecorp.armeria.common.util.EventLoopGroups;
88  import com.linecorp.armeria.common.util.Exceptions;
89  import com.linecorp.armeria.common.util.StartStopSupport;
90  import com.linecorp.armeria.common.util.SystemInfo;
91  import com.linecorp.armeria.internal.common.ReflectiveDependencyInjector;
92  import com.linecorp.armeria.server.AbstractHttpService;
93  import com.linecorp.armeria.server.ContextPathServicesBuilder;
94  import com.linecorp.armeria.server.DecoratingServiceBindingBuilder;
95  import com.linecorp.armeria.server.GracefulShutdown;
96  import com.linecorp.armeria.server.HttpService;
97  import com.linecorp.armeria.server.Route;
98  import com.linecorp.armeria.server.Server;
99  import com.linecorp.armeria.server.ServerBuilder;
100 import com.linecorp.armeria.server.ServerPort;
101 import com.linecorp.armeria.server.ServiceNaming;
102 import com.linecorp.armeria.server.ServiceRequestContext;
103 import com.linecorp.armeria.server.annotation.JacksonRequestConverterFunction;
104 import com.linecorp.armeria.server.auth.AuthService;
105 import com.linecorp.armeria.server.auth.Authorizer;
106 import com.linecorp.armeria.server.cors.CorsService;
107 import com.linecorp.armeria.server.docs.DocService;
108 import com.linecorp.armeria.server.encoding.DecodingService;
109 import com.linecorp.armeria.server.encoding.EncodingService;
110 import com.linecorp.armeria.server.file.FileService;
111 import com.linecorp.armeria.server.file.HttpFile;
112 import com.linecorp.armeria.server.healthcheck.HealthCheckService;
113 import com.linecorp.armeria.server.healthcheck.SettableHealthChecker;
114 import com.linecorp.armeria.server.logging.AccessLogWriter;
115 import com.linecorp.armeria.server.management.ManagementService;
116 import com.linecorp.armeria.server.metric.MetricCollectingService;
117 import com.linecorp.armeria.server.prometheus.PrometheusExpositionService;
118 import com.linecorp.armeria.server.thrift.THttpService;
119 import com.linecorp.armeria.server.thrift.ThriftCallService;
120 import com.linecorp.centraldogma.common.ShuttingDownException;
121 import com.linecorp.centraldogma.internal.CsrfToken;
122 import com.linecorp.centraldogma.internal.Jackson;
123 import com.linecorp.centraldogma.internal.thrift.CentralDogmaService;
124 import com.linecorp.centraldogma.server.auth.AuthConfig;
125 import com.linecorp.centraldogma.server.auth.AuthProvider;
126 import com.linecorp.centraldogma.server.auth.AuthProviderParameters;
127 import com.linecorp.centraldogma.server.auth.SessionManager;
128 import com.linecorp.centraldogma.server.command.Command;
129 import com.linecorp.centraldogma.server.command.CommandExecutor;
130 import com.linecorp.centraldogma.server.command.StandaloneCommandExecutor;
131 import com.linecorp.centraldogma.server.internal.admin.auth.CachedSessionManager;
132 import com.linecorp.centraldogma.server.internal.admin.auth.CsrfTokenAuthorizer;
133 import com.linecorp.centraldogma.server.internal.admin.auth.ExpiredSessionDeletingSessionManager;
134 import com.linecorp.centraldogma.server.internal.admin.auth.FileBasedSessionManager;
135 import com.linecorp.centraldogma.server.internal.admin.auth.SessionTokenAuthorizer;
136 import com.linecorp.centraldogma.server.internal.admin.service.DefaultLogoutService;
137 import com.linecorp.centraldogma.server.internal.admin.service.RepositoryService;
138 import com.linecorp.centraldogma.server.internal.admin.service.UserService;
139 import com.linecorp.centraldogma.server.internal.api.ContentServiceV1;
140 import com.linecorp.centraldogma.server.internal.api.CredentialServiceV1;
141 import com.linecorp.centraldogma.server.internal.api.GitHttpService;
142 import com.linecorp.centraldogma.server.internal.api.HttpApiExceptionHandler;
143 import com.linecorp.centraldogma.server.internal.api.MetadataApiService;
144 import com.linecorp.centraldogma.server.internal.api.MirroringServiceV1;
145 import com.linecorp.centraldogma.server.internal.api.ProjectServiceV1;
146 import com.linecorp.centraldogma.server.internal.api.RepositoryServiceV1;
147 import com.linecorp.centraldogma.server.internal.api.WatchService;
148 import com.linecorp.centraldogma.server.internal.api.auth.ApplicationTokenAuthorizer;
149 import com.linecorp.centraldogma.server.internal.api.auth.RequiresProjectRoleDecorator.RequiresProjectRoleDecoratorFactory;
150 import com.linecorp.centraldogma.server.internal.api.auth.RequiresRepositoryRoleDecorator.RequiresRepositoryRoleDecoratorFactory;
151 import com.linecorp.centraldogma.server.internal.api.converter.HttpApiRequestConverter;
152 import com.linecorp.centraldogma.server.internal.api.sysadmin.MirrorAccessControlService;
153 import com.linecorp.centraldogma.server.internal.api.sysadmin.ServerStatusService;
154 import com.linecorp.centraldogma.server.internal.api.sysadmin.TokenService;
155 import com.linecorp.centraldogma.server.internal.mirror.DefaultMirrorAccessController;
156 import com.linecorp.centraldogma.server.internal.mirror.DefaultMirroringServicePlugin;
157 import com.linecorp.centraldogma.server.internal.mirror.MirrorAccessControl;
158 import com.linecorp.centraldogma.server.internal.mirror.MirrorRunner;
159 import com.linecorp.centraldogma.server.internal.replication.ZooKeeperCommandExecutor;
160 import com.linecorp.centraldogma.server.internal.storage.project.DefaultProjectManager;
161 import com.linecorp.centraldogma.server.internal.storage.project.ProjectApiManager;
162 import com.linecorp.centraldogma.server.internal.storage.repository.CrudRepository;
163 import com.linecorp.centraldogma.server.internal.storage.repository.git.GitCrudRepository;
164 import com.linecorp.centraldogma.server.internal.thrift.CentralDogmaExceptionTranslator;
165 import com.linecorp.centraldogma.server.internal.thrift.CentralDogmaServiceImpl;
166 import com.linecorp.centraldogma.server.internal.thrift.CentralDogmaTimeoutScheduler;
167 import com.linecorp.centraldogma.server.internal.thrift.TokenlessClientLogger;
168 import com.linecorp.centraldogma.server.management.ServerStatus;
169 import com.linecorp.centraldogma.server.management.ServerStatusManager;
170 import com.linecorp.centraldogma.server.metadata.MetadataService;
171 import com.linecorp.centraldogma.server.mirror.MirrorProvider;
172 import com.linecorp.centraldogma.server.plugin.AllReplicasPlugin;
173 import com.linecorp.centraldogma.server.plugin.Plugin;
174 import com.linecorp.centraldogma.server.plugin.PluginInitContext;
175 import com.linecorp.centraldogma.server.plugin.PluginTarget;
176 import com.linecorp.centraldogma.server.storage.project.InternalProjectInitializer;
177 import com.linecorp.centraldogma.server.storage.project.Project;
178 import com.linecorp.centraldogma.server.storage.project.ProjectManager;
179 
180 import io.micrometer.core.instrument.MeterRegistry;
181 import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics;
182 import io.micrometer.core.instrument.binder.jvm.DiskSpaceMetrics;
183 import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
184 import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics;
185 import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
186 import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
187 import io.micrometer.core.instrument.binder.system.FileDescriptorMetrics;
188 import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
189 import io.micrometer.core.instrument.binder.system.UptimeMetrics;
190 import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
191 import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
192 import io.netty.util.concurrent.DefaultThreadFactory;
193 import io.netty.util.concurrent.GlobalEventExecutor;
194 
195 /**
196  * Central Dogma server.
197  *
198  * @see CentralDogmaBuilder
199  */
200 public class CentralDogma implements AutoCloseable {
201 
202     private static final Logger logger = LoggerFactory.getLogger(CentralDogma.class);
203 
204     private static final boolean GIT_MIRROR_ENABLED;
205 
206     static {
207         Jackson.registerModules(new SimpleModule().addSerializer(CacheStats.class, new CacheStatsSerializer()));
208 
209         boolean gitMirrorEnabled = false;
210         for (MirrorProvider mirrorProvider : MIRROR_PROVIDERS) {
211             if ("com.linecorp.centraldogma.server.internal.mirror.GitMirrorProvider"
212                     .equals(mirrorProvider.getClass().getName())) {
213                 gitMirrorEnabled = true;
214                 break;
215             }
216         }
217         logger.info("Git mirroring: {}",
218                     gitMirrorEnabled ? "enabled"
219                                      : "disabled ('centraldogma-server-mirror-git' module is not available)");
220         GIT_MIRROR_ENABLED = gitMirrorEnabled;
221     }
222 
223     /**
224      * Creates a new instance from the given configuration file.
225      *
226      * @throws IOException if failed to load the configuration from the specified file
227      */
228     public static CentralDogma forConfig(File configFile) throws IOException {
229         requireNonNull(configFile, "configFile");
230         return new CentralDogma(CentralDogmaConfig.load(configFile), Flags.meterRegistry(), ImmutableList.of());
231     }
232 
233     private final SettableHealthChecker serverHealth = new SettableHealthChecker(false);
234     private final CentralDogmaStartStop startStop;
235 
236     private final AtomicInteger numPendingStopRequests = new AtomicInteger();
237 
238     private final Map<PluginTarget, PluginGroup> pluginGroups;
239     @Nullable
240     private final PluginGroup pluginsForAllReplicas;
241     @Nullable
242     private final PluginGroup pluginsForLeaderOnly;
243     @Nullable
244     private final PluginGroup pluginsForZoneLeaderOnly;
245 
246     private final CentralDogmaConfig cfg;
247     @Nullable
248     private volatile ProjectManager pm;
249     @Nullable
250     private volatile Server server;
251     @Nullable
252     private ExecutorService repositoryWorker;
253     @Nullable
254     private ScheduledExecutorService purgeWorker;
255     @Nullable
256     private CommandExecutor executor;
257     private final MeterRegistry meterRegistry;
258     @Nullable
259     MeterRegistry meterRegistryToBeClosed;
260     @Nullable
261     private SessionManager sessionManager;
262     @Nullable
263     private ServerStatusManager statusManager;
264     @Nullable
265     private InternalProjectInitializer projectInitializer;
266     @Nullable
267     private volatile MirrorRunner mirrorRunner;
268     @Nullable
269     private volatile DefaultMirrorAccessController mirrorAccessController;
270 
271     CentralDogma(CentralDogmaConfig cfg, MeterRegistry meterRegistry, List<Plugin> plugins) {
272         this.cfg = requireNonNull(cfg, "cfg");
273         pluginGroups = PluginGroup.loadPlugins(CentralDogma.class.getClassLoader(), cfg, plugins);
274         pluginsForAllReplicas = pluginGroups.get(PluginTarget.ALL_REPLICAS);
275         pluginsForLeaderOnly = pluginGroups.get(PluginTarget.LEADER_ONLY);
276         pluginsForZoneLeaderOnly = pluginGroups.get(PluginTarget.ZONE_LEADER_ONLY);
277         if (pluginsForZoneLeaderOnly != null) {
278             checkState(cfg.zone() != null,
279                        "zone must be specified when zone leader plugins are enabled.");
280         }
281         startStop = new CentralDogmaStartStop(pluginsForAllReplicas);
282         this.meterRegistry = meterRegistry;
283     }
284 
285     /**
286      * Returns the configuration of the server.
287      *
288      * @return the {@link CentralDogmaConfig} instance which is used for configuring this {@link CentralDogma}.
289      */
290     public CentralDogmaConfig config() {
291         return cfg;
292     }
293 
294     /**
295      * Returns the primary port of the server.
296      *
297      * @return the primary {@link ServerPort} if the server is started. {@link Optional#empty()} otherwise.
298      */
299     @Nullable
300     public ServerPort activePort() {
301         final Server server = this.server;
302         return server != null ? server.activePort() : null;
303     }
304 
305     /**
306      * Returns the ports of the server.
307      *
308      * @return the {@link Map} which contains the pairs of local {@link InetSocketAddress} and
309      *         {@link ServerPort} is the server is started. {@link Optional#empty()} otherwise.
310      */
311     public Map<InetSocketAddress, ServerPort> activePorts() {
312         final Server server = this.server;
313         if (server != null) {
314             return server.activePorts();
315         } else {
316             return Collections.emptyMap();
317         }
318     }
319 
320     /**
321      * Returns the {@link ProjectManager} of the server if the server is started.
322      * {@code null} is returned, otherwise.
323      */
324     @Nullable
325     public ProjectManager projectManager() {
326         return pm;
327     }
328 
329     /**
330      * Returns the {@link MirroringService} of the server.
331      *
332      * @return the {@link MirroringService} if the server is started and mirroring is enabled.
333      *         {@code null} otherwise.
334      */
335     @Nullable
336     public MirroringService mirroringService() {
337         return pluginGroups.values()
338                            .stream()
339                            .map(group -> {
340                                return group.findFirstPlugin(DefaultMirroringServicePlugin.class);
341                            })
342                            .filter(Objects::nonNull)
343                            .findFirst()
344                            .map(DefaultMirroringServicePlugin::mirroringService)
345                            .orElse(null);
346     }
347 
348     /**
349      * Returns the {@link Plugin}s which have been loaded.
350      *
351      * @param target the {@link PluginTarget} of the {@link Plugin}s to be returned
352      */
353     public List<Plugin> plugins(PluginTarget target) {
354         requireNonNull(target, "target");
355         return pluginGroups.get(target).plugins();
356     }
357 
358     /**
359      * Returns the {@link MeterRegistry} that contains the stats related with the server.
360      */
361     public Optional<MeterRegistry> meterRegistry() {
362         return Optional.ofNullable(meterRegistry);
363     }
364 
365     /**
366      * Starts the server.
367      */
368     public CompletableFuture<Void> start() {
369         return startStop.start(true);
370     }
371 
372     /**
373      * Stops the server. This method does nothing if the server is stopped already.
374      */
375     public CompletableFuture<Void> stop() {
376         serverHealth.setHealthy(false);
377 
378         final Optional<GracefulShutdownTimeout> gracefulTimeoutOpt = cfg.gracefulShutdownTimeout();
379         if (gracefulTimeoutOpt.isPresent()) {
380             try {
381                 // Sleep 1 second so that clients have some time to redirect traffic according
382                 // to the health status
383                 Thread.sleep(1000);
384             } catch (InterruptedException e) {
385                 logger.debug("Interrupted while waiting for quiet period", e);
386                 Thread.currentThread().interrupt();
387             }
388         }
389 
390         numPendingStopRequests.incrementAndGet();
391         return startStop.stop().thenRun(numPendingStopRequests::decrementAndGet);
392     }
393 
394     @Override
395     public void close() {
396         startStop.close();
397     }
398 
399     private boolean doStart() throws Exception {
400         boolean success = false;
401         ExecutorService repositoryWorker = null;
402         ScheduledExecutorService purgeWorker = null;
403         ProjectManager pm = null;
404         CommandExecutor executor = null;
405         Server server = null;
406         SessionManager sessionManager = null;
407         try {
408             logger.info("Starting the Central Dogma ..");
409 
410             final ThreadPoolExecutor repositoryWorkerImpl = new ThreadPoolExecutor(
411                     cfg.numRepositoryWorkers(), cfg.numRepositoryWorkers(),
412                     // TODO(minwoox): Use LinkedTransferQueue when we upgrade to JDK 21.
413                     60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
414                     new DefaultThreadFactory("repository-worker", true));
415             repositoryWorkerImpl.allowCoreThreadTimeOut(true);
416             repositoryWorker = ExecutorServiceMetrics.monitor(meterRegistry, repositoryWorkerImpl,
417                                                               "repositoryWorker");
418 
419             logger.info("Starting the project manager: {}", cfg.dataDir());
420 
421             purgeWorker = Executors.newSingleThreadScheduledExecutor(
422                     new DefaultThreadFactory("purge-worker", true));
423 
424             pm = new DefaultProjectManager(cfg.dataDir(), repositoryWorker, purgeWorker,
425                                            meterRegistry, cfg.repositoryCacheSpec());
426 
427             logger.info("Started the project manager: {}", pm);
428 
429             logger.info("Current settings:\n{}", cfg);
430 
431             sessionManager = initializeSessionManager();
432 
433             logger.info("Starting the command executor ..");
434             executor = startCommandExecutor(pm, repositoryWorker, purgeWorker,
435                                             meterRegistry, sessionManager);
436             // The projectInitializer is set in startCommandExecutor.
437             assert projectInitializer != null;
438             if (executor.isWritable()) {
439                 logger.info("Started the command executor.");
440             }
441 
442             logger.info("Starting the RPC server.");
443             server = startServer(pm, executor, purgeWorker, meterRegistry, sessionManager,
444                                  projectInitializer);
445             logger.info("Started the RPC server at: {}", server.activePorts());
446             logger.info("Started the Central Dogma successfully.");
447             success = true;
448         } finally {
449             if (success) {
450                 this.repositoryWorker = repositoryWorker;
451                 this.purgeWorker = purgeWorker;
452                 this.pm = pm;
453                 this.executor = executor;
454                 this.server = server;
455                 this.sessionManager = sessionManager;
456             } else {
457                 doStop(server, executor, pm, repositoryWorker, purgeWorker, sessionManager, mirrorRunner);
458             }
459         }
460         return success;
461     }
462 
463     private CommandExecutor startCommandExecutor(
464             ProjectManager pm, Executor repositoryWorker,
465             ScheduledExecutorService purgeWorker, MeterRegistry meterRegistry,
466             @Nullable SessionManager sessionManager) {
467 
468         final Consumer<CommandExecutor> onTakeLeadership = exec -> {
469             if (pluginsForLeaderOnly != null) {
470                 logger.info("Starting plugins on the leader replica ..");
471                 pluginsForLeaderOnly
472                         .start(cfg, pm, exec, meterRegistry, purgeWorker, projectInitializer,
473                                mirrorAccessController)
474                         .handle((unused, cause) -> {
475                             if (cause == null) {
476                                 logger.info("Started plugins on the leader replica.");
477                             } else {
478                                 logger.error("Failed to start plugins on the leader replica..", cause);
479                             }
480                             return null;
481                         });
482             }
483         };
484 
485         final Consumer<CommandExecutor> onReleaseLeadership = exec -> {
486             if (pluginsForLeaderOnly != null) {
487                 logger.info("Stopping plugins on the leader replica ..");
488                 final CompletableFuture<?> future =
489                         pluginsForLeaderOnly
490                                 .stop(cfg, pm, exec, meterRegistry, purgeWorker, projectInitializer,
491                                       mirrorAccessController)
492                                 .handle((unused, cause) -> {
493                                     if (cause == null) {
494                                         logger.info("Stopped plugins on the leader replica.");
495                                     } else {
496                                         logger.error("Failed to stop plugins on the leader replica.", cause);
497                                     }
498                                     return null;
499                                 });
500                 try {
501                     future.get(10, TimeUnit.SECONDS);
502                 } catch (Exception e) {
503                     logger.warn("Failed to stop plugins on the leader replica in 10 seconds.", e);
504                 }
505             }
506         };
507 
508         Consumer<CommandExecutor> onTakeZoneLeadership = null;
509         Consumer<CommandExecutor> onReleaseZoneLeadership = null;
510         // TODO(ikhoon): Deduplicate
511         if (pluginsForZoneLeaderOnly != null) {
512             assert cfg.zone() != null;
513             final String zone = cfg.zone().currentZone();
514             onTakeZoneLeadership = exec -> {
515                 logger.info("Starting plugins on the {} zone leader replica ..", zone);
516                 pluginsForZoneLeaderOnly
517                         .start(cfg, pm, exec, meterRegistry, purgeWorker, projectInitializer,
518                                mirrorAccessController)
519                         .handle((unused, cause) -> {
520                             if (cause == null) {
521                                 logger.info("Started plugins on the {} zone leader replica.", zone);
522                             } else {
523                                 logger.error("Failed to start plugins on the {} zone leader replica..",
524                                              zone, cause);
525                             }
526                             return null;
527                         });
528             };
529             onReleaseZoneLeadership = exec -> {
530                 logger.info("Stopping plugins on the {} zone leader replica ..", zone);
531                 final CompletableFuture<?> future =
532                         pluginsForZoneLeaderOnly
533                                 .stop(cfg, pm, exec, meterRegistry, purgeWorker,
534                                       projectInitializer, mirrorAccessController)
535                                 .handle((unused, cause) -> {
536                                     if (cause == null) {
537                                         logger.info("Stopped plugins on the {} zone leader replica.", zone);
538                                     } else {
539                                         logger.error("Failed to stop plugins on the {} zone leader replica.",
540                                                      zone, cause);
541                                     }
542                                     return null;
543                                 });
544                 try {
545                     future.get(10, TimeUnit.SECONDS);
546                 } catch (Exception e) {
547                     logger.warn("Failed to stop plugins on the {} zone leader replica in 10 seconds", zone, e);
548                 }
549             };
550         }
551 
552         statusManager = new ServerStatusManager(cfg.dataDir());
553         logger.info("Startup mode: {}", statusManager.serverStatus());
554         final CommandExecutor executor;
555         final ReplicationMethod replicationMethod = cfg.replicationConfig().method();
556         switch (replicationMethod) {
557             case ZOOKEEPER:
558                 executor = newZooKeeperCommandExecutor(pm, repositoryWorker, statusManager, meterRegistry,
559                                                        sessionManager, onTakeLeadership, onReleaseLeadership,
560                                                        onTakeZoneLeadership, onReleaseZoneLeadership);
561                 break;
562             case NONE:
563                 logger.info("No replication mechanism specified; entering standalone");
564                 executor = new StandaloneCommandExecutor(pm, repositoryWorker, statusManager, sessionManager,
565                                                          onTakeLeadership, onReleaseLeadership,
566                                                          onTakeZoneLeadership, onReleaseZoneLeadership);
567                 break;
568             default:
569                 throw new Error("unknown replication method: " + replicationMethod);
570         }
571         projectInitializer = new InternalProjectInitializer(executor, pm);
572         mirrorAccessController = new DefaultMirrorAccessController();
573 
574         final ServerStatus initialServerStatus = statusManager.serverStatus();
575         executor.setWritable(initialServerStatus.writable());
576         if (!initialServerStatus.replicating()) {
577             projectInitializer.whenInitialized().complete(null);
578             return executor;
579         }
580         try {
581             final CompletableFuture<Void> startFuture = executor.start();
582             while (!startFuture.isDone()) {
583                 if (numPendingStopRequests.get() > 0) {
584                     // Stop request has been issued.
585                     executor.stop().get();
586                     break;
587                 }
588 
589                 try {
590                     startFuture.get(100, TimeUnit.MILLISECONDS);
591                 } catch (TimeoutException unused) {
592                     // Taking long time ..
593                 }
594             }
595 
596             // Trigger the exception if any.
597             startFuture.get();
598             projectInitializer.initialize();
599             final CrudRepository<MirrorAccessControl> accessControlRepository =
600                     new GitCrudRepository<>(MirrorAccessControl.class, executor, pm,
601                                             INTERNAL_PROJECT_DOGMA, Project.REPO_DOGMA,
602                                             MIRROR_ACCESS_CONTROL_PATH);
603             mirrorAccessController.setRepository(accessControlRepository);
604         } catch (Exception e) {
605             projectInitializer.whenInitialized().complete(null);
606             logger.warn("Failed to start the command executor. Entering read-only.", e);
607         }
608 
609         return executor;
610     }
611 
612     @Nullable
613     private SessionManager initializeSessionManager() throws Exception {
614         final AuthConfig authCfg = cfg.authConfig();
615         if (authCfg == null) {
616             return null;
617         }
618 
619         boolean success = false;
620         SessionManager manager = null;
621         try {
622             manager = new FileBasedSessionManager(new File(cfg.dataDir(), "_sessions").toPath(),
623                                                   authCfg.sessionValidationSchedule());
624             manager = new CachedSessionManager(manager, Caffeine.from(authCfg.sessionCacheSpec()).build());
625             manager = new ExpiredSessionDeletingSessionManager(manager);
626             success = true;
627             return manager;
628         } finally {
629             if (!success && manager != null) {
630                 try {
631                     // It will eventually close FileBasedSessionManager because the other managers just forward
632                     // the close method call to their delegate.
633                     manager.close();
634                 } catch (Exception e) {
635                     logger.warn("Failed to close a session manager.", e);
636                 }
637             }
638         }
639     }
640 
641     private Server startServer(ProjectManager pm, CommandExecutor executor,
642                                ScheduledExecutorService purgeWorker, MeterRegistry meterRegistry,
643                                @Nullable SessionManager sessionManager,
644                                InternalProjectInitializer projectInitializer) {
645         final ServerBuilder sb = Server.builder();
646         cfg.ports().forEach(sb::port);
647 
648         final boolean needsTls =
649                 cfg.ports().stream().anyMatch(ServerPort::hasTls) ||
650                 (cfg.managementConfig() != null && cfg.managementConfig().protocol().isTls());
651 
652         if (needsTls) {
653             try {
654                 final TlsConfig tlsConfig = cfg.tls();
655                 if (tlsConfig != null) {
656                     try (InputStream keyCertChainInputStream = tlsConfig.keyCertChainInputStream();
657                          InputStream keyInputStream = tlsConfig.keyInputStream()) {
658                         sb.tls(keyCertChainInputStream, keyInputStream, tlsConfig.keyPassword());
659                     }
660                 } else {
661                     logger.warn(
662                             "Missing TLS configuration. Generating a self-signed certificate for TLS support.");
663                     sb.tlsSelfSigned();
664                 }
665             } catch (Exception e) {
666                 Exceptions.throwUnsafely(e);
667             }
668         }
669 
670         sb.clientAddressSources(cfg.clientAddressSourceList());
671         sb.clientAddressTrustedProxyFilter(cfg.trustedProxyAddressPredicate());
672 
673         cfg.numWorkers().ifPresent(
674                 numWorkers -> sb.workerGroup(EventLoopGroups.newEventLoopGroup(numWorkers), true));
675         cfg.maxNumConnections().ifPresent(sb::maxNumConnections);
676         cfg.idleTimeoutMillis().ifPresent(sb::idleTimeoutMillis);
677         cfg.requestTimeoutMillis().ifPresent(sb::requestTimeoutMillis);
678         cfg.maxFrameLength().ifPresent(sb::maxRequestLength);
679         cfg.gracefulShutdownTimeout().ifPresent(t -> {
680             final GracefulShutdown gracefulShutdown =
681                     GracefulShutdown.builder()
682                                     .quietPeriodMillis(t.quietPeriodMillis())
683                                     .timeoutMillis(t.timeoutMillis())
684                                     .toExceptionFunction((ctx, req) -> {
685                                         return new ShuttingDownException();
686                                     })
687                                     .build();
688             sb.gracefulShutdown(gracefulShutdown);
689         });
690 
691         final MetadataService mds = new MetadataService(pm, executor, projectInitializer);
692         executor.setRepositoryMetadataSupplier(mds::getRepo);
693         final WatchService watchService = new WatchService(meterRegistry);
694         final AuthProvider authProvider = createAuthProvider(executor, sessionManager, mds);
695         final ProjectApiManager projectApiManager = new ProjectApiManager(pm, executor, mds);
696 
697         configureThriftService(sb, projectApiManager, executor, watchService, mds);
698 
699         sb.service("/title", webAppTitleFile(cfg.webAppTitle(), SystemInfo.hostname()).asService());
700 
701         sb.service(HEALTH_CHECK_PATH, HealthCheckService.builder()
702                                                         .checkers(serverHealth)
703                                                         .build());
704         configManagement(sb, config().managementConfig());
705 
706         sb.serviceUnder("/docs/",
707                         DocService.builder()
708                                   .exampleHeaders(CentralDogmaService.class,
709                                                   HttpHeaders.of(HttpHeaderNames.AUTHORIZATION,
710                                                                  "Bearer " + CsrfToken.ANONYMOUS))
711                                   .build());
712         final Function<? super HttpService, AuthService> authService =
713                 authService(mds, authProvider, sessionManager);
714         configureHttpApi(sb, projectApiManager, executor, watchService, mds, authProvider, authService,
715                          meterRegistry);
716 
717         configureMetrics(sb, meterRegistry);
718         // Add the CORS service as the last decorator(executed first) so that the CORS service is applied
719         // before AuthService.
720         configCors(sb, config().corsConfig());
721 
722         // Configure access log format.
723         final String accessLogFormat = cfg.accessLogFormat();
724         if (isNullOrEmpty(accessLogFormat)) {
725             sb.accessLogWriter(AccessLogWriter.disabled(), true);
726         } else if ("common".equals(accessLogFormat)) {
727             sb.accessLogWriter(AccessLogWriter.common(), true);
728         } else if ("combined".equals(accessLogFormat)) {
729             sb.accessLogWriter(AccessLogWriter.combined(), true);
730         } else {
731             sb.accessLogFormat(accessLogFormat);
732         }
733 
734         if (pluginsForAllReplicas != null) {
735             final PluginInitContext pluginInitContext =
736                     new PluginInitContext(config(), pm, executor, meterRegistry, purgeWorker, sb,
737                                           authService, projectInitializer, mirrorAccessController);
738             pluginsForAllReplicas.plugins()
739                                  .forEach(p -> {
740                                      if (!(p instanceof AllReplicasPlugin)) {
741                                          return;
742                                      }
743                                      final AllReplicasPlugin plugin = (AllReplicasPlugin) p;
744                                      plugin.init(pluginInitContext);
745                                  });
746         }
747         // Configure the uncaught exception handler just before starting the server so that override the
748         // default exception handler set by third-party libraries such as NIOServerCnxnFactory.
749         Thread.setDefaultUncaughtExceptionHandler((t, e) -> logger.warn("Uncaught exception: {}", t, e));
750 
751         final Server s = sb.build();
752         s.start().join();
753         return s;
754     }
755 
756     static HttpFile webAppTitleFile(@Nullable String webAppTitle, String hostname) {
757         requireNonNull(hostname, "hostname");
758         final Map<String, String> titleAndHostname = ImmutableMap.of(
759                 "title", firstNonNull(webAppTitle, "Central Dogma at {{hostname}}"),
760                 "hostname", hostname);
761 
762         try {
763             final HttpData data = HttpData.ofUtf8(Jackson.writeValueAsString(titleAndHostname));
764             return HttpFile.builder(data)
765                            .contentType(MediaType.JSON_UTF_8)
766                            .cacheControl(ServerCacheControl.REVALIDATED)
767                            .build();
768         } catch (JsonProcessingException e) {
769             throw new Error("Failed to encode the title and hostname:", e);
770         }
771     }
772 
773     @Nullable
774     private AuthProvider createAuthProvider(
775             CommandExecutor commandExecutor, @Nullable SessionManager sessionManager, MetadataService mds) {
776         final AuthConfig authCfg = cfg.authConfig();
777         if (authCfg == null) {
778             return null;
779         }
780 
781         checkState(sessionManager != null, "SessionManager is null");
782         final AuthProviderParameters parameters = new AuthProviderParameters(
783                 // Find application first, then find the session token.
784                 new ApplicationTokenAuthorizer(mds::findTokenBySecret).orElse(
785                         new SessionTokenAuthorizer(sessionManager, authCfg.systemAdministrators())),
786                 cfg,
787                 sessionManager::generateSessionId,
788                 // Propagate login and logout events to the other replicas.
789                 session -> commandExecutor.execute(Command.createSession(session)),
790                 sessionId -> commandExecutor.execute(Command.removeSession(sessionId)));
791         return authCfg.factory().create(parameters);
792     }
793 
794     private CommandExecutor newZooKeeperCommandExecutor(
795             ProjectManager pm, Executor repositoryWorker,
796             ServerStatusManager serverStatusManager,
797             MeterRegistry meterRegistry,
798             @Nullable SessionManager sessionManager,
799             @Nullable Consumer<CommandExecutor> onTakeLeadership,
800             @Nullable Consumer<CommandExecutor> onReleaseLeadership,
801             @Nullable Consumer<CommandExecutor> onTakeZoneLeadership,
802             @Nullable Consumer<CommandExecutor> onReleaseZoneLeadership) {
803         final ZooKeeperReplicationConfig zkCfg = (ZooKeeperReplicationConfig) cfg.replicationConfig();
804 
805         // Delete the old UUID replica ID which is not used anymore.
806         final File dataDir = cfg.dataDir();
807         new File(dataDir, "replica_id").delete();
808 
809         String zone = null;
810         if (config().zone() != null) {
811             zone = config().zone().currentZone();
812         }
813         // TODO(trustin): Provide a way to restart/reload the replicator
814         //                so that we can recover from ZooKeeper maintenance automatically.
815         return new ZooKeeperCommandExecutor(
816                 zkCfg, dataDir,
817                 new StandaloneCommandExecutor(pm, repositoryWorker, serverStatusManager, sessionManager,
818                         /* onTakeLeadership */ null, /* onReleaseLeadership */ null,
819                         /* onTakeZoneLeadership */ null, /* onReleaseZoneLeadership */ null),
820                 meterRegistry, zone,
821                 onTakeLeadership, onReleaseLeadership,
822                 onTakeZoneLeadership, onReleaseZoneLeadership);
823     }
824 
825     private void configureThriftService(ServerBuilder sb, ProjectApiManager projectApiManager,
826                                         CommandExecutor executor,
827                                         WatchService watchService, MetadataService mds) {
828         final CentralDogmaServiceImpl service =
829                 new CentralDogmaServiceImpl(projectApiManager, executor, watchService, mds);
830 
831         HttpService thriftService =
832                 ThriftCallService.of(service)
833                                  .decorate(CentralDogmaTimeoutScheduler::new)
834                                  .decorate(CentralDogmaExceptionTranslator::new)
835                                  .decorate(THttpService.newDecorator());
836 
837         if (cfg.isCsrfTokenRequiredForThrift()) {
838             thriftService = thriftService.decorate(AuthService.newDecorator(new CsrfTokenAuthorizer()));
839         } else {
840             thriftService = thriftService.decorate(TokenlessClientLogger::new);
841         }
842 
843         // Enable content compression for API responses.
844         thriftService = thriftService.decorate(contentEncodingDecorator());
845 
846         sb.service("/cd/thrift/v1", thriftService);
847     }
848 
849     private Function<? super HttpService, AuthService> authService(
850             MetadataService mds, @Nullable AuthProvider authProvider, @Nullable SessionManager sessionManager) {
851         if (authProvider == null) {
852             return AuthService.newDecorator(new CsrfTokenAuthorizer());
853         }
854         final AuthConfig authCfg = cfg.authConfig();
855         assert authCfg != null : "authCfg";
856         assert sessionManager != null : "sessionManager";
857         final Authorizer<HttpRequest> tokenAuthorizer =
858                 new ApplicationTokenAuthorizer(mds::findTokenBySecret)
859                         .orElse(new SessionTokenAuthorizer(sessionManager,
860                                                            authCfg.systemAdministrators()));
861         return AuthService.builder()
862                           .add(tokenAuthorizer)
863                           .onFailure(new CentralDogmaAuthFailureHandler())
864                           .newDecorator();
865     }
866 
867     private void configureHttpApi(ServerBuilder sb,
868                                   ProjectApiManager projectApiManager, CommandExecutor executor,
869                                   WatchService watchService, MetadataService mds,
870                                   @Nullable AuthProvider authProvider,
871                                   Function<? super HttpService, AuthService> authService,
872                                   MeterRegistry meterRegistry) {
873         final DependencyInjector dependencyInjector = DependencyInjector.ofSingletons(
874                 // Use the default ObjectMapper without any configuration.
875                 // See JacksonRequestConverterFunctionTest
876                 new JacksonRequestConverterFunction(new ObjectMapper()),
877                 new HttpApiRequestConverter(projectApiManager),
878                 new RequiresRepositoryRoleDecoratorFactory(mds),
879                 new RequiresProjectRoleDecoratorFactory(mds)
880         );
881         sb.dependencyInjector(dependencyInjector, false)
882           // TODO(ikhoon): Consider exposing ReflectiveDependencyInjector as a public API via
883           //               DependencyInjector.ofReflective()
884           .dependencyInjector(new ReflectiveDependencyInjector(), false);
885 
886         // Enable content compression for API responses.
887         final Function<? super HttpService, ? extends HttpService> decorator =
888                 authService.andThen(contentEncodingDecorator());
889         for (String path : ImmutableList.of(API_V0_PATH_PREFIX, API_V1_PATH_PREFIX)) {
890             final DecoratingServiceBindingBuilder decoratorBuilder =
891                     sb.routeDecorator().pathPrefix(path);
892             for (Route loginRoute : LOGIN_API_ROUTES) {
893                 decoratorBuilder.exclude(loginRoute);
894             }
895             for (Route logoutRoute : LOGOUT_API_ROUTES) {
896                 decoratorBuilder.exclude(logoutRoute);
897             }
898             decoratorBuilder.build(decorator);
899         }
900 
901         assert statusManager != null;
902         final ContextPathServicesBuilder apiV1ServiceBuilder = sb.contextPath(API_V1_PATH_PREFIX);
903         apiV1ServiceBuilder
904                 .annotatedService(new ServerStatusService(executor, statusManager))
905                 .annotatedService(new ProjectServiceV1(projectApiManager, executor))
906                 .annotatedService(new RepositoryServiceV1(executor, mds))
907                 .annotatedService(new CredentialServiceV1(projectApiManager, executor));
908 
909         if (GIT_MIRROR_ENABLED) {
910             mirrorRunner = new MirrorRunner(projectApiManager, executor, cfg, meterRegistry,
911                                             mirrorAccessController);
912 
913             apiV1ServiceBuilder
914                     .annotatedService(new MirroringServiceV1(projectApiManager, executor, mirrorRunner, cfg,
915                                                              mirrorAccessController))
916                     .annotatedService(new MirrorAccessControlService(executor, mirrorAccessController));
917         }
918 
919         apiV1ServiceBuilder.annotatedService()
920                            .defaultServiceNaming(new ServiceNaming() {
921                                private final String serviceName = ContentServiceV1.class.getName();
922                                private final String watchServiceName =
923                                        serviceName.replace("ContentServiceV1", "WatchContentServiceV1");
924 
925                                @Override
926                                public String serviceName(ServiceRequestContext ctx) {
927                                    if (ctx.request().headers().contains(HttpHeaderNames.IF_NONE_MATCH)) {
928                                        return watchServiceName;
929                                    }
930                                    return serviceName;
931                                }
932                            })
933                            .build(new ContentServiceV1(executor, watchService, meterRegistry));
934 
935         if (authProvider != null) {
936             sb.service("/security_enabled", new AbstractHttpService() {
937                 @Override
938                 protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) {
939                     return HttpResponse.of(HttpStatus.OK);
940                 }
941             });
942 
943             final AuthConfig authCfg = cfg.authConfig();
944             assert authCfg != null : "authCfg";
945             apiV1ServiceBuilder
946                     .annotatedService(new MetadataApiService(executor, mds, authCfg.loginNameNormalizer()))
947                     .annotatedService(new TokenService(executor, mds));
948 
949             // authentication services:
950             Optional.ofNullable(authProvider.loginApiService())
951                     .ifPresent(login -> LOGIN_API_ROUTES.forEach(mapping -> sb.service(mapping, login)));
952 
953             // Provide logout API by default.
954             final HttpService logout =
955                     Optional.ofNullable(authProvider.logoutApiService())
956                             .orElseGet(() -> new DefaultLogoutService(executor));
957             for (Route route : LOGOUT_API_ROUTES) {
958                 sb.service(route, decorator.apply(logout));
959             }
960 
961             authProvider.moreServices().forEach(sb::service);
962         }
963 
964         sb.annotatedService()
965           .decorator(decorator)
966           .decorator(DecodingService.newDecorator())
967           .build(new GitHttpService(projectApiManager));
968 
969         if (cfg.isWebAppEnabled()) {
970             sb.contextPath(API_V0_PATH_PREFIX)
971               .annotatedService(new UserService(executor))
972               .annotatedService(new RepositoryService(projectApiManager, executor));
973 
974             if (authProvider != null) {
975                 // Will redirect to /web/auth/login by default.
976                 sb.service(LOGIN_PATH, authProvider.webLoginService());
977                 // Will redirect to /web/auth/logout by default.
978                 sb.service(LOGOUT_PATH, authProvider.webLogoutService());
979             }
980 
981             // If the index.html is just returned, Next.js will handle the all remaining process such as
982             // fetching resources and routes to the target pages.
983             sb.serviceUnder("/app", HttpFile.of(CentralDogma.class.getClassLoader(),
984                                                 "com/linecorp/centraldogma/webapp/index.html")
985                                             .asService());
986             // Serve all web resources except for '/app'.
987             sb.route()
988               .pathPrefix("/")
989               .exclude("prefix:/app")
990               .exclude("prefix:/api")
991               .build(FileService.builder(CentralDogma.class.getClassLoader(),
992                                          "com/linecorp/centraldogma/webapp")
993                                 .cacheControl(ServerCacheControl.REVALIDATED)
994                                 .autoDecompress(true)
995                                 .serveCompressedFiles(true)
996                                 .fallbackFileExtensions("html")
997                                 .build());
998         }
999 
1000         sb.errorHandler(new HttpApiExceptionHandler());
1001     }
1002 
1003     private static void configCors(ServerBuilder sb, @Nullable CorsConfig corsConfig) {
1004         if (corsConfig == null) {
1005             return;
1006         }
1007 
1008         sb.decorator(CorsService.builder(corsConfig.allowedOrigins())
1009                                 .allowRequestMethods(HttpMethod.knownMethods())
1010                                 .allowAllRequestHeaders(true)
1011                                 .allowCredentials()
1012                                 .maxAge(corsConfig.maxAgeSeconds())
1013                                 .newDecorator());
1014     }
1015 
1016     private static void configManagement(ServerBuilder sb, @Nullable ManagementConfig managementConfig) {
1017         if (managementConfig == null) {
1018             return;
1019         }
1020 
1021         // curl -L https://<address>:<port>/internal/management/jvm/threaddump
1022         // curl -L https://<address>:<port>/internal/management/jvm/heapdump -o heapdump.hprof
1023         final int port = managementConfig.port();
1024         if (port == 0) {
1025             logger.info("'management.port' is 0, using the same ports as 'ports'.");
1026             sb.route()
1027               .pathPrefix(managementConfig.path())
1028               .defaultServiceName("management")
1029               .build(ManagementService.of());
1030         } else {
1031             final SessionProtocol managementProtocol = managementConfig.protocol();
1032             final String address = managementConfig.address();
1033             if (address == null) {
1034                 sb.port(new ServerPort(port, managementProtocol));
1035             } else {
1036                 sb.port(new ServerPort(new InetSocketAddress(address, port), managementProtocol));
1037             }
1038             sb.virtualHost(port)
1039               .route()
1040               .pathPrefix(managementConfig.path())
1041               .defaultServiceName("management")
1042               .build(ManagementService.of());
1043         }
1044     }
1045 
1046     private static Function<? super HttpService, EncodingService> contentEncodingDecorator() {
1047         return delegate -> EncodingService
1048                 .builder()
1049                 .encodableContentTypes(contentType -> {
1050                     if ("application".equals(contentType.type())) {
1051                         final String subtype = contentType.subtype();
1052                         switch (subtype) {
1053                             case "json":
1054                             case "xml":
1055                             case "x-thrift":
1056                             case "x-git-upload-pack-advertisement":
1057                             case "x-git-upload-pack-result":
1058                                 return true;
1059                             default:
1060                                 return subtype.endsWith("+json") ||
1061                                        subtype.endsWith("+xml") ||
1062                                        subtype.startsWith("vnd.apache.thrift.");
1063                         }
1064                     }
1065                     return false;
1066                 })
1067                 .build(delegate);
1068     }
1069 
1070     private void configureMetrics(ServerBuilder sb, MeterRegistry registry) {
1071         sb.meterRegistry(registry);
1072 
1073         // expose the prometheus endpoint if the registry is either a PrometheusMeterRegistry or
1074         // CompositeMeterRegistry
1075         if (registry instanceof PrometheusMeterRegistry) {
1076             final PrometheusMeterRegistry prometheusMeterRegistry = (PrometheusMeterRegistry) registry;
1077             sb.service(METRICS_PATH,
1078                        PrometheusExpositionService.of(prometheusMeterRegistry.getPrometheusRegistry()));
1079         } else if (registry instanceof CompositeMeterRegistry) {
1080             final PrometheusMeterRegistry prometheusMeterRegistry = PrometheusMeterRegistries.newRegistry();
1081             ((CompositeMeterRegistry) registry).add(prometheusMeterRegistry);
1082             sb.service(METRICS_PATH,
1083                        PrometheusExpositionService.of(prometheusMeterRegistry.getPrometheusRegistry()));
1084             meterRegistryToBeClosed = prometheusMeterRegistry;
1085         } else {
1086             logger.info("Not exposing a prometheus endpoint for the type: {}", registry.getClass());
1087         }
1088 
1089         sb.decorator(MetricCollectingService.newDecorator(MeterIdPrefixFunction.ofDefault("api")));
1090 
1091         // Bind system metrics.
1092         new FileDescriptorMetrics().bindTo(registry);
1093         new ProcessorMetrics().bindTo(registry);
1094         new ClassLoaderMetrics().bindTo(registry);
1095         new UptimeMetrics().bindTo(registry);
1096         new DiskSpaceMetrics(cfg.dataDir()).bindTo(registry);
1097         new JvmGcMetrics().bindTo(registry);
1098         new JvmMemoryMetrics().bindTo(registry);
1099         new JvmThreadMetrics().bindTo(registry);
1100 
1101         // Bind global thread pool metrics.
1102         ExecutorServiceMetrics.monitor(registry, ForkJoinPool.commonPool(), "commonPool");
1103     }
1104 
1105     private void doStop() {
1106         if (server == null) {
1107             return;
1108         }
1109 
1110         final Server server = this.server;
1111         final CommandExecutor executor = this.executor;
1112         final ProjectManager pm = this.pm;
1113         final ExecutorService repositoryWorker = this.repositoryWorker;
1114         final ExecutorService purgeWorker = this.purgeWorker;
1115         final SessionManager sessionManager = this.sessionManager;
1116         final MirrorRunner mirrorRunner = this.mirrorRunner;
1117 
1118         this.server = null;
1119         this.executor = null;
1120         this.pm = null;
1121         this.repositoryWorker = null;
1122         this.sessionManager = null;
1123         this.mirrorRunner = null;
1124         if (meterRegistryToBeClosed != null) {
1125             assert meterRegistry instanceof CompositeMeterRegistry;
1126             ((CompositeMeterRegistry) meterRegistry).remove(meterRegistryToBeClosed);
1127             meterRegistryToBeClosed.close();
1128             meterRegistryToBeClosed = null;
1129         }
1130 
1131         logger.info("Stopping the Central Dogma ..");
1132         if (!doStop(server, executor, pm, repositoryWorker, purgeWorker, sessionManager, mirrorRunner)) {
1133             logger.warn("Stopped the Central Dogma with failure.");
1134         } else {
1135             logger.info("Stopped the Central Dogma successfully.");
1136         }
1137     }
1138 
1139     private static boolean doStop(
1140             @Nullable Server server, @Nullable CommandExecutor executor,
1141             @Nullable ProjectManager pm,
1142             @Nullable ExecutorService repositoryWorker, @Nullable ExecutorService purgeWorker,
1143             @Nullable SessionManager sessionManager, @Nullable MirrorRunner mirrorRunner) {
1144 
1145         boolean success = true;
1146 
1147         // Stop the server first to reject new requests before stopping other resources.
1148         try {
1149             if (server != null) {
1150                 logger.info("Stopping the RPC server ..");
1151                 server.stop().join();
1152                 logger.info("Stopped the RPC server.");
1153             }
1154         } catch (Throwable t) {
1155             success = false;
1156             logger.warn("Failed to stop the RPC server:", t);
1157         }
1158 
1159         try {
1160             if (sessionManager != null) {
1161                 logger.info("Stopping the session manager ..");
1162                 sessionManager.close();
1163                 logger.info("Stopped the session manager.");
1164             }
1165         } catch (Throwable t) {
1166             success = false;
1167             logger.warn("Failed to stop the session manager:", t);
1168         }
1169 
1170         try {
1171             if (pm != null) {
1172                 logger.info("Stopping the project manager ..");
1173                 pm.close(ShuttingDownException::new);
1174                 logger.info("Stopped the project manager.");
1175             }
1176         } catch (Throwable t) {
1177             success = false;
1178             logger.warn("Failed to stop the project manager:", t);
1179         }
1180 
1181         try {
1182             if (executor != null) {
1183                 logger.info("Stopping the command executor ..");
1184                 executor.stop();
1185                 logger.info("Stopped the command executor.");
1186             }
1187         } catch (Throwable t) {
1188             success = false;
1189             logger.warn("Failed to stop the command executor:", t);
1190         }
1191 
1192         final BiFunction<ExecutorService, String, Boolean> stopWorker = (worker, name) -> {
1193             try {
1194                 if (worker != null && !worker.isTerminated()) {
1195                     logger.info("Stopping the {} worker ..", name);
1196                     boolean interruptLater = false;
1197                     while (!worker.isTerminated()) {
1198                         worker.shutdownNow();
1199                         try {
1200                             worker.awaitTermination(1, TimeUnit.SECONDS);
1201                         } catch (InterruptedException e) {
1202                             // Interrupt later.
1203                             interruptLater = true;
1204                         }
1205                     }
1206                     logger.info("Stopped the {} worker.", name);
1207 
1208                     if (interruptLater) {
1209                         Thread.currentThread().interrupt();
1210                     }
1211                 }
1212                 return true;
1213             } catch (Throwable t) {
1214                 logger.warn("Failed to stop the " + name + " worker:", t);
1215                 return false;
1216             }
1217         };
1218         if (!stopWorker.apply(repositoryWorker, "repository")) {
1219             success = false;
1220         }
1221         if (!stopWorker.apply(purgeWorker, "purge")) {
1222             success = false;
1223         }
1224 
1225         try {
1226             if (mirrorRunner != null) {
1227                 logger.info("Stopping the mirror runner..");
1228                 mirrorRunner.close();
1229                 logger.info("Stopped the mirror runner.");
1230             }
1231         } catch (Throwable t) {
1232             success = false;
1233             logger.warn("Failed to stop the mirror runner:", t);
1234         }
1235 
1236         return success;
1237     }
1238 
1239     private final class CentralDogmaStartStop extends StartStopSupport<Void, Void, Void, Void> {
1240 
1241         @Nullable
1242         private final PluginGroup pluginsForAllReplicas;
1243 
1244         CentralDogmaStartStop(@Nullable PluginGroup pluginsForAllReplicas) {
1245             super(GlobalEventExecutor.INSTANCE);
1246             this.pluginsForAllReplicas = pluginsForAllReplicas;
1247         }
1248 
1249         @Override
1250         protected CompletionStage<Void> doStart(@Nullable Void unused) throws Exception {
1251             return execute("startup", () -> {
1252                 try {
1253                     final boolean success = CentralDogma.this.doStart();
1254                     if (success) {
1255                         if (pluginsForAllReplicas != null) {
1256                             final ProjectManager pm = CentralDogma.this.pm;
1257                             final CommandExecutor executor = CentralDogma.this.executor;
1258                             final MeterRegistry meterRegistry = CentralDogma.this.meterRegistry;
1259                             if (pm != null && executor != null && meterRegistry != null) {
1260                                 pluginsForAllReplicas.start(cfg, pm, executor, meterRegistry, purgeWorker,
1261                                                             projectInitializer, mirrorAccessController).join();
1262                             }
1263                         }
1264                         serverHealth.setHealthy(true);
1265                     }
1266                 } catch (Exception e) {
1267                     Exceptions.throwUnsafely(e);
1268                 }
1269             });
1270         }
1271 
1272         @Override
1273         protected CompletionStage<Void> doStop(@Nullable Void unused) throws Exception {
1274             return execute("shutdown", () -> {
1275                 if (pluginsForAllReplicas != null) {
1276                     final ProjectManager pm = CentralDogma.this.pm;
1277                     final CommandExecutor executor = CentralDogma.this.executor;
1278                     final MeterRegistry meterRegistry = CentralDogma.this.meterRegistry;
1279                     if (pm != null && executor != null && meterRegistry != null) {
1280                         pluginsForAllReplicas.stop(cfg, pm, executor, meterRegistry, purgeWorker,
1281                                                    projectInitializer, mirrorAccessController).join();
1282                     }
1283                 }
1284                 CentralDogma.this.doStop();
1285             });
1286         }
1287 
1288         private CompletionStage<Void> execute(String mode, Runnable task) {
1289             final CompletableFuture<Void> future = new CompletableFuture<>();
1290             final Thread thread = new Thread(() -> {
1291                 try {
1292                     task.run();
1293                     future.complete(null);
1294                 } catch (Throwable cause) {
1295                     future.completeExceptionally(cause);
1296                 }
1297             }, "dogma-" + mode + "-0x" + Long.toHexString(CentralDogma.this.hashCode() & 0xFFFFFFFFL));
1298             thread.start();
1299             return future;
1300         }
1301     }
1302 }