1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.server.command;
17  
18  import static java.util.Objects.requireNonNull;
19  
20  import java.util.concurrent.CompletableFuture;
21  import java.util.concurrent.Executor;
22  import java.util.function.Consumer;
23  
24  import javax.annotation.Nullable;
25  
26  import org.slf4j.Logger;
27  import org.slf4j.LoggerFactory;
28  
29  import com.linecorp.centraldogma.common.ReadOnlyException;
30  import com.linecorp.centraldogma.common.RepositoryStatus;
31  import com.linecorp.centraldogma.server.auth.Session;
32  import com.linecorp.centraldogma.server.auth.SessionManager;
33  import com.linecorp.centraldogma.server.management.ServerStatusManager;
34  import com.linecorp.centraldogma.server.metadata.ProjectMetadata;
35  import com.linecorp.centraldogma.server.metadata.RepositoryMetadata;
36  import com.linecorp.centraldogma.server.storage.project.InternalProjectInitializer;
37  import com.linecorp.centraldogma.server.storage.project.Project;
38  import com.linecorp.centraldogma.server.storage.project.ProjectManager;
39  import com.linecorp.centraldogma.server.storage.repository.MetaRepository;
40  import com.linecorp.centraldogma.server.storage.repository.Repository;
41  
42  /**
43   * A {@link CommandExecutor} implementation which performs operations on the local storage.
44   */
45  public class StandaloneCommandExecutor extends AbstractCommandExecutor {
46  
47      private static final Logger logger = LoggerFactory.getLogger(StandaloneCommandExecutor.class);
48  
49      private final ProjectManager projectManager;
50      private final Executor repositoryWorker;
51      @Nullable
52      private final SessionManager sessionManager;
53      private final ServerStatusManager serverStatusManager;
54  
55      /**
56       * Creates a new instance.
57       *
58       * @param projectManager the project manager for accessing the storage
59       * @param repositoryWorker the executor which is used for performing storage operations
60       * @param sessionManager the session manager for creating/removing a session
61       * @param onTakeLeadership the callback to be invoked after the replica has taken the leadership
62       * @param onReleaseLeadership the callback to be invoked before the replica releases the leadership
63       * @param onTakeZoneLeadership the callback to be invoked after the replica has taken the zone leadership
64       * @param onReleaseZoneLeadership the callback to be invoked before the replica releases the zone leadership
65       */
66      public StandaloneCommandExecutor(ProjectManager projectManager,
67                                       Executor repositoryWorker,
68                                       ServerStatusManager serverStatusManager,
69                                       @Nullable SessionManager sessionManager,
70                                       @Nullable Consumer<CommandExecutor> onTakeLeadership,
71                                       @Nullable Consumer<CommandExecutor> onReleaseLeadership,
72                                       @Nullable Consumer<CommandExecutor> onTakeZoneLeadership,
73                                       @Nullable Consumer<CommandExecutor> onReleaseZoneLeadership) {
74          super(onTakeLeadership, onReleaseLeadership, onTakeZoneLeadership, onReleaseZoneLeadership);
75          this.projectManager = requireNonNull(projectManager, "projectManager");
76          this.repositoryWorker = requireNonNull(repositoryWorker, "repositoryWorker");
77          this.serverStatusManager = requireNonNull(serverStatusManager, "serverStatusManager");
78          this.sessionManager = sessionManager;
79      }
80  
81      @Override
82      public int replicaId() {
83          return 0;
84      }
85  
86      @Override
87      protected void doStart(@Nullable Runnable onTakeLeadership,
88                             @Nullable Runnable onReleaseLeadership,
89                             @Nullable Runnable onTakeZoneLeadership,
90                             @Nullable Runnable onReleaseZoneLeadership) {
91          if (onTakeLeadership != null) {
92              onTakeLeadership.run();
93          }
94          if (onTakeZoneLeadership != null) {
95              onTakeZoneLeadership.run();
96          }
97      }
98  
99      @Override
100     protected void doStop(@Nullable Runnable onReleaseLeadership, @Nullable Runnable onReleaseZoneLeadership) {
101         if (onReleaseLeadership != null) {
102             onReleaseLeadership.run();
103         }
104         if (onReleaseZoneLeadership != null) {
105             onReleaseZoneLeadership.run();
106         }
107     }
108 
109     @Override
110     protected <T> CompletableFuture<T> doExecute(Command<T> command) throws Exception {
111         throwExceptionIfRepositoryNotWritable(command);
112         return doExecute0(command);
113     }
114 
115     private void throwExceptionIfRepositoryNotWritable(Command<?> command) throws Exception {
116         if (command instanceof NormalizableCommit) {
117             assert command instanceof RepositoryCommand;
118             final RepositoryCommand<?> repositoryCommand = (RepositoryCommand<?>) command;
119             if (InternalProjectInitializer.INTERNAL_PROJECT_DOGMA.equals(repositoryCommand.projectName())) {
120                 return;
121             }
122             String repoName = repositoryCommand.repositoryName();
123             if (Project.REPO_META.equals(repoName)) {
124                 // Use REPO_DOGMA for the meta repository because the meta repository will be removed.
125                 repoName = Project.REPO_DOGMA;
126             }
127 
128             final ProjectMetadata metadata = projectManager.get(repositoryCommand.projectName()).metadata();
129             assert metadata != null;
130             final RepositoryMetadata repositoryMetadata = metadata.repos().get(repoName);
131             if (repositoryMetadata == null) {
132                 // The repository metadata is not found, so it is writable.
133                 return;
134             }
135             if (repositoryMetadata.status() == RepositoryStatus.READ_ONLY) {
136                 throw new ReadOnlyException(
137                         "The repository is in read-only. command: " + repositoryCommand);
138             }
139         }
140     }
141 
142     @SuppressWarnings("unchecked")
143     private <T> CompletableFuture<T> doExecute0(Command<T> command) throws Exception {
144         if (command instanceof CreateProjectCommand) {
145             return (CompletableFuture<T>) createProject((CreateProjectCommand) command);
146         }
147 
148         if (command instanceof ResetMetaRepositoryCommand) {
149             return (CompletableFuture<T>) resetMetaRepository((ResetMetaRepositoryCommand) command);
150         }
151 
152         if (command instanceof RemoveProjectCommand) {
153             return (CompletableFuture<T>) removeProject((RemoveProjectCommand) command);
154         }
155 
156         if (command instanceof UnremoveProjectCommand) {
157             return (CompletableFuture<T>) unremoveProject((UnremoveProjectCommand) command);
158         }
159 
160         if (command instanceof PurgeProjectCommand) {
161             return (CompletableFuture<T>) purgeProject((PurgeProjectCommand) command);
162         }
163 
164         if (command instanceof CreateRepositoryCommand) {
165             return (CompletableFuture<T>) createRepository((CreateRepositoryCommand) command);
166         }
167 
168         if (command instanceof RemoveRepositoryCommand) {
169             return (CompletableFuture<T>) removeRepository((RemoveRepositoryCommand) command);
170         }
171 
172         if (command instanceof UnremoveRepositoryCommand) {
173             return (CompletableFuture<T>) unremoveRepository((UnremoveRepositoryCommand) command);
174         }
175 
176         if (command instanceof PurgeRepositoryCommand) {
177             return (CompletableFuture<T>) purgeRepository((PurgeRepositoryCommand) command);
178         }
179 
180         if (command instanceof NormalizingPushCommand) {
181             return (CompletableFuture<T>) push((NormalizingPushCommand) command, true);
182         }
183 
184         if (command instanceof PushAsIsCommand) {
185             return (CompletableFuture<T>) push((PushAsIsCommand) command, false)
186                     .thenApply(CommitResult::revision);
187         }
188 
189         if (command instanceof TransformCommand) {
190             return (CompletableFuture<T>) push((TransformCommand) command, true);
191         }
192 
193         if (command instanceof CreateSessionCommand) {
194             return (CompletableFuture<T>) createSession((CreateSessionCommand) command);
195         }
196 
197         if (command instanceof RemoveSessionCommand) {
198             return (CompletableFuture<T>) removeSession((RemoveSessionCommand) command);
199         }
200 
201         if (command instanceof UpdateServerStatusCommand) {
202             return (CompletableFuture<T>) updateServerStatus((UpdateServerStatusCommand) command);
203         }
204 
205         if (command instanceof ForcePushCommand) {
206             // TODO(minwoox): Should we prevent executing when the replication status is READ_ONLY?
207             //noinspection TailRecursion
208             return doExecute0(((ForcePushCommand<T>) command).delegate());
209         }
210 
211         throw new UnsupportedOperationException(command.toString());
212     }
213 
214     // Project operations
215 
216     private CompletableFuture<Void> createProject(CreateProjectCommand c) {
217         return CompletableFuture.supplyAsync(() -> {
218             projectManager.create(c.projectName(), c.timestamp(), c.author());
219             return null;
220         }, repositoryWorker);
221     }
222 
223     private CompletableFuture<Void> removeProject(RemoveProjectCommand c) {
224         return CompletableFuture.supplyAsync(() -> {
225             projectManager.remove(c.projectName());
226             return null;
227         }, repositoryWorker);
228     }
229 
230     private CompletableFuture<Void> unremoveProject(UnremoveProjectCommand c) {
231         return CompletableFuture.supplyAsync(() -> {
232             projectManager.unremove(c.projectName());
233             return null;
234         }, repositoryWorker);
235     }
236 
237     private CompletableFuture<Void> purgeProject(PurgeProjectCommand c) {
238         return CompletableFuture.supplyAsync(() -> {
239             projectManager.markForPurge(c.projectName());
240             return null;
241         }, repositoryWorker);
242     }
243 
244     private CompletableFuture<Void> resetMetaRepository(ResetMetaRepositoryCommand command) {
245         return CompletableFuture.supplyAsync(() -> {
246             final Project project = projectManager.get(command.projectName());
247             if (project == null) {
248                 throw new IllegalStateException("Project not found: " + command.projectName());
249             }
250             final MetaRepository metaRepository = project.resetMetaRepository();
251             if (!Project.REPO_DOGMA.equals(metaRepository.name())) {
252                 logger.warn("Meta repository name is not changed in {}. meta repo: {}",
253                             project.name(), metaRepository.name());
254             }
255             return null;
256         }, repositoryWorker);
257     }
258 
259     // Repository operations
260 
261     private CompletableFuture<Void> createRepository(CreateRepositoryCommand c) {
262         return CompletableFuture.supplyAsync(() -> {
263             projectManager.get(c.projectName()).repos().create(c.repositoryName(), c.timestamp(), c.author());
264             return null;
265         }, repositoryWorker);
266     }
267 
268     private CompletableFuture<Void> removeRepository(RemoveRepositoryCommand c) {
269         return CompletableFuture.supplyAsync(() -> {
270             projectManager.get(c.projectName()).repos().remove(c.repositoryName());
271             return null;
272         }, repositoryWorker);
273     }
274 
275     private CompletableFuture<Void> unremoveRepository(UnremoveRepositoryCommand c) {
276         return CompletableFuture.supplyAsync(() -> {
277             projectManager.get(c.projectName()).repos().unremove(c.repositoryName());
278             return null;
279         }, repositoryWorker);
280     }
281 
282     private CompletableFuture<Void> purgeRepository(PurgeRepositoryCommand c) {
283         return CompletableFuture.supplyAsync(() -> {
284             projectManager.get(c.projectName()).repos().markForPurge(c.repositoryName());
285             return null;
286         }, repositoryWorker);
287     }
288 
289     private CompletableFuture<CommitResult> push(RepositoryCommand<?> c, boolean normalizing) {
290         if (c instanceof TransformCommand) {
291             final TransformCommand transformCommand = (TransformCommand) c;
292             return repo(c).commit(transformCommand.baseRevision(), transformCommand.timestamp(),
293                                   transformCommand.author(), transformCommand.summary(),
294                                   transformCommand.detail(), transformCommand.markup(),
295                                   transformCommand.transformer());
296         }
297         assert c instanceof AbstractPushCommand;
298         final AbstractPushCommand<?> pushCommand = (AbstractPushCommand<?>) c;
299         return repo(c).commit(pushCommand.baseRevision(), pushCommand.timestamp(), pushCommand.author(),
300                               pushCommand.summary(), pushCommand.detail(), pushCommand.markup(),
301                               pushCommand.changes(), normalizing);
302     }
303 
304     private Repository repo(RepositoryCommand<?> c) {
305         return projectManager.get(c.projectName()).repos().get(c.repositoryName());
306     }
307 
308     private CompletableFuture<Void> createSession(CreateSessionCommand c) {
309         if (sessionManager == null) {
310             // Security has been disabled for this replica.
311             return CompletableFuture.completedFuture(null);
312         }
313 
314         final Session session = c.session();
315         return sessionManager.create(session).exceptionally(cause -> {
316             logger.warn("Failed to replicate a session creation: {}", session, cause);
317             return null;
318         });
319     }
320 
321     private CompletableFuture<Void> removeSession(RemoveSessionCommand c) {
322         if (sessionManager == null) {
323             return CompletableFuture.completedFuture(null);
324         }
325 
326         final String sessionId = c.sessionId();
327         return sessionManager.delete(sessionId).exceptionally(cause -> {
328             logger.warn("Failed to replicate a session removal: {}", sessionId, cause);
329             return null;
330         });
331     }
332 
333     private CompletableFuture<Void> updateServerStatus(UpdateServerStatusCommand c) {
334         return CompletableFuture.supplyAsync(() -> {
335             serverStatusManager.updateStatus(c.serverStatus());
336             statusManager().updateStatus(c);
337             return null;
338         }, serverStatusManager.sequentialExecutor());
339     }
340 }