1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.linecorp.centraldogma.server;
18  
19  import static com.google.common.base.MoreObjects.firstNonNull;
20  import static com.google.common.base.Preconditions.checkState;
21  import static com.google.common.base.Strings.isNullOrEmpty;
22  import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.API_V0_PATH_PREFIX;
23  import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.API_V1_PATH_PREFIX;
24  import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.HEALTH_CHECK_PATH;
25  import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.METRICS_PATH;
26  import static com.linecorp.centraldogma.server.auth.AuthProvider.BUILTIN_WEB_BASE_PATH;
27  import static com.linecorp.centraldogma.server.auth.AuthProvider.LOGIN_API_ROUTES;
28  import static com.linecorp.centraldogma.server.auth.AuthProvider.LOGIN_PATH;
29  import static com.linecorp.centraldogma.server.auth.AuthProvider.LOGOUT_API_ROUTES;
30  import static com.linecorp.centraldogma.server.auth.AuthProvider.LOGOUT_PATH;
31  import static com.linecorp.centraldogma.server.internal.storage.project.ProjectInitializer.initializeInternalProject;
32  import static java.util.Objects.requireNonNull;
33  
34  import java.io.File;
35  import java.io.IOException;
36  import java.io.InputStream;
37  import java.net.InetSocketAddress;
38  import java.util.Collections;
39  import java.util.List;
40  import java.util.Map;
41  import java.util.Optional;
42  import java.util.concurrent.CompletableFuture;
43  import java.util.concurrent.CompletionStage;
44  import java.util.concurrent.Executor;
45  import java.util.concurrent.ExecutorService;
46  import java.util.concurrent.Executors;
47  import java.util.concurrent.ForkJoinPool;
48  import java.util.concurrent.LinkedBlockingQueue;
49  import java.util.concurrent.ScheduledExecutorService;
50  import java.util.concurrent.ThreadPoolExecutor;
51  import java.util.concurrent.TimeUnit;
52  import java.util.concurrent.TimeoutException;
53  import java.util.concurrent.atomic.AtomicInteger;
54  import java.util.function.BiFunction;
55  import java.util.function.Consumer;
56  import java.util.function.Function;
57  
58  import javax.annotation.Nullable;
59  
60  import org.slf4j.Logger;
61  import org.slf4j.LoggerFactory;
62  
63  import com.fasterxml.jackson.core.JsonProcessingException;
64  import com.fasterxml.jackson.databind.ObjectMapper;
65  import com.fasterxml.jackson.databind.module.SimpleModule;
66  import com.github.benmanes.caffeine.cache.Caffeine;
67  import com.github.benmanes.caffeine.cache.stats.CacheStats;
68  import com.google.common.collect.ImmutableList;
69  import com.google.common.collect.ImmutableMap;
70  
71  import com.linecorp.armeria.common.Flags;
72  import com.linecorp.armeria.common.HttpData;
73  import com.linecorp.armeria.common.HttpHeaderNames;
74  import com.linecorp.armeria.common.HttpHeaders;
75  import com.linecorp.armeria.common.HttpMethod;
76  import com.linecorp.armeria.common.HttpRequest;
77  import com.linecorp.armeria.common.HttpResponse;
78  import com.linecorp.armeria.common.HttpStatus;
79  import com.linecorp.armeria.common.MediaType;
80  import com.linecorp.armeria.common.ServerCacheControl;
81  import com.linecorp.armeria.common.metric.MeterIdPrefixFunction;
82  import com.linecorp.armeria.common.metric.PrometheusMeterRegistries;
83  import com.linecorp.armeria.common.util.EventLoopGroups;
84  import com.linecorp.armeria.common.util.Exceptions;
85  import com.linecorp.armeria.common.util.StartStopSupport;
86  import com.linecorp.armeria.common.util.SystemInfo;
87  import com.linecorp.armeria.server.AbstractHttpService;
88  import com.linecorp.armeria.server.HttpService;
89  import com.linecorp.armeria.server.Route;
90  import com.linecorp.armeria.server.Server;
91  import com.linecorp.armeria.server.ServerBuilder;
92  import com.linecorp.armeria.server.ServerPort;
93  import com.linecorp.armeria.server.ServiceNaming;
94  import com.linecorp.armeria.server.ServiceRequestContext;
95  import com.linecorp.armeria.server.annotation.JacksonRequestConverterFunction;
96  import com.linecorp.armeria.server.auth.AuthService;
97  import com.linecorp.armeria.server.auth.Authorizer;
98  import com.linecorp.armeria.server.cors.CorsService;
99  import com.linecorp.armeria.server.docs.DocService;
100 import com.linecorp.armeria.server.encoding.EncodingService;
101 import com.linecorp.armeria.server.file.FileService;
102 import com.linecorp.armeria.server.file.HttpFile;
103 import com.linecorp.armeria.server.healthcheck.HealthCheckService;
104 import com.linecorp.armeria.server.healthcheck.SettableHealthChecker;
105 import com.linecorp.armeria.server.logging.AccessLogWriter;
106 import com.linecorp.armeria.server.metric.MetricCollectingService;
107 import com.linecorp.armeria.server.metric.PrometheusExpositionService;
108 import com.linecorp.armeria.server.thrift.THttpService;
109 import com.linecorp.armeria.server.thrift.ThriftCallService;
110 import com.linecorp.centraldogma.common.ShuttingDownException;
111 import com.linecorp.centraldogma.internal.CsrfToken;
112 import com.linecorp.centraldogma.internal.Jackson;
113 import com.linecorp.centraldogma.internal.thrift.CentralDogmaService;
114 import com.linecorp.centraldogma.server.auth.AuthConfig;
115 import com.linecorp.centraldogma.server.auth.AuthProvider;
116 import com.linecorp.centraldogma.server.auth.AuthProviderParameters;
117 import com.linecorp.centraldogma.server.auth.SessionManager;
118 import com.linecorp.centraldogma.server.command.Command;
119 import com.linecorp.centraldogma.server.command.CommandExecutor;
120 import com.linecorp.centraldogma.server.command.StandaloneCommandExecutor;
121 import com.linecorp.centraldogma.server.internal.admin.auth.CachedSessionManager;
122 import com.linecorp.centraldogma.server.internal.admin.auth.CsrfTokenAuthorizer;
123 import com.linecorp.centraldogma.server.internal.admin.auth.ExpiredSessionDeletingSessionManager;
124 import com.linecorp.centraldogma.server.internal.admin.auth.FileBasedSessionManager;
125 import com.linecorp.centraldogma.server.internal.admin.auth.OrElseDefaultHttpFileService;
126 import com.linecorp.centraldogma.server.internal.admin.auth.SessionTokenAuthorizer;
127 import com.linecorp.centraldogma.server.internal.admin.service.DefaultLogoutService;
128 import com.linecorp.centraldogma.server.internal.admin.service.RepositoryService;
129 import com.linecorp.centraldogma.server.internal.admin.service.UserService;
130 import com.linecorp.centraldogma.server.internal.admin.util.RestfulJsonResponseConverter;
131 import com.linecorp.centraldogma.server.internal.api.AdministrativeService;
132 import com.linecorp.centraldogma.server.internal.api.ContentServiceV1;
133 import com.linecorp.centraldogma.server.internal.api.MetadataApiService;
134 import com.linecorp.centraldogma.server.internal.api.ProjectServiceV1;
135 import com.linecorp.centraldogma.server.internal.api.RepositoryServiceV1;
136 import com.linecorp.centraldogma.server.internal.api.TokenService;
137 import com.linecorp.centraldogma.server.internal.api.WatchService;
138 import com.linecorp.centraldogma.server.internal.api.auth.ApplicationTokenAuthorizer;
139 import com.linecorp.centraldogma.server.internal.api.converter.HttpApiRequestConverter;
140 import com.linecorp.centraldogma.server.internal.api.converter.HttpApiResponseConverter;
141 import com.linecorp.centraldogma.server.internal.mirror.DefaultMirroringServicePlugin;
142 import com.linecorp.centraldogma.server.internal.replication.ZooKeeperCommandExecutor;
143 import com.linecorp.centraldogma.server.internal.storage.project.DefaultProjectManager;
144 import com.linecorp.centraldogma.server.internal.storage.project.ProjectApiManager;
145 import com.linecorp.centraldogma.server.internal.thrift.CentralDogmaExceptionTranslator;
146 import com.linecorp.centraldogma.server.internal.thrift.CentralDogmaServiceImpl;
147 import com.linecorp.centraldogma.server.internal.thrift.CentralDogmaTimeoutScheduler;
148 import com.linecorp.centraldogma.server.internal.thrift.TokenlessClientLogger;
149 import com.linecorp.centraldogma.server.management.ServerStatus;
150 import com.linecorp.centraldogma.server.management.ServerStatusManager;
151 import com.linecorp.centraldogma.server.metadata.MetadataService;
152 import com.linecorp.centraldogma.server.metadata.MetadataServiceInjector;
153 import com.linecorp.centraldogma.server.plugin.AllReplicasPlugin;
154 import com.linecorp.centraldogma.server.plugin.Plugin;
155 import com.linecorp.centraldogma.server.plugin.PluginInitContext;
156 import com.linecorp.centraldogma.server.plugin.PluginTarget;
157 import com.linecorp.centraldogma.server.storage.project.ProjectManager;
158 
159 import io.micrometer.core.instrument.MeterRegistry;
160 import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics;
161 import io.micrometer.core.instrument.binder.jvm.DiskSpaceMetrics;
162 import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
163 import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics;
164 import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
165 import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
166 import io.micrometer.core.instrument.binder.system.FileDescriptorMetrics;
167 import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
168 import io.micrometer.core.instrument.binder.system.UptimeMetrics;
169 import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
170 import io.micrometer.prometheus.PrometheusMeterRegistry;
171 import io.netty.util.concurrent.DefaultThreadFactory;
172 import io.netty.util.concurrent.GlobalEventExecutor;
173 
174 /**
175  * Central Dogma server.
176  *
177  * @see CentralDogmaBuilder
178  */
179 public class CentralDogma implements AutoCloseable {
180 
181     private static final Logger logger = LoggerFactory.getLogger(CentralDogma.class);
182 
183     static {
184         Jackson.registerModules(new SimpleModule().addSerializer(CacheStats.class, new CacheStatsSerializer()));
185     }
186 
187     /**
188      * Creates a new instance from the given configuration file.
189      *
190      * @throws IOException if failed to load the configuration from the specified file
191      */
192     public static CentralDogma forConfig(File configFile) throws IOException {
193         requireNonNull(configFile, "configFile");
194         return new CentralDogma(Jackson.readValue(configFile, CentralDogmaConfig.class),
195                                 Flags.meterRegistry());
196     }
197 
198     private final SettableHealthChecker serverHealth = new SettableHealthChecker(false);
199     private final CentralDogmaStartStop startStop;
200 
201     private final AtomicInteger numPendingStopRequests = new AtomicInteger();
202 
203     @Nullable
204     private final PluginGroup pluginsForAllReplicas;
205     @Nullable
206     private final PluginGroup pluginsForLeaderOnly;
207 
208     private final CentralDogmaConfig cfg;
209     @Nullable
210     private volatile ProjectManager pm;
211     @Nullable
212     private volatile Server server;
213     @Nullable
214     private ExecutorService repositoryWorker;
215     @Nullable
216     private ScheduledExecutorService purgeWorker;
217     @Nullable
218     private CommandExecutor executor;
219     private final MeterRegistry meterRegistry;
220     @Nullable
221     MeterRegistry meterRegistryToBeClosed;
222     @Nullable
223     private SessionManager sessionManager;
224     @Nullable
225     private ServerStatusManager statusManager;
226 
227     CentralDogma(CentralDogmaConfig cfg, MeterRegistry meterRegistry) {
228         this.cfg = requireNonNull(cfg, "cfg");
229         pluginsForAllReplicas = PluginGroup.loadPlugins(
230                 CentralDogma.class.getClassLoader(), PluginTarget.ALL_REPLICAS, cfg);
231         pluginsForLeaderOnly = PluginGroup.loadPlugins(
232                 CentralDogma.class.getClassLoader(), PluginTarget.LEADER_ONLY, cfg);
233         startStop = new CentralDogmaStartStop(pluginsForAllReplicas);
234         this.meterRegistry = meterRegistry;
235     }
236 
237     /**
238      * Returns the configuration of the server.
239      *
240      * @return the {@link CentralDogmaConfig} instance which is used for configuring this {@link CentralDogma}.
241      */
242     public CentralDogmaConfig config() {
243         return cfg;
244     }
245 
246     /**
247      * Returns the primary port of the server.
248      *
249      * @return the primary {@link ServerPort} if the server is started. {@link Optional#empty()} otherwise.
250      */
251     @Nullable
252     public ServerPort activePort() {
253         final Server server = this.server;
254         return server != null ? server.activePort() : null;
255     }
256 
257     /**
258      * Returns the ports of the server.
259      *
260      * @return the {@link Map} which contains the pairs of local {@link InetSocketAddress} and
261      *         {@link ServerPort} is the server is started. {@link Optional#empty()} otherwise.
262      */
263     public Map<InetSocketAddress, ServerPort> activePorts() {
264         final Server server = this.server;
265         if (server != null) {
266             return server.activePorts();
267         } else {
268             return Collections.emptyMap();
269         }
270     }
271 
272     /**
273      * Returns the {@link ProjectManager} of the server if the server is started.
274      * {@code null} is returned, otherwise.
275      */
276     @Nullable
277     public ProjectManager projectManager() {
278         return pm;
279     }
280 
281     /**
282      * Returns the {@link MirroringService} of the server.
283      *
284      * @return the {@link MirroringService} if the server is started and mirroring is enabled.
285      *         {@link Optional#empty()} otherwise.
286      */
287     public Optional<MirroringService> mirroringService() {
288         if (pluginsForLeaderOnly == null) {
289             return Optional.empty();
290         }
291         return pluginsForLeaderOnly.findFirstPlugin(DefaultMirroringServicePlugin.class)
292                                    .map(DefaultMirroringServicePlugin::mirroringService);
293     }
294 
295     /**
296      * Returns the {@link Plugin}s which have been loaded.
297      *
298      * @param target the {@link PluginTarget} of the {@link Plugin}s to be returned
299      */
300     public List<Plugin> plugins(PluginTarget target) {
301         switch (requireNonNull(target, "target")) {
302             case LEADER_ONLY:
303                 return pluginsForLeaderOnly != null ? ImmutableList.copyOf(pluginsForLeaderOnly.plugins())
304                                                     : ImmutableList.of();
305             case ALL_REPLICAS:
306                 return pluginsForAllReplicas != null ? ImmutableList.copyOf(pluginsForAllReplicas.plugins())
307                                                      : ImmutableList.of();
308             default:
309                 // Should not reach here.
310                 throw new Error("Unknown plugin target: " + target);
311         }
312     }
313 
314     /**
315      * Returns the {@link MeterRegistry} that contains the stats related with the server.
316      */
317     public Optional<MeterRegistry> meterRegistry() {
318         return Optional.ofNullable(meterRegistry);
319     }
320 
321     /**
322      * Starts the server.
323      */
324     public CompletableFuture<Void> start() {
325         return startStop.start(true);
326     }
327 
328     /**
329      * Stops the server. This method does nothing if the server is stopped already.
330      */
331     public CompletableFuture<Void> stop() {
332         serverHealth.setHealthy(false);
333 
334         final Optional<GracefulShutdownTimeout> gracefulTimeoutOpt = cfg.gracefulShutdownTimeout();
335         if (gracefulTimeoutOpt.isPresent()) {
336             try {
337                 // Sleep 1 second so that clients have some time to redirect traffic according
338                 // to the health status
339                 Thread.sleep(1000);
340             } catch (InterruptedException e) {
341                 logger.debug("Interrupted while waiting for quiet period", e);
342                 Thread.currentThread().interrupt();
343             }
344         }
345 
346         numPendingStopRequests.incrementAndGet();
347         return startStop.stop().thenRun(numPendingStopRequests::decrementAndGet);
348     }
349 
350     @Override
351     public void close() {
352         startStop.close();
353     }
354 
355     private void doStart() throws Exception {
356         boolean success = false;
357         ExecutorService repositoryWorker = null;
358         ScheduledExecutorService purgeWorker = null;
359         ProjectManager pm = null;
360         CommandExecutor executor = null;
361         Server server = null;
362         SessionManager sessionManager = null;
363         try {
364             logger.info("Starting the Central Dogma ..");
365 
366             final ThreadPoolExecutor repositoryWorkerImpl = new ThreadPoolExecutor(
367                     cfg.numRepositoryWorkers(), cfg.numRepositoryWorkers(),
368                     // TODO(minwoox): Use LinkedTransferQueue when we upgrade to JDK 21.
369                     60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
370                     new DefaultThreadFactory("repository-worker", true));
371             repositoryWorkerImpl.allowCoreThreadTimeOut(true);
372             repositoryWorker = ExecutorServiceMetrics.monitor(meterRegistry, repositoryWorkerImpl,
373                                                               "repositoryWorker");
374 
375             logger.info("Starting the project manager: {}", cfg.dataDir());
376 
377             purgeWorker = Executors.newSingleThreadScheduledExecutor(
378                     new DefaultThreadFactory("purge-worker", true));
379 
380             pm = new DefaultProjectManager(cfg.dataDir(), repositoryWorker, purgeWorker,
381                                            meterRegistry, cfg.repositoryCacheSpec());
382 
383             logger.info("Started the project manager: {}", pm);
384 
385             logger.info("Current settings:\n{}", cfg);
386 
387             sessionManager = initializeSessionManager();
388 
389             logger.info("Starting the command executor ..");
390             executor = startCommandExecutor(pm, repositoryWorker, purgeWorker,
391                                             meterRegistry, sessionManager);
392             if (executor.isWritable()) {
393                 logger.info("Started the command executor.");
394 
395                 initializeInternalProject(executor);
396             }
397 
398             logger.info("Starting the RPC server.");
399             server = startServer(pm, executor, purgeWorker, meterRegistry, sessionManager);
400             logger.info("Started the RPC server at: {}", server.activePorts());
401             logger.info("Started the Central Dogma successfully.");
402             success = true;
403         } finally {
404             if (success) {
405                 serverHealth.setHealthy(true);
406                 this.repositoryWorker = repositoryWorker;
407                 this.purgeWorker = purgeWorker;
408                 this.pm = pm;
409                 this.executor = executor;
410                 this.server = server;
411                 this.sessionManager = sessionManager;
412             } else {
413                 doStop(server, executor, pm, repositoryWorker, purgeWorker, sessionManager);
414             }
415         }
416     }
417 
418     private CommandExecutor startCommandExecutor(
419             ProjectManager pm, Executor repositoryWorker,
420             ScheduledExecutorService purgeWorker, MeterRegistry meterRegistry,
421             @Nullable SessionManager sessionManager) {
422 
423         final Consumer<CommandExecutor> onTakeLeadership = exec -> {
424             if (pluginsForLeaderOnly != null) {
425                 logger.info("Starting plugins on the leader replica ..");
426                 pluginsForLeaderOnly
427                         .start(cfg, pm, exec, meterRegistry, purgeWorker).handle((unused, cause) -> {
428                             if (cause == null) {
429                                 logger.info("Started plugins on the leader replica.");
430                             } else {
431                                 logger.error("Failed to start plugins on the leader replica..", cause);
432                             }
433                             return null;
434                         });
435             }
436         };
437 
438         final Consumer<CommandExecutor> onReleaseLeadership = exec -> {
439             if (pluginsForLeaderOnly != null) {
440                 logger.info("Stopping plugins on the leader replica ..");
441                 pluginsForLeaderOnly.stop(cfg, pm, exec, meterRegistry, purgeWorker).handle((unused, cause) -> {
442                     if (cause == null) {
443                         logger.info("Stopped plugins on the leader replica.");
444                     } else {
445                         logger.error("Failed to stop plugins on the leader replica.", cause);
446                     }
447                     return null;
448                 });
449             }
450         };
451 
452         statusManager = new ServerStatusManager(cfg.dataDir());
453         final CommandExecutor executor;
454         final ReplicationMethod replicationMethod = cfg.replicationConfig().method();
455         switch (replicationMethod) {
456             case ZOOKEEPER:
457                 executor = newZooKeeperCommandExecutor(pm, repositoryWorker, statusManager, meterRegistry,
458                                                        sessionManager, onTakeLeadership, onReleaseLeadership);
459                 break;
460             case NONE:
461                 logger.info("No replication mechanism specified; entering standalone");
462                 executor = new StandaloneCommandExecutor(pm, repositoryWorker, statusManager, sessionManager,
463                                                          cfg.writeQuotaPerRepository(),
464                                                          onTakeLeadership, onReleaseLeadership);
465                 break;
466             default:
467                 throw new Error("unknown replication method: " + replicationMethod);
468         }
469 
470         final ServerStatus initialServerStatus = statusManager.serverStatus();
471         executor.setWritable(initialServerStatus.writable());
472         if (!initialServerStatus.replicating()) {
473             return executor;
474         }
475         try {
476             final CompletableFuture<Void> startFuture = executor.start();
477             while (!startFuture.isDone()) {
478                 if (numPendingStopRequests.get() > 0) {
479                     // Stop request has been issued.
480                     executor.stop().get();
481                     break;
482                 }
483 
484                 try {
485                     startFuture.get(100, TimeUnit.MILLISECONDS);
486                 } catch (TimeoutException unused) {
487                     // Taking long time ..
488                 }
489             }
490 
491             // Trigger the exception if any.
492             startFuture.get();
493         } catch (Exception e) {
494             logger.warn("Failed to start the command executor. Entering read-only.", e);
495         }
496 
497         return executor;
498     }
499 
500     @Nullable
501     private SessionManager initializeSessionManager() throws Exception {
502         final AuthConfig authCfg = cfg.authConfig();
503         if (authCfg == null) {
504             return null;
505         }
506 
507         boolean success = false;
508         SessionManager manager = null;
509         try {
510             manager = new FileBasedSessionManager(new File(cfg.dataDir(), "_sessions").toPath(),
511                                                   authCfg.sessionValidationSchedule());
512             manager = new CachedSessionManager(manager, Caffeine.from(authCfg.sessionCacheSpec()).build());
513             manager = new ExpiredSessionDeletingSessionManager(manager);
514             success = true;
515             return manager;
516         } finally {
517             if (!success && manager != null) {
518                 try {
519                     // It will eventually close FileBasedSessionManager because the other managers just forward
520                     // the close method call to their delegate.
521                     manager.close();
522                 } catch (Exception e) {
523                     logger.warn("Failed to close a session manager.", e);
524                 }
525             }
526         }
527     }
528 
529     private Server startServer(ProjectManager pm, CommandExecutor executor,
530                                ScheduledExecutorService purgeWorker, MeterRegistry meterRegistry,
531                                @Nullable SessionManager sessionManager) {
532         final ServerBuilder sb = Server.builder();
533         sb.verboseResponses(true);
534         cfg.ports().forEach(sb::port);
535 
536         if (cfg.ports().stream().anyMatch(ServerPort::hasTls)) {
537             try {
538                 final TlsConfig tlsConfig = cfg.tls();
539                 if (tlsConfig != null) {
540                     try (InputStream keyCertChainInputStream = tlsConfig.keyCertChainInputStream();
541                          InputStream keyInputStream = tlsConfig.keyInputStream()) {
542                         sb.tls(keyCertChainInputStream, keyInputStream, tlsConfig.keyPassword());
543                     }
544                 } else {
545                     logger.warn(
546                             "Missing TLS configuration. Generating a self-signed certificate for TLS support.");
547                     sb.tlsSelfSigned();
548                 }
549             } catch (Exception e) {
550                 Exceptions.throwUnsafely(e);
551             }
552         }
553 
554         sb.clientAddressSources(cfg.clientAddressSourceList());
555         sb.clientAddressTrustedProxyFilter(cfg.trustedProxyAddressPredicate());
556 
557         configCors(sb, config().corsConfig());
558 
559         cfg.numWorkers().ifPresent(
560                 numWorkers -> sb.workerGroup(EventLoopGroups.newEventLoopGroup(numWorkers), true));
561         cfg.maxNumConnections().ifPresent(sb::maxNumConnections);
562         cfg.idleTimeoutMillis().ifPresent(sb::idleTimeoutMillis);
563         cfg.requestTimeoutMillis().ifPresent(sb::requestTimeoutMillis);
564         cfg.maxFrameLength().ifPresent(sb::maxRequestLength);
565         cfg.gracefulShutdownTimeout().ifPresent(
566                 t -> sb.gracefulShutdownTimeoutMillis(t.quietPeriodMillis(), t.timeoutMillis()));
567 
568         final MetadataService mds = new MetadataService(pm, executor);
569         final WatchService watchService = new WatchService(meterRegistry);
570         final AuthProvider authProvider = createAuthProvider(executor, sessionManager, mds);
571         final ProjectApiManager projectApiManager = new ProjectApiManager(pm, executor, mds);
572 
573         configureThriftService(sb, projectApiManager, executor, watchService, mds);
574 
575         sb.service("/title", webAppTitleFile(cfg.webAppTitle(), SystemInfo.hostname()).asService());
576 
577         sb.service(HEALTH_CHECK_PATH, HealthCheckService.builder()
578                                                         .checkers(serverHealth)
579                                                         .build());
580 
581         sb.serviceUnder("/docs/",
582                         DocService.builder()
583                                   .exampleHeaders(CentralDogmaService.class,
584                                                   HttpHeaders.of(HttpHeaderNames.AUTHORIZATION,
585                                                                  "Bearer " + CsrfToken.ANONYMOUS))
586                                   .build());
587 
588         configureHttpApi(sb, projectApiManager, executor, watchService, mds, authProvider, sessionManager);
589 
590         configureMetrics(sb, meterRegistry);
591 
592         // Configure access log format.
593         final String accessLogFormat = cfg.accessLogFormat();
594         if (isNullOrEmpty(accessLogFormat)) {
595             sb.accessLogWriter(AccessLogWriter.disabled(), true);
596         } else if ("common".equals(accessLogFormat)) {
597             sb.accessLogWriter(AccessLogWriter.common(), true);
598         } else if ("combined".equals(accessLogFormat)) {
599             sb.accessLogWriter(AccessLogWriter.combined(), true);
600         } else {
601             sb.accessLogFormat(accessLogFormat);
602         }
603 
604         if (pluginsForAllReplicas != null) {
605             final PluginInitContext pluginInitContext =
606                     new PluginInitContext(config(), pm, executor, meterRegistry, purgeWorker, sb);
607             pluginsForAllReplicas.plugins()
608                                  .forEach(p -> {
609                                      if (!(p instanceof AllReplicasPlugin)) {
610                                          return;
611                                      }
612                                      final AllReplicasPlugin plugin = (AllReplicasPlugin) p;
613                                      plugin.init(pluginInitContext);
614                                  });
615         }
616         // Configure the uncaught exception handler just before starting the server so that override the
617         // default exception handler set by third-party libraries such as NIOServerCnxnFactory.
618         Thread.setDefaultUncaughtExceptionHandler((t, e) -> logger.warn("Uncaught exception: {}", t, e));
619 
620         final Server s = sb.build();
621         s.start().join();
622         return s;
623     }
624 
625     static HttpFile webAppTitleFile(@Nullable String webAppTitle, String hostname) {
626         requireNonNull(hostname, "hostname");
627         final Map<String, String> titleAndHostname = ImmutableMap.of(
628                 "title", firstNonNull(webAppTitle, "Central Dogma at {{hostname}}"),
629                 "hostname", hostname);
630 
631         try {
632             final HttpData data = HttpData.ofUtf8(Jackson.writeValueAsString(titleAndHostname));
633             return HttpFile.builder(data)
634                            .contentType(MediaType.JSON_UTF_8)
635                            .cacheControl(ServerCacheControl.REVALIDATED)
636                            .build();
637         } catch (JsonProcessingException e) {
638             throw new Error("Failed to encode the title and hostname:", e);
639         }
640     }
641 
642     @Nullable
643     private AuthProvider createAuthProvider(
644             CommandExecutor commandExecutor, @Nullable SessionManager sessionManager, MetadataService mds) {
645         final AuthConfig authCfg = cfg.authConfig();
646         if (authCfg == null) {
647             return null;
648         }
649 
650         checkState(sessionManager != null, "SessionManager is null");
651         final AuthProviderParameters parameters = new AuthProviderParameters(
652                 // Find application first, then find the session token.
653                 new ApplicationTokenAuthorizer(mds::findTokenBySecret).orElse(
654                         new SessionTokenAuthorizer(sessionManager, authCfg.administrators())),
655                 cfg,
656                 sessionManager::generateSessionId,
657                 // Propagate login and logout events to the other replicas.
658                 session -> commandExecutor.execute(Command.createSession(session)),
659                 sessionId -> commandExecutor.execute(Command.removeSession(sessionId)));
660         return authCfg.factory().create(parameters);
661     }
662 
663     private CommandExecutor newZooKeeperCommandExecutor(
664             ProjectManager pm, Executor repositoryWorker,
665             ServerStatusManager serverStatusManager,
666             MeterRegistry meterRegistry,
667             @Nullable SessionManager sessionManager,
668             @Nullable Consumer<CommandExecutor> onTakeLeadership,
669             @Nullable Consumer<CommandExecutor> onReleaseLeadership) {
670         final ZooKeeperReplicationConfig zkCfg = (ZooKeeperReplicationConfig) cfg.replicationConfig();
671 
672         // Delete the old UUID replica ID which is not used anymore.
673         final File dataDir = cfg.dataDir();
674         new File(dataDir, "replica_id").delete();
675 
676         // TODO(trustin): Provide a way to restart/reload the replicator
677         //                so that we can recover from ZooKeeper maintenance automatically.
678         return new ZooKeeperCommandExecutor(
679                 zkCfg, dataDir,
680                 new StandaloneCommandExecutor(pm, repositoryWorker, serverStatusManager, sessionManager,
681                         /* onTakeLeadership */ null, /* onReleaseLeadership */ null),
682                 meterRegistry, pm, config().writeQuotaPerRepository(), onTakeLeadership, onReleaseLeadership);
683     }
684 
685     private void configureThriftService(ServerBuilder sb, ProjectApiManager projectApiManager,
686                                         CommandExecutor executor,
687                                         WatchService watchService, MetadataService mds) {
688         final CentralDogmaServiceImpl service =
689                 new CentralDogmaServiceImpl(projectApiManager, executor, watchService, mds);
690 
691         HttpService thriftService =
692                 ThriftCallService.of(service)
693                                  .decorate(CentralDogmaTimeoutScheduler::new)
694                                  .decorate(CentralDogmaExceptionTranslator::new)
695                                  .decorate(THttpService.newDecorator());
696 
697         if (cfg.isCsrfTokenRequiredForThrift()) {
698             thriftService = thriftService.decorate(AuthService.newDecorator(new CsrfTokenAuthorizer()));
699         } else {
700             thriftService = thriftService.decorate(TokenlessClientLogger::new);
701         }
702 
703         // Enable content compression for API responses.
704         thriftService = thriftService.decorate(contentEncodingDecorator());
705 
706         sb.service("/cd/thrift/v1", thriftService);
707     }
708 
709     private void configureHttpApi(ServerBuilder sb,
710                                   ProjectApiManager projectApiManager, CommandExecutor executor,
711                                   WatchService watchService, MetadataService mds,
712                                   @Nullable AuthProvider authProvider,
713                                   @Nullable SessionManager sessionManager) {
714         Function<? super HttpService, ? extends HttpService> decorator;
715 
716         if (authProvider != null) {
717             sb.service("/security_enabled", new AbstractHttpService() {
718                 @Override
719                 protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) {
720                     return HttpResponse.of(HttpStatus.OK);
721                 }
722             });
723 
724             final AuthConfig authCfg = cfg.authConfig();
725             assert authCfg != null : "authCfg";
726             assert sessionManager != null : "sessionManager";
727             final Authorizer<HttpRequest> tokenAuthorizer =
728                     new ApplicationTokenAuthorizer(mds::findTokenBySecret)
729                             .orElse(new SessionTokenAuthorizer(sessionManager,
730                                                                authCfg.administrators()));
731             decorator = MetadataServiceInjector
732                     .newDecorator(mds)
733                     .andThen(AuthService.builder()
734                                         .add(tokenAuthorizer)
735                                         .onFailure(new CentralDogmaAuthFailureHandler())
736                                         .newDecorator());
737         } else {
738             decorator = MetadataServiceInjector
739                     .newDecorator(mds)
740                     .andThen(AuthService.newDecorator(new CsrfTokenAuthorizer()));
741         }
742 
743         final HttpApiRequestConverter v1RequestConverter = new HttpApiRequestConverter(projectApiManager);
744         final JacksonRequestConverterFunction jacksonRequestConverterFunction =
745                 // Use the default ObjectMapper without any configuration.
746                 // See JacksonRequestConverterFunctionTest
747                 new JacksonRequestConverterFunction(new ObjectMapper());
748         // Do not need jacksonResponseConverterFunction because HttpApiResponseConverter handles the JSON data.
749         final HttpApiResponseConverter v1ResponseConverter = new HttpApiResponseConverter();
750 
751         // Enable content compression for API responses.
752         decorator = decorator.andThen(contentEncodingDecorator());
753 
754         assert statusManager != null;
755         sb.annotatedService(API_V1_PATH_PREFIX,
756                             new AdministrativeService(executor, statusManager), decorator,
757                             v1RequestConverter, jacksonRequestConverterFunction, v1ResponseConverter);
758         sb.annotatedService(API_V1_PATH_PREFIX,
759                             new ProjectServiceV1(projectApiManager), decorator,
760                             v1RequestConverter, jacksonRequestConverterFunction, v1ResponseConverter);
761         sb.annotatedService(API_V1_PATH_PREFIX,
762                             new RepositoryServiceV1(executor, mds), decorator,
763                             v1RequestConverter, jacksonRequestConverterFunction, v1ResponseConverter);
764         sb.annotatedService()
765           .pathPrefix(API_V1_PATH_PREFIX)
766           .defaultServiceNaming(new ServiceNaming() {
767               private final String serviceName = ContentServiceV1.class.getName();
768               private final String watchServiceName =
769                       serviceName.replace("ContentServiceV1", "WatchContentServiceV1");
770 
771               @Override
772               public String serviceName(ServiceRequestContext ctx) {
773                   if (ctx.request().headers().contains(HttpHeaderNames.IF_NONE_MATCH)) {
774                       return watchServiceName;
775                   }
776                   return serviceName;
777               }
778           })
779           .decorator(decorator)
780           .requestConverters(v1RequestConverter, jacksonRequestConverterFunction)
781           .responseConverters(v1ResponseConverter)
782           .build(new ContentServiceV1(executor, watchService));
783 
784         if (authProvider != null) {
785             final AuthConfig authCfg = cfg.authConfig();
786             assert authCfg != null : "authCfg";
787             sb.annotatedService(API_V1_PATH_PREFIX,
788                                 new MetadataApiService(mds, authCfg.loginNameNormalizer()),
789                                 decorator, v1RequestConverter, jacksonRequestConverterFunction,
790                                 v1ResponseConverter);
791             sb.annotatedService(API_V1_PATH_PREFIX, new TokenService(executor, mds),
792                                 decorator, v1RequestConverter, jacksonRequestConverterFunction,
793                                 v1ResponseConverter);
794 
795             // authentication services:
796             Optional.ofNullable(authProvider.loginApiService())
797                     .ifPresent(login -> LOGIN_API_ROUTES.forEach(mapping -> sb.service(mapping, login)));
798 
799             // Provide logout API by default.
800             final HttpService logout =
801                     Optional.ofNullable(authProvider.logoutApiService())
802                             .orElseGet(() -> new DefaultLogoutService(executor));
803             for (Route route : LOGOUT_API_ROUTES) {
804                 sb.service(route, decorator.apply(logout));
805             }
806 
807             authProvider.moreServices().forEach(sb::service);
808         }
809 
810         if (cfg.isWebAppEnabled()) {
811             final RestfulJsonResponseConverter httpApiV0Converter = new RestfulJsonResponseConverter();
812 
813             // TODO(hyangtack): Simplify this if https://github.com/line/armeria/issues/582 is resolved.
814             sb.annotatedService(API_V0_PATH_PREFIX, new UserService(executor),
815                                 decorator, jacksonRequestConverterFunction, httpApiV0Converter)
816               .annotatedService(API_V0_PATH_PREFIX, new RepositoryService(projectApiManager, executor),
817                                 decorator, jacksonRequestConverterFunction, httpApiV0Converter);
818 
819             if (authProvider != null) {
820                 // Will redirect to /web/auth/login by default.
821                 sb.service(LOGIN_PATH, authProvider.webLoginService());
822                 // Will redirect to /web/auth/logout by default.
823                 sb.service(LOGOUT_PATH, authProvider.webLogoutService());
824 
825                 sb.serviceUnder(BUILTIN_WEB_BASE_PATH, new OrElseDefaultHttpFileService(
826                         FileService.builder(CentralDogma.class.getClassLoader(), "auth-webapp")
827                                    .autoDecompress(true)
828                                    .serveCompressedFiles(true)
829                                    .cacheControl(ServerCacheControl.REVALIDATED)
830                                    .build(),
831                         "/index.html"));
832             }
833             sb.serviceUnder("/",
834                             FileService.builder(CentralDogma.class.getClassLoader(), "webapp")
835                                        .cacheControl(ServerCacheControl.REVALIDATED)
836                                        .build());
837         }
838     }
839 
840     private static void configCors(ServerBuilder sb, @Nullable CorsConfig corsConfig) {
841         if (corsConfig == null) {
842             return;
843         }
844 
845         sb.decorator(CorsService.builder(corsConfig.allowedOrigins())
846                                 .allowRequestMethods(HttpMethod.knownMethods())
847                                 .allowAllRequestHeaders(true)
848                                 .allowCredentials()
849                                 .maxAge(corsConfig.maxAgeSeconds())
850                                 .newDecorator());
851     }
852 
853     private static Function<? super HttpService, EncodingService> contentEncodingDecorator() {
854         return delegate -> EncodingService
855                 .builder()
856                 .encodableContentTypes(contentType -> {
857                     if ("application".equals(contentType.type())) {
858                         final String subtype = contentType.subtype();
859                         switch (subtype) {
860                             case "json":
861                             case "xml":
862                             case "x-thrift":
863                                 return true;
864                             default:
865                                 return subtype.endsWith("+json") ||
866                                        subtype.endsWith("+xml") ||
867                                        subtype.startsWith("vnd.apache.thrift.");
868                         }
869                     }
870                     return false;
871                 })
872                 .build(delegate);
873     }
874 
875     private void configureMetrics(ServerBuilder sb, MeterRegistry registry) {
876         sb.meterRegistry(registry);
877 
878         // expose the prometheus endpoint if the registry is either a PrometheusMeterRegistry or
879         // CompositeMeterRegistry
880         if (registry instanceof PrometheusMeterRegistry) {
881             final PrometheusMeterRegistry prometheusMeterRegistry = (PrometheusMeterRegistry) registry;
882             sb.service(METRICS_PATH,
883                        PrometheusExpositionService.of(prometheusMeterRegistry.getPrometheusRegistry()));
884         } else if (registry instanceof CompositeMeterRegistry) {
885             final PrometheusMeterRegistry prometheusMeterRegistry = PrometheusMeterRegistries.newRegistry();
886             ((CompositeMeterRegistry) registry).add(prometheusMeterRegistry);
887             sb.service(METRICS_PATH,
888                        PrometheusExpositionService.of(prometheusMeterRegistry.getPrometheusRegistry()));
889             meterRegistryToBeClosed = prometheusMeterRegistry;
890         } else {
891             logger.info("Not exposing a prometheus endpoint for the type: {}", registry.getClass());
892         }
893 
894         sb.decorator(MetricCollectingService.newDecorator(repoAwareMeterIdPrefixFunction()));
895 
896         // Bind system metrics.
897         new FileDescriptorMetrics().bindTo(registry);
898         new ProcessorMetrics().bindTo(registry);
899         new ClassLoaderMetrics().bindTo(registry);
900         new UptimeMetrics().bindTo(registry);
901         new DiskSpaceMetrics(cfg.dataDir()).bindTo(registry);
902         new JvmGcMetrics().bindTo(registry);
903         new JvmMemoryMetrics().bindTo(registry);
904         new JvmThreadMetrics().bindTo(registry);
905 
906         // Bind global thread pool metrics.
907         ExecutorServiceMetrics.monitor(registry, ForkJoinPool.commonPool(), "commonPool");
908     }
909 
910     private static MeterIdPrefixFunction repoAwareMeterIdPrefixFunction() {
911         final MeterIdPrefixFunction meterIdPrefix = MeterIdPrefixFunction.ofDefault("api");
912         return meterIdPrefix.andThen((registry0, log, meterIdPrefix0) -> {
913             final ServiceRequestContext ctx = (ServiceRequestContext) log.context();
914             final String project = firstNonNull(ctx.pathParam("projectName"), "none");
915             final String repo = firstNonNull(ctx.pathParam("repoName"), "none");
916             return meterIdPrefix0.withTags("project", project, "repository", repo);
917         });
918     }
919 
920     private void doStop() {
921         if (server == null) {
922             return;
923         }
924 
925         final Server server = this.server;
926         final CommandExecutor executor = this.executor;
927         final ProjectManager pm = this.pm;
928         final ExecutorService repositoryWorker = this.repositoryWorker;
929         final ExecutorService purgeWorker = this.purgeWorker;
930         final SessionManager sessionManager = this.sessionManager;
931 
932         this.server = null;
933         this.executor = null;
934         this.pm = null;
935         this.repositoryWorker = null;
936         this.sessionManager = null;
937         if (meterRegistryToBeClosed != null) {
938             assert meterRegistry instanceof CompositeMeterRegistry;
939             ((CompositeMeterRegistry) meterRegistry).remove(meterRegistryToBeClosed);
940             meterRegistryToBeClosed.close();
941             meterRegistryToBeClosed = null;
942         }
943 
944         logger.info("Stopping the Central Dogma ..");
945         if (!doStop(server, executor, pm, repositoryWorker, purgeWorker, sessionManager)) {
946             logger.warn("Stopped the Central Dogma with failure.");
947         } else {
948             logger.info("Stopped the Central Dogma successfully.");
949         }
950     }
951 
952     private static boolean doStop(
953             @Nullable Server server, @Nullable CommandExecutor executor,
954             @Nullable ProjectManager pm,
955             @Nullable ExecutorService repositoryWorker, @Nullable ExecutorService purgeWorker,
956             @Nullable SessionManager sessionManager) {
957 
958         boolean success = true;
959         try {
960             if (sessionManager != null) {
961                 logger.info("Stopping the session manager ..");
962                 sessionManager.close();
963                 logger.info("Stopped the session manager.");
964             }
965         } catch (Throwable t) {
966             success = false;
967             logger.warn("Failed to stop the session manager:", t);
968         }
969 
970         try {
971             if (pm != null) {
972                 logger.info("Stopping the project manager ..");
973                 pm.close(ShuttingDownException::new);
974                 logger.info("Stopped the project manager.");
975             }
976         } catch (Throwable t) {
977             success = false;
978             logger.warn("Failed to stop the project manager:", t);
979         }
980 
981         try {
982             if (executor != null) {
983                 logger.info("Stopping the command executor ..");
984                 executor.stop();
985                 logger.info("Stopped the command executor.");
986             }
987         } catch (Throwable t) {
988             success = false;
989             logger.warn("Failed to stop the command executor:", t);
990         }
991 
992         final BiFunction<ExecutorService, String, Boolean> stopWorker = (worker, name) -> {
993             try {
994                 if (worker != null && !worker.isTerminated()) {
995                     logger.info("Stopping the {} worker ..", name);
996                     boolean interruptLater = false;
997                     while (!worker.isTerminated()) {
998                         worker.shutdownNow();
999                         try {
1000                             worker.awaitTermination(1, TimeUnit.SECONDS);
1001                         } catch (InterruptedException e) {
1002                             // Interrupt later.
1003                             interruptLater = true;
1004                         }
1005                     }
1006                     logger.info("Stopped the {} worker.", name);
1007 
1008                     if (interruptLater) {
1009                         Thread.currentThread().interrupt();
1010                     }
1011                 }
1012                 return true;
1013             } catch (Throwable t) {
1014                 logger.warn("Failed to stop the " + name + " worker:", t);
1015                 return false;
1016             }
1017         };
1018         if (!stopWorker.apply(repositoryWorker, "repository")) {
1019             success = false;
1020         }
1021         if (!stopWorker.apply(purgeWorker, "purge")) {
1022             success = false;
1023         }
1024 
1025         try {
1026             if (server != null) {
1027                 logger.info("Stopping the RPC server ..");
1028                 server.stop().join();
1029                 logger.info("Stopped the RPC server.");
1030             }
1031         } catch (Throwable t) {
1032             success = false;
1033             logger.warn("Failed to stop the RPC server:", t);
1034         }
1035 
1036         return success;
1037     }
1038 
1039     private final class CentralDogmaStartStop extends StartStopSupport<Void, Void, Void, Void> {
1040 
1041         @Nullable
1042         private final PluginGroup pluginsForAllReplicas;
1043 
1044         CentralDogmaStartStop(@Nullable PluginGroup pluginsForAllReplicas) {
1045             super(GlobalEventExecutor.INSTANCE);
1046             this.pluginsForAllReplicas = pluginsForAllReplicas;
1047         }
1048 
1049         @Override
1050         protected CompletionStage<Void> doStart(@Nullable Void unused) throws Exception {
1051             return execute("startup", () -> {
1052                 try {
1053                     CentralDogma.this.doStart();
1054                     if (pluginsForAllReplicas != null) {
1055                         final ProjectManager pm = CentralDogma.this.pm;
1056                         final CommandExecutor executor = CentralDogma.this.executor;
1057                         final MeterRegistry meterRegistry = CentralDogma.this.meterRegistry;
1058                         if (pm != null && executor != null && meterRegistry != null) {
1059                             pluginsForAllReplicas.start(cfg, pm, executor, meterRegistry, purgeWorker).join();
1060                         }
1061                     }
1062                 } catch (Exception e) {
1063                     Exceptions.throwUnsafely(e);
1064                 }
1065             });
1066         }
1067 
1068         @Override
1069         protected CompletionStage<Void> doStop(@Nullable Void unused) throws Exception {
1070             return execute("shutdown", () -> {
1071                 if (pluginsForAllReplicas != null) {
1072                     final ProjectManager pm = CentralDogma.this.pm;
1073                     final CommandExecutor executor = CentralDogma.this.executor;
1074                     final MeterRegistry meterRegistry = CentralDogma.this.meterRegistry;
1075                     if (pm != null && executor != null && meterRegistry != null) {
1076                         pluginsForAllReplicas.stop(cfg, pm, executor, meterRegistry, purgeWorker).join();
1077                     }
1078                 }
1079                 CentralDogma.this.doStop();
1080             });
1081         }
1082 
1083         private CompletionStage<Void> execute(String mode, Runnable task) {
1084             final CompletableFuture<Void> future = new CompletableFuture<>();
1085             final Thread thread = new Thread(() -> {
1086                 try {
1087                     task.run();
1088                     future.complete(null);
1089                 } catch (Throwable cause) {
1090                     future.completeExceptionally(cause);
1091                 }
1092             }, "dogma-" + mode + "-0x" + Long.toHexString(CentralDogma.this.hashCode() & 0xFFFFFFFFL));
1093             thread.start();
1094             return future;
1095         }
1096     }
1097 }