1   /*
2    * Copyright 2019 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.server.internal.mirror;
17  
18  import static java.util.Objects.requireNonNull;
19  
20  import java.io.File;
21  import java.util.concurrent.CompletableFuture;
22  import java.util.concurrent.CompletionStage;
23  
24  import javax.annotation.Nullable;
25  
26  import com.google.common.base.MoreObjects;
27  
28  import com.linecorp.centraldogma.server.CentralDogmaConfig;
29  import com.linecorp.centraldogma.server.ZoneConfig;
30  import com.linecorp.centraldogma.server.mirror.MirroringServicePluginConfig;
31  import com.linecorp.centraldogma.server.plugin.Plugin;
32  import com.linecorp.centraldogma.server.plugin.PluginContext;
33  import com.linecorp.centraldogma.server.plugin.PluginTarget;
34  
35  public final class DefaultMirroringServicePlugin implements Plugin {
36  
37      @Nullable
38      public static MirroringServicePluginConfig mirrorConfig(CentralDogmaConfig config) {
39          return (MirroringServicePluginConfig) config.pluginConfigMap().get(MirroringServicePluginConfig.class);
40      }
41  
42      @Nullable
43      private volatile MirrorSchedulingService mirroringService;
44  
45      @Nullable
46      private PluginTarget pluginTarget;
47  
48      @Override
49      public PluginTarget target(CentralDogmaConfig config) {
50          requireNonNull(config, "config");
51          if (pluginTarget != null) {
52              return pluginTarget;
53          }
54  
55          final MirroringServicePluginConfig mirrorConfig = mirrorConfig(config);
56          if (mirrorConfig != null && mirrorConfig.zonePinned()) {
57              pluginTarget = PluginTarget.ZONE_LEADER_ONLY;
58          } else {
59              pluginTarget = PluginTarget.LEADER_ONLY;
60          }
61          return pluginTarget;
62      }
63  
64      @Override
65      public synchronized CompletionStage<Void> start(PluginContext context) {
66          requireNonNull(context, "context");
67  
68          MirrorSchedulingService mirroringService = this.mirroringService;
69          if (mirroringService == null) {
70              final CentralDogmaConfig cfg = context.config();
71              final MirroringServicePluginConfig mirroringServicePluginConfig = mirrorConfig(cfg);
72              final int numThreads;
73              final int maxNumFilesPerMirror;
74              final long maxNumBytesPerMirror;
75              final ZoneConfig zoneConfig;
76              final boolean runMigration;
77  
78              if (mirroringServicePluginConfig != null) {
79                  numThreads = mirroringServicePluginConfig.numMirroringThreads();
80                  maxNumFilesPerMirror = mirroringServicePluginConfig.maxNumFilesPerMirror();
81                  maxNumBytesPerMirror = mirroringServicePluginConfig.maxNumBytesPerMirror();
82                  if (mirroringServicePluginConfig.zonePinned()) {
83                      zoneConfig = cfg.zone();
84                      assert zoneConfig != null : "zonePinned is enabled but no zone configuration found";
85                  } else {
86                      zoneConfig = null;
87                  }
88                  runMigration = mirroringServicePluginConfig.runMigration();
89              } else {
90                  numThreads = MirroringServicePluginConfig.INSTANCE.numMirroringThreads();
91                  maxNumFilesPerMirror = MirroringServicePluginConfig.INSTANCE.maxNumFilesPerMirror();
92                  maxNumBytesPerMirror = MirroringServicePluginConfig.INSTANCE.maxNumBytesPerMirror();
93                  zoneConfig = null;
94                  runMigration = true;
95              }
96              mirroringService = new MirrorSchedulingService(new File(cfg.dataDir(), "_mirrors"),
97                                                             context.projectManager(),
98                                                             context.meterRegistry(),
99                                                             numThreads,
100                                                            maxNumFilesPerMirror,
101                                                            maxNumBytesPerMirror, zoneConfig, runMigration,
102                                                            context.mirrorAccessController());
103             this.mirroringService = mirroringService;
104         }
105         mirroringService.start(context.commandExecutor());
106         return CompletableFuture.completedFuture(null);
107     }
108 
109     @Override
110     public synchronized CompletionStage<Void> stop(PluginContext context) {
111         final MirrorSchedulingService mirroringService = this.mirroringService;
112         if (mirroringService != null && mirroringService.isStarted()) {
113             mirroringService.stop();
114         }
115         return CompletableFuture.completedFuture(null);
116     }
117 
118     @Override
119     public Class<?> configType() {
120         return MirroringServicePluginConfig.class;
121     }
122 
123     @Nullable
124     public MirrorSchedulingService mirroringService() {
125         return mirroringService;
126     }
127 
128     @Override
129     public String toString() {
130         return MoreObjects.toStringHelper(this)
131                           .omitNullValues()
132                           .add("configType", configType().getName())
133                           .add("target", pluginTarget)
134                           .toString();
135     }
136 }