1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.server.internal.replication;
17
18 import static com.google.common.base.MoreObjects.firstNonNull;
19
20 import java.io.IOException;
21 import java.net.InetSocketAddress;
22 import java.util.concurrent.TimeUnit;
23 import java.util.function.ToDoubleFunction;
24
25 import javax.annotation.Nullable;
26
27 import org.apache.zookeeper.server.DataTree;
28 import org.apache.zookeeper.server.DatadirCleanupManager;
29 import org.apache.zookeeper.server.PurgeTxnLog;
30 import org.apache.zookeeper.server.ServerCnxnFactory;
31 import org.apache.zookeeper.server.ServerStats;
32 import org.apache.zookeeper.server.ZKDatabase;
33 import org.apache.zookeeper.server.ZooKeeperServer;
34 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
35 import org.apache.zookeeper.server.quorum.QuorumPeer;
36 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import io.micrometer.core.instrument.FunctionCounter;
41 import io.micrometer.core.instrument.Gauge;
42 import io.micrometer.core.instrument.MeterRegistry;
43 import io.micrometer.core.instrument.TimeGauge;
44
45 final class EmbeddedZooKeeper extends QuorumPeer {
46
47 private static final Logger logger = LoggerFactory.getLogger(EmbeddedZooKeeper.class);
48
49 static final String SASL_SERVER_LOGIN_CONTEXT = "QuorumServer";
50 static final String SASL_LEARNER_LOGIN_CONTEXT = "QuorumLearner";
51
52 private static final ServerStats EMPTY_STATS = new InactiveServerStats();
53
54 private final ServerCnxnFactory cnxnFactory;
55 private final DatadirCleanupManager purgeManager;
56
57 EmbeddedZooKeeper(QuorumPeerConfig zkCfg, MeterRegistry meterRegistry) throws IOException {
58 cnxnFactory = createCnxnFactory(zkCfg);
59
60 setTxnFactory(new FileTxnSnapLog(zkCfg.getDataLogDir(), zkCfg.getDataDir()));
61 enableLocalSessions(zkCfg.areLocalSessionsEnabled());
62 enableLocalSessionsUpgrading(zkCfg.isLocalSessionsUpgradingEnabled());
63 setElectionType(zkCfg.getElectionAlg());
64 setMyid(zkCfg.getServerId());
65 setTickTime(zkCfg.getTickTime());
66 setMinSessionTimeout(zkCfg.getMinSessionTimeout());
67 setMaxSessionTimeout(zkCfg.getMaxSessionTimeout());
68 setInitLimit(zkCfg.getInitLimit());
69 setSyncLimit(zkCfg.getSyncLimit());
70 setConfigFileName(zkCfg.getConfigFilename());
71 setZKDatabase(new ZKDatabase(getTxnFactory()));
72 setQuorumVerifier(zkCfg.getQuorumVerifier(), false);
73 if (zkCfg.getLastSeenQuorumVerifier() != null) {
74 setLastSeenQuorumVerifier(zkCfg.getLastSeenQuorumVerifier(), false);
75 }
76 initConfigInZKDatabase();
77 setCnxnFactory(cnxnFactory);
78 setLearnerType(zkCfg.getPeerType());
79 setSyncEnabled(zkCfg.getSyncEnabled());
80 setQuorumListenOnAllIPs(zkCfg.getQuorumListenOnAllIPs());
81 setMultiAddressEnabled(false);
82
83 configureSasl();
84
85 purgeManager = new DatadirCleanupManager(zkCfg.getDataDir(), zkCfg.getDataLogDir(),
86 zkCfg.getSnapRetainCount(), zkCfg.getPurgeInterval());
87
88
89 TimeGauge.builder("replica.zk.latency", this, TimeUnit.MILLISECONDS,
90 self -> serverStats(self).getAvgLatency())
91 .tag("type", "avg")
92 .register(meterRegistry);
93 TimeGauge.builder("replica.zk.latency", this, TimeUnit.MILLISECONDS,
94 self -> serverStats(self).getMaxLatency())
95 .tag("type", "max")
96 .register(meterRegistry);
97 TimeGauge.builder("replica.zk.latency", this, TimeUnit.MILLISECONDS,
98 self -> serverStats(self).getMinLatency())
99 .tag("type", "min")
100 .register(meterRegistry);
101
102 Gauge.builder("replica.zk.outstanding.requests", this,
103 self -> serverStats(self).getOutstandingRequests())
104 .register(meterRegistry);
105
106 Gauge.builder("replica.zk.last.processed.zxid", this,
107 self -> serverStats(self).getLastProcessedZxid())
108 .register(meterRegistry);
109
110 Gauge.builder("replica.zk.data.dir.size", this,
111 self -> serverStats(self).getDataDirSize())
112 .baseUnit("bytes")
113 .register(meterRegistry);
114
115 Gauge.builder("replica.zk.log.dir.size", this,
116 self -> serverStats(self).getLogDirSize())
117 .baseUnit("bytes")
118 .register(meterRegistry);
119
120 FunctionCounter.builder("replica.zk.packets.received", this,
121 self -> serverStats(self).getPacketsReceived())
122 .register(meterRegistry);
123
124 FunctionCounter.builder("replica.zk.packets.sent", this,
125 self -> serverStats(self).getPacketsSent())
126 .register(meterRegistry);
127
128 Gauge.builder("replica.zk.alive.client.connections", this,
129 self -> serverStats(self).getNumAliveClientConnections())
130 .register(meterRegistry);
131
132 Gauge.builder("replica.zk.state", this,
133 self -> {
134 final String state = serverStats(self).getServerState();
135 if (state == null) {
136 return 0;
137 }
138 switch (state) {
139 case "leader":
140 return 1;
141 case "follower":
142 return 2;
143 case "observer":
144 return 3;
145 case "read-only":
146 return 4;
147 default:
148 return 5;
149 }
150 })
151 .description("0 = inactive, 1 = leader, 2 = follower, 3 = observer, 4 = read-only, 5 = unknown")
152 .register(meterRegistry);
153
154
155 Gauge.builder("replica.zk.approximate.data.size", this, new ApproximateDataSizeFunction())
156 .baseUnit("bytes")
157 .register(meterRegistry);
158
159 Gauge.builder("replica.zk.nodes", this,
160 self -> {
161 final DataTree tree = dataTree(self);
162 return tree != null ? tree.getNodeCount() : 0;
163 })
164 .register(meterRegistry);
165
166 Gauge.builder("replica.zk.ephemerals", this,
167 self -> {
168 final DataTree tree = dataTree(self);
169 return tree != null ? tree.getEphemeralsCount() : 0;
170 })
171 .register(meterRegistry);
172
173 Gauge.builder("replica.zk.watches", this,
174 self -> {
175 final DataTree tree = dataTree(self);
176 return tree != null ? tree.getWatchCount() : 0;
177 })
178 .register(meterRegistry);
179 }
180
181 private static ServerStats serverStats(@Nullable EmbeddedZooKeeper peer) {
182 if (peer == null) {
183 return EMPTY_STATS;
184 }
185
186 final ZooKeeperServer activeServer = peer.getActiveServer();
187 if (activeServer == null) {
188 return EMPTY_STATS;
189 }
190
191 final ServerStats stats = activeServer.serverStats();
192 return firstNonNull(stats, EMPTY_STATS);
193 }
194
195 @Nullable
196 private static DataTree dataTree(@Nullable EmbeddedZooKeeper peer) {
197 if (peer == null) {
198 return null;
199 }
200
201 final ZooKeeperServer activeServer = peer.getActiveServer();
202 if (activeServer == null) {
203 return null;
204 }
205
206 final ZKDatabase database = activeServer.getZKDatabase();
207 if (database == null) {
208 return null;
209 }
210
211 return database.getDataTree();
212 }
213
214 private static ServerCnxnFactory createCnxnFactory(QuorumPeerConfig zkCfg) throws IOException {
215 final InetSocketAddress bindAddr = zkCfg.getClientPortAddress();
216 final ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
217
218 cnxnFactory.configure(new InetSocketAddress("127.0.0.1", bindAddr != null ? bindAddr.getPort() : 0),
219 zkCfg.getMaxClientCnxns());
220 return cnxnFactory;
221 }
222
223 private void configureSasl() {
224 quorumServerSaslAuthRequired = true;
225 quorumLearnerSaslAuthRequired = true;
226 quorumServerLoginContext = SASL_SERVER_LOGIN_CONTEXT;
227 quorumLearnerLoginContext = SASL_LEARNER_LOGIN_CONTEXT;
228 }
229
230 @Override
231 public synchronized void start() {
232 purgeTxnLogs();
233 purgeManager.start();
234 super.start();
235 }
236
237 @Override
238 public void shutdown() {
239
240 cnxnFactory.shutdown();
241 purgeManager.shutdown();
242 super.shutdown();
243 }
244
245 private void purgeTxnLogs() {
246 logger.info("Purging old ZooKeeper snapshots and logs ..");
247 try {
248 PurgeTxnLog.purge(purgeManager.getDataLogDir(),
249 purgeManager.getSnapDir(),
250 purgeManager.getSnapRetainCount());
251 logger.info("Purged old ZooKeeper snapshots and logs.");
252 } catch (IOException e) {
253 logger.error("Failed to purge old ZooKeeper snapshots and logs:", e);
254 }
255 }
256
257 private static final class InactiveServerStats extends ServerStats {
258 InactiveServerStats() {
259 super(new Provider() {
260 @Override
261 public long getOutstandingRequests() {
262 return 0;
263 }
264
265 @Override
266 public long getLastProcessedZxid() {
267 return 0;
268 }
269
270 @Nullable
271 @Override
272 public String getState() {
273 return null;
274 }
275
276 @Override
277 public int getNumAliveConnections() {
278 return 0;
279 }
280
281 @Override
282 public long getDataDirSize() {
283 return 0;
284 }
285
286 @Override
287 public long getLogDirSize() {
288 return 0;
289 }
290 });
291 }
292 }
293
294
295
296
297
298 private static class ApproximateDataSizeFunction implements ToDoubleFunction<EmbeddedZooKeeper> {
299
300 private static final long MIN_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(3);
301
302 private volatile long lastCalculationTimeNanos = System.nanoTime() - MIN_INTERVAL_NANOS;
303
304
305
306
307 private long value;
308
309 @Override
310 public double applyAsDouble(EmbeddedZooKeeper self) {
311 final long currentTimeNanos = System.nanoTime();
312 if (currentTimeNanos - lastCalculationTimeNanos < MIN_INTERVAL_NANOS) {
313 return value;
314 }
315
316 final DataTree tree = dataTree(self);
317 final long value = tree != null ? tree.approximateDataSize() : 0;
318 this.value = value;
319 lastCalculationTimeNanos = currentTimeNanos;
320 return value;
321 }
322 }
323 }