1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.linecorp.centraldogma.server.command;
18
19 import static java.util.Objects.requireNonNull;
20
21 import java.util.concurrent.CompletableFuture;
22 import java.util.concurrent.CompletionStage;
23 import java.util.concurrent.ForkJoinPool;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.function.Consumer;
26
27 import javax.annotation.Nullable;
28
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import com.google.common.base.MoreObjects;
33
34 import com.linecorp.armeria.common.util.Exceptions;
35 import com.linecorp.armeria.common.util.StartStopSupport;
36 import com.linecorp.centraldogma.common.ReadOnlyException;
37
38
39
40
41 public abstract class AbstractCommandExecutor implements CommandExecutor {
42
43 private static final Logger logger = LoggerFactory.getLogger(AbstractCommandExecutor.class);
44
45 @Nullable
46 private final Consumer<CommandExecutor> onTakeLeadership;
47 @Nullable
48 private final Consumer<CommandExecutor> onReleaseLeadership;
49
50 private final CommandExecutorStartStop startStop = new CommandExecutorStartStop();
51 private volatile boolean started;
52 private volatile boolean writable = true;
53 private final AtomicInteger numPendingStopRequests = new AtomicInteger();
54 private final CommandExecutorStatusManager statusManager;
55
56
57
58
59
60
61
62 protected AbstractCommandExecutor(@Nullable Consumer<CommandExecutor> onTakeLeadership,
63 @Nullable Consumer<CommandExecutor> onReleaseLeadership) {
64 this.onTakeLeadership = onTakeLeadership;
65 this.onReleaseLeadership = onReleaseLeadership;
66 statusManager = new CommandExecutorStatusManager(this);
67 }
68
69 @Override
70 public final boolean isStarted() {
71 return started;
72 }
73
74 protected final boolean isStopping() {
75 return numPendingStopRequests.get() > 0;
76 }
77
78 @Override
79 public final CompletableFuture<Void> start() {
80 return startStop.start(false).thenRun(() -> {
81 started = true;
82 if (!writable) {
83 logger.warn("Started a command executor with read-only mode.");
84 }
85 });
86 }
87
88 protected abstract void doStart(@Nullable Runnable onTakeLeadership,
89 @Nullable Runnable onReleaseLeadership) throws Exception;
90
91 @Override
92 public final CompletableFuture<Void> stop() {
93 started = false;
94 numPendingStopRequests.incrementAndGet();
95 return startStop.stop().thenRun(numPendingStopRequests::decrementAndGet);
96 }
97
98 protected abstract void doStop(@Nullable Runnable onReleaseLeadership) throws Exception;
99
100 @Override
101 public final boolean isWritable() {
102 return isStarted() && writable;
103 }
104
105 @Override
106 public final void setWritable(boolean writable) {
107 this.writable = writable;
108 }
109
110 @Override
111 public final <T> CompletableFuture<T> execute(Command<T> command) {
112 requireNonNull(command, "command");
113 if (!isStarted()) {
114 throw new ReadOnlyException("running in read-only mode. command: " + command);
115 }
116 if (!writable && !(command instanceof AdministrativeCommand)) {
117
118
119
120 throw new ReadOnlyException("running in read-only mode. command: " + command);
121 }
122
123 try {
124 return doExecute(command);
125 } catch (Throwable t) {
126 final CompletableFuture<T> f = new CompletableFuture<>();
127 f.completeExceptionally(t);
128 return f;
129 }
130 }
131
132 protected abstract <T> CompletableFuture<T> doExecute(Command<T> command) throws Exception;
133
134 @Override
135 public CommandExecutorStatusManager statusManager() {
136 return statusManager;
137 }
138
139 @Override
140 public String toString() {
141 return MoreObjects.toStringHelper(this)
142 .add("writable", isWritable())
143 .add("replicating", started)
144 .toString();
145 }
146
147 private final class CommandExecutorStartStop extends StartStopSupport<Void, Void, Void, Void> {
148
149 CommandExecutorStartStop() {
150 super(ForkJoinPool.commonPool());
151 }
152
153 @Override
154 protected CompletionStage<Void> doStart(@Nullable Void unused) throws Exception {
155 return execute("command-executor", () -> {
156 try {
157 AbstractCommandExecutor.this.doStart(toRunnable(onTakeLeadership),
158 toRunnable(onReleaseLeadership));
159 } catch (Exception e) {
160 Exceptions.throwUnsafely(e);
161 }
162 });
163 }
164
165 @Override
166 protected CompletionStage<Void> doStop(@Nullable Void unused) throws Exception {
167 return execute("command-executor-shutdown", () -> {
168 try {
169 AbstractCommandExecutor.this.doStop(toRunnable(onReleaseLeadership));
170 } catch (Exception e) {
171 Exceptions.throwUnsafely(e);
172 }
173 });
174 }
175
176 @Nullable
177 private Runnable toRunnable(@Nullable Consumer<CommandExecutor> callback) {
178 return callback != null ? () -> callback.accept(AbstractCommandExecutor.this) : null;
179 }
180
181 private CompletionStage<Void> execute(String threadNamePrefix, Runnable task) {
182 final CompletableFuture<Void> future = new CompletableFuture<>();
183 final String threadName = threadNamePrefix + "-0x" +
184 Long.toHexString(AbstractCommandExecutor.this.hashCode() & 0xFFFFFFFFL);
185 final Thread thread = new Thread(() -> {
186 try {
187 task.run();
188 future.complete(null);
189 } catch (Throwable cause) {
190 future.completeExceptionally(cause);
191 }
192 }, threadName);
193 thread.setContextClassLoader(CommandExecutorStartStop.class.getClassLoader());
194 thread.start();
195 return future;
196 }
197 }
198 }