1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.server.command;
17
18 import static com.linecorp.centraldogma.server.internal.storage.project.ProjectInitializer.INTERNAL_PROJECT_DOGMA;
19 import static java.util.Objects.requireNonNull;
20
21 import java.util.Map;
22 import java.util.concurrent.CompletableFuture;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.Executor;
25 import java.util.function.Consumer;
26
27 import javax.annotation.Nullable;
28
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import com.cronutils.utils.VisibleForTesting;
33 import com.google.common.util.concurrent.RateLimiter;
34 import com.spotify.futures.CompletableFutures;
35
36 import com.linecorp.centraldogma.common.TooManyRequestsException;
37 import com.linecorp.centraldogma.server.QuotaConfig;
38 import com.linecorp.centraldogma.server.auth.Session;
39 import com.linecorp.centraldogma.server.auth.SessionManager;
40 import com.linecorp.centraldogma.server.metadata.MetadataService;
41 import com.linecorp.centraldogma.server.storage.project.Project;
42 import com.linecorp.centraldogma.server.storage.project.ProjectManager;
43 import com.linecorp.centraldogma.server.storage.repository.Repository;
44
45
46
47
48 public class StandaloneCommandExecutor extends AbstractCommandExecutor {
49
50 private static final Logger logger = LoggerFactory.getLogger(StandaloneCommandExecutor.class);
51
52 private static final RateLimiter UNLIMITED = RateLimiter.create(Double.MAX_VALUE);
53
54 private final ProjectManager projectManager;
55 private final Executor repositoryWorker;
56 @Nullable
57 private final SessionManager sessionManager;
58
59 private final double permitsPerSecond;
60 private final MetadataService metadataService;
61
62 @VisibleForTesting
63 final Map<String, RateLimiter> writeRateLimiters;
64
65
66
67
68
69
70
71
72
73
74
75 public StandaloneCommandExecutor(ProjectManager projectManager,
76 Executor repositoryWorker,
77 @Nullable SessionManager sessionManager,
78 @Nullable QuotaConfig writeQuota,
79 @Nullable Consumer<CommandExecutor> onTakeLeadership,
80 @Nullable Consumer<CommandExecutor> onReleaseLeadership) {
81 this(projectManager, repositoryWorker, sessionManager,
82 writeQuota != null ? writeQuota.permitsPerSecond() : 0,
83 onTakeLeadership, onReleaseLeadership);
84 }
85
86
87
88
89
90
91
92
93
94
95 public StandaloneCommandExecutor(ProjectManager projectManager,
96 Executor repositoryWorker,
97 @Nullable SessionManager sessionManager,
98 @Nullable Consumer<CommandExecutor> onTakeLeadership,
99 @Nullable Consumer<CommandExecutor> onReleaseLeadership) {
100 this(projectManager, repositoryWorker, sessionManager, -1, onTakeLeadership, onReleaseLeadership);
101 }
102
103 private StandaloneCommandExecutor(ProjectManager projectManager,
104 Executor repositoryWorker,
105 @Nullable SessionManager sessionManager,
106 double permitsPerSecond,
107 @Nullable Consumer<CommandExecutor> onTakeLeadership,
108 @Nullable Consumer<CommandExecutor> onReleaseLeadership) {
109 super(onTakeLeadership, onReleaseLeadership);
110 this.projectManager = requireNonNull(projectManager, "projectManager");
111 this.repositoryWorker = requireNonNull(repositoryWorker, "repositoryWorker");
112 this.sessionManager = sessionManager;
113 this.permitsPerSecond = permitsPerSecond;
114 writeRateLimiters = new ConcurrentHashMap<>();
115 metadataService = new MetadataService(projectManager, this);
116 }
117
118 @Override
119 public int replicaId() {
120 return 0;
121 }
122
123 @Override
124 protected void doStart(@Nullable Runnable onTakeLeadership,
125 @Nullable Runnable onReleaseLeadership) {
126 if (onTakeLeadership != null) {
127 onTakeLeadership.run();
128 }
129 }
130
131 @Override
132 protected void doStop(@Nullable Runnable onReleaseLeadership) {
133 if (onReleaseLeadership != null) {
134 onReleaseLeadership.run();
135 }
136 }
137
138 @Override
139 @SuppressWarnings("unchecked")
140 protected <T> CompletableFuture<T> doExecute(Command<T> command) throws Exception {
141 if (command instanceof CreateProjectCommand) {
142 return (CompletableFuture<T>) createProject((CreateProjectCommand) command);
143 }
144
145 if (command instanceof RemoveProjectCommand) {
146 return (CompletableFuture<T>) removeProject((RemoveProjectCommand) command);
147 }
148
149 if (command instanceof UnremoveProjectCommand) {
150 return (CompletableFuture<T>) unremoveProject((UnremoveProjectCommand) command);
151 }
152
153 if (command instanceof PurgeProjectCommand) {
154 return (CompletableFuture<T>) purgeProject((PurgeProjectCommand) command);
155 }
156
157 if (command instanceof CreateRepositoryCommand) {
158 return (CompletableFuture<T>) createRepository((CreateRepositoryCommand) command);
159 }
160
161 if (command instanceof RemoveRepositoryCommand) {
162 return (CompletableFuture<T>) removeRepository((RemoveRepositoryCommand) command);
163 }
164
165 if (command instanceof UnremoveRepositoryCommand) {
166 return (CompletableFuture<T>) unremoveRepository((UnremoveRepositoryCommand) command);
167 }
168
169 if (command instanceof PurgeRepositoryCommand) {
170 return (CompletableFuture<T>) purgeRepository((PurgeRepositoryCommand) command);
171 }
172
173 if (command instanceof NormalizingPushCommand) {
174 return (CompletableFuture<T>) push((NormalizingPushCommand) command, true);
175 }
176
177 if (command instanceof PushAsIsCommand) {
178 return (CompletableFuture<T>) push((PushAsIsCommand) command, false)
179 .thenApply(CommitResult::revision);
180 }
181
182 if (command instanceof CreateSessionCommand) {
183 return (CompletableFuture<T>) createSession((CreateSessionCommand) command);
184 }
185
186 if (command instanceof RemoveSessionCommand) {
187 return (CompletableFuture<T>) removeSession((RemoveSessionCommand) command);
188 }
189
190 if (command instanceof UpdateServerStatusCommand) {
191 return (CompletableFuture<T>) updateServerStatus((UpdateServerStatusCommand) command);
192 }
193
194 if (command instanceof ForcePushCommand) {
195
196 return doExecute(((ForcePushCommand<T>) command).delegate());
197 }
198
199 throw new UnsupportedOperationException(command.toString());
200 }
201
202
203
204 private CompletableFuture<Void> createProject(CreateProjectCommand c) {
205 return CompletableFuture.supplyAsync(() -> {
206 projectManager.create(c.projectName(), c.timestamp(), c.author());
207 return null;
208 }, repositoryWorker);
209 }
210
211 private CompletableFuture<Void> removeProject(RemoveProjectCommand c) {
212 return CompletableFuture.supplyAsync(() -> {
213 projectManager.remove(c.projectName());
214 return null;
215 }, repositoryWorker);
216 }
217
218 private CompletableFuture<Void> unremoveProject(UnremoveProjectCommand c) {
219 return CompletableFuture.supplyAsync(() -> {
220 projectManager.unremove(c.projectName());
221 return null;
222 }, repositoryWorker);
223 }
224
225 private CompletableFuture<Void> purgeProject(PurgeProjectCommand c) {
226 return CompletableFuture.supplyAsync(() -> {
227 projectManager.markForPurge(c.projectName());
228 return null;
229 }, repositoryWorker);
230 }
231
232
233
234 private CompletableFuture<Void> createRepository(CreateRepositoryCommand c) {
235 return CompletableFuture.supplyAsync(() -> {
236 projectManager.get(c.projectName()).repos().create(c.repositoryName(), c.timestamp(), c.author());
237 return null;
238 }, repositoryWorker);
239 }
240
241 private CompletableFuture<Void> removeRepository(RemoveRepositoryCommand c) {
242 if (writeQuotaEnabled()) {
243 writeRateLimiters.remove(rateLimiterKey(c.projectName(), c.repositoryName()));
244 }
245 return CompletableFuture.supplyAsync(() -> {
246 projectManager.get(c.projectName()).repos().remove(c.repositoryName());
247 return null;
248 }, repositoryWorker);
249 }
250
251 private CompletableFuture<Void> unremoveRepository(UnremoveRepositoryCommand c) {
252 return CompletableFuture.supplyAsync(() -> {
253 projectManager.get(c.projectName()).repos().unremove(c.repositoryName());
254 return null;
255 }, repositoryWorker);
256 }
257
258 private CompletableFuture<Void> purgeRepository(PurgeRepositoryCommand c) {
259 return CompletableFuture.supplyAsync(() -> {
260 projectManager.get(c.projectName()).repos().markForPurge(c.repositoryName());
261 return null;
262 }, repositoryWorker);
263 }
264
265 private CompletableFuture<CommitResult> push(AbstractPushCommand<?> c, boolean normalizing) {
266 if (c.projectName().equals(INTERNAL_PROJECT_DOGMA) || c.repositoryName().equals(Project.REPO_DOGMA) ||
267 !writeQuotaEnabled()) {
268 return push0(c, normalizing);
269 }
270
271 final RateLimiter rateLimiter =
272 writeRateLimiters.get(rateLimiterKey(c.projectName(), c.repositoryName()));
273 if (rateLimiter != null) {
274 return tryPush(c, normalizing, rateLimiter);
275 }
276
277 return getRateLimiter(c.projectName(), c.repositoryName()).thenCompose(
278 limiter -> tryPush(c, normalizing, limiter));
279 }
280
281 private CompletableFuture<CommitResult> tryPush(
282 AbstractPushCommand<?> c, boolean normalizing, @Nullable RateLimiter rateLimiter) {
283 if (rateLimiter == null || rateLimiter == UNLIMITED || rateLimiter.tryAcquire()) {
284 return push0(c, normalizing);
285 } else {
286 return CompletableFutures.exceptionallyCompletedFuture(
287 new TooManyRequestsException("commits", c.executionPath(), rateLimiter.getRate()));
288 }
289 }
290
291 private CompletableFuture<CommitResult> push0(AbstractPushCommand<?> c, boolean normalizing) {
292 return repo(c).commit(c.baseRevision(), c.timestamp(), c.author(), c.summary(), c.detail(), c.markup(),
293 c.changes(), normalizing);
294 }
295
296 private CompletableFuture<RateLimiter> getRateLimiter(String projectName, String repoName) {
297 return metadataService.getRepo(projectName, repoName).thenApply(meta -> {
298 setWriteQuota(projectName, repoName, meta.writeQuota());
299 return writeRateLimiters.get(rateLimiterKey(projectName, repoName));
300 });
301 }
302
303 @Override
304 public final void setWriteQuota(String projectName, String repoName, @Nullable QuotaConfig writeQuota) {
305 if (!writeQuotaEnabled()) {
306
307 return;
308 }
309 final double permitsForRepo = writeQuota != null ? writeQuota.permitsPerSecond() : 0;
310 final double permitsPerSecond = permitsForRepo != 0 ? permitsForRepo : this.permitsPerSecond;
311
312 writeRateLimiters.compute(rateLimiterKey(projectName, repoName), (key, rateLimiter) -> {
313 if (permitsPerSecond == 0) {
314 return UNLIMITED;
315 }
316
317 if (rateLimiter == null) {
318 return RateLimiter.create(permitsPerSecond);
319 } else {
320 rateLimiter.setRate(permitsPerSecond);
321 return rateLimiter;
322 }
323 });
324 }
325
326 private static String rateLimiterKey(String projectName, String repoName) {
327 return projectName + '/' + repoName;
328 }
329
330 private boolean writeQuotaEnabled() {
331 return Double.compare(permitsPerSecond, -1) > 0;
332 }
333
334 private Repository repo(RepositoryCommand<?> c) {
335 return projectManager.get(c.projectName()).repos().get(c.repositoryName());
336 }
337
338 private CompletableFuture<Void> createSession(CreateSessionCommand c) {
339 if (sessionManager == null) {
340
341 return CompletableFuture.completedFuture(null);
342 }
343
344 final Session session = c.session();
345 return sessionManager.create(session).exceptionally(cause -> {
346 logger.warn("Failed to replicate a session creation: {}", session, cause);
347 return null;
348 });
349 }
350
351 private CompletableFuture<Void> removeSession(RemoveSessionCommand c) {
352 if (sessionManager == null) {
353 return CompletableFuture.completedFuture(null);
354 }
355
356 final String sessionId = c.sessionId();
357 return sessionManager.delete(sessionId).exceptionally(cause -> {
358 logger.warn("Failed to replicate a session removal: {}", sessionId, cause);
359 return null;
360 });
361 }
362
363 private CompletableFuture<Void> updateServerStatus(UpdateServerStatusCommand c) {
364 setWritable(c.writable());
365 return CompletableFuture.completedFuture(null);
366 }
367 }