1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.linecorp.centraldogma.server;
18
19 import static com.google.common.base.MoreObjects.firstNonNull;
20 import static com.google.common.base.Preconditions.checkState;
21 import static com.google.common.base.Strings.isNullOrEmpty;
22 import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.API_V0_PATH_PREFIX;
23 import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.API_V1_PATH_PREFIX;
24 import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.HEALTH_CHECK_PATH;
25 import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.METRICS_PATH;
26 import static com.linecorp.centraldogma.server.auth.AuthProvider.BUILTIN_WEB_BASE_PATH;
27 import static com.linecorp.centraldogma.server.auth.AuthProvider.LOGIN_API_ROUTES;
28 import static com.linecorp.centraldogma.server.auth.AuthProvider.LOGIN_PATH;
29 import static com.linecorp.centraldogma.server.auth.AuthProvider.LOGOUT_API_ROUTES;
30 import static com.linecorp.centraldogma.server.auth.AuthProvider.LOGOUT_PATH;
31 import static com.linecorp.centraldogma.server.internal.storage.project.ProjectInitializer.initializeInternalProject;
32 import static java.util.Objects.requireNonNull;
33
34 import java.io.File;
35 import java.io.IOException;
36 import java.io.InputStream;
37 import java.net.InetSocketAddress;
38 import java.util.Collections;
39 import java.util.List;
40 import java.util.Map;
41 import java.util.Optional;
42 import java.util.concurrent.CompletableFuture;
43 import java.util.concurrent.CompletionStage;
44 import java.util.concurrent.Executor;
45 import java.util.concurrent.ExecutorService;
46 import java.util.concurrent.Executors;
47 import java.util.concurrent.ForkJoinPool;
48 import java.util.concurrent.LinkedBlockingQueue;
49 import java.util.concurrent.ScheduledExecutorService;
50 import java.util.concurrent.ThreadPoolExecutor;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.TimeoutException;
53 import java.util.concurrent.atomic.AtomicInteger;
54 import java.util.function.BiFunction;
55 import java.util.function.Consumer;
56 import java.util.function.Function;
57
58 import javax.annotation.Nullable;
59
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 import com.fasterxml.jackson.core.JsonProcessingException;
64 import com.fasterxml.jackson.databind.ObjectMapper;
65 import com.fasterxml.jackson.databind.module.SimpleModule;
66 import com.github.benmanes.caffeine.cache.Caffeine;
67 import com.github.benmanes.caffeine.cache.stats.CacheStats;
68 import com.google.common.collect.ImmutableList;
69 import com.google.common.collect.ImmutableMap;
70
71 import com.linecorp.armeria.common.Flags;
72 import com.linecorp.armeria.common.HttpData;
73 import com.linecorp.armeria.common.HttpHeaderNames;
74 import com.linecorp.armeria.common.HttpHeaders;
75 import com.linecorp.armeria.common.HttpMethod;
76 import com.linecorp.armeria.common.HttpRequest;
77 import com.linecorp.armeria.common.HttpResponse;
78 import com.linecorp.armeria.common.HttpStatus;
79 import com.linecorp.armeria.common.MediaType;
80 import com.linecorp.armeria.common.ServerCacheControl;
81 import com.linecorp.armeria.common.metric.MeterIdPrefixFunction;
82 import com.linecorp.armeria.common.metric.PrometheusMeterRegistries;
83 import com.linecorp.armeria.common.util.EventLoopGroups;
84 import com.linecorp.armeria.common.util.Exceptions;
85 import com.linecorp.armeria.common.util.StartStopSupport;
86 import com.linecorp.armeria.common.util.SystemInfo;
87 import com.linecorp.armeria.server.AbstractHttpService;
88 import com.linecorp.armeria.server.HttpService;
89 import com.linecorp.armeria.server.Route;
90 import com.linecorp.armeria.server.Server;
91 import com.linecorp.armeria.server.ServerBuilder;
92 import com.linecorp.armeria.server.ServerPort;
93 import com.linecorp.armeria.server.ServiceNaming;
94 import com.linecorp.armeria.server.ServiceRequestContext;
95 import com.linecorp.armeria.server.annotation.JacksonRequestConverterFunction;
96 import com.linecorp.armeria.server.auth.AuthService;
97 import com.linecorp.armeria.server.auth.Authorizer;
98 import com.linecorp.armeria.server.cors.CorsService;
99 import com.linecorp.armeria.server.docs.DocService;
100 import com.linecorp.armeria.server.encoding.EncodingService;
101 import com.linecorp.armeria.server.file.FileService;
102 import com.linecorp.armeria.server.file.HttpFile;
103 import com.linecorp.armeria.server.healthcheck.HealthCheckService;
104 import com.linecorp.armeria.server.healthcheck.SettableHealthChecker;
105 import com.linecorp.armeria.server.logging.AccessLogWriter;
106 import com.linecorp.armeria.server.metric.MetricCollectingService;
107 import com.linecorp.armeria.server.metric.PrometheusExpositionService;
108 import com.linecorp.armeria.server.thrift.THttpService;
109 import com.linecorp.armeria.server.thrift.ThriftCallService;
110 import com.linecorp.centraldogma.common.ShuttingDownException;
111 import com.linecorp.centraldogma.internal.CsrfToken;
112 import com.linecorp.centraldogma.internal.Jackson;
113 import com.linecorp.centraldogma.internal.thrift.CentralDogmaService;
114 import com.linecorp.centraldogma.server.auth.AuthConfig;
115 import com.linecorp.centraldogma.server.auth.AuthProvider;
116 import com.linecorp.centraldogma.server.auth.AuthProviderParameters;
117 import com.linecorp.centraldogma.server.auth.SessionManager;
118 import com.linecorp.centraldogma.server.command.Command;
119 import com.linecorp.centraldogma.server.command.CommandExecutor;
120 import com.linecorp.centraldogma.server.command.StandaloneCommandExecutor;
121 import com.linecorp.centraldogma.server.internal.admin.auth.CachedSessionManager;
122 import com.linecorp.centraldogma.server.internal.admin.auth.CsrfTokenAuthorizer;
123 import com.linecorp.centraldogma.server.internal.admin.auth.ExpiredSessionDeletingSessionManager;
124 import com.linecorp.centraldogma.server.internal.admin.auth.FileBasedSessionManager;
125 import com.linecorp.centraldogma.server.internal.admin.auth.OrElseDefaultHttpFileService;
126 import com.linecorp.centraldogma.server.internal.admin.auth.SessionTokenAuthorizer;
127 import com.linecorp.centraldogma.server.internal.admin.service.DefaultLogoutService;
128 import com.linecorp.centraldogma.server.internal.admin.service.RepositoryService;
129 import com.linecorp.centraldogma.server.internal.admin.service.UserService;
130 import com.linecorp.centraldogma.server.internal.admin.util.RestfulJsonResponseConverter;
131 import com.linecorp.centraldogma.server.internal.api.AdministrativeService;
132 import com.linecorp.centraldogma.server.internal.api.ContentServiceV1;
133 import com.linecorp.centraldogma.server.internal.api.MetadataApiService;
134 import com.linecorp.centraldogma.server.internal.api.ProjectServiceV1;
135 import com.linecorp.centraldogma.server.internal.api.RepositoryServiceV1;
136 import com.linecorp.centraldogma.server.internal.api.TokenService;
137 import com.linecorp.centraldogma.server.internal.api.WatchService;
138 import com.linecorp.centraldogma.server.internal.api.auth.ApplicationTokenAuthorizer;
139 import com.linecorp.centraldogma.server.internal.api.converter.HttpApiRequestConverter;
140 import com.linecorp.centraldogma.server.internal.api.converter.HttpApiResponseConverter;
141 import com.linecorp.centraldogma.server.internal.mirror.DefaultMirroringServicePlugin;
142 import com.linecorp.centraldogma.server.internal.replication.ZooKeeperCommandExecutor;
143 import com.linecorp.centraldogma.server.internal.storage.project.DefaultProjectManager;
144 import com.linecorp.centraldogma.server.internal.storage.project.ProjectApiManager;
145 import com.linecorp.centraldogma.server.internal.thrift.CentralDogmaExceptionTranslator;
146 import com.linecorp.centraldogma.server.internal.thrift.CentralDogmaServiceImpl;
147 import com.linecorp.centraldogma.server.internal.thrift.CentralDogmaTimeoutScheduler;
148 import com.linecorp.centraldogma.server.internal.thrift.TokenlessClientLogger;
149 import com.linecorp.centraldogma.server.management.ServerStatus;
150 import com.linecorp.centraldogma.server.management.ServerStatusManager;
151 import com.linecorp.centraldogma.server.metadata.MetadataService;
152 import com.linecorp.centraldogma.server.metadata.MetadataServiceInjector;
153 import com.linecorp.centraldogma.server.plugin.AllReplicasPlugin;
154 import com.linecorp.centraldogma.server.plugin.Plugin;
155 import com.linecorp.centraldogma.server.plugin.PluginInitContext;
156 import com.linecorp.centraldogma.server.plugin.PluginTarget;
157 import com.linecorp.centraldogma.server.storage.project.ProjectManager;
158
159 import io.micrometer.core.instrument.MeterRegistry;
160 import io.micrometer.core.instrument.binder.jvm.ClassLoaderMetrics;
161 import io.micrometer.core.instrument.binder.jvm.DiskSpaceMetrics;
162 import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
163 import io.micrometer.core.instrument.binder.jvm.JvmGcMetrics;
164 import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
165 import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
166 import io.micrometer.core.instrument.binder.system.FileDescriptorMetrics;
167 import io.micrometer.core.instrument.binder.system.ProcessorMetrics;
168 import io.micrometer.core.instrument.binder.system.UptimeMetrics;
169 import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
170 import io.micrometer.prometheus.PrometheusMeterRegistry;
171 import io.netty.util.concurrent.DefaultThreadFactory;
172 import io.netty.util.concurrent.GlobalEventExecutor;
173
174
175
176
177
178
179 public class CentralDogma implements AutoCloseable {
180
181 private static final Logger logger = LoggerFactory.getLogger(CentralDogma.class);
182
183 static {
184 Jackson.registerModules(new SimpleModule().addSerializer(CacheStats.class, new CacheStatsSerializer()));
185 }
186
187
188
189
190
191
192 public static CentralDogma forConfig(File configFile) throws IOException {
193 requireNonNull(configFile, "configFile");
194 return new CentralDogma(Jackson.readValue(configFile, CentralDogmaConfig.class),
195 Flags.meterRegistry());
196 }
197
198 private final SettableHealthChecker serverHealth = new SettableHealthChecker(false);
199 private final CentralDogmaStartStop startStop;
200
201 private final AtomicInteger numPendingStopRequests = new AtomicInteger();
202
203 @Nullable
204 private final PluginGroup pluginsForAllReplicas;
205 @Nullable
206 private final PluginGroup pluginsForLeaderOnly;
207
208 private final CentralDogmaConfig cfg;
209 @Nullable
210 private volatile ProjectManager pm;
211 @Nullable
212 private volatile Server server;
213 @Nullable
214 private ExecutorService repositoryWorker;
215 @Nullable
216 private ScheduledExecutorService purgeWorker;
217 @Nullable
218 private CommandExecutor executor;
219 private final MeterRegistry meterRegistry;
220 @Nullable
221 MeterRegistry meterRegistryToBeClosed;
222 @Nullable
223 private SessionManager sessionManager;
224 @Nullable
225 private ServerStatusManager statusManager;
226
227 CentralDogma(CentralDogmaConfig cfg, MeterRegistry meterRegistry) {
228 this.cfg = requireNonNull(cfg, "cfg");
229 pluginsForAllReplicas = PluginGroup.loadPlugins(
230 CentralDogma.class.getClassLoader(), PluginTarget.ALL_REPLICAS, cfg);
231 pluginsForLeaderOnly = PluginGroup.loadPlugins(
232 CentralDogma.class.getClassLoader(), PluginTarget.LEADER_ONLY, cfg);
233 startStop = new CentralDogmaStartStop(pluginsForAllReplicas);
234 this.meterRegistry = meterRegistry;
235 }
236
237
238
239
240
241
242 public CentralDogmaConfig config() {
243 return cfg;
244 }
245
246
247
248
249
250
251 @Nullable
252 public ServerPort activePort() {
253 final Server server = this.server;
254 return server != null ? server.activePort() : null;
255 }
256
257
258
259
260
261
262
263 public Map<InetSocketAddress, ServerPort> activePorts() {
264 final Server server = this.server;
265 if (server != null) {
266 return server.activePorts();
267 } else {
268 return Collections.emptyMap();
269 }
270 }
271
272
273
274
275
276 @Nullable
277 public ProjectManager projectManager() {
278 return pm;
279 }
280
281
282
283
284
285
286
287 public Optional<MirroringService> mirroringService() {
288 if (pluginsForLeaderOnly == null) {
289 return Optional.empty();
290 }
291 return pluginsForLeaderOnly.findFirstPlugin(DefaultMirroringServicePlugin.class)
292 .map(DefaultMirroringServicePlugin::mirroringService);
293 }
294
295
296
297
298
299
300 public List<Plugin> plugins(PluginTarget target) {
301 switch (requireNonNull(target, "target")) {
302 case LEADER_ONLY:
303 return pluginsForLeaderOnly != null ? ImmutableList.copyOf(pluginsForLeaderOnly.plugins())
304 : ImmutableList.of();
305 case ALL_REPLICAS:
306 return pluginsForAllReplicas != null ? ImmutableList.copyOf(pluginsForAllReplicas.plugins())
307 : ImmutableList.of();
308 default:
309
310 throw new Error("Unknown plugin target: " + target);
311 }
312 }
313
314
315
316
317 public Optional<MeterRegistry> meterRegistry() {
318 return Optional.ofNullable(meterRegistry);
319 }
320
321
322
323
324 public CompletableFuture<Void> start() {
325 return startStop.start(true);
326 }
327
328
329
330
331 public CompletableFuture<Void> stop() {
332 serverHealth.setHealthy(false);
333
334 final Optional<GracefulShutdownTimeout> gracefulTimeoutOpt = cfg.gracefulShutdownTimeout();
335 if (gracefulTimeoutOpt.isPresent()) {
336 try {
337
338
339 Thread.sleep(1000);
340 } catch (InterruptedException e) {
341 logger.debug("Interrupted while waiting for quiet period", e);
342 Thread.currentThread().interrupt();
343 }
344 }
345
346 numPendingStopRequests.incrementAndGet();
347 return startStop.stop().thenRun(numPendingStopRequests::decrementAndGet);
348 }
349
350 @Override
351 public void close() {
352 startStop.close();
353 }
354
355 private void doStart() throws Exception {
356 boolean success = false;
357 ExecutorService repositoryWorker = null;
358 ScheduledExecutorService purgeWorker = null;
359 ProjectManager pm = null;
360 CommandExecutor executor = null;
361 Server server = null;
362 SessionManager sessionManager = null;
363 try {
364 logger.info("Starting the Central Dogma ..");
365
366 final ThreadPoolExecutor repositoryWorkerImpl = new ThreadPoolExecutor(
367 cfg.numRepositoryWorkers(), cfg.numRepositoryWorkers(),
368
369 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
370 new DefaultThreadFactory("repository-worker", true));
371 repositoryWorkerImpl.allowCoreThreadTimeOut(true);
372 repositoryWorker = ExecutorServiceMetrics.monitor(meterRegistry, repositoryWorkerImpl,
373 "repositoryWorker");
374
375 logger.info("Starting the project manager: {}", cfg.dataDir());
376
377 purgeWorker = Executors.newSingleThreadScheduledExecutor(
378 new DefaultThreadFactory("purge-worker", true));
379
380 pm = new DefaultProjectManager(cfg.dataDir(), repositoryWorker, purgeWorker,
381 meterRegistry, cfg.repositoryCacheSpec());
382
383 logger.info("Started the project manager: {}", pm);
384
385 logger.info("Current settings:\n{}", cfg);
386
387 sessionManager = initializeSessionManager();
388
389 logger.info("Starting the command executor ..");
390 executor = startCommandExecutor(pm, repositoryWorker, purgeWorker,
391 meterRegistry, sessionManager);
392 if (executor.isWritable()) {
393 logger.info("Started the command executor.");
394
395 initializeInternalProject(executor);
396 }
397
398 logger.info("Starting the RPC server.");
399 server = startServer(pm, executor, purgeWorker, meterRegistry, sessionManager);
400 logger.info("Started the RPC server at: {}", server.activePorts());
401 logger.info("Started the Central Dogma successfully.");
402 success = true;
403 } finally {
404 if (success) {
405 serverHealth.setHealthy(true);
406 this.repositoryWorker = repositoryWorker;
407 this.purgeWorker = purgeWorker;
408 this.pm = pm;
409 this.executor = executor;
410 this.server = server;
411 this.sessionManager = sessionManager;
412 } else {
413 doStop(server, executor, pm, repositoryWorker, purgeWorker, sessionManager);
414 }
415 }
416 }
417
418 private CommandExecutor startCommandExecutor(
419 ProjectManager pm, Executor repositoryWorker,
420 ScheduledExecutorService purgeWorker, MeterRegistry meterRegistry,
421 @Nullable SessionManager sessionManager) {
422
423 final Consumer<CommandExecutor> onTakeLeadership = exec -> {
424 if (pluginsForLeaderOnly != null) {
425 logger.info("Starting plugins on the leader replica ..");
426 pluginsForLeaderOnly
427 .start(cfg, pm, exec, meterRegistry, purgeWorker).handle((unused, cause) -> {
428 if (cause == null) {
429 logger.info("Started plugins on the leader replica.");
430 } else {
431 logger.error("Failed to start plugins on the leader replica..", cause);
432 }
433 return null;
434 });
435 }
436 };
437
438 final Consumer<CommandExecutor> onReleaseLeadership = exec -> {
439 if (pluginsForLeaderOnly != null) {
440 logger.info("Stopping plugins on the leader replica ..");
441 pluginsForLeaderOnly.stop(cfg, pm, exec, meterRegistry, purgeWorker).handle((unused, cause) -> {
442 if (cause == null) {
443 logger.info("Stopped plugins on the leader replica.");
444 } else {
445 logger.error("Failed to stop plugins on the leader replica.", cause);
446 }
447 return null;
448 });
449 }
450 };
451
452 statusManager = new ServerStatusManager(cfg.dataDir());
453 final CommandExecutor executor;
454 final ReplicationMethod replicationMethod = cfg.replicationConfig().method();
455 switch (replicationMethod) {
456 case ZOOKEEPER:
457 executor = newZooKeeperCommandExecutor(pm, repositoryWorker, statusManager, meterRegistry,
458 sessionManager, onTakeLeadership, onReleaseLeadership);
459 break;
460 case NONE:
461 logger.info("No replication mechanism specified; entering standalone");
462 executor = new StandaloneCommandExecutor(pm, repositoryWorker, statusManager, sessionManager,
463 cfg.writeQuotaPerRepository(),
464 onTakeLeadership, onReleaseLeadership);
465 break;
466 default:
467 throw new Error("unknown replication method: " + replicationMethod);
468 }
469
470 final ServerStatus initialServerStatus = statusManager.serverStatus();
471 executor.setWritable(initialServerStatus.writable());
472 if (!initialServerStatus.replicating()) {
473 return executor;
474 }
475 try {
476 final CompletableFuture<Void> startFuture = executor.start();
477 while (!startFuture.isDone()) {
478 if (numPendingStopRequests.get() > 0) {
479
480 executor.stop().get();
481 break;
482 }
483
484 try {
485 startFuture.get(100, TimeUnit.MILLISECONDS);
486 } catch (TimeoutException unused) {
487
488 }
489 }
490
491
492 startFuture.get();
493 } catch (Exception e) {
494 logger.warn("Failed to start the command executor. Entering read-only.", e);
495 }
496
497 return executor;
498 }
499
500 @Nullable
501 private SessionManager initializeSessionManager() throws Exception {
502 final AuthConfig authCfg = cfg.authConfig();
503 if (authCfg == null) {
504 return null;
505 }
506
507 boolean success = false;
508 SessionManager manager = null;
509 try {
510 manager = new FileBasedSessionManager(new File(cfg.dataDir(), "_sessions").toPath(),
511 authCfg.sessionValidationSchedule());
512 manager = new CachedSessionManager(manager, Caffeine.from(authCfg.sessionCacheSpec()).build());
513 manager = new ExpiredSessionDeletingSessionManager(manager);
514 success = true;
515 return manager;
516 } finally {
517 if (!success && manager != null) {
518 try {
519
520
521 manager.close();
522 } catch (Exception e) {
523 logger.warn("Failed to close a session manager.", e);
524 }
525 }
526 }
527 }
528
529 private Server startServer(ProjectManager pm, CommandExecutor executor,
530 ScheduledExecutorService purgeWorker, MeterRegistry meterRegistry,
531 @Nullable SessionManager sessionManager) {
532 final ServerBuilder sb = Server.builder();
533 sb.verboseResponses(true);
534 cfg.ports().forEach(sb::port);
535
536 if (cfg.ports().stream().anyMatch(ServerPort::hasTls)) {
537 try {
538 final TlsConfig tlsConfig = cfg.tls();
539 if (tlsConfig != null) {
540 try (InputStream keyCertChainInputStream = tlsConfig.keyCertChainInputStream();
541 InputStream keyInputStream = tlsConfig.keyInputStream()) {
542 sb.tls(keyCertChainInputStream, keyInputStream, tlsConfig.keyPassword());
543 }
544 } else {
545 logger.warn(
546 "Missing TLS configuration. Generating a self-signed certificate for TLS support.");
547 sb.tlsSelfSigned();
548 }
549 } catch (Exception e) {
550 Exceptions.throwUnsafely(e);
551 }
552 }
553
554 sb.clientAddressSources(cfg.clientAddressSourceList());
555 sb.clientAddressTrustedProxyFilter(cfg.trustedProxyAddressPredicate());
556
557 configCors(sb, config().corsConfig());
558
559 cfg.numWorkers().ifPresent(
560 numWorkers -> sb.workerGroup(EventLoopGroups.newEventLoopGroup(numWorkers), true));
561 cfg.maxNumConnections().ifPresent(sb::maxNumConnections);
562 cfg.idleTimeoutMillis().ifPresent(sb::idleTimeoutMillis);
563 cfg.requestTimeoutMillis().ifPresent(sb::requestTimeoutMillis);
564 cfg.maxFrameLength().ifPresent(sb::maxRequestLength);
565 cfg.gracefulShutdownTimeout().ifPresent(
566 t -> sb.gracefulShutdownTimeoutMillis(t.quietPeriodMillis(), t.timeoutMillis()));
567
568 final MetadataService mds = new MetadataService(pm, executor);
569 final WatchService watchService = new WatchService(meterRegistry);
570 final AuthProvider authProvider = createAuthProvider(executor, sessionManager, mds);
571 final ProjectApiManager projectApiManager = new ProjectApiManager(pm, executor, mds);
572
573 configureThriftService(sb, projectApiManager, executor, watchService, mds);
574
575 sb.service("/title", webAppTitleFile(cfg.webAppTitle(), SystemInfo.hostname()).asService());
576
577 sb.service(HEALTH_CHECK_PATH, HealthCheckService.builder()
578 .checkers(serverHealth)
579 .build());
580
581 sb.serviceUnder("/docs/",
582 DocService.builder()
583 .exampleHeaders(CentralDogmaService.class,
584 HttpHeaders.of(HttpHeaderNames.AUTHORIZATION,
585 "Bearer " + CsrfToken.ANONYMOUS))
586 .build());
587
588 configureHttpApi(sb, projectApiManager, executor, watchService, mds, authProvider, sessionManager);
589
590 configureMetrics(sb, meterRegistry);
591
592
593 final String accessLogFormat = cfg.accessLogFormat();
594 if (isNullOrEmpty(accessLogFormat)) {
595 sb.accessLogWriter(AccessLogWriter.disabled(), true);
596 } else if ("common".equals(accessLogFormat)) {
597 sb.accessLogWriter(AccessLogWriter.common(), true);
598 } else if ("combined".equals(accessLogFormat)) {
599 sb.accessLogWriter(AccessLogWriter.combined(), true);
600 } else {
601 sb.accessLogFormat(accessLogFormat);
602 }
603
604 if (pluginsForAllReplicas != null) {
605 final PluginInitContext pluginInitContext =
606 new PluginInitContext(config(), pm, executor, meterRegistry, purgeWorker, sb);
607 pluginsForAllReplicas.plugins()
608 .forEach(p -> {
609 if (!(p instanceof AllReplicasPlugin)) {
610 return;
611 }
612 final AllReplicasPlugin plugin = (AllReplicasPlugin) p;
613 plugin.init(pluginInitContext);
614 });
615 }
616
617
618 Thread.setDefaultUncaughtExceptionHandler((t, e) -> logger.warn("Uncaught exception: {}", t, e));
619
620 final Server s = sb.build();
621 s.start().join();
622 return s;
623 }
624
625 static HttpFile webAppTitleFile(@Nullable String webAppTitle, String hostname) {
626 requireNonNull(hostname, "hostname");
627 final Map<String, String> titleAndHostname = ImmutableMap.of(
628 "title", firstNonNull(webAppTitle, "Central Dogma at {{hostname}}"),
629 "hostname", hostname);
630
631 try {
632 final HttpData data = HttpData.ofUtf8(Jackson.writeValueAsString(titleAndHostname));
633 return HttpFile.builder(data)
634 .contentType(MediaType.JSON_UTF_8)
635 .cacheControl(ServerCacheControl.REVALIDATED)
636 .build();
637 } catch (JsonProcessingException e) {
638 throw new Error("Failed to encode the title and hostname:", e);
639 }
640 }
641
642 @Nullable
643 private AuthProvider createAuthProvider(
644 CommandExecutor commandExecutor, @Nullable SessionManager sessionManager, MetadataService mds) {
645 final AuthConfig authCfg = cfg.authConfig();
646 if (authCfg == null) {
647 return null;
648 }
649
650 checkState(sessionManager != null, "SessionManager is null");
651 final AuthProviderParameters parameters = new AuthProviderParameters(
652
653 new ApplicationTokenAuthorizer(mds::findTokenBySecret).orElse(
654 new SessionTokenAuthorizer(sessionManager, authCfg.administrators())),
655 cfg,
656 sessionManager::generateSessionId,
657
658 session -> commandExecutor.execute(Command.createSession(session)),
659 sessionId -> commandExecutor.execute(Command.removeSession(sessionId)));
660 return authCfg.factory().create(parameters);
661 }
662
663 private CommandExecutor newZooKeeperCommandExecutor(
664 ProjectManager pm, Executor repositoryWorker,
665 ServerStatusManager serverStatusManager,
666 MeterRegistry meterRegistry,
667 @Nullable SessionManager sessionManager,
668 @Nullable Consumer<CommandExecutor> onTakeLeadership,
669 @Nullable Consumer<CommandExecutor> onReleaseLeadership) {
670 final ZooKeeperReplicationConfig zkCfg = (ZooKeeperReplicationConfig) cfg.replicationConfig();
671
672
673 final File dataDir = cfg.dataDir();
674 new File(dataDir, "replica_id").delete();
675
676
677
678 return new ZooKeeperCommandExecutor(
679 zkCfg, dataDir,
680 new StandaloneCommandExecutor(pm, repositoryWorker, serverStatusManager, sessionManager,
681 null, null),
682 meterRegistry, pm, config().writeQuotaPerRepository(), onTakeLeadership, onReleaseLeadership);
683 }
684
685 private void configureThriftService(ServerBuilder sb, ProjectApiManager projectApiManager,
686 CommandExecutor executor,
687 WatchService watchService, MetadataService mds) {
688 final CentralDogmaServiceImpl service =
689 new CentralDogmaServiceImpl(projectApiManager, executor, watchService, mds);
690
691 HttpService thriftService =
692 ThriftCallService.of(service)
693 .decorate(CentralDogmaTimeoutScheduler::new)
694 .decorate(CentralDogmaExceptionTranslator::new)
695 .decorate(THttpService.newDecorator());
696
697 if (cfg.isCsrfTokenRequiredForThrift()) {
698 thriftService = thriftService.decorate(AuthService.newDecorator(new CsrfTokenAuthorizer()));
699 } else {
700 thriftService = thriftService.decorate(TokenlessClientLogger::new);
701 }
702
703
704 thriftService = thriftService.decorate(contentEncodingDecorator());
705
706 sb.service("/cd/thrift/v1", thriftService);
707 }
708
709 private void configureHttpApi(ServerBuilder sb,
710 ProjectApiManager projectApiManager, CommandExecutor executor,
711 WatchService watchService, MetadataService mds,
712 @Nullable AuthProvider authProvider,
713 @Nullable SessionManager sessionManager) {
714 Function<? super HttpService, ? extends HttpService> decorator;
715
716 if (authProvider != null) {
717 sb.service("/security_enabled", new AbstractHttpService() {
718 @Override
719 protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) {
720 return HttpResponse.of(HttpStatus.OK);
721 }
722 });
723
724 final AuthConfig authCfg = cfg.authConfig();
725 assert authCfg != null : "authCfg";
726 assert sessionManager != null : "sessionManager";
727 final Authorizer<HttpRequest> tokenAuthorizer =
728 new ApplicationTokenAuthorizer(mds::findTokenBySecret)
729 .orElse(new SessionTokenAuthorizer(sessionManager,
730 authCfg.administrators()));
731 decorator = MetadataServiceInjector
732 .newDecorator(mds)
733 .andThen(AuthService.builder()
734 .add(tokenAuthorizer)
735 .onFailure(new CentralDogmaAuthFailureHandler())
736 .newDecorator());
737 } else {
738 decorator = MetadataServiceInjector
739 .newDecorator(mds)
740 .andThen(AuthService.newDecorator(new CsrfTokenAuthorizer()));
741 }
742
743 final HttpApiRequestConverter v1RequestConverter = new HttpApiRequestConverter(projectApiManager);
744 final JacksonRequestConverterFunction jacksonRequestConverterFunction =
745
746
747 new JacksonRequestConverterFunction(new ObjectMapper());
748
749 final HttpApiResponseConverter v1ResponseConverter = new HttpApiResponseConverter();
750
751
752 decorator = decorator.andThen(contentEncodingDecorator());
753
754 assert statusManager != null;
755 sb.annotatedService(API_V1_PATH_PREFIX,
756 new AdministrativeService(executor, statusManager), decorator,
757 v1RequestConverter, jacksonRequestConverterFunction, v1ResponseConverter);
758 sb.annotatedService(API_V1_PATH_PREFIX,
759 new ProjectServiceV1(projectApiManager), decorator,
760 v1RequestConverter, jacksonRequestConverterFunction, v1ResponseConverter);
761 sb.annotatedService(API_V1_PATH_PREFIX,
762 new RepositoryServiceV1(executor, mds), decorator,
763 v1RequestConverter, jacksonRequestConverterFunction, v1ResponseConverter);
764 sb.annotatedService()
765 .pathPrefix(API_V1_PATH_PREFIX)
766 .defaultServiceNaming(new ServiceNaming() {
767 private final String serviceName = ContentServiceV1.class.getName();
768 private final String watchServiceName =
769 serviceName.replace("ContentServiceV1", "WatchContentServiceV1");
770
771 @Override
772 public String serviceName(ServiceRequestContext ctx) {
773 if (ctx.request().headers().contains(HttpHeaderNames.IF_NONE_MATCH)) {
774 return watchServiceName;
775 }
776 return serviceName;
777 }
778 })
779 .decorator(decorator)
780 .requestConverters(v1RequestConverter, jacksonRequestConverterFunction)
781 .responseConverters(v1ResponseConverter)
782 .build(new ContentServiceV1(executor, watchService));
783
784 if (authProvider != null) {
785 final AuthConfig authCfg = cfg.authConfig();
786 assert authCfg != null : "authCfg";
787 sb.annotatedService(API_V1_PATH_PREFIX,
788 new MetadataApiService(mds, authCfg.loginNameNormalizer()),
789 decorator, v1RequestConverter, jacksonRequestConverterFunction,
790 v1ResponseConverter);
791 sb.annotatedService(API_V1_PATH_PREFIX, new TokenService(executor, mds),
792 decorator, v1RequestConverter, jacksonRequestConverterFunction,
793 v1ResponseConverter);
794
795
796 Optional.ofNullable(authProvider.loginApiService())
797 .ifPresent(login -> LOGIN_API_ROUTES.forEach(mapping -> sb.service(mapping, login)));
798
799
800 final HttpService logout =
801 Optional.ofNullable(authProvider.logoutApiService())
802 .orElseGet(() -> new DefaultLogoutService(executor));
803 for (Route route : LOGOUT_API_ROUTES) {
804 sb.service(route, decorator.apply(logout));
805 }
806
807 authProvider.moreServices().forEach(sb::service);
808 }
809
810 if (cfg.isWebAppEnabled()) {
811 final RestfulJsonResponseConverter httpApiV0Converter = new RestfulJsonResponseConverter();
812
813
814 sb.annotatedService(API_V0_PATH_PREFIX, new UserService(executor),
815 decorator, jacksonRequestConverterFunction, httpApiV0Converter)
816 .annotatedService(API_V0_PATH_PREFIX, new RepositoryService(projectApiManager, executor),
817 decorator, jacksonRequestConverterFunction, httpApiV0Converter);
818
819 if (authProvider != null) {
820
821 sb.service(LOGIN_PATH, authProvider.webLoginService());
822
823 sb.service(LOGOUT_PATH, authProvider.webLogoutService());
824
825 sb.serviceUnder(BUILTIN_WEB_BASE_PATH, new OrElseDefaultHttpFileService(
826 FileService.builder(CentralDogma.class.getClassLoader(), "auth-webapp")
827 .autoDecompress(true)
828 .serveCompressedFiles(true)
829 .cacheControl(ServerCacheControl.REVALIDATED)
830 .build(),
831 "/index.html"));
832 }
833 sb.serviceUnder("/",
834 FileService.builder(CentralDogma.class.getClassLoader(), "webapp")
835 .cacheControl(ServerCacheControl.REVALIDATED)
836 .build());
837 }
838 }
839
840 private static void configCors(ServerBuilder sb, @Nullable CorsConfig corsConfig) {
841 if (corsConfig == null) {
842 return;
843 }
844
845 sb.decorator(CorsService.builder(corsConfig.allowedOrigins())
846 .allowRequestMethods(HttpMethod.knownMethods())
847 .allowAllRequestHeaders(true)
848 .allowCredentials()
849 .maxAge(corsConfig.maxAgeSeconds())
850 .newDecorator());
851 }
852
853 private static Function<? super HttpService, EncodingService> contentEncodingDecorator() {
854 return delegate -> EncodingService
855 .builder()
856 .encodableContentTypes(contentType -> {
857 if ("application".equals(contentType.type())) {
858 final String subtype = contentType.subtype();
859 switch (subtype) {
860 case "json":
861 case "xml":
862 case "x-thrift":
863 return true;
864 default:
865 return subtype.endsWith("+json") ||
866 subtype.endsWith("+xml") ||
867 subtype.startsWith("vnd.apache.thrift.");
868 }
869 }
870 return false;
871 })
872 .build(delegate);
873 }
874
875 private void configureMetrics(ServerBuilder sb, MeterRegistry registry) {
876 sb.meterRegistry(registry);
877
878
879
880 if (registry instanceof PrometheusMeterRegistry) {
881 final PrometheusMeterRegistry prometheusMeterRegistry = (PrometheusMeterRegistry) registry;
882 sb.service(METRICS_PATH,
883 PrometheusExpositionService.of(prometheusMeterRegistry.getPrometheusRegistry()));
884 } else if (registry instanceof CompositeMeterRegistry) {
885 final PrometheusMeterRegistry prometheusMeterRegistry = PrometheusMeterRegistries.newRegistry();
886 ((CompositeMeterRegistry) registry).add(prometheusMeterRegistry);
887 sb.service(METRICS_PATH,
888 PrometheusExpositionService.of(prometheusMeterRegistry.getPrometheusRegistry()));
889 meterRegistryToBeClosed = prometheusMeterRegistry;
890 } else {
891 logger.info("Not exposing a prometheus endpoint for the type: {}", registry.getClass());
892 }
893
894 sb.decorator(MetricCollectingService.newDecorator(repoAwareMeterIdPrefixFunction()));
895
896
897 new FileDescriptorMetrics().bindTo(registry);
898 new ProcessorMetrics().bindTo(registry);
899 new ClassLoaderMetrics().bindTo(registry);
900 new UptimeMetrics().bindTo(registry);
901 new DiskSpaceMetrics(cfg.dataDir()).bindTo(registry);
902 new JvmGcMetrics().bindTo(registry);
903 new JvmMemoryMetrics().bindTo(registry);
904 new JvmThreadMetrics().bindTo(registry);
905
906
907 ExecutorServiceMetrics.monitor(registry, ForkJoinPool.commonPool(), "commonPool");
908 }
909
910 private static MeterIdPrefixFunction repoAwareMeterIdPrefixFunction() {
911 final MeterIdPrefixFunction meterIdPrefix = MeterIdPrefixFunction.ofDefault("api");
912 return meterIdPrefix.andThen((registry0, log, meterIdPrefix0) -> {
913 final ServiceRequestContext ctx = (ServiceRequestContext) log.context();
914 final String project = firstNonNull(ctx.pathParam("projectName"), "none");
915 final String repo = firstNonNull(ctx.pathParam("repoName"), "none");
916 return meterIdPrefix0.withTags("project", project, "repository", repo);
917 });
918 }
919
920 private void doStop() {
921 if (server == null) {
922 return;
923 }
924
925 final Server server = this.server;
926 final CommandExecutor executor = this.executor;
927 final ProjectManager pm = this.pm;
928 final ExecutorService repositoryWorker = this.repositoryWorker;
929 final ExecutorService purgeWorker = this.purgeWorker;
930 final SessionManager sessionManager = this.sessionManager;
931
932 this.server = null;
933 this.executor = null;
934 this.pm = null;
935 this.repositoryWorker = null;
936 this.sessionManager = null;
937 if (meterRegistryToBeClosed != null) {
938 assert meterRegistry instanceof CompositeMeterRegistry;
939 ((CompositeMeterRegistry) meterRegistry).remove(meterRegistryToBeClosed);
940 meterRegistryToBeClosed.close();
941 meterRegistryToBeClosed = null;
942 }
943
944 logger.info("Stopping the Central Dogma ..");
945 if (!doStop(server, executor, pm, repositoryWorker, purgeWorker, sessionManager)) {
946 logger.warn("Stopped the Central Dogma with failure.");
947 } else {
948 logger.info("Stopped the Central Dogma successfully.");
949 }
950 }
951
952 private static boolean doStop(
953 @Nullable Server server, @Nullable CommandExecutor executor,
954 @Nullable ProjectManager pm,
955 @Nullable ExecutorService repositoryWorker, @Nullable ExecutorService purgeWorker,
956 @Nullable SessionManager sessionManager) {
957
958 boolean success = true;
959 try {
960 if (sessionManager != null) {
961 logger.info("Stopping the session manager ..");
962 sessionManager.close();
963 logger.info("Stopped the session manager.");
964 }
965 } catch (Throwable t) {
966 success = false;
967 logger.warn("Failed to stop the session manager:", t);
968 }
969
970 try {
971 if (pm != null) {
972 logger.info("Stopping the project manager ..");
973 pm.close(ShuttingDownException::new);
974 logger.info("Stopped the project manager.");
975 }
976 } catch (Throwable t) {
977 success = false;
978 logger.warn("Failed to stop the project manager:", t);
979 }
980
981 try {
982 if (executor != null) {
983 logger.info("Stopping the command executor ..");
984 executor.stop();
985 logger.info("Stopped the command executor.");
986 }
987 } catch (Throwable t) {
988 success = false;
989 logger.warn("Failed to stop the command executor:", t);
990 }
991
992 final BiFunction<ExecutorService, String, Boolean> stopWorker = (worker, name) -> {
993 try {
994 if (worker != null && !worker.isTerminated()) {
995 logger.info("Stopping the {} worker ..", name);
996 boolean interruptLater = false;
997 while (!worker.isTerminated()) {
998 worker.shutdownNow();
999 try {
1000 worker.awaitTermination(1, TimeUnit.SECONDS);
1001 } catch (InterruptedException e) {
1002
1003 interruptLater = true;
1004 }
1005 }
1006 logger.info("Stopped the {} worker.", name);
1007
1008 if (interruptLater) {
1009 Thread.currentThread().interrupt();
1010 }
1011 }
1012 return true;
1013 } catch (Throwable t) {
1014 logger.warn("Failed to stop the " + name + " worker:", t);
1015 return false;
1016 }
1017 };
1018 if (!stopWorker.apply(repositoryWorker, "repository")) {
1019 success = false;
1020 }
1021 if (!stopWorker.apply(purgeWorker, "purge")) {
1022 success = false;
1023 }
1024
1025 try {
1026 if (server != null) {
1027 logger.info("Stopping the RPC server ..");
1028 server.stop().join();
1029 logger.info("Stopped the RPC server.");
1030 }
1031 } catch (Throwable t) {
1032 success = false;
1033 logger.warn("Failed to stop the RPC server:", t);
1034 }
1035
1036 return success;
1037 }
1038
1039 private final class CentralDogmaStartStop extends StartStopSupport<Void, Void, Void, Void> {
1040
1041 @Nullable
1042 private final PluginGroup pluginsForAllReplicas;
1043
1044 CentralDogmaStartStop(@Nullable PluginGroup pluginsForAllReplicas) {
1045 super(GlobalEventExecutor.INSTANCE);
1046 this.pluginsForAllReplicas = pluginsForAllReplicas;
1047 }
1048
1049 @Override
1050 protected CompletionStage<Void> doStart(@Nullable Void unused) throws Exception {
1051 return execute("startup", () -> {
1052 try {
1053 CentralDogma.this.doStart();
1054 if (pluginsForAllReplicas != null) {
1055 final ProjectManager pm = CentralDogma.this.pm;
1056 final CommandExecutor executor = CentralDogma.this.executor;
1057 final MeterRegistry meterRegistry = CentralDogma.this.meterRegistry;
1058 if (pm != null && executor != null && meterRegistry != null) {
1059 pluginsForAllReplicas.start(cfg, pm, executor, meterRegistry, purgeWorker).join();
1060 }
1061 }
1062 } catch (Exception e) {
1063 Exceptions.throwUnsafely(e);
1064 }
1065 });
1066 }
1067
1068 @Override
1069 protected CompletionStage<Void> doStop(@Nullable Void unused) throws Exception {
1070 return execute("shutdown", () -> {
1071 if (pluginsForAllReplicas != null) {
1072 final ProjectManager pm = CentralDogma.this.pm;
1073 final CommandExecutor executor = CentralDogma.this.executor;
1074 final MeterRegistry meterRegistry = CentralDogma.this.meterRegistry;
1075 if (pm != null && executor != null && meterRegistry != null) {
1076 pluginsForAllReplicas.stop(cfg, pm, executor, meterRegistry, purgeWorker).join();
1077 }
1078 }
1079 CentralDogma.this.doStop();
1080 });
1081 }
1082
1083 private CompletionStage<Void> execute(String mode, Runnable task) {
1084 final CompletableFuture<Void> future = new CompletableFuture<>();
1085 final Thread thread = new Thread(() -> {
1086 try {
1087 task.run();
1088 future.complete(null);
1089 } catch (Throwable cause) {
1090 future.completeExceptionally(cause);
1091 }
1092 }, "dogma-" + mode + "-0x" + Long.toHexString(CentralDogma.this.hashCode() & 0xFFFFFFFFL));
1093 thread.start();
1094 return future;
1095 }
1096 }
1097 }