1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.linecorp.centraldogma.server.command;
18  
19  import static java.util.Objects.requireNonNull;
20  
21  import java.util.concurrent.CompletableFuture;
22  import java.util.concurrent.CompletionStage;
23  import java.util.concurrent.ForkJoinPool;
24  import java.util.concurrent.atomic.AtomicInteger;
25  import java.util.function.BiFunction;
26  import java.util.function.Consumer;
27  
28  import javax.annotation.Nullable;
29  
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  
33  import com.google.common.base.MoreObjects;
34  
35  import com.linecorp.armeria.common.util.Exceptions;
36  import com.linecorp.armeria.common.util.StartStopSupport;
37  import com.linecorp.centraldogma.common.ReadOnlyException;
38  import com.linecorp.centraldogma.server.metadata.RepositoryMetadata;
39  
40  /**
41   * Helps to implement a concrete {@link CommandExecutor}.
42   */
43  public abstract class AbstractCommandExecutor implements CommandExecutor {
44  
45      private static final Logger logger = LoggerFactory.getLogger(AbstractCommandExecutor.class);
46  
47      @Nullable
48      private final Consumer<CommandExecutor> onTakeLeadership;
49      @Nullable
50      private final Consumer<CommandExecutor> onReleaseLeadership;
51      @Nullable
52      private final Consumer<CommandExecutor> onTakeZoneLeadership;
53      @Nullable
54      private final Consumer<CommandExecutor> onReleaseZoneLeadership;
55  
56      private final CommandExecutorStartStop startStop = new CommandExecutorStartStop();
57      private volatile boolean started;
58      private volatile boolean writable = true;
59      private final AtomicInteger numPendingStopRequests = new AtomicInteger();
60      private final CommandExecutorStatusManager statusManager;
61  
62      @Nullable
63      private BiFunction<String, String, CompletableFuture<RepositoryMetadata>> repositoryMetadataSupplier;
64  
65      /**
66       * Creates a new instance.
67       *
68       * @param onTakeLeadership the callback to be invoked after the replica has taken the leadership
69       * @param onReleaseLeadership the callback to be invoked before the replica releases the leadership
70       * @param onTakeZoneLeadership the callback to be invoked after the replica has taken the zone leadership
71       * @param onReleaseZoneLeadership the callback to be invoked before the replica releases the zone leadership
72       */
73      protected AbstractCommandExecutor(@Nullable Consumer<CommandExecutor> onTakeLeadership,
74                                        @Nullable Consumer<CommandExecutor> onReleaseLeadership,
75                                        @Nullable Consumer<CommandExecutor> onTakeZoneLeadership,
76                                        @Nullable Consumer<CommandExecutor> onReleaseZoneLeadership) {
77          this.onTakeLeadership = onTakeLeadership;
78          this.onReleaseLeadership = onReleaseLeadership;
79          this.onTakeZoneLeadership = onTakeZoneLeadership;
80          this.onReleaseZoneLeadership = onReleaseZoneLeadership;
81          statusManager = new CommandExecutorStatusManager(this);
82      }
83  
84      @Override
85      public final boolean isStarted() {
86          return started;
87      }
88  
89      protected final boolean isStopping() {
90          return numPendingStopRequests.get() > 0;
91      }
92  
93      @Override
94      public final CompletableFuture<Void> start() {
95          return startStop.start(false).thenRun(() -> {
96              started = true;
97              if (!writable) {
98                  logger.warn("Started a command executor with read-only mode.");
99              }
100         });
101     }
102 
103     protected abstract void doStart(@Nullable Runnable onTakeLeadership,
104                                     @Nullable Runnable onReleaseLeadership,
105                                     @Nullable Runnable onTakeZoneLeadership,
106                                     @Nullable Runnable onReleaseZoneLeadership) throws Exception;
107 
108     @Override
109     public final CompletableFuture<Void> stop() {
110         started = false;
111         numPendingStopRequests.incrementAndGet();
112         return startStop.stop().thenRun(numPendingStopRequests::decrementAndGet);
113     }
114 
115     protected abstract void doStop(@Nullable Runnable onReleaseLeadership,
116                                    @Nullable Runnable onReleaseZoneLeadership) throws Exception;
117 
118     @Override
119     public final boolean isWritable() {
120         return isStarted() && writable;
121     }
122 
123     @Override
124     public final void setWritable(boolean writable) {
125         this.writable = writable;
126     }
127 
128     @Override
129     public void setRepositoryMetadataSupplier(
130             BiFunction<String, String, CompletableFuture<RepositoryMetadata>> supplier) {
131         repositoryMetadataSupplier = requireNonNull(supplier, "supplier");
132     }
133 
134     @Override
135     public final <T> CompletableFuture<T> execute(Command<T> command) {
136         requireNonNull(command, "command");
137         if (!isStarted()) {
138             throw new ReadOnlyException("running in read-only mode. command: " + command);
139         }
140         if (!writable && !(command instanceof SystemAdministrativeCommand)) {
141             // Reject all commands except for AdministrativeCommand when the replica is in read-only mode.
142             // AdministrativeCommand is allowed because it is used to change the read-only mode or migrate
143             // metadata under maintenance mode.
144             throw new ReadOnlyException("running in read-only mode. command: " + command);
145         }
146 
147         try {
148             return doExecute(command);
149         } catch (Throwable t) {
150             final CompletableFuture<T> f = new CompletableFuture<>();
151             f.completeExceptionally(t);
152             return f;
153         }
154     }
155 
156     protected abstract <T> CompletableFuture<T> doExecute(Command<T> command) throws Exception;
157 
158     @Override
159     public CommandExecutorStatusManager statusManager() {
160         return statusManager;
161     }
162 
163     @Override
164     public String toString() {
165         return MoreObjects.toStringHelper(this)
166                           .add("writable", isWritable())
167                           .add("replicating", started)
168                           .toString();
169     }
170 
171     private final class CommandExecutorStartStop extends StartStopSupport<Void, Void, Void, Void> {
172 
173         CommandExecutorStartStop() {
174             super(ForkJoinPool.commonPool());
175         }
176 
177         @Override
178         protected CompletionStage<Void> doStart(@Nullable Void unused) throws Exception {
179             return execute("command-executor", () -> {
180                 try {
181                     AbstractCommandExecutor.this.doStart(toRunnable(onTakeLeadership),
182                                                          toRunnable(onReleaseLeadership),
183                                                          toRunnable(onTakeZoneLeadership),
184                                                          toRunnable(onReleaseZoneLeadership));
185                 } catch (Exception e) {
186                     Exceptions.throwUnsafely(e);
187                 }
188             });
189         }
190 
191         @Override
192         protected CompletionStage<Void> doStop(@Nullable Void unused) throws Exception {
193             return execute("command-executor-shutdown", () -> {
194                 try {
195                     AbstractCommandExecutor.this.doStop(toRunnable(onReleaseLeadership),
196                                                         toRunnable(onReleaseZoneLeadership));
197                 } catch (Exception e) {
198                     Exceptions.throwUnsafely(e);
199                 }
200             });
201         }
202 
203         @Nullable
204         private Runnable toRunnable(@Nullable Consumer<CommandExecutor> callback) {
205             return callback != null ? () -> callback.accept(AbstractCommandExecutor.this) : null;
206         }
207 
208         private CompletionStage<Void> execute(String threadNamePrefix, Runnable task) {
209             final CompletableFuture<Void> future = new CompletableFuture<>();
210             final String threadName = threadNamePrefix + "-0x" +
211                                       Long.toHexString(AbstractCommandExecutor.this.hashCode() & 0xFFFFFFFFL);
212             final Thread thread = new Thread(() -> {
213                 try {
214                     task.run();
215                     future.complete(null);
216                 } catch (Throwable cause) {
217                     future.completeExceptionally(cause);
218                 }
219             }, threadName);
220             thread.setContextClassLoader(CommandExecutorStartStop.class.getClassLoader());
221             thread.start();
222             return future;
223         }
224     }
225 }