1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.linecorp.centraldogma.server.command;
18  
19  import static java.util.Objects.requireNonNull;
20  
21  import java.util.concurrent.CompletableFuture;
22  import java.util.concurrent.CompletionStage;
23  import java.util.concurrent.ForkJoinPool;
24  import java.util.concurrent.atomic.AtomicInteger;
25  import java.util.function.Consumer;
26  
27  import javax.annotation.Nullable;
28  
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  
32  import com.google.common.base.MoreObjects;
33  
34  import com.linecorp.armeria.common.util.Exceptions;
35  import com.linecorp.armeria.common.util.StartStopSupport;
36  import com.linecorp.centraldogma.common.ReadOnlyException;
37  
38  /**
39   * Helps to implement a concrete {@link CommandExecutor}.
40   */
41  public abstract class AbstractCommandExecutor implements CommandExecutor {
42  
43      private static final Logger logger = LoggerFactory.getLogger(AbstractCommandExecutor.class);
44  
45      @Nullable
46      private final Consumer<CommandExecutor> onTakeLeadership;
47      @Nullable
48      private final Consumer<CommandExecutor> onReleaseLeadership;
49  
50      private final CommandExecutorStartStop startStop = new CommandExecutorStartStop();
51      private volatile boolean started;
52      private volatile boolean writable = true;
53      private final AtomicInteger numPendingStopRequests = new AtomicInteger();
54      private final CommandExecutorStatusManager statusManager;
55  
56      /**
57       * Creates a new instance.
58       *
59       * @param onTakeLeadership the callback to be invoked after the replica has taken the leadership
60       * @param onReleaseLeadership the callback to be invoked before the replica releases the leadership
61       */
62      protected AbstractCommandExecutor(@Nullable Consumer<CommandExecutor> onTakeLeadership,
63                                        @Nullable Consumer<CommandExecutor> onReleaseLeadership) {
64          this.onTakeLeadership = onTakeLeadership;
65          this.onReleaseLeadership = onReleaseLeadership;
66          statusManager = new CommandExecutorStatusManager(this);
67      }
68  
69      @Override
70      public final boolean isStarted() {
71          return started;
72      }
73  
74      protected final boolean isStopping() {
75          return numPendingStopRequests.get() > 0;
76      }
77  
78      @Override
79      public final CompletableFuture<Void> start() {
80          return startStop.start(false).thenRun(() -> {
81              started = true;
82              if (!writable) {
83                  logger.warn("Started a command executor with read-only mode.");
84              }
85          });
86      }
87  
88      protected abstract void doStart(@Nullable Runnable onTakeLeadership,
89                                      @Nullable Runnable onReleaseLeadership) throws Exception;
90  
91      @Override
92      public final CompletableFuture<Void> stop() {
93          started = false;
94          numPendingStopRequests.incrementAndGet();
95          return startStop.stop().thenRun(numPendingStopRequests::decrementAndGet);
96      }
97  
98      protected abstract void doStop(@Nullable Runnable onReleaseLeadership) throws Exception;
99  
100     @Override
101     public final boolean isWritable() {
102         return isStarted() && writable;
103     }
104 
105     @Override
106     public final void setWritable(boolean writable) {
107         this.writable = writable;
108     }
109 
110     @Override
111     public final <T> CompletableFuture<T> execute(Command<T> command) {
112         requireNonNull(command, "command");
113         if (!isStarted()) {
114             throw new ReadOnlyException("running in read-only mode. command: " + command);
115         }
116         if (!writable && !(command instanceof AdministrativeCommand)) {
117             // Reject all commands except for AdministrativeCommand when the replica is in read-only mode.
118             // AdministrativeCommand is allowed because it is used to change the read-only mode or migrate
119             // metadata under maintenance mode.
120             throw new ReadOnlyException("running in read-only mode. command: " + command);
121         }
122 
123         try {
124             return doExecute(command);
125         } catch (Throwable t) {
126             final CompletableFuture<T> f = new CompletableFuture<>();
127             f.completeExceptionally(t);
128             return f;
129         }
130     }
131 
132     protected abstract <T> CompletableFuture<T> doExecute(Command<T> command) throws Exception;
133 
134     @Override
135     public CommandExecutorStatusManager statusManager() {
136         return statusManager;
137     }
138 
139     @Override
140     public String toString() {
141         return MoreObjects.toStringHelper(this)
142                           .add("writable", isWritable())
143                           .add("replicating", started)
144                           .toString();
145     }
146 
147     private final class CommandExecutorStartStop extends StartStopSupport<Void, Void, Void, Void> {
148 
149         CommandExecutorStartStop() {
150             super(ForkJoinPool.commonPool());
151         }
152 
153         @Override
154         protected CompletionStage<Void> doStart(@Nullable Void unused) throws Exception {
155             return execute("command-executor", () -> {
156                 try {
157                     AbstractCommandExecutor.this.doStart(toRunnable(onTakeLeadership),
158                                                          toRunnable(onReleaseLeadership));
159                 } catch (Exception e) {
160                     Exceptions.throwUnsafely(e);
161                 }
162             });
163         }
164 
165         @Override
166         protected CompletionStage<Void> doStop(@Nullable Void unused) throws Exception {
167             return execute("command-executor-shutdown", () -> {
168                 try {
169                     AbstractCommandExecutor.this.doStop(toRunnable(onReleaseLeadership));
170                 } catch (Exception e) {
171                     Exceptions.throwUnsafely(e);
172                 }
173             });
174         }
175 
176         @Nullable
177         private Runnable toRunnable(@Nullable Consumer<CommandExecutor> callback) {
178             return callback != null ? () -> callback.accept(AbstractCommandExecutor.this) : null;
179         }
180 
181         private CompletionStage<Void> execute(String threadNamePrefix, Runnable task) {
182             final CompletableFuture<Void> future = new CompletableFuture<>();
183             final String threadName = threadNamePrefix + "-0x" +
184                                       Long.toHexString(AbstractCommandExecutor.this.hashCode() & 0xFFFFFFFFL);
185             final Thread thread = new Thread(() -> {
186                 try {
187                     task.run();
188                     future.complete(null);
189                 } catch (Throwable cause) {
190                     future.completeExceptionally(cause);
191                 }
192             }, threadName);
193             thread.setContextClassLoader(CommandExecutorStartStop.class.getClassLoader());
194             thread.start();
195             return future;
196         }
197     }
198 }