1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.linecorp.centraldogma.server.command;
18
19 import static java.util.Objects.requireNonNull;
20
21 import java.util.concurrent.CompletableFuture;
22 import java.util.concurrent.CompletionStage;
23 import java.util.concurrent.ForkJoinPool;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.function.BiFunction;
26 import java.util.function.Consumer;
27
28 import javax.annotation.Nullable;
29
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import com.google.common.base.MoreObjects;
34
35 import com.linecorp.armeria.common.util.Exceptions;
36 import com.linecorp.armeria.common.util.StartStopSupport;
37 import com.linecorp.centraldogma.common.ReadOnlyException;
38 import com.linecorp.centraldogma.server.metadata.RepositoryMetadata;
39
40
41
42
43 public abstract class AbstractCommandExecutor implements CommandExecutor {
44
45 private static final Logger logger = LoggerFactory.getLogger(AbstractCommandExecutor.class);
46
47 @Nullable
48 private final Consumer<CommandExecutor> onTakeLeadership;
49 @Nullable
50 private final Consumer<CommandExecutor> onReleaseLeadership;
51 @Nullable
52 private final Consumer<CommandExecutor> onTakeZoneLeadership;
53 @Nullable
54 private final Consumer<CommandExecutor> onReleaseZoneLeadership;
55
56 private final CommandExecutorStartStop startStop = new CommandExecutorStartStop();
57 private volatile boolean started;
58 private volatile boolean writable = true;
59 private final AtomicInteger numPendingStopRequests = new AtomicInteger();
60 private final CommandExecutorStatusManager statusManager;
61
62 @Nullable
63 private BiFunction<String, String, CompletableFuture<RepositoryMetadata>> repositoryMetadataSupplier;
64
65
66
67
68
69
70
71
72
73 protected AbstractCommandExecutor(@Nullable Consumer<CommandExecutor> onTakeLeadership,
74 @Nullable Consumer<CommandExecutor> onReleaseLeadership,
75 @Nullable Consumer<CommandExecutor> onTakeZoneLeadership,
76 @Nullable Consumer<CommandExecutor> onReleaseZoneLeadership) {
77 this.onTakeLeadership = onTakeLeadership;
78 this.onReleaseLeadership = onReleaseLeadership;
79 this.onTakeZoneLeadership = onTakeZoneLeadership;
80 this.onReleaseZoneLeadership = onReleaseZoneLeadership;
81 statusManager = new CommandExecutorStatusManager(this);
82 }
83
84 @Override
85 public final boolean isStarted() {
86 return started;
87 }
88
89 protected final boolean isStopping() {
90 return numPendingStopRequests.get() > 0;
91 }
92
93 @Override
94 public final CompletableFuture<Void> start() {
95 return startStop.start(false).thenRun(() -> {
96 started = true;
97 if (!writable) {
98 logger.warn("Started a command executor with read-only mode.");
99 }
100 });
101 }
102
103 protected abstract void doStart(@Nullable Runnable onTakeLeadership,
104 @Nullable Runnable onReleaseLeadership,
105 @Nullable Runnable onTakeZoneLeadership,
106 @Nullable Runnable onReleaseZoneLeadership) throws Exception;
107
108 @Override
109 public final CompletableFuture<Void> stop() {
110 started = false;
111 numPendingStopRequests.incrementAndGet();
112 return startStop.stop().thenRun(numPendingStopRequests::decrementAndGet);
113 }
114
115 protected abstract void doStop(@Nullable Runnable onReleaseLeadership,
116 @Nullable Runnable onReleaseZoneLeadership) throws Exception;
117
118 @Override
119 public final boolean isWritable() {
120 return isStarted() && writable;
121 }
122
123 @Override
124 public final void setWritable(boolean writable) {
125 this.writable = writable;
126 }
127
128 @Override
129 public void setRepositoryMetadataSupplier(
130 BiFunction<String, String, CompletableFuture<RepositoryMetadata>> supplier) {
131 repositoryMetadataSupplier = requireNonNull(supplier, "supplier");
132 }
133
134 @Override
135 public final <T> CompletableFuture<T> execute(Command<T> command) {
136 requireNonNull(command, "command");
137 if (!isStarted()) {
138 throw new ReadOnlyException("running in read-only mode. command: " + command);
139 }
140 if (!writable && !(command instanceof SystemAdministrativeCommand)) {
141
142
143
144 throw new ReadOnlyException("running in read-only mode. command: " + command);
145 }
146
147 try {
148 return doExecute(command);
149 } catch (Throwable t) {
150 final CompletableFuture<T> f = new CompletableFuture<>();
151 f.completeExceptionally(t);
152 return f;
153 }
154 }
155
156 protected abstract <T> CompletableFuture<T> doExecute(Command<T> command) throws Exception;
157
158 @Override
159 public CommandExecutorStatusManager statusManager() {
160 return statusManager;
161 }
162
163 @Override
164 public String toString() {
165 return MoreObjects.toStringHelper(this)
166 .add("writable", isWritable())
167 .add("replicating", started)
168 .toString();
169 }
170
171 private final class CommandExecutorStartStop extends StartStopSupport<Void, Void, Void, Void> {
172
173 CommandExecutorStartStop() {
174 super(ForkJoinPool.commonPool());
175 }
176
177 @Override
178 protected CompletionStage<Void> doStart(@Nullable Void unused) throws Exception {
179 return execute("command-executor", () -> {
180 try {
181 AbstractCommandExecutor.this.doStart(toRunnable(onTakeLeadership),
182 toRunnable(onReleaseLeadership),
183 toRunnable(onTakeZoneLeadership),
184 toRunnable(onReleaseZoneLeadership));
185 } catch (Exception e) {
186 Exceptions.throwUnsafely(e);
187 }
188 });
189 }
190
191 @Override
192 protected CompletionStage<Void> doStop(@Nullable Void unused) throws Exception {
193 return execute("command-executor-shutdown", () -> {
194 try {
195 AbstractCommandExecutor.this.doStop(toRunnable(onReleaseLeadership),
196 toRunnable(onReleaseZoneLeadership));
197 } catch (Exception e) {
198 Exceptions.throwUnsafely(e);
199 }
200 });
201 }
202
203 @Nullable
204 private Runnable toRunnable(@Nullable Consumer<CommandExecutor> callback) {
205 return callback != null ? () -> callback.accept(AbstractCommandExecutor.this) : null;
206 }
207
208 private CompletionStage<Void> execute(String threadNamePrefix, Runnable task) {
209 final CompletableFuture<Void> future = new CompletableFuture<>();
210 final String threadName = threadNamePrefix + "-0x" +
211 Long.toHexString(AbstractCommandExecutor.this.hashCode() & 0xFFFFFFFFL);
212 final Thread thread = new Thread(() -> {
213 try {
214 task.run();
215 future.complete(null);
216 } catch (Throwable cause) {
217 future.completeExceptionally(cause);
218 }
219 }, threadName);
220 thread.setContextClassLoader(CommandExecutorStartStop.class.getClassLoader());
221 thread.start();
222 return future;
223 }
224 }
225 }