1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.server.internal.mirror;
17
18 import static java.util.Objects.requireNonNull;
19
20 import java.io.File;
21 import java.util.concurrent.CompletableFuture;
22 import java.util.concurrent.CompletionStage;
23
24 import javax.annotation.Nullable;
25
26 import com.google.common.base.MoreObjects;
27
28 import com.linecorp.centraldogma.server.CentralDogmaConfig;
29 import com.linecorp.centraldogma.server.ZoneConfig;
30 import com.linecorp.centraldogma.server.mirror.MirroringServicePluginConfig;
31 import com.linecorp.centraldogma.server.plugin.Plugin;
32 import com.linecorp.centraldogma.server.plugin.PluginContext;
33 import com.linecorp.centraldogma.server.plugin.PluginTarget;
34
35 public final class DefaultMirroringServicePlugin implements Plugin {
36
37 @Nullable
38 public static MirroringServicePluginConfig mirrorConfig(CentralDogmaConfig config) {
39 return (MirroringServicePluginConfig) config.pluginConfigMap().get(MirroringServicePluginConfig.class);
40 }
41
42 @Nullable
43 private volatile MirrorSchedulingService mirroringService;
44
45 @Nullable
46 private PluginTarget pluginTarget;
47
48 @Override
49 public PluginTarget target(CentralDogmaConfig config) {
50 requireNonNull(config, "config");
51 if (pluginTarget != null) {
52 return pluginTarget;
53 }
54
55 final MirroringServicePluginConfig mirrorConfig = mirrorConfig(config);
56 if (mirrorConfig != null && mirrorConfig.zonePinned()) {
57 pluginTarget = PluginTarget.ZONE_LEADER_ONLY;
58 } else {
59 pluginTarget = PluginTarget.LEADER_ONLY;
60 }
61 return pluginTarget;
62 }
63
64 @Override
65 public synchronized CompletionStage<Void> start(PluginContext context) {
66 requireNonNull(context, "context");
67
68 MirrorSchedulingService mirroringService = this.mirroringService;
69 if (mirroringService == null) {
70 final CentralDogmaConfig cfg = context.config();
71 final MirroringServicePluginConfig mirroringServicePluginConfig = mirrorConfig(cfg);
72 final int numThreads;
73 final int maxNumFilesPerMirror;
74 final long maxNumBytesPerMirror;
75 final ZoneConfig zoneConfig;
76 final boolean runMigration;
77
78 if (mirroringServicePluginConfig != null) {
79 numThreads = mirroringServicePluginConfig.numMirroringThreads();
80 maxNumFilesPerMirror = mirroringServicePluginConfig.maxNumFilesPerMirror();
81 maxNumBytesPerMirror = mirroringServicePluginConfig.maxNumBytesPerMirror();
82 if (mirroringServicePluginConfig.zonePinned()) {
83 zoneConfig = cfg.zone();
84 assert zoneConfig != null : "zonePinned is enabled but no zone configuration found";
85 } else {
86 zoneConfig = null;
87 }
88 runMigration = mirroringServicePluginConfig.runMigration();
89 } else {
90 numThreads = MirroringServicePluginConfig.INSTANCE.numMirroringThreads();
91 maxNumFilesPerMirror = MirroringServicePluginConfig.INSTANCE.maxNumFilesPerMirror();
92 maxNumBytesPerMirror = MirroringServicePluginConfig.INSTANCE.maxNumBytesPerMirror();
93 zoneConfig = null;
94 runMigration = true;
95 }
96 mirroringService = new MirrorSchedulingService(new File(cfg.dataDir(), "_mirrors"),
97 context.projectManager(),
98 context.meterRegistry(),
99 numThreads,
100 maxNumFilesPerMirror,
101 maxNumBytesPerMirror, zoneConfig, runMigration,
102 context.mirrorAccessController());
103 this.mirroringService = mirroringService;
104 }
105 mirroringService.start(context.commandExecutor());
106 return CompletableFuture.completedFuture(null);
107 }
108
109 @Override
110 public synchronized CompletionStage<Void> stop(PluginContext context) {
111 final MirrorSchedulingService mirroringService = this.mirroringService;
112 if (mirroringService != null && mirroringService.isStarted()) {
113 mirroringService.stop();
114 }
115 return CompletableFuture.completedFuture(null);
116 }
117
118 @Override
119 public Class<?> configType() {
120 return MirroringServicePluginConfig.class;
121 }
122
123 @Nullable
124 public MirrorSchedulingService mirroringService() {
125 return mirroringService;
126 }
127
128 @Override
129 public String toString() {
130 return MoreObjects.toStringHelper(this)
131 .omitNullValues()
132 .add("configType", configType().getName())
133 .add("target", pluginTarget)
134 .toString();
135 }
136 }