1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.server.command;
17  
18  import static com.linecorp.centraldogma.server.internal.storage.project.ProjectInitializer.INTERNAL_PROJECT_DOGMA;
19  import static java.util.Objects.requireNonNull;
20  
21  import java.util.Map;
22  import java.util.concurrent.CompletableFuture;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.Executor;
25  import java.util.function.Consumer;
26  
27  import javax.annotation.Nullable;
28  
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  
32  import com.cronutils.utils.VisibleForTesting;
33  import com.google.common.util.concurrent.RateLimiter;
34  import com.spotify.futures.CompletableFutures;
35  
36  import com.linecorp.centraldogma.common.TooManyRequestsException;
37  import com.linecorp.centraldogma.server.QuotaConfig;
38  import com.linecorp.centraldogma.server.auth.Session;
39  import com.linecorp.centraldogma.server.auth.SessionManager;
40  import com.linecorp.centraldogma.server.metadata.MetadataService;
41  import com.linecorp.centraldogma.server.storage.project.Project;
42  import com.linecorp.centraldogma.server.storage.project.ProjectManager;
43  import com.linecorp.centraldogma.server.storage.repository.Repository;
44  
45  /**
46   * A {@link CommandExecutor} implementation which performs operations on the local storage.
47   */
48  public class StandaloneCommandExecutor extends AbstractCommandExecutor {
49  
50      private static final Logger logger = LoggerFactory.getLogger(StandaloneCommandExecutor.class);
51  
52      private static final RateLimiter UNLIMITED = RateLimiter.create(Double.MAX_VALUE);
53  
54      private final ProjectManager projectManager;
55      private final Executor repositoryWorker;
56      @Nullable
57      private final SessionManager sessionManager;
58      // if permitsPerSecond is -1, a quota is checked by ZooKeeperCommandExecutor.
59      private final double permitsPerSecond;
60      private final MetadataService metadataService;
61  
62      @VisibleForTesting
63      final Map<String, RateLimiter> writeRateLimiters;
64  
65      /**
66       * Creates a new instance.
67       *
68       * @param projectManager the project manager for accessing the storage
69       * @param repositoryWorker the executor which is used for performing storage operations
70       * @param writeQuota the write quota for limiting {@link NormalizingPushCommand}
71       * @param sessionManager the session manager for creating/removing a session
72       * @param onTakeLeadership the callback to be invoked after the replica has taken the leadership
73       * @param onReleaseLeadership the callback to be invoked before the replica releases the leadership
74       */
75      public StandaloneCommandExecutor(ProjectManager projectManager,
76                                       Executor repositoryWorker,
77                                       @Nullable SessionManager sessionManager,
78                                       @Nullable QuotaConfig writeQuota,
79                                       @Nullable Consumer<CommandExecutor> onTakeLeadership,
80                                       @Nullable Consumer<CommandExecutor> onReleaseLeadership) {
81          this(projectManager, repositoryWorker, sessionManager,
82               writeQuota != null ? writeQuota.permitsPerSecond() : 0,
83               onTakeLeadership, onReleaseLeadership);
84      }
85  
86      /**
87       * Creates a new instance.
88       *
89       * @param projectManager the project manager for accessing the storage
90       * @param repositoryWorker the executor which is used for performing storage operations
91       * @param sessionManager the session manager for creating/removing a session
92       * @param onTakeLeadership the callback to be invoked after the replica has taken the leadership
93       * @param onReleaseLeadership the callback to be invoked before the replica releases the leadership
94       */
95      public StandaloneCommandExecutor(ProjectManager projectManager,
96                                       Executor repositoryWorker,
97                                       @Nullable SessionManager sessionManager,
98                                       @Nullable Consumer<CommandExecutor> onTakeLeadership,
99                                       @Nullable Consumer<CommandExecutor> onReleaseLeadership) {
100         this(projectManager, repositoryWorker, sessionManager, -1, onTakeLeadership, onReleaseLeadership);
101     }
102 
103     private StandaloneCommandExecutor(ProjectManager projectManager,
104                                       Executor repositoryWorker,
105                                       @Nullable SessionManager sessionManager,
106                                       double permitsPerSecond,
107                                       @Nullable Consumer<CommandExecutor> onTakeLeadership,
108                                       @Nullable Consumer<CommandExecutor> onReleaseLeadership) {
109         super(onTakeLeadership, onReleaseLeadership);
110         this.projectManager = requireNonNull(projectManager, "projectManager");
111         this.repositoryWorker = requireNonNull(repositoryWorker, "repositoryWorker");
112         this.sessionManager = sessionManager;
113         this.permitsPerSecond = permitsPerSecond;
114         writeRateLimiters = new ConcurrentHashMap<>();
115         metadataService = new MetadataService(projectManager, this);
116     }
117 
118     @Override
119     public int replicaId() {
120         return 0;
121     }
122 
123     @Override
124     protected void doStart(@Nullable Runnable onTakeLeadership,
125                            @Nullable Runnable onReleaseLeadership) {
126         if (onTakeLeadership != null) {
127             onTakeLeadership.run();
128         }
129     }
130 
131     @Override
132     protected void doStop(@Nullable Runnable onReleaseLeadership) {
133         if (onReleaseLeadership != null) {
134             onReleaseLeadership.run();
135         }
136     }
137 
138     @Override
139     @SuppressWarnings("unchecked")
140     protected <T> CompletableFuture<T> doExecute(Command<T> command) throws Exception {
141         if (command instanceof CreateProjectCommand) {
142             return (CompletableFuture<T>) createProject((CreateProjectCommand) command);
143         }
144 
145         if (command instanceof RemoveProjectCommand) {
146             return (CompletableFuture<T>) removeProject((RemoveProjectCommand) command);
147         }
148 
149         if (command instanceof UnremoveProjectCommand) {
150             return (CompletableFuture<T>) unremoveProject((UnremoveProjectCommand) command);
151         }
152 
153         if (command instanceof PurgeProjectCommand) {
154             return (CompletableFuture<T>) purgeProject((PurgeProjectCommand) command);
155         }
156 
157         if (command instanceof CreateRepositoryCommand) {
158             return (CompletableFuture<T>) createRepository((CreateRepositoryCommand) command);
159         }
160 
161         if (command instanceof RemoveRepositoryCommand) {
162             return (CompletableFuture<T>) removeRepository((RemoveRepositoryCommand) command);
163         }
164 
165         if (command instanceof UnremoveRepositoryCommand) {
166             return (CompletableFuture<T>) unremoveRepository((UnremoveRepositoryCommand) command);
167         }
168 
169         if (command instanceof PurgeRepositoryCommand) {
170             return (CompletableFuture<T>) purgeRepository((PurgeRepositoryCommand) command);
171         }
172 
173         if (command instanceof NormalizingPushCommand) {
174             return (CompletableFuture<T>) push((NormalizingPushCommand) command, true);
175         }
176 
177         if (command instanceof PushAsIsCommand) {
178             return (CompletableFuture<T>) push((PushAsIsCommand) command, false)
179                     .thenApply(CommitResult::revision);
180         }
181 
182         if (command instanceof CreateSessionCommand) {
183             return (CompletableFuture<T>) createSession((CreateSessionCommand) command);
184         }
185 
186         if (command instanceof RemoveSessionCommand) {
187             return (CompletableFuture<T>) removeSession((RemoveSessionCommand) command);
188         }
189 
190         if (command instanceof UpdateServerStatusCommand) {
191             return (CompletableFuture<T>) updateServerStatus((UpdateServerStatusCommand) command);
192         }
193 
194         if (command instanceof ForcePushCommand) {
195             //noinspection TailRecursion
196             return doExecute(((ForcePushCommand<T>) command).delegate());
197         }
198 
199         throw new UnsupportedOperationException(command.toString());
200     }
201 
202     // Project operations
203 
204     private CompletableFuture<Void> createProject(CreateProjectCommand c) {
205         return CompletableFuture.supplyAsync(() -> {
206             projectManager.create(c.projectName(), c.timestamp(), c.author());
207             return null;
208         }, repositoryWorker);
209     }
210 
211     private CompletableFuture<Void> removeProject(RemoveProjectCommand c) {
212         return CompletableFuture.supplyAsync(() -> {
213             projectManager.remove(c.projectName());
214             return null;
215         }, repositoryWorker);
216     }
217 
218     private CompletableFuture<Void> unremoveProject(UnremoveProjectCommand c) {
219         return CompletableFuture.supplyAsync(() -> {
220             projectManager.unremove(c.projectName());
221             return null;
222         }, repositoryWorker);
223     }
224 
225     private CompletableFuture<Void> purgeProject(PurgeProjectCommand c) {
226         return CompletableFuture.supplyAsync(() -> {
227             projectManager.markForPurge(c.projectName());
228             return null;
229         }, repositoryWorker);
230     }
231 
232     // Repository operations
233 
234     private CompletableFuture<Void> createRepository(CreateRepositoryCommand c) {
235         return CompletableFuture.supplyAsync(() -> {
236             projectManager.get(c.projectName()).repos().create(c.repositoryName(), c.timestamp(), c.author());
237             return null;
238         }, repositoryWorker);
239     }
240 
241     private CompletableFuture<Void> removeRepository(RemoveRepositoryCommand c) {
242         if (writeQuotaEnabled()) {
243             writeRateLimiters.remove(rateLimiterKey(c.projectName(), c.repositoryName()));
244         }
245         return CompletableFuture.supplyAsync(() -> {
246             projectManager.get(c.projectName()).repos().remove(c.repositoryName());
247             return null;
248         }, repositoryWorker);
249     }
250 
251     private CompletableFuture<Void> unremoveRepository(UnremoveRepositoryCommand c) {
252         return CompletableFuture.supplyAsync(() -> {
253             projectManager.get(c.projectName()).repos().unremove(c.repositoryName());
254             return null;
255         }, repositoryWorker);
256     }
257 
258     private CompletableFuture<Void> purgeRepository(PurgeRepositoryCommand c) {
259         return CompletableFuture.supplyAsync(() -> {
260             projectManager.get(c.projectName()).repos().markForPurge(c.repositoryName());
261             return null;
262         }, repositoryWorker);
263     }
264 
265     private CompletableFuture<CommitResult> push(AbstractPushCommand<?> c, boolean normalizing) {
266         if (c.projectName().equals(INTERNAL_PROJECT_DOGMA) || c.repositoryName().equals(Project.REPO_DOGMA) ||
267             !writeQuotaEnabled()) {
268             return push0(c, normalizing);
269         }
270 
271         final RateLimiter rateLimiter =
272                 writeRateLimiters.get(rateLimiterKey(c.projectName(), c.repositoryName()));
273         if (rateLimiter != null) {
274             return tryPush(c, normalizing, rateLimiter);
275         }
276 
277         return getRateLimiter(c.projectName(), c.repositoryName()).thenCompose(
278                 limiter -> tryPush(c, normalizing, limiter));
279     }
280 
281     private CompletableFuture<CommitResult> tryPush(
282             AbstractPushCommand<?> c, boolean normalizing, @Nullable RateLimiter rateLimiter) {
283         if (rateLimiter == null || rateLimiter == UNLIMITED || rateLimiter.tryAcquire()) {
284             return push0(c, normalizing);
285         } else {
286             return CompletableFutures.exceptionallyCompletedFuture(
287                     new TooManyRequestsException("commits", c.executionPath(), rateLimiter.getRate()));
288         }
289     }
290 
291     private CompletableFuture<CommitResult> push0(AbstractPushCommand<?> c, boolean normalizing) {
292         return repo(c).commit(c.baseRevision(), c.timestamp(), c.author(), c.summary(), c.detail(), c.markup(),
293                               c.changes(), normalizing);
294     }
295 
296     private CompletableFuture<RateLimiter> getRateLimiter(String projectName, String repoName) {
297         return metadataService.getRepo(projectName, repoName).thenApply(meta -> {
298             setWriteQuota(projectName, repoName, meta.writeQuota());
299             return writeRateLimiters.get(rateLimiterKey(projectName, repoName));
300         });
301     }
302 
303     @Override
304     public final void setWriteQuota(String projectName, String repoName, @Nullable QuotaConfig writeQuota) {
305         if (!writeQuotaEnabled()) {
306             // This method should be called only when a write quota is enabled.
307             return;
308         }
309         final double permitsForRepo = writeQuota != null ? writeQuota.permitsPerSecond() : 0;
310         final double permitsPerSecond = permitsForRepo != 0 ? permitsForRepo : this.permitsPerSecond;
311 
312         writeRateLimiters.compute(rateLimiterKey(projectName, repoName), (key, rateLimiter) -> {
313             if (permitsPerSecond == 0) {
314                 return UNLIMITED;
315             }
316 
317             if (rateLimiter == null) {
318                 return RateLimiter.create(permitsPerSecond);
319             } else {
320                 rateLimiter.setRate(permitsPerSecond);
321                 return rateLimiter;
322             }
323         });
324     }
325 
326     private static String rateLimiterKey(String projectName, String repoName) {
327         return projectName + '/' + repoName;
328     }
329 
330     private boolean writeQuotaEnabled() {
331         return Double.compare(permitsPerSecond, -1) > 0;
332     }
333 
334     private Repository repo(RepositoryCommand<?> c) {
335         return projectManager.get(c.projectName()).repos().get(c.repositoryName());
336     }
337 
338     private CompletableFuture<Void> createSession(CreateSessionCommand c) {
339         if (sessionManager == null) {
340             // Security has been disabled for this replica.
341             return CompletableFuture.completedFuture(null);
342         }
343 
344         final Session session = c.session();
345         return sessionManager.create(session).exceptionally(cause -> {
346             logger.warn("Failed to replicate a session creation: {}", session, cause);
347             return null;
348         });
349     }
350 
351     private CompletableFuture<Void> removeSession(RemoveSessionCommand c) {
352         if (sessionManager == null) {
353             return CompletableFuture.completedFuture(null);
354         }
355 
356         final String sessionId = c.sessionId();
357         return sessionManager.delete(sessionId).exceptionally(cause -> {
358             logger.warn("Failed to replicate a session removal: {}", sessionId, cause);
359             return null;
360         });
361     }
362 
363     private CompletableFuture<Void> updateServerStatus(UpdateServerStatusCommand c) {
364         setWritable(c.writable());
365         return CompletableFuture.completedFuture(null);
366     }
367 }