1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.linecorp.centraldogma.server.internal.replication;
18
19 import static com.google.common.base.MoreObjects.firstNonNull;
20 import static com.google.common.collect.ImmutableList.toImmutableList;
21 import static com.linecorp.centraldogma.server.internal.storage.project.ProjectInitializer.INTERNAL_PROJECT_DOGMA;
22 import static java.util.Objects.requireNonNull;
23
24 import java.io.BufferedReader;
25 import java.io.File;
26 import java.io.FileInputStream;
27 import java.io.FileNotFoundException;
28 import java.io.FileOutputStream;
29 import java.io.InputStreamReader;
30 import java.nio.charset.StandardCharsets;
31 import java.util.AbstractMap.SimpleImmutableEntry;
32 import java.util.ArrayList;
33 import java.util.Arrays;
34 import java.util.Collections;
35 import java.util.Comparator;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Map.Entry;
39 import java.util.Objects;
40 import java.util.Optional;
41 import java.util.Properties;
42 import java.util.concurrent.CompletableFuture;
43 import java.util.concurrent.ConcurrentHashMap;
44 import java.util.concurrent.ConcurrentMap;
45 import java.util.concurrent.ExecutionException;
46 import java.util.concurrent.ExecutorService;
47 import java.util.concurrent.Executors;
48 import java.util.concurrent.ForkJoinPool;
49 import java.util.concurrent.LinkedTransferQueue;
50 import java.util.concurrent.ScheduledExecutorService;
51 import java.util.concurrent.ThreadPoolExecutor;
52 import java.util.concurrent.TimeUnit;
53 import java.util.function.Consumer;
54
55 import javax.annotation.Nullable;
56
57 import org.apache.curator.RetryPolicy;
58 import org.apache.curator.framework.CuratorFramework;
59 import org.apache.curator.framework.CuratorFrameworkFactory;
60 import org.apache.curator.framework.api.transaction.CuratorOp;
61 import org.apache.curator.framework.imps.CuratorFrameworkState;
62 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
63 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
64 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
65 import org.apache.curator.framework.recipes.leader.LeaderSelector;
66 import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
67 import org.apache.curator.framework.recipes.locks.InterProcessMutex;
68 import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2;
69 import org.apache.curator.framework.recipes.locks.Lease;
70 import org.apache.curator.framework.state.ConnectionState;
71 import org.apache.curator.retry.RetryForever;
72 import org.apache.zookeeper.CreateMode;
73 import org.apache.zookeeper.KeeperException;
74 import org.apache.zookeeper.server.auth.DigestLoginModule;
75 import org.apache.zookeeper.server.auth.SASLAuthenticationProvider;
76 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
77 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
78 import org.slf4j.Logger;
79 import org.slf4j.LoggerFactory;
80
81 import com.fasterxml.jackson.annotation.JsonCreator;
82 import com.fasterxml.jackson.annotation.JsonProperty;
83 import com.github.benmanes.caffeine.cache.Cache;
84 import com.github.benmanes.caffeine.cache.Caffeine;
85 import com.google.common.annotations.VisibleForTesting;
86 import com.google.common.base.Joiner;
87 import com.google.common.collect.ImmutableList;
88 import com.google.common.collect.ImmutableMultimap;
89 import com.google.common.escape.Escaper;
90 import com.google.common.escape.Escapers;
91 import com.google.common.util.concurrent.MoreExecutors;
92
93 import com.linecorp.armeria.common.util.Exceptions;
94 import com.linecorp.armeria.common.util.SafeCloseable;
95 import com.linecorp.centraldogma.common.CentralDogmaException;
96 import com.linecorp.centraldogma.common.Revision;
97 import com.linecorp.centraldogma.common.TooManyRequestsException;
98 import com.linecorp.centraldogma.internal.Jackson;
99 import com.linecorp.centraldogma.server.QuotaConfig;
100 import com.linecorp.centraldogma.server.ZooKeeperReplicationConfig;
101 import com.linecorp.centraldogma.server.ZooKeeperServerConfig;
102 import com.linecorp.centraldogma.server.command.AbstractCommandExecutor;
103 import com.linecorp.centraldogma.server.command.Command;
104 import com.linecorp.centraldogma.server.command.CommandExecutor;
105 import com.linecorp.centraldogma.server.command.CommandType;
106 import com.linecorp.centraldogma.server.command.CommitResult;
107 import com.linecorp.centraldogma.server.command.ForcePushCommand;
108 import com.linecorp.centraldogma.server.command.NormalizingPushCommand;
109 import com.linecorp.centraldogma.server.command.RemoveRepositoryCommand;
110 import com.linecorp.centraldogma.server.command.UpdateServerStatusCommand;
111 import com.linecorp.centraldogma.server.metadata.MetadataService;
112 import com.linecorp.centraldogma.server.metadata.RepositoryMetadata;
113 import com.linecorp.centraldogma.server.storage.project.Project;
114 import com.linecorp.centraldogma.server.storage.project.ProjectManager;
115
116 import io.micrometer.core.instrument.Gauge;
117 import io.micrometer.core.instrument.MeterRegistry;
118 import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
119 import io.netty.util.concurrent.DefaultThreadFactory;
120
121 public final class ZooKeeperCommandExecutor
122 extends AbstractCommandExecutor implements PathChildrenCacheListener {
123
124 private static final Logger logger = LoggerFactory.getLogger(ZooKeeperCommandExecutor.class);
125 private static final Escaper jaasValueEscaper =
126 Escapers.builder().addEscape('\"', "\\\"").addEscape('\\', "\\\\").build();
127 private static final Joiner colonJoiner = Joiner.on(':');
128
129 private static final String PATH_PREFIX = "/dogma";
130 private static final int MAX_BYTES = 1024 * 1023;
131
132
133
134
135
136
137 private static final String LOG_PATH = "logs";
138
139 private static final String LOG_BLOCK_PATH = "log_blocks";
140
141 private static final String LOCK_PATH = "lock";
142
143 private static final String QUOTA_PATH = "quota";
144
145 private static final String LEADER_PATH = "leader";
146
147 private static final RetryPolicy RETRY_POLICY_ALWAYS = new RetryForever(500);
148 private static final RetryPolicy RETRY_POLICY_NEVER = (retryCount, elapsedTimeMs, sleeper) -> false;
149
150 private static final QuotaConfig UNLIMITED_QUOTA = new QuotaConfig(Integer.MAX_VALUE, 1);
151 private static final Entry<InterProcessSemaphoreV2, SettableSharedCount> UNLIMITED_SEMAPHORE =
152 new SimpleImmutableEntry<>(null, null);
153
154 private final ConcurrentMap<String, InterProcessMutex> mutexMap = new ConcurrentHashMap<>();
155
156 @VisibleForTesting
157 final ConcurrentMap<String, Entry<InterProcessSemaphoreV2, SettableSharedCount>> semaphoreMap =
158 new ConcurrentHashMap<>();
159
160 @VisibleForTesting
161 final Cache<String, QuotaConfig> writeQuotaCache = Caffeine.newBuilder().maximumSize(2000).build();
162
163 private final ZooKeeperReplicationConfig cfg;
164 private final File revisionFile;
165 private final File zkConfFile;
166 private final File zkDataDir;
167 private final File zkLogDir;
168 private final CommandExecutor delegate;
169 private final MeterRegistry meterRegistry;
170
171 @Nullable
172 private final QuotaConfig writeQuota;
173
174 private MetadataService metadataService;
175
176 private volatile EmbeddedZooKeeper quorumPeer;
177 private volatile CuratorFramework curator;
178 private volatile RetryPolicy retryPolicy = RETRY_POLICY_NEVER;
179 private volatile ExecutorService executor;
180 private volatile ExecutorService logWatcherExecutor;
181 private volatile PathChildrenCache logWatcher;
182 private volatile OldLogRemover oldLogRemover;
183 private volatile ExecutorService leaderSelectorExecutor;
184 private volatile LeaderSelector leaderSelector;
185 private volatile ScheduledExecutorService quotaExecutor;
186 private volatile boolean createdParentNodes;
187
188 private class OldLogRemover implements LeaderSelectorListener {
189 volatile boolean hasLeadership;
190
191 @Override
192 public void stateChanged(CuratorFramework client, ConnectionState newState) {
193
194 }
195
196 @Override
197 public void takeLeadership(CuratorFramework client) throws Exception {
198 final ListenerInfo listenerInfo = ZooKeeperCommandExecutor.this.listenerInfo;
199 if (listenerInfo == null) {
200
201 return;
202 }
203
204 logger.info("Taking leadership: {}", replicaId());
205 try {
206 hasLeadership = true;
207 if (listenerInfo.onTakeLeadership != null) {
208 listenerInfo.onTakeLeadership.run();
209 }
210
211 while (curator.getState() == CuratorFrameworkState.STARTED) {
212 deleteLogs();
213 synchronized (this) {
214 wait();
215 }
216 }
217 } catch (InterruptedException e) {
218
219 } catch (Exception e) {
220 logger.error("Leader stopped due to an unexpected exception:", e);
221 } finally {
222 hasLeadership = false;
223 logger.info("Releasing leadership: {}", replicaId());
224 if (listenerInfo.onReleaseLeadership != null) {
225 listenerInfo.onReleaseLeadership.run();
226 }
227
228 if (ZooKeeperCommandExecutor.this.listenerInfo != null) {
229
230 leaderSelector.requeue();
231 }
232 }
233 }
234
235 public synchronized void touch() {
236 notify();
237 }
238
239 private void deleteLogs() throws Exception {
240 final List<String> children = curator.getChildren().forPath(absolutePath(LOG_PATH));
241 if (children.size() <= cfg.maxLogCount()) {
242 return;
243 }
244
245 final long minAllowedTimestamp = System.currentTimeMillis() - cfg.minLogAgeMillis();
246 final int targetCount = children.size() - cfg.maxLogCount();
247 final List<String> deleted = new ArrayList<>(targetCount);
248 children.sort(Comparator.comparingLong(Long::parseLong));
249 try {
250 for (int i = 0; i < targetCount; ++i) {
251 final String logPath = absolutePath(LOG_PATH, children.get(i));
252 final LogMeta meta = Jackson.readValue(curator.getData().forPath(logPath), LogMeta.class);
253
254 if (meta.timestamp() >= minAllowedTimestamp) {
255
256
257
258 break;
259 }
260
261 deleteLog(curator.transactionOp().delete().forPath(logPath), deleted, children.get(i));
262 for (long blockId : meta.blocks()) {
263 final String blockPath = absolutePath(LOG_BLOCK_PATH) + '/' + pathFromRevision(blockId);
264 deleteLog(curator.transactionOp().delete().forPath(blockPath),
265 deleted, children.get(i));
266 }
267 }
268 } finally {
269 logger.info("delete logs: {}", deleted);
270 }
271 }
272
273 private void deleteLog(CuratorOp curatorOp, List<String> deleted, String childName) {
274 try {
275 curator.transaction().forOperations(curatorOp);
276 deleted.add(childName);
277 } catch (Throwable t) {
278 logger.warn("Failed to delete ZooKeeper log: {}", childName, t);
279 }
280 }
281 }
282
283 private static final class ListenerInfo {
284 long lastReplayedRevision;
285 final Runnable onTakeLeadership;
286 final Runnable onReleaseLeadership;
287
288 ListenerInfo(long lastReplayedRevision,
289 @Nullable Runnable onTakeLeadership, @Nullable Runnable onReleaseLeadership) {
290
291 this.lastReplayedRevision = lastReplayedRevision;
292 this.onReleaseLeadership = onReleaseLeadership;
293 this.onTakeLeadership = onTakeLeadership;
294 }
295 }
296
297 private volatile ListenerInfo listenerInfo;
298
299 public ZooKeeperCommandExecutor(ZooKeeperReplicationConfig cfg,
300 File dataDir, CommandExecutor delegate,
301 MeterRegistry meterRegistry,
302 ProjectManager projectManager,
303 @Nullable QuotaConfig writeQuota,
304 @Nullable Consumer<CommandExecutor> onTakeLeadership,
305 @Nullable Consumer<CommandExecutor> onReleaseLeadership) {
306 super(onTakeLeadership, onReleaseLeadership);
307
308 this.cfg = requireNonNull(cfg, "cfg");
309 requireNonNull(dataDir, "dataDir");
310 revisionFile = new File(dataDir.getAbsolutePath() + File.separatorChar + "last_revision");
311 zkConfFile = new File(dataDir.getAbsolutePath() + File.separatorChar +
312 "_zookeeper" + File.separatorChar + "config.properties");
313 zkDataDir = new File(dataDir.getAbsolutePath() + File.separatorChar +
314 "_zookeeper" + File.separatorChar + "data");
315 zkLogDir = new File(dataDir.getAbsolutePath() + File.separatorChar +
316 "_zookeeper" + File.separatorChar + "log");
317
318 this.delegate = requireNonNull(delegate, "delegate");
319 this.meterRegistry = requireNonNull(meterRegistry, "meterRegistry");
320 this.writeQuota = writeQuota;
321 metadataService = new MetadataService(projectManager, this);
322
323
324 Gauge.builder("replica.id", this, self -> replicaId()).register(meterRegistry);
325 if (cfg.serverConfig().groupId() != null) {
326 Gauge.builder("replica.groupId", this, self -> self.cfg.serverConfig().groupId())
327 .register(meterRegistry);
328 }
329 Gauge.builder("replica.read.only", this, self -> self.isWritable() ? 0 : 1).register(meterRegistry);
330 Gauge.builder("replica.replicating", this, self -> self.isStarted() ? 1 : 0).register(meterRegistry);
331 Gauge.builder("replica.has.leadership", this,
332 self -> {
333 final OldLogRemover remover = self.oldLogRemover;
334 return remover != null && remover.hasLeadership ? 1 : 0;
335 })
336 .register(meterRegistry);
337 Gauge.builder("replica.last.replayed.revision", this,
338 self -> {
339 final ListenerInfo info = self.listenerInfo;
340 if (info == null) {
341 return 0;
342 }
343 return info.lastReplayedRevision;
344 })
345 .register(meterRegistry);
346 }
347
348 @Override
349 public int replicaId() {
350 return cfg.serverId();
351 }
352
353 @Override
354 protected void doStart(@Nullable Runnable onTakeLeadership,
355 @Nullable Runnable onReleaseLeadership) throws Exception {
356 try {
357
358 final long lastReplayedRevision;
359 try {
360 lastReplayedRevision = getLastReplayedRevision();
361 listenerInfo = new ListenerInfo(lastReplayedRevision, onTakeLeadership, onReleaseLeadership);
362 } catch (Exception e) {
363 throw new ReplicationException("failed to read " + revisionFile, e);
364 }
365
366
367 quorumPeer = startZooKeeper();
368 retryPolicy = RETRY_POLICY_ALWAYS;
369
370
371 curator = CuratorFrameworkFactory.newClient(
372 "127.0.0.1:" + quorumPeer.getClientPort(), cfg.timeoutMillis(), cfg.timeoutMillis(),
373 (retryCount, elapsedTimeMs, sleeper) -> {
374 return retryPolicy.allowRetry(retryCount, elapsedTimeMs, sleeper);
375 });
376
377 curator.start();
378
379
380 logWatcherExecutor = ExecutorServiceMetrics.monitor(
381 meterRegistry,
382 Executors.newSingleThreadExecutor(
383 new DefaultThreadFactory("zookeeper-log-watcher", true)),
384 "zkLogWatcher");
385
386 logWatcher = new PathChildrenCache(curator, absolutePath(LOG_PATH),
387 true, false, logWatcherExecutor);
388 logWatcher.getListenable().addListener(this, MoreExecutors.directExecutor());
389 logWatcher.start();
390
391
392 oldLogRemover = new OldLogRemover();
393 leaderSelectorExecutor = ExecutorServiceMetrics.monitor(
394 meterRegistry,
395 Executors.newSingleThreadExecutor(
396 new DefaultThreadFactory("zookeeper-leader-selector", true)),
397 "zkLeaderSelector");
398
399 leaderSelector = new LeaderSelector(curator, absolutePath(LEADER_PATH),
400 leaderSelectorExecutor, oldLogRemover);
401 leaderSelector.start();
402
403
404 delegate.start();
405
406
407 final ThreadPoolExecutor executor = new ThreadPoolExecutor(
408 cfg.numWorkers(), cfg.numWorkers(),
409 60, TimeUnit.SECONDS, new LinkedTransferQueue<>(),
410 new DefaultThreadFactory("zookeeper-command-executor", true));
411 executor.allowCoreThreadTimeOut(true);
412
413 this.executor = ExecutorServiceMetrics.monitor(meterRegistry, executor, "zkCommandExecutor");
414
415 quotaExecutor = ExecutorServiceMetrics.monitor(
416 meterRegistry,
417 Executors.newSingleThreadScheduledExecutor(
418 new DefaultThreadFactory("zookeeper-quota-executor", true)),
419 "quotaExecutor");
420 } catch (InterruptedException | ReplicationException e) {
421 throw e;
422 } catch (Exception e) {
423 throw new ReplicationException(e);
424 }
425 }
426
427 private EmbeddedZooKeeper startZooKeeper() throws Exception {
428 logger.info("Starting the ZooKeeper peer ({}) ..", cfg.serverId());
429 EmbeddedZooKeeper peer = null;
430 boolean success = false;
431 try {
432 final Properties zkProps = new Properties();
433
434
435 copyZkProperty(zkProps, "initLimit", "5");
436 copyZkProperty(zkProps, "syncLimit", "10");
437 copyZkProperty(zkProps, "tickTime", "3000");
438 copyZkProperty(zkProps, "syncEnabled", "true");
439 copyZkProperty(zkProps, "autopurge.snapRetainCount", "3");
440 copyZkProperty(zkProps, "autopurge.purgeInterval", "1");
441 copyZkProperty(zkProps, "quorumListenOnAllIPs", "false");
442
443
444 System.setProperty("zookeeper.fsync.warningthresholdms",
445 cfg.additionalProperties().getOrDefault("fsync.warningthresholdms", "1000"));
446
447
448 zkProps.setProperty("dataDir", zkDataDir.getPath());
449 zkProps.setProperty("dataLogDir", zkLogDir.getPath());
450 zkDataDir.mkdirs();
451 zkLogDir.mkdirs();
452
453
454 try (FileOutputStream out = new FileOutputStream(new File(zkDataDir, "myid"))) {
455 out.write((cfg.serverId() + "\n").getBytes(StandardCharsets.US_ASCII));
456 }
457
458
459
460 final File jaasConfFile = new File(zkDataDir, "jaas.conf");
461 try (FileOutputStream out = new FileOutputStream(jaasConfFile)) {
462 final StringBuilder buf = new StringBuilder();
463 final String newline = System.lineSeparator();
464 final String escapedSecret = jaasValueEscaper.escape(cfg.secret());
465 ImmutableList.of("Server", EmbeddedZooKeeper.SASL_SERVER_LOGIN_CONTEXT).forEach(name -> {
466 buf.append(name).append(" {").append(newline);
467 buf.append(DigestLoginModule.class.getName()).append(" required").append(newline);
468 buf.append("user_super=\"").append(escapedSecret).append("\";").append(newline);
469 buf.append("};").append(newline);
470 });
471 ImmutableList.of("Client", EmbeddedZooKeeper.SASL_LEARNER_LOGIN_CONTEXT).forEach(name -> {
472 buf.append(name).append(" {").append(newline);
473 buf.append(DigestLoginModule.class.getName()).append(" required").append(newline);
474 buf.append("username=\"super\"").append(newline);
475 buf.append("password=\"").append(escapedSecret).append("\";").append(newline);
476 buf.append("};").append(newline);
477 });
478 out.write(buf.toString().getBytes());
479 }
480 System.setProperty("java.security.auth.login.config", jaasConfFile.getAbsolutePath());
481 System.setProperty("zookeeper.authProvider.1", SASLAuthenticationProvider.class.getName());
482
483
484 zkProps.setProperty("clientPort", String.valueOf(cfg.serverConfig().clientPort()));
485
486 final Map<Integer, ZooKeeperServerConfig> servers = cfg.servers();
487
488 boolean hasGroupId = false;
489 for (Entry<Integer, ZooKeeperServerConfig> entry : servers.entrySet()) {
490 final ZooKeeperServerConfig serverConfig = entry.getValue();
491 zkProps.setProperty(
492 "server." + entry.getKey(),
493 serverConfig.host() + ':' + serverConfig.quorumPort() + ':' +
494 serverConfig.electionPort() + ":participant");
495
496 if (!hasGroupId && serverConfig.groupId() != null) {
497 hasGroupId = true;
498 }
499 }
500
501
502 if (hasGroupId) {
503 final ImmutableMultimap.Builder<Integer, Integer> groupBuilder = ImmutableMultimap.builder();
504 boolean isHierarchical = true;
505 for (Entry<Integer, ZooKeeperServerConfig> entry : servers.entrySet()) {
506 final Integer groupId = entry.getValue().groupId();
507 if (groupId == null) {
508 isHierarchical = false;
509 final List<ZooKeeperServerConfig> noGroupIds =
510 servers.values().stream()
511 .filter(serverConfig -> serverConfig.groupId() == null)
512 .collect(toImmutableList());
513 logger.warn("Hierarchical quorums are disabled. 'groupId' are missing in {}",
514 noGroupIds);
515 break;
516 } else {
517 groupBuilder.put(groupId, entry.getKey());
518 }
519 }
520 if (isHierarchical) {
521 groupBuilder.build().asMap().forEach((groupId, serverIds) -> {
522
523 zkProps.setProperty("group." + groupId, colonJoiner.join(serverIds));
524 });
525 servers.forEach((serverId, serverConfig) -> {
526
527 zkProps.setProperty("weight." + serverId, String.valueOf(serverConfig.weight()));
528 });
529 }
530 }
531
532
533 zkProps.setProperty("admin.enableServer", "false");
534
535 zkConfFile.getParentFile().mkdirs();
536 try (FileOutputStream out = new FileOutputStream(zkConfFile)) {
537 zkProps.store(out, null);
538 }
539
540 final QuorumPeerConfig zkCfg = new QuorumPeerConfig();
541 zkCfg.parse(zkConfFile.getPath());
542
543 peer = new EmbeddedZooKeeper(zkCfg, meterRegistry);
544 peer.start();
545
546
547 for (;;) {
548 final ServerState state = peer.getPeerState();
549 if (state == ServerState.FOLLOWING || state == ServerState.LEADING) {
550 break;
551 }
552
553 if (isStopping()) {
554 throw new InterruptedException("Stop requested before joining the cluster");
555 }
556
557 logger.info("Waiting for the ZooKeeper peer ({}) to join the cluster ..", peer.getId());
558 Thread.sleep(1000);
559 }
560
561 if (peer.getId() == peer.getCurrentVote().getId()) {
562 logger.info("The ZooKeeper peer ({}) has joined the cluster as a leader.", peer.getId());
563 } else {
564 logger.info("The ZooKeeper peer ({}) has joined the cluster, following {}.",
565 peer.getId(), peer.getCurrentVote().getId());
566 }
567
568 success = true;
569 return peer;
570 } finally {
571 if (!success && peer != null) {
572 try {
573 peer.shutdown();
574 } catch (Exception e) {
575 logger.warn("Failed to shutdown the failed ZooKeeper peer: {}", e.getMessage(), e);
576 }
577 }
578 }
579 }
580
581 private void copyZkProperty(Properties zkProps, String key, String defaultValue) {
582 zkProps.setProperty(key, cfg.additionalProperties().getOrDefault(key, defaultValue));
583 }
584
585 private void stopLater() {
586
587
588 ForkJoinPool.commonPool().execute(this::stop);
589 }
590
591 @Override
592 protected void doStop(@Nullable Runnable onReleaseLeadership) throws Exception {
593 listenerInfo = null;
594 logger.info("Stopping the worker threads");
595 boolean interrupted = shutdown(executor);
596 logger.info("Stopped the worker threads");
597
598 logger.info("Stopping the quota worker threads");
599 interrupted |= shutdown(quotaExecutor);
600 logger.info("Stopped the quota worker threads");
601 semaphoreMap.clear();
602
603 try {
604 logger.info("Stopping the delegate command executor");
605 delegate.stop();
606 logger.info("Stopped the delegate command executor");
607 } catch (Exception e) {
608 logger.warn("Failed to stop the delegate command executor {}: {}", delegate, e.getMessage(), e);
609 } finally {
610 retryPolicy = RETRY_POLICY_NEVER;
611 try {
612 if (leaderSelector != null) {
613 logger.info("Closing the leader selector");
614 leaderSelector.close();
615 interrupted |= shutdown(leaderSelectorExecutor);
616 logger.info("Closed the leader selector");
617 }
618 } catch (Exception e) {
619 logger.warn("Failed to close the leader selector: {}", e.getMessage(), e);
620 } finally {
621 try {
622 if (logWatcher != null) {
623 logger.info("Closing the log watcher");
624 logWatcher.close();
625 interrupted |= shutdown(logWatcherExecutor);
626 logger.info("Closed the log watcher");
627 }
628 } catch (Exception e) {
629 logger.warn("Failed to close the log watcher: {}", e.getMessage(), e);
630 } finally {
631 try {
632 if (curator != null) {
633 logger.info("Closing the Curator framework");
634 curator.close();
635 logger.info("Closed the Curator framework");
636 }
637 } catch (Exception e) {
638 logger.warn("Failed to close the Curator framework: {}", e.getMessage(), e);
639 } finally {
640 try {
641 if (quorumPeer != null) {
642 final long peerId = quorumPeer.getId();
643 logger.info("Shutting down the ZooKeeper peer ({})", peerId);
644 quorumPeer.shutdown();
645 logger.info("Shut down the ZooKeeper peer ({})", peerId);
646 }
647 } catch (Exception e) {
648 logger.warn("Failed to shut down the ZooKeeper peer: {}", e.getMessage(), e);
649 } finally {
650 if (interrupted) {
651 Thread.currentThread().interrupt();
652 }
653 }
654 }
655 }
656 }
657 }
658 }
659
660 private static boolean shutdown(@Nullable ExecutorService executor) {
661 if (executor == null) {
662 return false;
663 }
664
665 boolean interrupted = false;
666 while (!executor.isTerminated()) {
667 executor.shutdown();
668 try {
669 executor.awaitTermination(1, TimeUnit.SECONDS);
670 } catch (InterruptedException e) {
671
672 interrupted = true;
673 }
674 }
675 return interrupted;
676 }
677
678 private long getLastReplayedRevision() throws Exception {
679 final FileInputStream fis;
680 try {
681 fis = new FileInputStream(revisionFile);
682 } catch (FileNotFoundException ignored) {
683 return -1;
684 }
685
686 try (BufferedReader br = new BufferedReader(new InputStreamReader(fis))) {
687 final String l = br.readLine();
688 if (l == null) {
689 return -1;
690 }
691 return Long.parseLong(l.trim());
692 }
693 }
694
695 private void updateLastReplayedRevision(long lastReplayedRevision) throws Exception {
696 boolean success = false;
697 try (FileOutputStream fos = new FileOutputStream(revisionFile)) {
698 fos.write(String.valueOf(lastReplayedRevision).getBytes(StandardCharsets.UTF_8));
699 success = true;
700 } finally {
701 if (success) {
702 logger.info("Updated lastReplayedRevision to: {}", lastReplayedRevision);
703 } else {
704 logger.error("Failed to update lastReplayedRevision to: {}", lastReplayedRevision);
705 }
706 }
707 }
708
709 private synchronized void replayLogs(long targetRevision) {
710 final ListenerInfo info = listenerInfo;
711 if (info == null) {
712 return;
713 }
714
715 if (targetRevision <= info.lastReplayedRevision) {
716 return;
717 }
718
719 long nextRevision = info.lastReplayedRevision + 1;
720 for (;;) {
721 ReplicationLog<?> l = null;
722 try {
723 final Optional<ReplicationLog<?>> log = loadLog(nextRevision, true);
724 if (log.isPresent()) {
725 l = log.get();
726 final Command<?> command = l.command();
727 final Object expectedResult = l.result();
728 final Object actualResult = delegate.execute(command).get();
729
730 if (!Objects.equals(expectedResult, actualResult)) {
731 throw new ReplicationException(
732 "mismatching replay result at revision " + nextRevision +
733 ": " + actualResult + " (expected: " + expectedResult +
734 ", command: " + command + ')');
735 }
736 if (command instanceof RemoveRepositoryCommand) {
737 clearWriteQuota((RemoveRepositoryCommand) command);
738 }
739 } else {
740
741 }
742
743 updateLastReplayedRevision(nextRevision);
744 info.lastReplayedRevision = nextRevision;
745 if (nextRevision == targetRevision) {
746 break;
747 } else {
748 nextRevision++;
749 }
750 } catch (Throwable t) {
751 if (l != null) {
752 logger.error(
753 "Failed to replay a log at revision {}; entering read-only mode. replay log: {}",
754 nextRevision, l, t);
755 } else {
756 logger.error("Failed to replay a log at revision {}; entering read-only mode.",
757 nextRevision, t);
758 }
759
760 stopLater();
761
762 if (t instanceof ReplicationException) {
763 throw (ReplicationException) t;
764 }
765 final StringBuilder sb = new StringBuilder();
766 sb.append("failed to replay a log at revision " + nextRevision);
767 if (l != null) {
768 sb.append(". replay log: ").append(l);
769 }
770 throw new ReplicationException(sb.toString(), t);
771 }
772 }
773 }
774
775 @Override
776 public void childEvent(CuratorFramework unused, PathChildrenCacheEvent event) throws Exception {
777 if (event.getType() != PathChildrenCacheEvent.Type.CHILD_ADDED) {
778 return;
779 }
780
781 final long lastKnownRevision = revisionFromPath(event.getData().getPath());
782 try {
783 replayLogs(lastKnownRevision);
784 } catch (ReplicationException ignored) {
785
786 return;
787 }
788
789 oldLogRemover.touch();
790 }
791
792 private SafeCloseable safeLock(Command<?> command) {
793 final String executionPath = command.executionPath();
794 final InterProcessMutex mtx = mutexMap.computeIfAbsent(
795 executionPath, k -> new InterProcessMutex(curator, absolutePath(LOCK_PATH, k)));
796
797 WriteLock writeLock = null;
798 boolean lockTimeout = false;
799 try {
800
801 if (!mtx.acquire(10, TimeUnit.SECONDS)) {
802 lockTimeout = true;
803 throw new ReplicationException(
804 "Failed to acquire a lock for " + executionPath + " in 10 seconds");
805 }
806 if (command instanceof NormalizingPushCommand) {
807 writeLock = acquireWriteLock((NormalizingPushCommand) command);
808 } else if (command instanceof RemoveRepositoryCommand) {
809 clearWriteQuota((RemoveRepositoryCommand) command);
810 }
811 } catch (Exception e) {
812 if (lockTimeout) {
813 return Exceptions.throwUnsafely(e);
814 }
815
816 logger.error("Failed to acquire a lock for {}; entering read-only mode", executionPath, e);
817 stopLater();
818 throw new ReplicationException("failed to acquire a lock for " + executionPath, e);
819 }
820
821 if (writeLock != null) {
822 scheduleWriteLockRelease(writeLock, mtx, executionPath);
823 }
824
825 return () -> safeRelease(mtx);
826 }
827
828 private void clearWriteQuota(RemoveRepositoryCommand command) {
829 final String cacheKey = rateLimiterKey(command.projectName(), command.repositoryName());
830 semaphoreMap.remove(cacheKey);
831 writeQuotaCache.invalidate(cacheKey);
832 }
833
834 private static void safeRelease(InterProcessMutex mtx) {
835 try {
836 mtx.release();
837 } catch (Exception ignored) {
838
839 }
840 }
841
842 @Nullable
843 private WriteLock acquireWriteLock(NormalizingPushCommand command) throws Exception {
844 if (command.projectName().equals(INTERNAL_PROJECT_DOGMA) ||
845 command.repositoryName().equals(Project.REPO_DOGMA)) {
846
847 return null;
848 }
849
850 QuotaConfig writeQuota = writeQuotaCache.getIfPresent(command.executionPath());
851 if (writeQuota == UNLIMITED_QUOTA) {
852 return null;
853 }
854
855 if (writeQuota == null) {
856
857 final RepositoryMetadata meta;
858 try {
859 meta = metadataService.getRepo(command.projectName(), command.repositoryName()).get();
860 } catch (InterruptedException | ExecutionException e) {
861 throw new CentralDogmaException("Unexpected exception caught while retrieving " +
862 RepositoryMetadata.class.getSimpleName(), e);
863 }
864
865 writeQuota = meta.writeQuota();
866 }
867
868 if (writeQuota == null) {
869
870 writeQuota = this.writeQuota;
871 }
872
873 setWriteQuota(command.projectName(), command.repositoryName(), writeQuota);
874 final Entry<InterProcessSemaphoreV2, SettableSharedCount> entry =
875 semaphoreMap.get(rateLimiterKey(command.projectName(), command.repositoryName()));
876 if (entry == UNLIMITED_SEMAPHORE) {
877
878 return null;
879 }
880
881 assert writeQuota != null;
882 final InterProcessSemaphoreV2 semaphore = entry.getKey();
883 final Lease lease = semaphore.acquire(200, TimeUnit.MILLISECONDS);
884 return new WriteLock(semaphore, lease, writeQuota);
885 }
886
887 @Override
888 public void setWriteQuota(String projectName, String repoName, @Nullable QuotaConfig writeQuota) {
889 final QuotaConfig quota = firstNonNull(writeQuota, UNLIMITED_QUOTA);
890 writeQuotaCache.put(rateLimiterKey(projectName, repoName), quota);
891
892 semaphoreMap.compute(rateLimiterKey(projectName, repoName), (k, v) -> {
893 if (quota == UNLIMITED_QUOTA) {
894 return UNLIMITED_SEMAPHORE;
895 }
896
897 final int requestQuota = quota.requestQuota();
898 if (v == null || v == UNLIMITED_SEMAPHORE) {
899 final SettableSharedCount cnt = new SettableSharedCount(requestQuota);
900 return new SimpleImmutableEntry<>(
901 new InterProcessSemaphoreV2(curator, absolutePath(QUOTA_PATH, k), cnt), cnt);
902 }
903
904 final SettableSharedCount count = v.getValue();
905 if (count.getCount() != requestQuota) {
906 count.setCount(requestQuota);
907 }
908 return v;
909 });
910 }
911
912 private static String rateLimiterKey(String projectName, String repoName) {
913 return projectName + '/' + repoName;
914 }
915
916 private void scheduleWriteLockRelease(WriteLock writeLock, InterProcessMutex mtx, String executionPath) {
917 final Lease lease = writeLock.lease;
918 final QuotaConfig writeQuota = writeLock.writeQuota;
919 if (lease == null) {
920 safeRelease(mtx);
921 throw new TooManyRequestsException("commits", executionPath, writeQuota.permitsPerSecond());
922 } else {
923 quotaExecutor.schedule(() -> writeLock.semaphore.returnLease(lease),
924 writeQuota.timeWindowSeconds(), TimeUnit.SECONDS);
925 }
926 }
927
928 private static String path(String... pathElements) {
929 final StringBuilder sb = new StringBuilder();
930 for (String path : pathElements) {
931 if (path.startsWith("/")) {
932 path = path.substring(1);
933 }
934
935 if (path.endsWith("/")) {
936 path = path.substring(0, path.length() - 1);
937 }
938 if (!path.isEmpty()) {
939 sb.append('/');
940 sb.append(path);
941 }
942 }
943 return sb.toString();
944 }
945
946 private static String absolutePath(String... pathElements) {
947 if (pathElements.length == 0) {
948 return PATH_PREFIX;
949 }
950 return path(PATH_PREFIX, path(pathElements));
951 }
952
953 private static class LogMeta {
954
955 private final int replicaId;
956 private final long timestamp;
957 private final int size;
958 private final List<Long> blocks = new ArrayList<>();
959
960 @JsonCreator
961 LogMeta(@JsonProperty(value = "replicaId", required = true) int replicaId,
962 @JsonProperty(value = "timestamp", defaultValue = "0") Long timestamp,
963 @JsonProperty("size") int size) {
964 this.replicaId = replicaId;
965 if (timestamp == null) {
966 timestamp = 0L;
967 }
968 this.timestamp = timestamp;
969 this.size = size;
970 }
971
972 @JsonProperty
973 int replicaId() {
974 return replicaId;
975 }
976
977 @JsonProperty
978 long timestamp() {
979 return timestamp;
980 }
981
982 @JsonProperty
983 int size() {
984 return size;
985 }
986
987 @JsonProperty
988 List<Long> blocks() {
989 return Collections.unmodifiableList(blocks);
990 }
991
992 public void appendBlock(long blockId) {
993 blocks.add(blockId);
994 }
995 }
996
997 private long storeLog(ReplicationLog<?> log) {
998 try {
999 final byte[] bytes = Jackson.writeValueAsBytes(log);
1000 assert bytes.length > 0;
1001
1002 final LogMeta logMeta = new LogMeta(log.replicaId(), System.currentTimeMillis(), bytes.length);
1003
1004 final int count = (bytes.length + MAX_BYTES - 1) / MAX_BYTES;
1005 for (int i = 0; i < count; ++i) {
1006 final int start = i * MAX_BYTES;
1007 final int end = Math.min((i + 1) * MAX_BYTES, bytes.length);
1008 final byte[] b = Arrays.copyOfRange(bytes, start, end);
1009 final String blockPath = curator.create()
1010 .withMode(CreateMode.PERSISTENT_SEQUENTIAL)
1011 .forPath(absolutePath(LOG_BLOCK_PATH) + '/', b);
1012 final long blockId = revisionFromPath(blockPath);
1013 logMeta.appendBlock(blockId);
1014 }
1015
1016 final String logPath =
1017 curator.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL)
1018 .forPath(absolutePath(LOG_PATH) + '/', Jackson.writeValueAsBytes(logMeta));
1019
1020 return revisionFromPath(logPath);
1021 } catch (Exception e) {
1022 logger.error("Failed to store a log; entering read-only mode: {}", log, e);
1023 stopLater();
1024 throw new ReplicationException("failed to store a log: " + log, e);
1025 }
1026 }
1027
1028 @VisibleForTesting
1029 Optional<ReplicationLog<?>> loadLog(long revision, boolean skipIfSameReplica) {
1030 try {
1031 createParentNodes();
1032
1033 final String logPath = absolutePath(LOG_PATH) + '/' + pathFromRevision(revision);
1034
1035 final LogMeta logMeta = Jackson.readValue(curator.getData().forPath(logPath), LogMeta.class);
1036
1037 if (skipIfSameReplica && replicaId() == logMeta.replicaId()) {
1038 return Optional.empty();
1039 }
1040
1041 final byte[] bytes = new byte[logMeta.size()];
1042 int offset = 0;
1043 for (long blockId : logMeta.blocks()) {
1044 final String blockPath = absolutePath(LOG_BLOCK_PATH) + '/' + pathFromRevision(blockId);
1045 final byte[] b = curator.getData().forPath(blockPath);
1046 System.arraycopy(b, 0, bytes, offset, b.length);
1047 offset += b.length;
1048 }
1049 assert logMeta.size() == offset;
1050
1051 final ReplicationLog<?> log = Jackson.readValue(bytes, ReplicationLog.class);
1052 return Optional.of(log);
1053 } catch (Exception e) {
1054 logger.error("Failed to load a log at revision {}; entering read-only mode", revision, e);
1055 stopLater();
1056 throw new ReplicationException("failed to load a log at revision " + revision, e);
1057 }
1058 }
1059
1060 private static long revisionFromPath(String path) {
1061 final String[] s = path.split("/");
1062 return Long.parseLong(s[s.length - 1]);
1063 }
1064
1065 private static String pathFromRevision(long revision) {
1066 return String.format("%010d", revision);
1067 }
1068
1069
1070 @Override
1071 protected <T> CompletableFuture<T> doExecute(Command<T> command) throws Exception {
1072 final CompletableFuture<T> future = new CompletableFuture<>();
1073 executor.execute(() -> {
1074 try {
1075 future.complete(blockingExecute(command));
1076 } catch (Throwable t) {
1077 future.completeExceptionally(t);
1078 }
1079 });
1080 return future;
1081 }
1082
1083 private <T> T blockingExecute(Command<T> command) throws Exception {
1084 createParentNodes();
1085
1086 try (SafeCloseable ignored = safeLock(command)) {
1087
1088
1089
1090
1091
1092
1093
1094 final List<String> recentRevisions = curator.getChildren().forPath(absolutePath(LOG_PATH));
1095 if (!recentRevisions.isEmpty()) {
1096 final long lastRevision = recentRevisions.stream().mapToLong(Long::parseLong).max().getAsLong();
1097 replayLogs(lastRevision);
1098 }
1099
1100 final T result = delegate.execute(command).get();
1101 final ReplicationLog<?> log;
1102 if (command.type() == CommandType.NORMALIZING_PUSH) {
1103 final NormalizingPushCommand normalizingPushCommand = (NormalizingPushCommand) command;
1104 assert result instanceof CommitResult : result;
1105 final CommitResult commitResult = (CommitResult) result;
1106 final Command<Revision> pushAsIsCommand = normalizingPushCommand.asIs(commitResult);
1107 log = new ReplicationLog<>(replicaId(), pushAsIsCommand, commitResult.revision());
1108 } else if (command.type() == CommandType.FORCE_PUSH &&
1109 ((ForcePushCommand<?>) command).delegate().type() == CommandType.NORMALIZING_PUSH) {
1110 final NormalizingPushCommand delegated =
1111 (NormalizingPushCommand) ((ForcePushCommand<?>) command).delegate();
1112 final CommitResult commitResult = (CommitResult) result;
1113 final Command<Revision> command0 = Command.forcePush(delegated.asIs(commitResult));
1114 log = new ReplicationLog<>(replicaId(), command0, commitResult.revision());
1115 } else {
1116 if (command.type() == CommandType.UPDATE_SERVER_STATUS) {
1117 final UpdateServerStatusCommand command0 = (UpdateServerStatusCommand) command;
1118 final boolean writable = command0.writable();
1119 final boolean wasWritable = isWritable();
1120 setWritable(writable);
1121 if (writable != wasWritable) {
1122 if (writable) {
1123 logger.warn("Left read-only mode.");
1124 } else {
1125 logger.warn("Entered read-only mode.");
1126 }
1127 }
1128 }
1129
1130 log = new ReplicationLog<>(replicaId(), command, result);
1131 }
1132
1133
1134 final long revision = storeLog(log);
1135
1136 logger.debug("logging OK. revision = {}, log = {}", revision, log);
1137 return result;
1138 }
1139 }
1140
1141 private void createParentNodes() throws Exception {
1142 if (createdParentNodes) {
1143 return;
1144 }
1145
1146
1147 createZkPathIfMissing(absolutePath());
1148 createZkPathIfMissing(absolutePath(LOG_PATH));
1149 createZkPathIfMissing(absolutePath(LOG_BLOCK_PATH));
1150 createZkPathIfMissing(absolutePath(LOCK_PATH));
1151
1152 createdParentNodes = true;
1153 }
1154
1155 private void createZkPathIfMissing(String zkPath) throws Exception {
1156 try {
1157 curator.create().forPath(zkPath);
1158 } catch (KeeperException.NodeExistsException ignored) {
1159
1160 }
1161 }
1162
1163 @VisibleForTesting
1164 void setMetadataService(MetadataService metadataService) {
1165 this.metadataService = metadataService;
1166 }
1167
1168 private static final class WriteLock {
1169 private final InterProcessSemaphoreV2 semaphore;
1170 private final Lease lease;
1171 private final QuotaConfig writeQuota;
1172
1173 private WriteLock(InterProcessSemaphoreV2 semaphore, Lease lease, QuotaConfig writeQuota) {
1174 this.semaphore = semaphore;
1175 this.lease = lease;
1176 this.writeQuota = writeQuota;
1177 }
1178 }
1179 }