1   /*
2    * Copyright 2018 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.server.internal.replication;
17  
18  import static com.google.common.base.MoreObjects.firstNonNull;
19  
20  import java.io.IOException;
21  import java.net.InetSocketAddress;
22  import java.util.concurrent.TimeUnit;
23  import java.util.function.ToDoubleFunction;
24  
25  import javax.annotation.Nullable;
26  
27  import org.apache.zookeeper.server.DataTree;
28  import org.apache.zookeeper.server.DatadirCleanupManager;
29  import org.apache.zookeeper.server.PurgeTxnLog;
30  import org.apache.zookeeper.server.ServerCnxnFactory;
31  import org.apache.zookeeper.server.ServerStats;
32  import org.apache.zookeeper.server.ZKDatabase;
33  import org.apache.zookeeper.server.ZooKeeperServer;
34  import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
35  import org.apache.zookeeper.server.quorum.QuorumPeer;
36  import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
37  import org.slf4j.Logger;
38  import org.slf4j.LoggerFactory;
39  
40  import io.micrometer.core.instrument.FunctionCounter;
41  import io.micrometer.core.instrument.Gauge;
42  import io.micrometer.core.instrument.MeterRegistry;
43  import io.micrometer.core.instrument.TimeGauge;
44  
45  final class EmbeddedZooKeeper extends QuorumPeer {
46  
47      private static final Logger logger = LoggerFactory.getLogger(EmbeddedZooKeeper.class);
48  
49      static final String SASL_SERVER_LOGIN_CONTEXT = "QuorumServer";
50      static final String SASL_LEARNER_LOGIN_CONTEXT = "QuorumLearner";
51  
52      private static final ServerStats EMPTY_STATS = new InactiveServerStats();
53  
54      private final ServerCnxnFactory cnxnFactory;
55      private final DatadirCleanupManager purgeManager;
56  
57      EmbeddedZooKeeper(QuorumPeerConfig zkCfg, MeterRegistry meterRegistry) throws IOException {
58          cnxnFactory = createCnxnFactory(zkCfg);
59  
60          setTxnFactory(new FileTxnSnapLog(zkCfg.getDataLogDir(), zkCfg.getDataDir()));
61          enableLocalSessions(zkCfg.areLocalSessionsEnabled());
62          enableLocalSessionsUpgrading(zkCfg.isLocalSessionsUpgradingEnabled());
63          setElectionType(zkCfg.getElectionAlg());
64          setMyid(zkCfg.getServerId());
65          setTickTime(zkCfg.getTickTime());
66          setMinSessionTimeout(zkCfg.getMinSessionTimeout());
67          setMaxSessionTimeout(zkCfg.getMaxSessionTimeout());
68          setInitLimit(zkCfg.getInitLimit());
69          setSyncLimit(zkCfg.getSyncLimit());
70          setConfigFileName(zkCfg.getConfigFilename());
71          setZKDatabase(new ZKDatabase(getTxnFactory()));
72          setQuorumVerifier(zkCfg.getQuorumVerifier(), false);
73          if (zkCfg.getLastSeenQuorumVerifier() != null) {
74              setLastSeenQuorumVerifier(zkCfg.getLastSeenQuorumVerifier(), false);
75          }
76          initConfigInZKDatabase();
77          setCnxnFactory(cnxnFactory);
78          setLearnerType(zkCfg.getPeerType());
79          setSyncEnabled(zkCfg.getSyncEnabled());
80          setQuorumListenOnAllIPs(zkCfg.getQuorumListenOnAllIPs());
81          setMultiAddressEnabled(false);
82  
83          configureSasl();
84  
85          purgeManager = new DatadirCleanupManager(zkCfg.getDataDir(), zkCfg.getDataLogDir(),
86                                                   zkCfg.getSnapRetainCount(), zkCfg.getPurgeInterval());
87  
88          // Bind meters that indicates the ZooKeeper stats.
89          TimeGauge.builder("replica.zk.latency", this, TimeUnit.MILLISECONDS,
90                            self -> serverStats(self).getAvgLatency())
91                   .tag("type", "avg")
92                   .register(meterRegistry);
93          TimeGauge.builder("replica.zk.latency", this, TimeUnit.MILLISECONDS,
94                            self -> serverStats(self).getMaxLatency())
95                   .tag("type", "max")
96                   .register(meterRegistry);
97          TimeGauge.builder("replica.zk.latency", this, TimeUnit.MILLISECONDS,
98                            self -> serverStats(self).getMinLatency())
99                   .tag("type", "min")
100                  .register(meterRegistry);
101 
102         Gauge.builder("replica.zk.outstanding.requests", this,
103                       self -> serverStats(self).getOutstandingRequests())
104              .register(meterRegistry);
105 
106         Gauge.builder("replica.zk.last.processed.zxid", this,
107                       self -> serverStats(self).getLastProcessedZxid())
108              .register(meterRegistry);
109 
110         Gauge.builder("replica.zk.data.dir.size", this,
111                       self -> serverStats(self).getDataDirSize())
112              .baseUnit("bytes")
113              .register(meterRegistry);
114 
115         Gauge.builder("replica.zk.log.dir.size", this,
116                       self -> serverStats(self).getLogDirSize())
117              .baseUnit("bytes")
118              .register(meterRegistry);
119 
120         FunctionCounter.builder("replica.zk.packets.received", this,
121                                 self -> serverStats(self).getPacketsReceived())
122                        .register(meterRegistry);
123 
124         FunctionCounter.builder("replica.zk.packets.sent", this,
125                                 self -> serverStats(self).getPacketsSent())
126                        .register(meterRegistry);
127 
128         Gauge.builder("replica.zk.alive.client.connections", this,
129                       self -> serverStats(self).getNumAliveClientConnections())
130              .register(meterRegistry);
131 
132         Gauge.builder("replica.zk.state", this,
133                       self -> {
134                           final String state = serverStats(self).getServerState();
135                           if (state == null) {
136                               return 0;
137                           }
138                           switch (state) {
139                               case "leader":
140                                   return 1;
141                               case "follower":
142                                   return 2;
143                               case "observer":
144                                   return 3;
145                               case "read-only":
146                                   return 4;
147                               default:
148                                   return 5; // Unknown
149                           }
150                       })
151              .description("0 = inactive, 1 = leader, 2 = follower, 3 = observer, 4 = read-only, 5 = unknown")
152              .register(meterRegistry);
153 
154         // Bind the meters pulled in from DataTree.
155         Gauge.builder("replica.zk.approximate.data.size", this, new ApproximateDataSizeFunction())
156              .baseUnit("bytes")
157              .register(meterRegistry);
158 
159         Gauge.builder("replica.zk.nodes", this,
160                       self -> {
161                           final DataTree tree = dataTree(self);
162                           return tree != null ? tree.getNodeCount() : 0;
163                       })
164              .register(meterRegistry);
165 
166         Gauge.builder("replica.zk.ephemerals", this,
167                       self -> {
168                           final DataTree tree = dataTree(self);
169                           return tree != null ? tree.getEphemeralsCount() : 0;
170                       })
171              .register(meterRegistry);
172 
173         Gauge.builder("replica.zk.watches", this,
174                       self -> {
175                           final DataTree tree = dataTree(self);
176                           return tree != null ? tree.getWatchCount() : 0;
177                       })
178              .register(meterRegistry);
179     }
180 
181     private static ServerStats serverStats(@Nullable EmbeddedZooKeeper peer) {
182         if (peer == null) {
183             return EMPTY_STATS;
184         }
185 
186         final ZooKeeperServer activeServer = peer.getActiveServer();
187         if (activeServer == null) {
188             return EMPTY_STATS;
189         }
190 
191         final ServerStats stats = activeServer.serverStats();
192         return firstNonNull(stats, EMPTY_STATS);
193     }
194 
195     @Nullable
196     private static DataTree dataTree(@Nullable EmbeddedZooKeeper peer) {
197         if (peer == null) {
198             return null;
199         }
200 
201         final ZooKeeperServer activeServer = peer.getActiveServer();
202         if (activeServer == null) {
203             return null;
204         }
205 
206         final ZKDatabase database = activeServer.getZKDatabase();
207         if (database == null) {
208             return null;
209         }
210 
211         return database.getDataTree();
212     }
213 
214     private static ServerCnxnFactory createCnxnFactory(QuorumPeerConfig zkCfg) throws IOException {
215         final InetSocketAddress bindAddr = zkCfg.getClientPortAddress();
216         final ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
217         // Listen only on 127.0.0.1 because we do not want to expose ZooKeeper to others.
218         cnxnFactory.configure(new InetSocketAddress("127.0.0.1", bindAddr != null ? bindAddr.getPort() : 0),
219                               zkCfg.getMaxClientCnxns());
220         return cnxnFactory;
221     }
222 
223     private void configureSasl() {
224         quorumServerSaslAuthRequired = true;
225         quorumLearnerSaslAuthRequired = true;
226         quorumServerLoginContext = SASL_SERVER_LOGIN_CONTEXT;
227         quorumLearnerLoginContext = SASL_LEARNER_LOGIN_CONTEXT;
228     }
229 
230     @Override
231     public synchronized void start() {
232         purgeTxnLogs();
233         purgeManager.start();
234         super.start();
235     }
236 
237     @Override
238     public void shutdown() {
239         // Close the network stack first so that the shutdown process is done quickly.
240         cnxnFactory.shutdown();
241         purgeManager.shutdown();
242         super.shutdown();
243     }
244 
245     private void purgeTxnLogs() {
246         logger.info("Purging old ZooKeeper snapshots and logs ..");
247         try {
248             PurgeTxnLog.purge(purgeManager.getDataLogDir(),
249                               purgeManager.getSnapDir(),
250                               purgeManager.getSnapRetainCount());
251             logger.info("Purged old ZooKeeper snapshots and logs.");
252         } catch (IOException e) {
253             logger.error("Failed to purge old ZooKeeper snapshots and logs:", e);
254         }
255     }
256 
257     private static final class InactiveServerStats extends ServerStats {
258         InactiveServerStats() {
259             super(new Provider() {
260                 @Override
261                 public long getOutstandingRequests() {
262                     return 0;
263                 }
264 
265                 @Override
266                 public long getLastProcessedZxid() {
267                     return 0;
268                 }
269 
270                 @Nullable
271                 @Override
272                 public String getState() {
273                     return null;
274                 }
275 
276                 @Override
277                 public int getNumAliveConnections() {
278                     return 0;
279                 }
280 
281                 @Override
282                 public long getDataDirSize() {
283                     return 0;
284                 }
285 
286                 @Override
287                 public long getLogDirSize() {
288                     return 0;
289                 }
290             });
291         }
292     }
293 
294     /**
295      * Caches {@link DataTree#approximateDataSize()} for 3 seconds, because it's relatively an expensive
296      * operation.
297      */
298     private static class ApproximateDataSizeFunction implements ToDoubleFunction<EmbeddedZooKeeper> {
299 
300         private static final long MIN_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(3);
301 
302         private volatile long lastCalculationTimeNanos = System.nanoTime() - MIN_INTERVAL_NANOS;
303 
304         /**
305          * No need to be volatile because it's guarded by {@link #lastCalculationTimeNanos}.
306          */
307         private long value;
308 
309         @Override
310         public double applyAsDouble(EmbeddedZooKeeper self) {
311             final long currentTimeNanos = System.nanoTime();
312             if (currentTimeNanos - lastCalculationTimeNanos < MIN_INTERVAL_NANOS) {
313                 return value;
314             }
315 
316             final DataTree tree = dataTree(self);
317             final long value = tree != null ? tree.approximateDataSize() : 0;
318             this.value = value;
319             lastCalculationTimeNanos = currentTimeNanos;
320             return value;
321         }
322     }
323 }