1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.server.internal.thrift;
17  
18  import static com.google.common.base.MoreObjects.firstNonNull;
19  import static com.linecorp.centraldogma.common.Author.SYSTEM;
20  import static com.linecorp.centraldogma.common.Revision.HEAD;
21  import static com.linecorp.centraldogma.internal.Util.validateProjectName;
22  import static com.linecorp.centraldogma.internal.Util.validateRepositoryName;
23  import static com.linecorp.centraldogma.server.internal.api.ContentServiceV1.checkMetaRepoPush;
24  import static com.linecorp.centraldogma.server.internal.api.RepositoryServiceV1.increaseCounterIfOldRevisionUsed;
25  import static com.linecorp.centraldogma.server.internal.thrift.Converter.convert;
26  import static com.linecorp.centraldogma.server.storage.project.Project.isInternalRepo;
27  import static com.linecorp.centraldogma.server.storage.repository.FindOptions.FIND_ALL_WITHOUT_CONTENT;
28  import static com.spotify.futures.CompletableFutures.allAsList;
29  import static java.util.Objects.requireNonNull;
30  import static java.util.stream.Collectors.toList;
31  
32  import java.util.ArrayList;
33  import java.util.List;
34  import java.util.Map;
35  import java.util.concurrent.Callable;
36  import java.util.concurrent.CancellationException;
37  import java.util.concurrent.CompletableFuture;
38  
39  import org.apache.thrift.async.AsyncMethodCallback;
40  import org.slf4j.Logger;
41  import org.slf4j.LoggerFactory;
42  
43  import com.linecorp.armeria.common.RequestContext;
44  import com.linecorp.armeria.common.util.Exceptions;
45  import com.linecorp.centraldogma.internal.thrift.Author;
46  import com.linecorp.centraldogma.internal.thrift.CentralDogmaConstants;
47  import com.linecorp.centraldogma.internal.thrift.CentralDogmaException;
48  import com.linecorp.centraldogma.internal.thrift.CentralDogmaService;
49  import com.linecorp.centraldogma.internal.thrift.Change;
50  import com.linecorp.centraldogma.internal.thrift.Comment;
51  import com.linecorp.centraldogma.internal.thrift.DiffFileResult;
52  import com.linecorp.centraldogma.internal.thrift.Entry;
53  import com.linecorp.centraldogma.internal.thrift.ErrorCode;
54  import com.linecorp.centraldogma.internal.thrift.GetFileResult;
55  import com.linecorp.centraldogma.internal.thrift.MergeQuery;
56  import com.linecorp.centraldogma.internal.thrift.MergedEntry;
57  import com.linecorp.centraldogma.internal.thrift.NamedQuery;
58  import com.linecorp.centraldogma.internal.thrift.Plugin;
59  import com.linecorp.centraldogma.internal.thrift.Project;
60  import com.linecorp.centraldogma.internal.thrift.Query;
61  import com.linecorp.centraldogma.internal.thrift.Revision;
62  import com.linecorp.centraldogma.internal.thrift.Schema;
63  import com.linecorp.centraldogma.internal.thrift.WatchFileResult;
64  import com.linecorp.centraldogma.internal.thrift.WatchRepositoryResult;
65  import com.linecorp.centraldogma.server.command.Command;
66  import com.linecorp.centraldogma.server.command.CommandExecutor;
67  import com.linecorp.centraldogma.server.internal.api.WatchService;
68  import com.linecorp.centraldogma.server.internal.storage.RequestAlreadyTimedOutException;
69  import com.linecorp.centraldogma.server.internal.storage.project.ProjectApiManager;
70  import com.linecorp.centraldogma.server.metadata.MetadataService;
71  import com.linecorp.centraldogma.server.storage.repository.Repository;
72  
73  public class CentralDogmaServiceImpl implements CentralDogmaService.AsyncIface {
74  
75      private static final Logger logger = LoggerFactory.getLogger(CentralDogmaServiceImpl.class);
76  
77      private static final IllegalArgumentException RESERVED_REPOSITORY_EXCEPTION =
78              Exceptions.clearTrace(new IllegalArgumentException(
79                      "The repository is reserved by system and thus cannot be created or removed."));
80  
81      private final ProjectApiManager projectApiManager;
82      private final CommandExecutor executor;
83      private final WatchService watchService;
84      private final MetadataService mds;
85  
86      public CentralDogmaServiceImpl(ProjectApiManager projectApiManager, CommandExecutor executor,
87                                     WatchService watchService, MetadataService mds) {
88          this.projectApiManager = requireNonNull(projectApiManager, "projectApiManager");
89          this.executor = requireNonNull(executor, "executor");
90          this.watchService = requireNonNull(watchService, "watchService");
91          this.mds = requireNonNull(mds, "mds");
92      }
93  
94      private static void handle(CompletableFuture<?> future, AsyncMethodCallback resultHandler) {
95          future.handle((res, cause) -> {
96              if (cause != null) {
97                  resultHandler.onError(convert(cause));
98              } else {
99                  resultHandler.onComplete(res);
100             }
101             return null;
102         });
103     }
104 
105     private static void handle(Callable<?> task, AsyncMethodCallback resultHandler) {
106         try {
107             resultHandler.onComplete(task.call());
108         } catch (Throwable cause) {
109             resultHandler.onError(convert(cause));
110         }
111     }
112 
113     private static void handleAsVoidResult(CompletableFuture<?> future, AsyncMethodCallback resultHandler) {
114         future.handle((res, cause) -> {
115             if (cause != null) {
116                 resultHandler.onError(convert(cause));
117             } else {
118                 resultHandler.onComplete(null);
119             }
120             return null;
121         });
122     }
123 
124     @Override
125     public void createProject(String name, AsyncMethodCallback resultHandler) {
126         validateProjectName(name, "name", false);
127         // ProjectInitializingCommandExecutor initializes a metadata for the specified project.
128         handle(projectApiManager.createProject(name, SYSTEM), resultHandler);
129     }
130 
131     @Override
132     public void removeProject(String name, AsyncMethodCallback resultHandler) {
133         // Metadata must be updated first because it cannot be updated if the project is removed.
134         handle(projectApiManager.removeProject(name, SYSTEM), resultHandler);
135     }
136 
137     @Override
138     public void purgeProject(String name, AsyncMethodCallback resultHandler) {
139         handleAsVoidResult(projectApiManager.purgeProject(name, SYSTEM), resultHandler);
140     }
141 
142     @Override
143     public void unremoveProject(String name, AsyncMethodCallback resultHandler) {
144         // Restore the project first then update its metadata as 'active'.
145         handleAsVoidResult(projectApiManager.unremoveProject(name, SYSTEM), resultHandler);
146     }
147 
148     @Override
149     public void listProjects(AsyncMethodCallback resultHandler) {
150         handle(() -> {
151             final Map<String, com.linecorp.centraldogma.server.storage.project.Project> projects =
152                     projectApiManager.listProjects(null);
153             final List<Project> ret = new ArrayList<>(projects.size());
154             projects.forEach((key, value) -> ret.add(convert(key, value)));
155             return ret;
156         }, resultHandler);
157     }
158 
159     @Override
160     public void listRemovedProjects(AsyncMethodCallback resultHandler) {
161         handle(() -> projectApiManager.listRemovedProjects().keySet(), resultHandler);
162     }
163 
164     @Override
165     public void createRepository(String projectName, String repositoryName,
166                                  AsyncMethodCallback resultHandler) {
167         validateRepositoryName(repositoryName, "repositoryName");
168         // HTTP v1 API will return '403 forbidden' in this case, but we deal it as '400 bad request' here.
169         if (isInternalRepo(repositoryName)) {
170             resultHandler.onError(convert(RESERVED_REPOSITORY_EXCEPTION));
171             return;
172         }
173         handleAsVoidResult(executor.execute(Command.createRepository(SYSTEM, projectName, repositoryName))
174                                    .thenCompose(unused -> mds.addRepo(SYSTEM, projectName, repositoryName)),
175                            resultHandler);
176     }
177 
178     @Override
179     public void removeRepository(String projectName, String repositoryName,
180                                  AsyncMethodCallback resultHandler) {
181         // HTTP v1 API will return '403 forbidden' in this case, but we deal it as '400 bad request' here.
182         if (isInternalRepo(repositoryName)) {
183             resultHandler.onError(convert(RESERVED_REPOSITORY_EXCEPTION));
184             return;
185         }
186         handleAsVoidResult(executor.execute(Command.removeRepository(SYSTEM, projectName, repositoryName))
187                                    .thenCompose(unused -> mds.removeRepo(SYSTEM, projectName, repositoryName)),
188                            resultHandler);
189     }
190 
191     @Override
192     public void purgeRepository(String projectName, String repositoryName, AsyncMethodCallback resultHandler) {
193         handleAsVoidResult(executor.execute(Command.purgeRepository(SYSTEM, projectName, repositoryName))
194                                    .thenCompose(unused -> mds.purgeRepo(SYSTEM, projectName, repositoryName)),
195                            resultHandler);
196     }
197 
198     @Override
199     public void unremoveRepository(String projectName, String repositoryName,
200                                    AsyncMethodCallback resultHandler) {
201         handleAsVoidResult(executor.execute(Command.unremoveRepository(SYSTEM, projectName, repositoryName))
202                                    .thenCompose(unused -> mds.restoreRepo(SYSTEM, projectName, repositoryName)),
203                            resultHandler);
204     }
205 
206     @Override
207     public void listRepositories(String projectName, AsyncMethodCallback resultHandler) {
208         handle(allAsList(projectApiManager.getProject(projectName, null).repos().list().entrySet().stream()
209                                           .map(e -> convert(e.getKey(), e.getValue()))
210                                           .collect(toList())),
211                resultHandler);
212     }
213 
214     @Override
215     public void listRemovedRepositories(String projectName, AsyncMethodCallback resultHandler) {
216         handle(() -> projectApiManager.getProject(projectName, null).repos().listRemoved().keySet(),
217                resultHandler);
218     }
219 
220     @Override
221     public void normalizeRevision(String projectName, String repositoryName, Revision revision,
222                                   AsyncMethodCallback resultHandler) {
223 
224         final com.linecorp.centraldogma.common.Revision normalized =
225                 normalizeRevision(projectName, repositoryName, revision);
226 
227         resultHandler.onComplete(convert(normalized));
228     }
229 
230     private com.linecorp.centraldogma.common.Revision normalizeRevision(
231             String projectName, String repositoryName, Revision revision) {
232         final Repository repository = projectApiManager.getProject(projectName, null).repos().get(
233                 repositoryName);
234         final com.linecorp.centraldogma.common.Revision normalized =
235                 repository.normalizeNow(convert(revision));
236         final com.linecorp.centraldogma.common.Revision head = repository.normalizeNow(HEAD);
237         increaseCounterIfOldRevisionUsed(RequestContext.current(), repository, normalized, head);
238         return normalized;
239     }
240 
241     @Override
242     public void listFiles(String projectName, String repositoryName, Revision revision, String pathPattern,
243                           AsyncMethodCallback resultHandler) {
244         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
245         normalizeRevision(projectName, repositoryName, revision);
246         handle(projectApiManager.getProject(projectName, null).repos().get(repositoryName)
247                                 .find(convert(revision), pathPattern, FIND_ALL_WITHOUT_CONTENT)
248                                 .thenApply(entries -> {
249                                     final List<Entry> ret = new ArrayList<>(entries.size());
250                                     entries.forEach((path, entry) -> ret.add(
251                                             new Entry(path, convert(entry.type()))));
252                                     return ret;
253                                 }),
254                resultHandler);
255     }
256 
257     @Override
258     public void getFiles(String projectName, String repositoryName, Revision revision, String pathPattern,
259                          AsyncMethodCallback resultHandler) {
260         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
261         normalizeRevision(projectName, repositoryName, revision);
262         handle(projectApiManager.getProject(projectName, null).repos().get(repositoryName)
263                                 .find(convert(revision), pathPattern)
264                                 .thenApply(entries -> {
265                                     final List<Entry> ret = new ArrayList<>(entries.size());
266                                     ret.addAll(entries.entrySet().stream()
267                                                       .map(e -> convert(e.getValue()))
268                                                       .collect(toList()));
269                                     return ret;
270                                 }),
271                resultHandler);
272     }
273 
274     @Override
275     public void getHistory(String projectName, String repositoryName, Revision from, Revision to,
276                            String pathPattern, AsyncMethodCallback resultHandler) {
277         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
278         normalizeRevision(projectName, repositoryName, from);
279         normalizeRevision(projectName, repositoryName, to);
280         handle(projectApiManager.getProject(projectName, null).repos().get(repositoryName)
281                                 .history(convert(from), convert(to), pathPattern)
282                                 .thenApply(commits -> commits.stream()
283                                                              .map(Converter::convert)
284                                                              .collect(toList())),
285                resultHandler);
286     }
287 
288     @Override
289     public void getDiffs(String projectName, String repositoryName, Revision from, Revision to,
290                          String pathPattern, AsyncMethodCallback resultHandler) {
291         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
292         normalizeRevision(projectName, repositoryName, from);
293         normalizeRevision(projectName, repositoryName, to);
294         handle(projectApiManager.getProject(projectName, null).repos().get(repositoryName)
295                                 .diff(convert(from), convert(to), pathPattern)
296                                 .thenApply(diffs -> convert(diffs.values(), Converter::convert)),
297                resultHandler);
298     }
299 
300     @Override
301     public void getPreviewDiffs(String projectName, String repositoryName, Revision baseRevision,
302                                 List<Change> changes, AsyncMethodCallback resultHandler) {
303         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
304         normalizeRevision(projectName, repositoryName, baseRevision);
305         handle(projectApiManager.getProject(projectName, null).repos().get(repositoryName)
306                                 .previewDiff(convert(baseRevision), convert(changes, Converter::convert))
307                                 .thenApply(diffs -> convert(diffs.values(), Converter::convert)),
308                resultHandler);
309     }
310 
311     @Override
312     public void push(String projectName, String repositoryName, Revision baseRevision, Author author,
313                      String summary, Comment detail, List<Change> changes, AsyncMethodCallback resultHandler) {
314         final List<com.linecorp.centraldogma.common.Change<?>> convertedChanges =
315                 convert(changes, Converter::convert);
316         try {
317             checkMetaRepoPush(repositoryName, convertedChanges);
318         } catch (Exception e) {
319             resultHandler.onError(e);
320             return;
321         }
322         // TODO(trustin): Change Repository.commit() to return a Commit.
323         handle(executor.execute(Command.push(convert(author), projectName, repositoryName,
324                                              convert(baseRevision), summary, detail.getContent(),
325                                              convert(detail.getMarkup()), convertedChanges))
326                        .thenCompose(commitResult -> {
327                            final com.linecorp.centraldogma.common.Revision newRev = commitResult.revision();
328                            return projectApiManager.getProject(projectName, null).repos().get(repositoryName)
329                                                    .history(newRev, newRev, "/**");
330                        })
331                        .thenApply(commits -> convert(commits.get(0))),
332                resultHandler);
333     }
334 
335     @Override
336     public void getFile(String projectName, String repositoryName, Revision revision, Query query,
337                         AsyncMethodCallback resultHandler) {
338         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
339         normalizeRevision(projectName, repositoryName, revision);
340         handle(projectApiManager.getProject(projectName, null).repos().get(repositoryName)
341                                 .get(convert(revision), convert(query))
342                                 .thenApply(res -> new GetFileResult(convert(res.type()), res.contentAsText())),
343                resultHandler);
344     }
345 
346     @Override
347     public void diffFile(String projectName, String repositoryName, Revision from, Revision to, Query query,
348                          AsyncMethodCallback resultHandler) {
349         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
350         normalizeRevision(projectName, repositoryName, from);
351         normalizeRevision(projectName, repositoryName, to);
352         // FIXME(trustin): Remove the firstNonNull() on the change content once we make it optional.
353         handle(projectApiManager.getProject(projectName, null).repos().get(repositoryName)
354                                 .diff(convert(from), convert(to), convert(query))
355                                 .thenApply(change -> new DiffFileResult(
356                                         convert(change.type()), firstNonNull(change.contentAsText(), ""))),
357                resultHandler);
358     }
359 
360     @Override
361     public void mergeFiles(String projectName, String repositoryName, Revision revision,
362                            MergeQuery mergeQuery, AsyncMethodCallback resultHandler) {
363         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
364         normalizeRevision(projectName, repositoryName, revision);
365         handle(projectApiManager.getProject(projectName, null).repos().get(repositoryName)
366                                 .mergeFiles(convert(revision), convert(mergeQuery))
367                                 .thenApply(merged -> new MergedEntry(convert(merged.revision()),
368                                                                      convert(merged.type()),
369                                                                      merged.contentAsText(),
370                                                                      merged.paths())),
371                resultHandler);
372     }
373 
374     @Override
375     public void watchRepository(
376             String projectName, String repositoryName, Revision lastKnownRevision,
377             String pathPattern, long timeoutMillis, AsyncMethodCallback resultHandler) {
378         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
379         normalizeRevision(projectName, repositoryName, lastKnownRevision);
380         if (timeoutMillis <= 0) {
381             rejectInvalidWatchTimeout("watchRepository", resultHandler);
382             return;
383         }
384 
385         final Repository repo = projectApiManager.getProject(projectName, null).repos().get(repositoryName);
386         final CompletableFuture<com.linecorp.centraldogma.common.Revision> future =
387                 watchService.watchRepository(repo, convert(lastKnownRevision), pathPattern, timeoutMillis,
388                                              false);
389         handleWatchRepositoryResult(future, resultHandler);
390     }
391 
392     private static void handleWatchRepositoryResult(
393             CompletableFuture<com.linecorp.centraldogma.common.Revision> future,
394             AsyncMethodCallback resultHandler) {
395         future.handle((res, cause) -> {
396             if (cause == null) {
397                 final WatchRepositoryResult wrr = new WatchRepositoryResult();
398                 wrr.setRevision(convert(res));
399                 resultHandler.onComplete(wrr);
400             } else if (cause instanceof CancellationException) {
401                 resultHandler.onComplete(CentralDogmaConstants.EMPTY_WATCH_REPOSITORY_RESULT);
402             } else if (cause instanceof RequestAlreadyTimedOutException) {
403                 logger.warn("Ignoring the exception raised when a request has already timed out.");
404             } else {
405                 logAndInvokeOnError("watchRepository", resultHandler, cause);
406             }
407             return null;
408         });
409     }
410 
411     @Override
412     public void watchFile(
413             String projectName, String repositoryName, Revision lastKnownRevision,
414             Query query, long timeoutMillis, AsyncMethodCallback resultHandler) {
415         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
416         normalizeRevision(projectName, repositoryName, lastKnownRevision);
417         if (timeoutMillis <= 0) {
418             rejectInvalidWatchTimeout("watchFile", resultHandler);
419             return;
420         }
421 
422         final Repository repo = projectApiManager.getProject(projectName, null).repos().get(repositoryName);
423         final CompletableFuture<com.linecorp.centraldogma.common.Entry<Object>> future =
424                 watchService.watchFile(repo, convert(lastKnownRevision), convert(query), timeoutMillis,
425                                        false);
426 
427         handleWatchFileResult(future, resultHandler);
428     }
429 
430     private static void rejectInvalidWatchTimeout(String operationName, AsyncMethodCallback resultHandler) {
431         final CentralDogmaException cde = new CentralDogmaException(ErrorCode.BAD_REQUEST);
432         CentralDogmaExceptions.log(operationName, cde);
433         resultHandler.onError(cde);
434     }
435 
436     private static void handleWatchFileResult(
437             CompletableFuture<com.linecorp.centraldogma.common.Entry<Object>> future,
438             AsyncMethodCallback resultHandler) {
439         future.handle((res, cause) -> {
440             if (cause == null) {
441                 final WatchFileResult wfr = new WatchFileResult();
442                 wfr.setRevision(convert(res.revision()));
443                 wfr.setType(convert(res.type()));
444                 wfr.setContent(res.contentAsText());
445                 resultHandler.onComplete(wfr);
446             } else if (cause instanceof CancellationException) {
447                 resultHandler.onComplete(CentralDogmaConstants.EMPTY_WATCH_FILE_RESULT);
448             } else if (cause instanceof RequestAlreadyTimedOutException) {
449                 logger.warn("Ignoring the exception raised when a request has already timed out.");
450             } else {
451                 logAndInvokeOnError("watchFile", resultHandler, cause);
452             }
453             return null;
454         });
455     }
456 
457     private static void logAndInvokeOnError(
458             String operationName, AsyncMethodCallback resultHandler, Throwable cause) {
459         final CentralDogmaException cde = convert(cause);
460         CentralDogmaExceptions.log(operationName, cde);
461         resultHandler.onError(cde);
462     }
463 
464     // Unimplemented methods
465 
466     @Override
467     public void getSchema(String projectName, AsyncMethodCallback resultHandler) {
468         unimplemented(resultHandler);
469     }
470 
471     @Override
472     public void saveSchema(String projectName, Schema schema, AsyncMethodCallback resultHandler) {
473         unimplemented(resultHandler);
474     }
475 
476     @Override
477     public void getNamedQuery(String projectName, String name, AsyncMethodCallback resultHandler) {
478         unimplemented(resultHandler);
479     }
480 
481     @Override
482     public void saveNamedQuery(String projectName, NamedQuery namedQuery, AsyncMethodCallback resultHandler) {
483         unimplemented(resultHandler);
484     }
485 
486     @Override
487     public void removeNamedQuery(String projectName, String name, AsyncMethodCallback resultHandler) {
488         unimplemented(resultHandler);
489     }
490 
491     @Override
492     public void listNamedQueries(String projectName, AsyncMethodCallback resultHandler) {
493         unimplemented(resultHandler);
494     }
495 
496     @Override
497     public void getPlugin(String projectName, String pluginName, AsyncMethodCallback resultHandler) {
498         unimplemented(resultHandler);
499     }
500 
501     @Override
502     public void savePlugin(String projectName, Plugin plugin, AsyncMethodCallback resultHandler) {
503         unimplemented(resultHandler);
504     }
505 
506     @Override
507     public void removePlugin(String projectName, String pluginName, AsyncMethodCallback resultHandler) {
508         unimplemented(resultHandler);
509     }
510 
511     @Override
512     public void listPlugins(String projectName, AsyncMethodCallback resultHandler) {
513         unimplemented(resultHandler);
514     }
515 
516     @Override
517     public void listPluginOperations(String projectName, AsyncMethodCallback resultHandler) {
518         unimplemented(resultHandler);
519     }
520 
521     @Override
522     public void performPluginOperation(String projectName, String pluginName, String operationName,
523                                        String params, AsyncMethodCallback resultHandler) {
524         unimplemented(resultHandler);
525     }
526 
527     @Override
528     public void queryByNamedQuery(String projectName, String namedQuery, Revision revision,
529                                   AsyncMethodCallback resultHandler) {
530         unimplemented(resultHandler);
531     }
532 
533     @Override
534     public void listSubscribers(String projectName, String repositoryName, String path,
535                                 AsyncMethodCallback resultHandler) {
536         unimplemented(resultHandler);
537     }
538 
539     private static void unimplemented(AsyncMethodCallback resultHandler) {
540         resultHandler.onError(new CentralDogmaException(ErrorCode.UNIMPLEMENTED));
541     }
542 }