1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.linecorp.centraldogma.server.internal.replication;
18  
19  import static com.google.common.base.MoreObjects.firstNonNull;
20  import static com.google.common.collect.ImmutableList.toImmutableList;
21  import static com.linecorp.centraldogma.server.internal.storage.project.ProjectInitializer.INTERNAL_PROJECT_DOGMA;
22  import static java.util.Objects.requireNonNull;
23  
24  import java.io.BufferedReader;
25  import java.io.File;
26  import java.io.FileInputStream;
27  import java.io.FileNotFoundException;
28  import java.io.FileOutputStream;
29  import java.io.InputStreamReader;
30  import java.nio.charset.StandardCharsets;
31  import java.util.AbstractMap.SimpleImmutableEntry;
32  import java.util.ArrayList;
33  import java.util.Arrays;
34  import java.util.Collections;
35  import java.util.Comparator;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Map.Entry;
39  import java.util.Objects;
40  import java.util.Optional;
41  import java.util.Properties;
42  import java.util.concurrent.CompletableFuture;
43  import java.util.concurrent.ConcurrentHashMap;
44  import java.util.concurrent.ConcurrentMap;
45  import java.util.concurrent.ExecutionException;
46  import java.util.concurrent.ExecutorService;
47  import java.util.concurrent.Executors;
48  import java.util.concurrent.ForkJoinPool;
49  import java.util.concurrent.LinkedTransferQueue;
50  import java.util.concurrent.ScheduledExecutorService;
51  import java.util.concurrent.ThreadPoolExecutor;
52  import java.util.concurrent.TimeUnit;
53  import java.util.function.Consumer;
54  
55  import javax.annotation.Nullable;
56  
57  import org.apache.curator.RetryPolicy;
58  import org.apache.curator.framework.CuratorFramework;
59  import org.apache.curator.framework.CuratorFrameworkFactory;
60  import org.apache.curator.framework.api.transaction.CuratorOp;
61  import org.apache.curator.framework.imps.CuratorFrameworkState;
62  import org.apache.curator.framework.recipes.cache.PathChildrenCache;
63  import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
64  import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
65  import org.apache.curator.framework.recipes.leader.LeaderSelector;
66  import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
67  import org.apache.curator.framework.recipes.locks.InterProcessMutex;
68  import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
69  import org.apache.curator.framework.recipes.locks.Lease;
70  import org.apache.curator.framework.state.ConnectionState;
71  import org.apache.curator.retry.RetryForever;
72  import org.apache.zookeeper.CreateMode;
73  import org.apache.zookeeper.KeeperException;
74  import org.apache.zookeeper.server.auth.DigestLoginModule;
75  import org.apache.zookeeper.server.auth.SASLAuthenticationProvider;
76  import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
77  import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
78  import org.slf4j.Logger;
79  import org.slf4j.LoggerFactory;
80  
81  import com.fasterxml.jackson.annotation.JsonCreator;
82  import com.fasterxml.jackson.annotation.JsonProperty;
83  import com.github.benmanes.caffeine.cache.Cache;
84  import com.github.benmanes.caffeine.cache.Caffeine;
85  import com.google.common.annotations.VisibleForTesting;
86  import com.google.common.base.Joiner;
87  import com.google.common.collect.ImmutableList;
88  import com.google.common.collect.ImmutableMultimap;
89  import com.google.common.escape.Escaper;
90  import com.google.common.escape.Escapers;
91  import com.google.common.util.concurrent.MoreExecutors;
92  
93  import com.linecorp.armeria.common.util.Exceptions;
94  import com.linecorp.armeria.common.util.SafeCloseable;
95  import com.linecorp.centraldogma.common.CentralDogmaException;
96  import com.linecorp.centraldogma.common.Revision;
97  import com.linecorp.centraldogma.common.TooManyRequestsException;
98  import com.linecorp.centraldogma.internal.Jackson;
99  import com.linecorp.centraldogma.server.QuotaConfig;
100 import com.linecorp.centraldogma.server.ZooKeeperReplicationConfig;
101 import com.linecorp.centraldogma.server.ZooKeeperServerConfig;
102 import com.linecorp.centraldogma.server.command.AbstractCommandExecutor;
103 import com.linecorp.centraldogma.server.command.Command;
104 import com.linecorp.centraldogma.server.command.CommandExecutor;
105 import com.linecorp.centraldogma.server.command.CommandType;
106 import com.linecorp.centraldogma.server.command.CommitResult;
107 import com.linecorp.centraldogma.server.command.ForcePushCommand;
108 import com.linecorp.centraldogma.server.command.NormalizingPushCommand;
109 import com.linecorp.centraldogma.server.command.RemoveRepositoryCommand;
110 import com.linecorp.centraldogma.server.command.UpdateServerStatusCommand;
111 import com.linecorp.centraldogma.server.metadata.MetadataService;
112 import com.linecorp.centraldogma.server.metadata.RepositoryMetadata;
113 import com.linecorp.centraldogma.server.storage.project.Project;
114 import com.linecorp.centraldogma.server.storage.project.ProjectManager;
115 
116 import io.micrometer.core.instrument.Gauge;
117 import io.micrometer.core.instrument.MeterRegistry;
118 import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
119 import io.netty.util.concurrent.DefaultThreadFactory;
120 
121 public final class ZooKeeperCommandExecutor
122         extends AbstractCommandExecutor implements PathChildrenCacheListener {
123 
124     private static final Logger logger = LoggerFactory.getLogger(ZooKeeperCommandExecutor.class);
125     private static final Escaper jaasValueEscaper =
126             Escapers.builder().addEscape('\"', "\\\"").addEscape('\\', "\\\\").build();
127     private static final Joiner colonJoiner = Joiner.on(':');
128 
129     private static final String PATH_PREFIX = "/dogma";
130     private static final int MAX_BYTES = 1024 * 1023; // Max size in document is 1M. but safety.
131 
132     // Log revision should be started at 0 and be increased by 1. Do not create any changes without creating
133     // a log node, because otherwise the consistency of the log revision will be broken. Also, we should use
134     // zk 3.4.x or later version, because deleting a log node in an older version will break consistency as
135     // well.
136 
137     private static final String LOG_PATH = "logs";
138 
139     private static final String LOG_BLOCK_PATH = "log_blocks";
140 
141     private static final String LOCK_PATH = "lock";
142 
143     private static final String QUOTA_PATH = "quota";
144 
145     private static final String LEADER_PATH = "leader";
146 
147     private static final RetryPolicy RETRY_POLICY_ALWAYS = new RetryForever(500);
148     private static final RetryPolicy RETRY_POLICY_NEVER = (retryCount, elapsedTimeMs, sleeper) -> false;
149 
150     private static final QuotaConfig UNLIMITED_QUOTA = new QuotaConfig(Integer.MAX_VALUE, 1);
151     private static final Entry<InterProcessSemaphoreV2, SettableSharedCount> UNLIMITED_SEMAPHORE =
152             new SimpleImmutableEntry<>(null, null);
153 
154     private final ConcurrentMap<String, InterProcessMutex> mutexMap = new ConcurrentHashMap<>();
155 
156     @VisibleForTesting
157     final ConcurrentMap<String, Entry<InterProcessSemaphoreV2, SettableSharedCount>> semaphoreMap =
158             new ConcurrentHashMap<>();
159 
160     @VisibleForTesting
161     final Cache<String, QuotaConfig> writeQuotaCache = Caffeine.newBuilder().maximumSize(2000).build();
162 
163     private final ZooKeeperReplicationConfig cfg;
164     private final File revisionFile;
165     private final File zkConfFile;
166     private final File zkDataDir;
167     private final File zkLogDir;
168     private final CommandExecutor delegate;
169     private final MeterRegistry meterRegistry;
170 
171     @Nullable
172     private final QuotaConfig writeQuota;
173 
174     private MetadataService metadataService;
175 
176     private volatile EmbeddedZooKeeper quorumPeer;
177     private volatile CuratorFramework curator;
178     private volatile RetryPolicy retryPolicy = RETRY_POLICY_NEVER;
179     private volatile ExecutorService executor;
180     private volatile ExecutorService logWatcherExecutor;
181     private volatile PathChildrenCache logWatcher;
182     private volatile OldLogRemover oldLogRemover;
183     private volatile ExecutorService leaderSelectorExecutor;
184     private volatile LeaderSelector leaderSelector;
185     private volatile ScheduledExecutorService quotaExecutor;
186     private volatile boolean createdParentNodes;
187 
188     private class OldLogRemover implements LeaderSelectorListener {
189         volatile boolean hasLeadership;
190 
191         @Override
192         public void stateChanged(CuratorFramework client, ConnectionState newState) {
193             //ignore
194         }
195 
196         @Override
197         public void takeLeadership(CuratorFramework client) throws Exception {
198             final ListenerInfo listenerInfo = ZooKeeperCommandExecutor.this.listenerInfo;
199             if (listenerInfo == null) {
200                 // Stopped.
201                 return;
202             }
203 
204             logger.info("Taking leadership: {}", replicaId());
205             try {
206                 hasLeadership = true;
207                 if (listenerInfo.onTakeLeadership != null) {
208                     listenerInfo.onTakeLeadership.run();
209                 }
210 
211                 while (curator.getState() == CuratorFrameworkState.STARTED) {
212                     deleteLogs();
213                     synchronized (this) {
214                         wait();
215                     }
216                 }
217             } catch (InterruptedException e) {
218                 // Leader selector has been closed.
219             } catch (Exception e) {
220                 logger.error("Leader stopped due to an unexpected exception:", e);
221             } finally {
222                 hasLeadership = false;
223                 logger.info("Releasing leadership: {}", replicaId());
224                 if (listenerInfo.onReleaseLeadership != null) {
225                     listenerInfo.onReleaseLeadership.run();
226                 }
227 
228                 if (ZooKeeperCommandExecutor.this.listenerInfo != null) {
229                     // Requeue only when the executor is not stopped.
230                     leaderSelector.requeue();
231                 }
232             }
233         }
234 
235         public synchronized void touch() {
236             notify();
237         }
238 
239         private void deleteLogs() throws Exception {
240             final List<String> children = curator.getChildren().forPath(absolutePath(LOG_PATH));
241             if (children.size() <= cfg.maxLogCount()) {
242                 return;
243             }
244 
245             final long minAllowedTimestamp = System.currentTimeMillis() - cfg.minLogAgeMillis();
246             final int targetCount = children.size() - cfg.maxLogCount();
247             final List<String> deleted = new ArrayList<>(targetCount);
248             children.sort(Comparator.comparingLong(Long::parseLong));
249             try {
250                 for (int i = 0; i < targetCount; ++i) {
251                     final String logPath = absolutePath(LOG_PATH, children.get(i));
252                     final LogMeta meta = Jackson.readValue(curator.getData().forPath(logPath), LogMeta.class);
253 
254                     if (meta.timestamp() >= minAllowedTimestamp) {
255                         // Do not delete the logs that are not old enough.
256                         // We can break the loop here because the 'children' has been sorted by
257                         // insertion order (sequence value).
258                         break;
259                     }
260 
261                     deleteLog(curator.transactionOp().delete().forPath(logPath), deleted, children.get(i));
262                     for (long blockId : meta.blocks()) {
263                         final String blockPath = absolutePath(LOG_BLOCK_PATH) + '/' + pathFromRevision(blockId);
264                         deleteLog(curator.transactionOp().delete().forPath(blockPath),
265                                   deleted, children.get(i));
266                     }
267                 }
268             } finally {
269                 logger.info("delete logs: {}", deleted);
270             }
271         }
272 
273         private void deleteLog(CuratorOp curatorOp, List<String> deleted, String childName) {
274             try {
275                 curator.transaction().forOperations(curatorOp);
276                 deleted.add(childName);
277             } catch (Throwable t) {
278                 logger.warn("Failed to delete ZooKeeper log: {}", childName, t);
279             }
280         }
281     }
282 
283     private static final class ListenerInfo {
284         long lastReplayedRevision;
285         final Runnable onTakeLeadership;
286         final Runnable onReleaseLeadership;
287 
288         ListenerInfo(long lastReplayedRevision,
289                      @Nullable Runnable onTakeLeadership, @Nullable Runnable onReleaseLeadership) {
290 
291             this.lastReplayedRevision = lastReplayedRevision;
292             this.onReleaseLeadership = onReleaseLeadership;
293             this.onTakeLeadership = onTakeLeadership;
294         }
295     }
296 
297     private volatile ListenerInfo listenerInfo;
298 
299     public ZooKeeperCommandExecutor(ZooKeeperReplicationConfig cfg,
300                                     File dataDir, CommandExecutor delegate,
301                                     MeterRegistry meterRegistry,
302                                     ProjectManager projectManager,
303                                     @Nullable QuotaConfig writeQuota,
304                                     @Nullable Consumer<CommandExecutor> onTakeLeadership,
305                                     @Nullable Consumer<CommandExecutor> onReleaseLeadership) {
306         super(onTakeLeadership, onReleaseLeadership);
307 
308         this.cfg = requireNonNull(cfg, "cfg");
309         requireNonNull(dataDir, "dataDir");
310         revisionFile = new File(dataDir.getAbsolutePath() + File.separatorChar + "last_revision");
311         zkConfFile = new File(dataDir.getAbsolutePath() + File.separatorChar +
312                               "_zookeeper" + File.separatorChar + "config.properties");
313         zkDataDir = new File(dataDir.getAbsolutePath() + File.separatorChar +
314                              "_zookeeper" + File.separatorChar + "data");
315         zkLogDir = new File(dataDir.getAbsolutePath() + File.separatorChar +
316                             "_zookeeper" + File.separatorChar + "log");
317 
318         this.delegate = requireNonNull(delegate, "delegate");
319         this.meterRegistry = requireNonNull(meterRegistry, "meterRegistry");
320         this.writeQuota = writeQuota;
321         metadataService = new MetadataService(projectManager, this);
322 
323         // Register the metrics which are accessible even before started.
324         Gauge.builder("replica.id", this, self -> replicaId()).register(meterRegistry);
325         if (cfg.serverConfig().groupId() != null) {
326             Gauge.builder("replica.groupId", this, self -> self.cfg.serverConfig().groupId())
327                  .register(meterRegistry);
328         }
329         Gauge.builder("replica.read.only", this, self -> self.isWritable() ? 0 : 1).register(meterRegistry);
330         Gauge.builder("replica.replicating", this, self -> self.isStarted() ? 1 : 0).register(meterRegistry);
331         Gauge.builder("replica.has.leadership", this,
332                       self -> {
333                           final OldLogRemover remover = self.oldLogRemover;
334                           return remover != null && remover.hasLeadership ? 1 : 0;
335                       })
336              .register(meterRegistry);
337         Gauge.builder("replica.last.replayed.revision", this,
338                       self -> {
339                           final ListenerInfo info = self.listenerInfo;
340                           if (info == null) {
341                               return 0;
342                           }
343                           return info.lastReplayedRevision;
344                       })
345              .register(meterRegistry);
346     }
347 
348     @Override
349     public int replicaId() {
350         return cfg.serverId();
351     }
352 
353     @Override
354     protected void doStart(@Nullable Runnable onTakeLeadership,
355                            @Nullable Runnable onReleaseLeadership) throws Exception {
356         try {
357             // Get the last replayed revision.
358             final long lastReplayedRevision;
359             try {
360                 lastReplayedRevision = getLastReplayedRevision();
361                 listenerInfo = new ListenerInfo(lastReplayedRevision, onTakeLeadership, onReleaseLeadership);
362             } catch (Exception e) {
363                 throw new ReplicationException("failed to read " + revisionFile, e);
364             }
365 
366             // Start the embedded ZooKeeper.
367             quorumPeer = startZooKeeper();
368             retryPolicy = RETRY_POLICY_ALWAYS;
369 
370             // Start the Curator framework.
371             curator = CuratorFrameworkFactory.newClient(
372                     "127.0.0.1:" + quorumPeer.getClientPort(), cfg.timeoutMillis(), cfg.timeoutMillis(),
373                     (retryCount, elapsedTimeMs, sleeper) -> {
374                         return retryPolicy.allowRetry(retryCount, elapsedTimeMs, sleeper);
375                     });
376 
377             curator.start();
378 
379             // Start the log replay.
380             logWatcherExecutor = ExecutorServiceMetrics.monitor(
381                     meterRegistry,
382                     Executors.newSingleThreadExecutor(
383                             new DefaultThreadFactory("zookeeper-log-watcher", true)),
384                     "zkLogWatcher");
385 
386             logWatcher = new PathChildrenCache(curator, absolutePath(LOG_PATH),
387                                                true, false, logWatcherExecutor);
388             logWatcher.getListenable().addListener(this, MoreExecutors.directExecutor());
389             logWatcher.start();
390 
391             // Start the leader selection.
392             oldLogRemover = new OldLogRemover();
393             leaderSelectorExecutor = ExecutorServiceMetrics.monitor(
394                     meterRegistry,
395                     Executors.newSingleThreadExecutor(
396                             new DefaultThreadFactory("zookeeper-leader-selector", true)),
397                     "zkLeaderSelector");
398 
399             leaderSelector = new LeaderSelector(curator, absolutePath(LEADER_PATH),
400                                                 leaderSelectorExecutor, oldLogRemover);
401             leaderSelector.start();
402 
403             // Start the delegate.
404             delegate.start();
405 
406             // Get the command executor threads ready.
407             final ThreadPoolExecutor executor = new ThreadPoolExecutor(
408                     cfg.numWorkers(), cfg.numWorkers(),
409                     60, TimeUnit.SECONDS, new LinkedTransferQueue<>(),
410                     new DefaultThreadFactory("zookeeper-command-executor", true));
411             executor.allowCoreThreadTimeOut(true);
412 
413             this.executor = ExecutorServiceMetrics.monitor(meterRegistry, executor, "zkCommandExecutor");
414 
415             quotaExecutor = ExecutorServiceMetrics.monitor(
416                     meterRegistry,
417                     Executors.newSingleThreadScheduledExecutor(
418                             new DefaultThreadFactory("zookeeper-quota-executor", true)),
419                     "quotaExecutor");
420         } catch (InterruptedException | ReplicationException e) {
421             throw e;
422         } catch (Exception e) {
423             throw new ReplicationException(e);
424         }
425     }
426 
427     private EmbeddedZooKeeper startZooKeeper() throws Exception {
428         logger.info("Starting the ZooKeeper peer ({}) ..", cfg.serverId());
429         EmbeddedZooKeeper peer = null;
430         boolean success = false;
431         try {
432             final Properties zkProps = new Properties();
433 
434             // Set the properties.
435             copyZkProperty(zkProps, "initLimit", "5");
436             copyZkProperty(zkProps, "syncLimit", "10");
437             copyZkProperty(zkProps, "tickTime", "3000");
438             copyZkProperty(zkProps, "syncEnabled", "true");
439             copyZkProperty(zkProps, "autopurge.snapRetainCount", "3");
440             copyZkProperty(zkProps, "autopurge.purgeInterval", "1");
441             copyZkProperty(zkProps, "quorumListenOnAllIPs", "false");
442 
443             // Set the properties that must be set in System properties.
444             System.setProperty("zookeeper.fsync.warningthresholdms",
445                                cfg.additionalProperties().getOrDefault("fsync.warningthresholdms", "1000"));
446 
447             // Set the data directories.
448             zkProps.setProperty("dataDir", zkDataDir.getPath());
449             zkProps.setProperty("dataLogDir", zkLogDir.getPath());
450             zkDataDir.mkdirs();
451             zkLogDir.mkdirs();
452 
453             // Generate the myid file in.
454             try (FileOutputStream out = new FileOutputStream(new File(zkDataDir, "myid"))) {
455                 out.write((cfg.serverId() + "\n").getBytes(StandardCharsets.US_ASCII));
456             }
457 
458             // Generate the jaas.conf and configure system properties to enable SASL authentication
459             // for server, client, quorum server and quorum learner.
460             final File jaasConfFile = new File(zkDataDir, "jaas.conf");
461             try (FileOutputStream out = new FileOutputStream(jaasConfFile)) {
462                 final StringBuilder buf = new StringBuilder();
463                 final String newline = System.lineSeparator();
464                 final String escapedSecret = jaasValueEscaper.escape(cfg.secret());
465                 ImmutableList.of("Server", EmbeddedZooKeeper.SASL_SERVER_LOGIN_CONTEXT).forEach(name -> {
466                     buf.append(name).append(" {").append(newline);
467                     buf.append(DigestLoginModule.class.getName()).append(" required").append(newline);
468                     buf.append("user_super=\"").append(escapedSecret).append("\";").append(newline);
469                     buf.append("};").append(newline);
470                 });
471                 ImmutableList.of("Client", EmbeddedZooKeeper.SASL_LEARNER_LOGIN_CONTEXT).forEach(name -> {
472                     buf.append(name).append(" {").append(newline);
473                     buf.append(DigestLoginModule.class.getName()).append(" required").append(newline);
474                     buf.append("username=\"super\"").append(newline);
475                     buf.append("password=\"").append(escapedSecret).append("\";").append(newline);
476                     buf.append("};").append(newline);
477                 });
478                 out.write(buf.toString().getBytes());
479             }
480             System.setProperty("java.security.auth.login.config", jaasConfFile.getAbsolutePath());
481             System.setProperty("zookeeper.authProvider.1", SASLAuthenticationProvider.class.getName());
482 
483             // Set the client port, which is unused.
484             zkProps.setProperty("clientPort", String.valueOf(cfg.serverConfig().clientPort()));
485 
486             final Map<Integer, ZooKeeperServerConfig> servers = cfg.servers();
487             // Add replicas.
488             boolean hasGroupId = false;
489             for (Entry<Integer, ZooKeeperServerConfig> entry : servers.entrySet()) {
490                 final ZooKeeperServerConfig serverConfig = entry.getValue();
491                 zkProps.setProperty(
492                         "server." + entry.getKey(),
493                         serverConfig.host() + ':' + serverConfig.quorumPort() + ':' +
494                         serverConfig.electionPort() + ":participant");
495 
496                 if (!hasGroupId && serverConfig.groupId() != null) {
497                     hasGroupId = true;
498                 }
499             }
500 
501             // Add groups if exists
502             if (hasGroupId) {
503                 final ImmutableMultimap.Builder<Integer, Integer> groupBuilder = ImmutableMultimap.builder();
504                 boolean isHierarchical = true;
505                 for (Entry<Integer, ZooKeeperServerConfig> entry : servers.entrySet()) {
506                     final Integer groupId = entry.getValue().groupId();
507                     if (groupId == null) {
508                         isHierarchical = false;
509                         final List<ZooKeeperServerConfig> noGroupIds =
510                                 servers.values().stream()
511                                        .filter(serverConfig -> serverConfig.groupId() == null)
512                                        .collect(toImmutableList());
513                         logger.warn("Hierarchical quorums are disabled. 'groupId' are missing in {}",
514                                     noGroupIds);
515                         break;
516                     } else {
517                         groupBuilder.put(groupId, entry.getKey());
518                     }
519                 }
520                 if (isHierarchical) {
521                     groupBuilder.build().asMap().forEach((groupId, serverIds) -> {
522                         // e.g. group.1=1:2:3
523                         zkProps.setProperty("group." + groupId, colonJoiner.join(serverIds));
524                     });
525                     servers.forEach((serverId, serverConfig) -> {
526                         // e.g. weight.1=1
527                         zkProps.setProperty("weight." + serverId, String.valueOf(serverConfig.weight()));
528                     });
529                 }
530             }
531 
532             // Disable Jetty-based admin server.
533             zkProps.setProperty("admin.enableServer", "false");
534 
535             zkConfFile.getParentFile().mkdirs();
536             try (FileOutputStream out = new FileOutputStream(zkConfFile)) {
537                 zkProps.store(out, null);
538             }
539 
540             final QuorumPeerConfig zkCfg = new QuorumPeerConfig();
541             zkCfg.parse(zkConfFile.getPath());
542 
543             peer = new EmbeddedZooKeeper(zkCfg, meterRegistry);
544             peer.start();
545 
546             // Wait until the ZooKeeper joins the cluster.
547             for (;;) {
548                 final ServerState state = peer.getPeerState();
549                 if (state == ServerState.FOLLOWING || state == ServerState.LEADING) {
550                     break;
551                 }
552 
553                 if (isStopping()) {
554                     throw new InterruptedException("Stop requested before joining the cluster");
555                 }
556 
557                 logger.info("Waiting for the ZooKeeper peer ({}) to join the cluster ..", peer.getId());
558                 Thread.sleep(1000);
559             }
560 
561             if (peer.getId() == peer.getCurrentVote().getId()) {
562                 logger.info("The ZooKeeper peer ({}) has joined the cluster as a leader.", peer.getId());
563             } else {
564                 logger.info("The ZooKeeper peer ({}) has joined the cluster, following {}.",
565                             peer.getId(), peer.getCurrentVote().getId());
566             }
567 
568             success = true;
569             return peer;
570         } finally {
571             if (!success && peer != null) {
572                 try {
573                     peer.shutdown();
574                 } catch (Exception e) {
575                     logger.warn("Failed to shutdown the failed ZooKeeper peer: {}", e.getMessage(), e);
576                 }
577             }
578         }
579     }
580 
581     private void copyZkProperty(Properties zkProps, String key, String defaultValue) {
582         zkProps.setProperty(key, cfg.additionalProperties().getOrDefault(key, defaultValue));
583     }
584 
585     private void stopLater() {
586         // Stop from an other thread so that it does not get stuck
587         // when this method runs on an executor thread.
588         ForkJoinPool.commonPool().execute(this::stop);
589     }
590 
591     @Override
592     protected void doStop(@Nullable Runnable onReleaseLeadership) throws Exception {
593         listenerInfo = null;
594         logger.info("Stopping the worker threads");
595         boolean interrupted = shutdown(executor);
596         logger.info("Stopped the worker threads");
597 
598         logger.info("Stopping the quota worker threads");
599         interrupted |= shutdown(quotaExecutor);
600         logger.info("Stopped the quota worker threads");
601         semaphoreMap.clear();
602 
603         try {
604             logger.info("Stopping the delegate command executor");
605             delegate.stop();
606             logger.info("Stopped the delegate command executor");
607         } catch (Exception e) {
608             logger.warn("Failed to stop the delegate command executor {}: {}", delegate, e.getMessage(), e);
609         } finally {
610             retryPolicy = RETRY_POLICY_NEVER;
611             try {
612                 if (leaderSelector != null) {
613                     logger.info("Closing the leader selector");
614                     leaderSelector.close();
615                     interrupted |= shutdown(leaderSelectorExecutor);
616                     logger.info("Closed the leader selector");
617                 }
618             } catch (Exception e) {
619                 logger.warn("Failed to close the leader selector: {}", e.getMessage(), e);
620             } finally {
621                 try {
622                     if (logWatcher != null) {
623                         logger.info("Closing the log watcher");
624                         logWatcher.close();
625                         interrupted |= shutdown(logWatcherExecutor);
626                         logger.info("Closed the log watcher");
627                     }
628                 } catch (Exception e) {
629                     logger.warn("Failed to close the log watcher: {}", e.getMessage(), e);
630                 } finally {
631                     try {
632                         if (curator != null) {
633                             logger.info("Closing the Curator framework");
634                             curator.close();
635                             logger.info("Closed the Curator framework");
636                         }
637                     } catch (Exception e) {
638                         logger.warn("Failed to close the Curator framework: {}", e.getMessage(), e);
639                     } finally {
640                         try {
641                             if (quorumPeer != null) {
642                                 final long peerId = quorumPeer.getId();
643                                 logger.info("Shutting down the ZooKeeper peer ({})", peerId);
644                                 quorumPeer.shutdown();
645                                 logger.info("Shut down the ZooKeeper peer ({})", peerId);
646                             }
647                         } catch (Exception e) {
648                             logger.warn("Failed to shut down the ZooKeeper peer: {}", e.getMessage(), e);
649                         } finally {
650                             if (interrupted) {
651                                 Thread.currentThread().interrupt();
652                             }
653                         }
654                     }
655                 }
656             }
657         }
658     }
659 
660     private static boolean shutdown(@Nullable ExecutorService executor) {
661         if (executor == null) {
662             return false;
663         }
664 
665         boolean interrupted = false;
666         while (!executor.isTerminated()) {
667             executor.shutdown();
668             try {
669                 executor.awaitTermination(1, TimeUnit.SECONDS);
670             } catch (InterruptedException e) {
671                 // Interrupt later.
672                 interrupted = true;
673             }
674         }
675         return interrupted;
676     }
677 
678     private long getLastReplayedRevision() throws Exception {
679         final FileInputStream fis;
680         try {
681             fis = new FileInputStream(revisionFile);
682         } catch (FileNotFoundException ignored) {
683             return -1;
684         }
685 
686         try (BufferedReader br = new BufferedReader(new InputStreamReader(fis))) {
687             final String l = br.readLine();
688             if (l == null) {
689                 return -1;
690             }
691             return Long.parseLong(l.trim());
692         }
693     }
694 
695     private void updateLastReplayedRevision(long lastReplayedRevision) throws Exception {
696         boolean success = false;
697         try (FileOutputStream fos = new FileOutputStream(revisionFile)) {
698             fos.write(String.valueOf(lastReplayedRevision).getBytes(StandardCharsets.UTF_8));
699             success = true;
700         } finally {
701             if (success) {
702                 logger.info("Updated lastReplayedRevision to: {}", lastReplayedRevision);
703             } else {
704                 logger.error("Failed to update lastReplayedRevision to: {}", lastReplayedRevision);
705             }
706         }
707     }
708 
709     private synchronized void replayLogs(long targetRevision) {
710         final ListenerInfo info = listenerInfo;
711         if (info == null) {
712             return;
713         }
714 
715         if (targetRevision <= info.lastReplayedRevision) {
716             return;
717         }
718 
719         long nextRevision = info.lastReplayedRevision + 1;
720         for (;;) {
721             ReplicationLog<?> l = null;
722             try {
723                 final Optional<ReplicationLog<?>> log = loadLog(nextRevision, true);
724                 if (log.isPresent()) {
725                     l = log.get();
726                     final Command<?> command = l.command();
727                     final Object expectedResult = l.result();
728                     final Object actualResult = delegate.execute(command).get();
729 
730                     if (!Objects.equals(expectedResult, actualResult)) {
731                         throw new ReplicationException(
732                                 "mismatching replay result at revision " + nextRevision +
733                                 ": " + actualResult + " (expected: " + expectedResult +
734                                 ", command: " + command + ')');
735                     }
736                     if (command instanceof RemoveRepositoryCommand) {
737                         clearWriteQuota((RemoveRepositoryCommand) command);
738                     }
739                 } else {
740                     // same replicaId. skip
741                 }
742 
743                 updateLastReplayedRevision(nextRevision);
744                 info.lastReplayedRevision = nextRevision;
745                 if (nextRevision == targetRevision) {
746                     break;
747                 } else {
748                     nextRevision++;
749                 }
750             } catch (Throwable t) {
751                 if (l != null) {
752                     logger.error(
753                             "Failed to replay a log at revision {}; entering read-only mode. replay log: {}",
754                             nextRevision, l, t);
755                 } else {
756                     logger.error("Failed to replay a log at revision {}; entering read-only mode.",
757                                  nextRevision, t);
758                 }
759 
760                 stopLater();
761 
762                 if (t instanceof ReplicationException) {
763                     throw (ReplicationException) t;
764                 }
765                 final StringBuilder sb = new StringBuilder();
766                 sb.append("failed to replay a log at revision " + nextRevision);
767                 if (l != null) {
768                     sb.append(". replay log: ").append(l);
769                 }
770                 throw new ReplicationException(sb.toString(), t);
771             }
772         }
773     }
774 
775     @Override
776     public void childEvent(CuratorFramework unused, PathChildrenCacheEvent event) throws Exception {
777         if (event.getType() != PathChildrenCacheEvent.Type.CHILD_ADDED) {
778             return;
779         }
780 
781         final long lastKnownRevision = revisionFromPath(event.getData().getPath());
782         try {
783             replayLogs(lastKnownRevision);
784         } catch (ReplicationException ignored) {
785             // replayLogs() logs and handles the exception already, so we just bail out here.
786             return;
787         }
788 
789         oldLogRemover.touch();
790     }
791 
792     private SafeCloseable safeLock(Command<?> command) {
793         final String executionPath = command.executionPath();
794         final InterProcessMutex mtx = mutexMap.computeIfAbsent(
795                 executionPath, k -> new InterProcessMutex(curator, absolutePath(LOCK_PATH, k)));
796 
797         WriteLock writeLock = null;
798         boolean lockTimeout = false;
799         try {
800             // Align with the default request timeout
801             if (!mtx.acquire(10, TimeUnit.SECONDS)) {
802                 lockTimeout = true;
803                 throw new ReplicationException(
804                         "Failed to acquire a lock for " + executionPath + " in 10 seconds");
805             }
806             if (command instanceof NormalizingPushCommand) {
807                 writeLock = acquireWriteLock((NormalizingPushCommand) command);
808             } else if (command instanceof RemoveRepositoryCommand) {
809                 clearWriteQuota((RemoveRepositoryCommand) command);
810             }
811         } catch (Exception e) {
812             if (lockTimeout) {
813                 return Exceptions.throwUnsafely(e);
814             }
815 
816             logger.error("Failed to acquire a lock for {}; entering read-only mode", executionPath, e);
817             stopLater();
818             throw new ReplicationException("failed to acquire a lock for " + executionPath, e);
819         }
820 
821         if (writeLock != null) {
822             scheduleWriteLockRelease(writeLock, mtx, executionPath);
823         }
824 
825         return () -> safeRelease(mtx);
826     }
827 
828     private void clearWriteQuota(RemoveRepositoryCommand command) {
829         final String cacheKey = rateLimiterKey(command.projectName(), command.repositoryName());
830         semaphoreMap.remove(cacheKey);
831         writeQuotaCache.invalidate(cacheKey);
832     }
833 
834     private static void safeRelease(InterProcessMutex mtx) {
835         try {
836             mtx.release();
837         } catch (Exception ignored) {
838             // Ignore.
839         }
840     }
841 
842     @Nullable
843     private WriteLock acquireWriteLock(NormalizingPushCommand command) throws Exception {
844         if (command.projectName().equals(INTERNAL_PROJECT_DOGMA) ||
845             command.repositoryName().equals(Project.REPO_DOGMA)) {
846             // Do not check quota for internal project and repository.
847             return null;
848         }
849 
850         QuotaConfig writeQuota = writeQuotaCache.getIfPresent(command.executionPath());
851         if (writeQuota == UNLIMITED_QUOTA) {
852             return null;
853         }
854 
855         if (writeQuota == null) {
856             // Cache miss, load a write quota
857             final RepositoryMetadata meta;
858             try {
859                 meta = metadataService.getRepo(command.projectName(), command.repositoryName()).get();
860             } catch (InterruptedException | ExecutionException e) {
861                 throw new CentralDogmaException("Unexpected exception caught while retrieving " +
862                                                 RepositoryMetadata.class.getSimpleName(), e);
863             }
864 
865             writeQuota = meta.writeQuota();
866         }
867 
868         if (writeQuota == null) {
869             // Fallback to the global quota
870             writeQuota = this.writeQuota;
871         }
872 
873         setWriteQuota(command.projectName(), command.repositoryName(), writeQuota);
874         final Entry<InterProcessSemaphoreV2, SettableSharedCount> entry =
875                 semaphoreMap.get(rateLimiterKey(command.projectName(), command.repositoryName()));
876         if (entry == UNLIMITED_SEMAPHORE) {
877             // No quota is set
878             return null;
879         }
880 
881         assert writeQuota != null;
882         final InterProcessSemaphoreV2 semaphore = entry.getKey();
883         final Lease lease = semaphore.acquire(200, TimeUnit.MILLISECONDS);
884         return new WriteLock(semaphore, lease, writeQuota);
885     }
886 
887     @Override
888     public void setWriteQuota(String projectName, String repoName, @Nullable QuotaConfig writeQuota) {
889         final QuotaConfig quota = firstNonNull(writeQuota, UNLIMITED_QUOTA);
890         writeQuotaCache.put(rateLimiterKey(projectName, repoName), quota);
891 
892         semaphoreMap.compute(rateLimiterKey(projectName, repoName), (k, v) -> {
893             if (quota == UNLIMITED_QUOTA) {
894                 return UNLIMITED_SEMAPHORE;
895             }
896 
897             final int requestQuota = quota.requestQuota();
898             if (v == null || v == UNLIMITED_SEMAPHORE) {
899                 final SettableSharedCount cnt = new SettableSharedCount(requestQuota);
900                 return new SimpleImmutableEntry<>(
901                         new InterProcessSemaphoreV2(curator, absolutePath(QUOTA_PATH, k), cnt), cnt);
902             }
903 
904             final SettableSharedCount count = v.getValue();
905             if (count.getCount() != requestQuota) {
906                 count.setCount(requestQuota);
907             }
908             return v;
909         });
910     }
911 
912     private static String rateLimiterKey(String projectName, String repoName) {
913         return projectName + '/' + repoName;
914     }
915 
916     private void scheduleWriteLockRelease(WriteLock writeLock, InterProcessMutex mtx, String executionPath) {
917         final Lease lease = writeLock.lease;
918         final QuotaConfig writeQuota = writeLock.writeQuota;
919         if (lease == null) {
920             safeRelease(mtx);
921             throw new TooManyRequestsException("commits", executionPath, writeQuota.permitsPerSecond());
922         } else {
923             quotaExecutor.schedule(() -> writeLock.semaphore.returnLease(lease),
924                                    writeQuota.timeWindowSeconds(), TimeUnit.SECONDS);
925         }
926     }
927 
928     private static String path(String... pathElements) {
929         final StringBuilder sb = new StringBuilder();
930         for (String path : pathElements) {
931             if (path.startsWith("/")) { //remove starting "/"
932                 path = path.substring(1);
933             }
934 
935             if (path.endsWith("/")) { //remove trailing "/"
936                 path = path.substring(0, path.length() - 1);
937             }
938             if (!path.isEmpty()) {
939                 sb.append('/');
940                 sb.append(path);
941             }
942         }
943         return sb.toString();
944     }
945 
946     private static String absolutePath(String... pathElements) {
947         if (pathElements.length == 0) {
948             return PATH_PREFIX;
949         }
950         return path(PATH_PREFIX, path(pathElements));
951     }
952 
953     private static class LogMeta {
954 
955         private final int replicaId;
956         private final long timestamp;
957         private final int size;
958         private final List<Long> blocks = new ArrayList<>();
959 
960         @JsonCreator
961         LogMeta(@JsonProperty(value = "replicaId", required = true) int replicaId,
962                 @JsonProperty(value = "timestamp", defaultValue = "0") Long timestamp,
963                 @JsonProperty("size") int size) {
964             this.replicaId = replicaId;
965             if (timestamp == null) {
966                 timestamp = 0L;
967             }
968             this.timestamp = timestamp;
969             this.size = size;
970         }
971 
972         @JsonProperty
973         int replicaId() {
974             return replicaId;
975         }
976 
977         @JsonProperty
978         long timestamp() {
979             return timestamp;
980         }
981 
982         @JsonProperty
983         int size() {
984             return size;
985         }
986 
987         @JsonProperty
988         List<Long> blocks() {
989             return Collections.unmodifiableList(blocks);
990         }
991 
992         public void appendBlock(long blockId) {
993             blocks.add(blockId);
994         }
995     }
996 
997     private long storeLog(ReplicationLog<?> log) {
998         try {
999             final byte[] bytes = Jackson.writeValueAsBytes(log);
1000             assert bytes.length > 0;
1001 
1002             final LogMeta logMeta = new LogMeta(log.replicaId(), System.currentTimeMillis(), bytes.length);
1003 
1004             final int count = (bytes.length + MAX_BYTES - 1) / MAX_BYTES;
1005             for (int i = 0; i < count; ++i) {
1006                 final int start = i * MAX_BYTES;
1007                 final int end = Math.min((i + 1) * MAX_BYTES, bytes.length);
1008                 final byte[] b = Arrays.copyOfRange(bytes, start, end);
1009                 final String blockPath = curator.create()
1010                                                 .withMode(CreateMode.PERSISTENT_SEQUENTIAL)
1011                                                 .forPath(absolutePath(LOG_BLOCK_PATH) + '/', b);
1012                 final long blockId = revisionFromPath(blockPath);
1013                 logMeta.appendBlock(blockId);
1014             }
1015 
1016             final String logPath =
1017                     curator.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL)
1018                            .forPath(absolutePath(LOG_PATH) + '/', Jackson.writeValueAsBytes(logMeta));
1019 
1020             return revisionFromPath(logPath);
1021         } catch (Exception e) {
1022             logger.error("Failed to store a log; entering read-only mode: {}", log, e);
1023             stopLater();
1024             throw new ReplicationException("failed to store a log: " + log, e);
1025         }
1026     }
1027 
1028     @VisibleForTesting
1029     Optional<ReplicationLog<?>> loadLog(long revision, boolean skipIfSameReplica) {
1030         try {
1031             createParentNodes();
1032 
1033             final String logPath = absolutePath(LOG_PATH) + '/' + pathFromRevision(revision);
1034 
1035             final LogMeta logMeta = Jackson.readValue(curator.getData().forPath(logPath), LogMeta.class);
1036 
1037             if (skipIfSameReplica && replicaId() == logMeta.replicaId()) {
1038                 return Optional.empty();
1039             }
1040 
1041             final byte[] bytes = new byte[logMeta.size()];
1042             int offset = 0;
1043             for (long blockId : logMeta.blocks()) {
1044                 final String blockPath = absolutePath(LOG_BLOCK_PATH) + '/' + pathFromRevision(blockId);
1045                 final byte[] b = curator.getData().forPath(blockPath);
1046                 System.arraycopy(b, 0, bytes, offset, b.length);
1047                 offset += b.length;
1048             }
1049             assert logMeta.size() == offset;
1050 
1051             final ReplicationLog<?> log = Jackson.readValue(bytes, ReplicationLog.class);
1052             return Optional.of(log);
1053         } catch (Exception e) {
1054             logger.error("Failed to load a log at revision {}; entering read-only mode", revision, e);
1055             stopLater();
1056             throw new ReplicationException("failed to load a log at revision " + revision, e);
1057         }
1058     }
1059 
1060     private static long revisionFromPath(String path) {
1061         final String[] s = path.split("/");
1062         return Long.parseLong(s[s.length - 1]);
1063     }
1064 
1065     private static String pathFromRevision(long revision) {
1066         return String.format("%010d", revision);
1067     }
1068 
1069     // Ensure that all logs are replayed, any other logs can not be added before end of this function.
1070     @Override
1071     protected <T> CompletableFuture<T> doExecute(Command<T> command) throws Exception {
1072         final CompletableFuture<T> future = new CompletableFuture<>();
1073         executor.execute(() -> {
1074             try {
1075                 future.complete(blockingExecute(command));
1076             } catch (Throwable t) {
1077                 future.completeExceptionally(t);
1078             }
1079         });
1080         return future;
1081     }
1082 
1083     private <T> T blockingExecute(Command<T> command) throws Exception {
1084         createParentNodes();
1085 
1086         try (SafeCloseable ignored = safeLock(command)) {
1087 
1088             // NB: We are sure no other replicas will append the conflicting logs (the commands with the
1089             //     same execution path) while we hold the lock for the command's execution path.
1090             //
1091             //     Other replicas may still append the logs with different execution paths, because, by design,
1092             //     two commands never conflict with each other if they have different execution paths.
1093 
1094             final List<String> recentRevisions = curator.getChildren().forPath(absolutePath(LOG_PATH));
1095             if (!recentRevisions.isEmpty()) {
1096                 final long lastRevision = recentRevisions.stream().mapToLong(Long::parseLong).max().getAsLong();
1097                 replayLogs(lastRevision);
1098             }
1099 
1100             final T result = delegate.execute(command).get();
1101             final ReplicationLog<?> log;
1102             if (command.type() == CommandType.NORMALIZING_PUSH) {
1103                 final NormalizingPushCommand normalizingPushCommand = (NormalizingPushCommand) command;
1104                 assert result instanceof CommitResult : result;
1105                 final CommitResult commitResult = (CommitResult) result;
1106                 final Command<Revision> pushAsIsCommand = normalizingPushCommand.asIs(commitResult);
1107                 log = new ReplicationLog<>(replicaId(), pushAsIsCommand, commitResult.revision());
1108             } else if (command.type() == CommandType.FORCE_PUSH &&
1109                        ((ForcePushCommand<?>) command).delegate().type() == CommandType.NORMALIZING_PUSH) {
1110                 final NormalizingPushCommand delegated =
1111                         (NormalizingPushCommand) ((ForcePushCommand<?>) command).delegate();
1112                 final CommitResult commitResult = (CommitResult) result;
1113                 final Command<Revision> command0 = Command.forcePush(delegated.asIs(commitResult));
1114                 log = new ReplicationLog<>(replicaId(), command0, commitResult.revision());
1115             } else {
1116                 if (command.type() == CommandType.UPDATE_SERVER_STATUS) {
1117                     final UpdateServerStatusCommand command0 = (UpdateServerStatusCommand) command;
1118                     final boolean writable = command0.writable();
1119                     final boolean wasWritable = isWritable();
1120                     setWritable(writable);
1121                     if (writable != wasWritable) {
1122                         if (writable) {
1123                             logger.warn("Left read-only mode.");
1124                         } else {
1125                             logger.warn("Entered read-only mode.");
1126                         }
1127                     }
1128                 }
1129 
1130                 log = new ReplicationLog<>(replicaId(), command, result);
1131             }
1132 
1133             // Store the command execution log to ZooKeeper.
1134             final long revision = storeLog(log);
1135 
1136             logger.debug("logging OK. revision = {}, log = {}", revision, log);
1137             return result;
1138         }
1139     }
1140 
1141     private void createParentNodes() throws Exception {
1142         if (createdParentNodes) {
1143             return;
1144         }
1145 
1146         // Create the zkPath if it does not exist.
1147         createZkPathIfMissing(absolutePath());
1148         createZkPathIfMissing(absolutePath(LOG_PATH));
1149         createZkPathIfMissing(absolutePath(LOG_BLOCK_PATH));
1150         createZkPathIfMissing(absolutePath(LOCK_PATH));
1151 
1152         createdParentNodes = true;
1153     }
1154 
1155     private void createZkPathIfMissing(String zkPath) throws Exception {
1156         try {
1157             curator.create().forPath(zkPath);
1158         } catch (KeeperException.NodeExistsException ignored) {
1159             // Ignore.
1160         }
1161     }
1162 
1163     @VisibleForTesting
1164     void setMetadataService(MetadataService metadataService) {
1165         this.metadataService = metadataService;
1166     }
1167 
1168     private static final class WriteLock {
1169         private final InterProcessSemaphoreV2 semaphore;
1170         private final Lease lease;
1171         private final QuotaConfig writeQuota;
1172 
1173         private WriteLock(InterProcessSemaphoreV2 semaphore, Lease lease, QuotaConfig writeQuota) {
1174             this.semaphore = semaphore;
1175             this.lease = lease;
1176             this.writeQuota = writeQuota;
1177         }
1178     }
1179 }