1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.server.command;
17
18 import static java.util.Objects.requireNonNull;
19
20 import java.util.concurrent.CompletableFuture;
21 import java.util.concurrent.Executor;
22 import java.util.function.Consumer;
23
24 import javax.annotation.Nullable;
25
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 import com.linecorp.centraldogma.common.ReadOnlyException;
30 import com.linecorp.centraldogma.common.RepositoryStatus;
31 import com.linecorp.centraldogma.server.auth.Session;
32 import com.linecorp.centraldogma.server.auth.SessionManager;
33 import com.linecorp.centraldogma.server.management.ServerStatusManager;
34 import com.linecorp.centraldogma.server.metadata.ProjectMetadata;
35 import com.linecorp.centraldogma.server.metadata.RepositoryMetadata;
36 import com.linecorp.centraldogma.server.storage.project.InternalProjectInitializer;
37 import com.linecorp.centraldogma.server.storage.project.Project;
38 import com.linecorp.centraldogma.server.storage.project.ProjectManager;
39 import com.linecorp.centraldogma.server.storage.repository.MetaRepository;
40 import com.linecorp.centraldogma.server.storage.repository.Repository;
41
42
43
44
45 public class StandaloneCommandExecutor extends AbstractCommandExecutor {
46
47 private static final Logger logger = LoggerFactory.getLogger(StandaloneCommandExecutor.class);
48
49 private final ProjectManager projectManager;
50 private final Executor repositoryWorker;
51 @Nullable
52 private final SessionManager sessionManager;
53 private final ServerStatusManager serverStatusManager;
54
55
56
57
58
59
60
61
62
63
64
65
66 public StandaloneCommandExecutor(ProjectManager projectManager,
67 Executor repositoryWorker,
68 ServerStatusManager serverStatusManager,
69 @Nullable SessionManager sessionManager,
70 @Nullable Consumer<CommandExecutor> onTakeLeadership,
71 @Nullable Consumer<CommandExecutor> onReleaseLeadership,
72 @Nullable Consumer<CommandExecutor> onTakeZoneLeadership,
73 @Nullable Consumer<CommandExecutor> onReleaseZoneLeadership) {
74 super(onTakeLeadership, onReleaseLeadership, onTakeZoneLeadership, onReleaseZoneLeadership);
75 this.projectManager = requireNonNull(projectManager, "projectManager");
76 this.repositoryWorker = requireNonNull(repositoryWorker, "repositoryWorker");
77 this.serverStatusManager = requireNonNull(serverStatusManager, "serverStatusManager");
78 this.sessionManager = sessionManager;
79 }
80
81 @Override
82 public int replicaId() {
83 return 0;
84 }
85
86 @Override
87 protected void doStart(@Nullable Runnable onTakeLeadership,
88 @Nullable Runnable onReleaseLeadership,
89 @Nullable Runnable onTakeZoneLeadership,
90 @Nullable Runnable onReleaseZoneLeadership) {
91 if (onTakeLeadership != null) {
92 onTakeLeadership.run();
93 }
94 if (onTakeZoneLeadership != null) {
95 onTakeZoneLeadership.run();
96 }
97 }
98
99 @Override
100 protected void doStop(@Nullable Runnable onReleaseLeadership, @Nullable Runnable onReleaseZoneLeadership) {
101 if (onReleaseLeadership != null) {
102 onReleaseLeadership.run();
103 }
104 if (onReleaseZoneLeadership != null) {
105 onReleaseZoneLeadership.run();
106 }
107 }
108
109 @Override
110 protected <T> CompletableFuture<T> doExecute(Command<T> command) throws Exception {
111 throwExceptionIfRepositoryNotWritable(command);
112 return doExecute0(command);
113 }
114
115 private void throwExceptionIfRepositoryNotWritable(Command<?> command) throws Exception {
116 if (command instanceof NormalizableCommit) {
117 assert command instanceof RepositoryCommand;
118 final RepositoryCommand<?> repositoryCommand = (RepositoryCommand<?>) command;
119 if (InternalProjectInitializer.INTERNAL_PROJECT_DOGMA.equals(repositoryCommand.projectName())) {
120 return;
121 }
122 String repoName = repositoryCommand.repositoryName();
123 if (Project.REPO_META.equals(repoName)) {
124
125 repoName = Project.REPO_DOGMA;
126 }
127
128 final ProjectMetadata metadata = projectManager.get(repositoryCommand.projectName()).metadata();
129 assert metadata != null;
130 final RepositoryMetadata repositoryMetadata = metadata.repos().get(repoName);
131 if (repositoryMetadata == null) {
132
133 return;
134 }
135 if (repositoryMetadata.status() == RepositoryStatus.READ_ONLY) {
136 throw new ReadOnlyException(
137 "The repository is in read-only. command: " + repositoryCommand);
138 }
139 }
140 }
141
142 @SuppressWarnings("unchecked")
143 private <T> CompletableFuture<T> doExecute0(Command<T> command) throws Exception {
144 if (command instanceof CreateProjectCommand) {
145 return (CompletableFuture<T>) createProject((CreateProjectCommand) command);
146 }
147
148 if (command instanceof ResetMetaRepositoryCommand) {
149 return (CompletableFuture<T>) resetMetaRepository((ResetMetaRepositoryCommand) command);
150 }
151
152 if (command instanceof RemoveProjectCommand) {
153 return (CompletableFuture<T>) removeProject((RemoveProjectCommand) command);
154 }
155
156 if (command instanceof UnremoveProjectCommand) {
157 return (CompletableFuture<T>) unremoveProject((UnremoveProjectCommand) command);
158 }
159
160 if (command instanceof PurgeProjectCommand) {
161 return (CompletableFuture<T>) purgeProject((PurgeProjectCommand) command);
162 }
163
164 if (command instanceof CreateRepositoryCommand) {
165 return (CompletableFuture<T>) createRepository((CreateRepositoryCommand) command);
166 }
167
168 if (command instanceof RemoveRepositoryCommand) {
169 return (CompletableFuture<T>) removeRepository((RemoveRepositoryCommand) command);
170 }
171
172 if (command instanceof UnremoveRepositoryCommand) {
173 return (CompletableFuture<T>) unremoveRepository((UnremoveRepositoryCommand) command);
174 }
175
176 if (command instanceof PurgeRepositoryCommand) {
177 return (CompletableFuture<T>) purgeRepository((PurgeRepositoryCommand) command);
178 }
179
180 if (command instanceof NormalizingPushCommand) {
181 return (CompletableFuture<T>) push((NormalizingPushCommand) command, true);
182 }
183
184 if (command instanceof PushAsIsCommand) {
185 return (CompletableFuture<T>) push((PushAsIsCommand) command, false)
186 .thenApply(CommitResult::revision);
187 }
188
189 if (command instanceof TransformCommand) {
190 return (CompletableFuture<T>) push((TransformCommand) command, true);
191 }
192
193 if (command instanceof CreateSessionCommand) {
194 return (CompletableFuture<T>) createSession((CreateSessionCommand) command);
195 }
196
197 if (command instanceof RemoveSessionCommand) {
198 return (CompletableFuture<T>) removeSession((RemoveSessionCommand) command);
199 }
200
201 if (command instanceof UpdateServerStatusCommand) {
202 return (CompletableFuture<T>) updateServerStatus((UpdateServerStatusCommand) command);
203 }
204
205 if (command instanceof ForcePushCommand) {
206
207
208 return doExecute0(((ForcePushCommand<T>) command).delegate());
209 }
210
211 throw new UnsupportedOperationException(command.toString());
212 }
213
214
215
216 private CompletableFuture<Void> createProject(CreateProjectCommand c) {
217 return CompletableFuture.supplyAsync(() -> {
218 projectManager.create(c.projectName(), c.timestamp(), c.author());
219 return null;
220 }, repositoryWorker);
221 }
222
223 private CompletableFuture<Void> removeProject(RemoveProjectCommand c) {
224 return CompletableFuture.supplyAsync(() -> {
225 projectManager.remove(c.projectName());
226 return null;
227 }, repositoryWorker);
228 }
229
230 private CompletableFuture<Void> unremoveProject(UnremoveProjectCommand c) {
231 return CompletableFuture.supplyAsync(() -> {
232 projectManager.unremove(c.projectName());
233 return null;
234 }, repositoryWorker);
235 }
236
237 private CompletableFuture<Void> purgeProject(PurgeProjectCommand c) {
238 return CompletableFuture.supplyAsync(() -> {
239 projectManager.markForPurge(c.projectName());
240 return null;
241 }, repositoryWorker);
242 }
243
244 private CompletableFuture<Void> resetMetaRepository(ResetMetaRepositoryCommand command) {
245 return CompletableFuture.supplyAsync(() -> {
246 final Project project = projectManager.get(command.projectName());
247 if (project == null) {
248 throw new IllegalStateException("Project not found: " + command.projectName());
249 }
250 final MetaRepository metaRepository = project.resetMetaRepository();
251 if (!Project.REPO_DOGMA.equals(metaRepository.name())) {
252 logger.warn("Meta repository name is not changed in {}. meta repo: {}",
253 project.name(), metaRepository.name());
254 }
255 return null;
256 }, repositoryWorker);
257 }
258
259
260
261 private CompletableFuture<Void> createRepository(CreateRepositoryCommand c) {
262 return CompletableFuture.supplyAsync(() -> {
263 projectManager.get(c.projectName()).repos().create(c.repositoryName(), c.timestamp(), c.author());
264 return null;
265 }, repositoryWorker);
266 }
267
268 private CompletableFuture<Void> removeRepository(RemoveRepositoryCommand c) {
269 return CompletableFuture.supplyAsync(() -> {
270 projectManager.get(c.projectName()).repos().remove(c.repositoryName());
271 return null;
272 }, repositoryWorker);
273 }
274
275 private CompletableFuture<Void> unremoveRepository(UnremoveRepositoryCommand c) {
276 return CompletableFuture.supplyAsync(() -> {
277 projectManager.get(c.projectName()).repos().unremove(c.repositoryName());
278 return null;
279 }, repositoryWorker);
280 }
281
282 private CompletableFuture<Void> purgeRepository(PurgeRepositoryCommand c) {
283 return CompletableFuture.supplyAsync(() -> {
284 projectManager.get(c.projectName()).repos().markForPurge(c.repositoryName());
285 return null;
286 }, repositoryWorker);
287 }
288
289 private CompletableFuture<CommitResult> push(RepositoryCommand<?> c, boolean normalizing) {
290 if (c instanceof TransformCommand) {
291 final TransformCommand transformCommand = (TransformCommand) c;
292 return repo(c).commit(transformCommand.baseRevision(), transformCommand.timestamp(),
293 transformCommand.author(), transformCommand.summary(),
294 transformCommand.detail(), transformCommand.markup(),
295 transformCommand.transformer());
296 }
297 assert c instanceof AbstractPushCommand;
298 final AbstractPushCommand<?> pushCommand = (AbstractPushCommand<?>) c;
299 return repo(c).commit(pushCommand.baseRevision(), pushCommand.timestamp(), pushCommand.author(),
300 pushCommand.summary(), pushCommand.detail(), pushCommand.markup(),
301 pushCommand.changes(), normalizing);
302 }
303
304 private Repository repo(RepositoryCommand<?> c) {
305 return projectManager.get(c.projectName()).repos().get(c.repositoryName());
306 }
307
308 private CompletableFuture<Void> createSession(CreateSessionCommand c) {
309 if (sessionManager == null) {
310
311 return CompletableFuture.completedFuture(null);
312 }
313
314 final Session session = c.session();
315 return sessionManager.create(session).exceptionally(cause -> {
316 logger.warn("Failed to replicate a session creation: {}", session, cause);
317 return null;
318 });
319 }
320
321 private CompletableFuture<Void> removeSession(RemoveSessionCommand c) {
322 if (sessionManager == null) {
323 return CompletableFuture.completedFuture(null);
324 }
325
326 final String sessionId = c.sessionId();
327 return sessionManager.delete(sessionId).exceptionally(cause -> {
328 logger.warn("Failed to replicate a session removal: {}", sessionId, cause);
329 return null;
330 });
331 }
332
333 private CompletableFuture<Void> updateServerStatus(UpdateServerStatusCommand c) {
334 return CompletableFuture.supplyAsync(() -> {
335 serverStatusManager.updateStatus(c.serverStatus());
336 statusManager().updateStatus(c);
337 return null;
338 }, serverStatusManager.sequentialExecutor());
339 }
340 }