1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.server.internal.thrift;
17  
18  import static com.google.common.base.MoreObjects.firstNonNull;
19  import static com.linecorp.centraldogma.common.Author.SYSTEM;
20  import static com.linecorp.centraldogma.common.Revision.HEAD;
21  import static com.linecorp.centraldogma.server.internal.api.ContentServiceV1.checkPush;
22  import static com.linecorp.centraldogma.server.internal.api.RepositoryServiceV1.increaseCounterIfOldRevisionUsed;
23  import static com.linecorp.centraldogma.server.internal.thrift.Converter.convert;
24  import static com.linecorp.centraldogma.server.storage.project.Project.isReservedRepoName;
25  import static com.linecorp.centraldogma.server.storage.repository.FindOptions.FIND_ALL_WITHOUT_CONTENT;
26  import static com.spotify.futures.CompletableFutures.allAsList;
27  import static java.util.Objects.requireNonNull;
28  import static java.util.stream.Collectors.toList;
29  
30  import java.util.ArrayList;
31  import java.util.List;
32  import java.util.Map;
33  import java.util.concurrent.Callable;
34  import java.util.concurrent.CancellationException;
35  import java.util.concurrent.CompletableFuture;
36  
37  import org.apache.thrift.async.AsyncMethodCallback;
38  import org.slf4j.Logger;
39  import org.slf4j.LoggerFactory;
40  
41  import com.linecorp.armeria.common.RequestContext;
42  import com.linecorp.armeria.common.util.Exceptions;
43  import com.linecorp.centraldogma.internal.thrift.Author;
44  import com.linecorp.centraldogma.internal.thrift.CentralDogmaConstants;
45  import com.linecorp.centraldogma.internal.thrift.CentralDogmaException;
46  import com.linecorp.centraldogma.internal.thrift.CentralDogmaService;
47  import com.linecorp.centraldogma.internal.thrift.Change;
48  import com.linecorp.centraldogma.internal.thrift.Comment;
49  import com.linecorp.centraldogma.internal.thrift.DiffFileResult;
50  import com.linecorp.centraldogma.internal.thrift.Entry;
51  import com.linecorp.centraldogma.internal.thrift.ErrorCode;
52  import com.linecorp.centraldogma.internal.thrift.GetFileResult;
53  import com.linecorp.centraldogma.internal.thrift.MergeQuery;
54  import com.linecorp.centraldogma.internal.thrift.MergedEntry;
55  import com.linecorp.centraldogma.internal.thrift.NamedQuery;
56  import com.linecorp.centraldogma.internal.thrift.Plugin;
57  import com.linecorp.centraldogma.internal.thrift.Project;
58  import com.linecorp.centraldogma.internal.thrift.Query;
59  import com.linecorp.centraldogma.internal.thrift.Revision;
60  import com.linecorp.centraldogma.internal.thrift.Schema;
61  import com.linecorp.centraldogma.internal.thrift.WatchFileResult;
62  import com.linecorp.centraldogma.internal.thrift.WatchRepositoryResult;
63  import com.linecorp.centraldogma.server.command.Command;
64  import com.linecorp.centraldogma.server.command.CommandExecutor;
65  import com.linecorp.centraldogma.server.internal.api.WatchService;
66  import com.linecorp.centraldogma.server.internal.storage.RequestAlreadyTimedOutException;
67  import com.linecorp.centraldogma.server.internal.storage.project.ProjectApiManager;
68  import com.linecorp.centraldogma.server.metadata.MetadataService;
69  import com.linecorp.centraldogma.server.storage.repository.Repository;
70  
71  public class CentralDogmaServiceImpl implements CentralDogmaService.AsyncIface {
72  
73      private static final Logger logger = LoggerFactory.getLogger(CentralDogmaServiceImpl.class);
74  
75      private static final IllegalArgumentException RESERVED_REPOSITORY_EXCEPTION =
76              Exceptions.clearTrace(new IllegalArgumentException(
77                      "The repository is reserved by system and thus cannot be created or removed."));
78  
79      private final ProjectApiManager projectApiManager;
80      private final CommandExecutor executor;
81      private final WatchService watchService;
82      private final MetadataService mds;
83  
84      public CentralDogmaServiceImpl(ProjectApiManager projectApiManager, CommandExecutor executor,
85                                     WatchService watchService, MetadataService mds) {
86          this.projectApiManager = requireNonNull(projectApiManager, "projectApiManager");
87          this.executor = requireNonNull(executor, "executor");
88          this.watchService = requireNonNull(watchService, "watchService");
89          this.mds = requireNonNull(mds, "mds");
90      }
91  
92      private static void handle(CompletableFuture<?> future, AsyncMethodCallback resultHandler) {
93          future.handle((res, cause) -> {
94              if (cause != null) {
95                  resultHandler.onError(convert(cause));
96              } else {
97                  resultHandler.onComplete(res);
98              }
99              return null;
100         });
101     }
102 
103     private static void handle(Callable<?> task, AsyncMethodCallback resultHandler) {
104         try {
105             resultHandler.onComplete(task.call());
106         } catch (Throwable cause) {
107             resultHandler.onError(convert(cause));
108         }
109     }
110 
111     private static void handleAsVoidResult(CompletableFuture<?> future, AsyncMethodCallback resultHandler) {
112         future.handle((res, cause) -> {
113             if (cause != null) {
114                 resultHandler.onError(convert(cause));
115             } else {
116                 resultHandler.onComplete(null);
117             }
118             return null;
119         });
120     }
121 
122     @Override
123     public void createProject(String name, AsyncMethodCallback resultHandler) {
124         // ProjectInitializingCommandExecutor initializes a metadata for the specified project.
125         handle(projectApiManager.createProject(name, SYSTEM), resultHandler);
126     }
127 
128     @Override
129     public void removeProject(String name, AsyncMethodCallback resultHandler) {
130         // Metadata must be updated first because it cannot be updated if the project is removed.
131         handle(projectApiManager.removeProject(name, SYSTEM), resultHandler);
132     }
133 
134     @Override
135     public void purgeProject(String name, AsyncMethodCallback resultHandler) {
136         handleAsVoidResult(projectApiManager.purgeProject(name, SYSTEM), resultHandler);
137     }
138 
139     @Override
140     public void unremoveProject(String name, AsyncMethodCallback resultHandler) {
141         // Restore the project first then update its metadata as 'active'.
142         handleAsVoidResult(projectApiManager.unremoveProject(name, SYSTEM), resultHandler);
143     }
144 
145     @Override
146     public void listProjects(AsyncMethodCallback resultHandler) {
147         handle(() -> {
148             final Map<String, com.linecorp.centraldogma.server.storage.project.Project> projects =
149                     projectApiManager.listProjects();
150             final List<Project> ret = new ArrayList<>(projects.size());
151             projects.forEach((key, value) -> ret.add(convert(key, value)));
152             return ret;
153         }, resultHandler);
154     }
155 
156     @Override
157     public void listRemovedProjects(AsyncMethodCallback resultHandler) {
158         handle(() -> projectApiManager.listRemovedProjects().keySet(), resultHandler);
159     }
160 
161     @Override
162     public void createRepository(String projectName, String repositoryName,
163                                  AsyncMethodCallback resultHandler) {
164         // HTTP v1 API will return '403 forbidden' in this case, but we deal it as '400 bad request' here.
165         if (isReservedRepoName(repositoryName)) {
166             resultHandler.onError(convert(RESERVED_REPOSITORY_EXCEPTION));
167             return;
168         }
169         handleAsVoidResult(executor.execute(Command.createRepository(SYSTEM, projectName, repositoryName))
170                                    .thenCompose(unused -> mds.addRepo(SYSTEM, projectName, repositoryName)),
171                            resultHandler);
172     }
173 
174     @Override
175     public void removeRepository(String projectName, String repositoryName,
176                                  AsyncMethodCallback resultHandler) {
177         // HTTP v1 API will return '403 forbidden' in this case, but we deal it as '400 bad request' here.
178         if (isReservedRepoName(repositoryName)) {
179             resultHandler.onError(convert(RESERVED_REPOSITORY_EXCEPTION));
180             return;
181         }
182         handleAsVoidResult(executor.execute(Command.removeRepository(SYSTEM, projectName, repositoryName))
183                                    .thenCompose(unused -> mds.removeRepo(SYSTEM, projectName, repositoryName)),
184                            resultHandler);
185     }
186 
187     @Override
188     public void purgeRepository(String projectName, String repositoryName, AsyncMethodCallback resultHandler) {
189         handleAsVoidResult(executor.execute(Command.purgeRepository(SYSTEM, projectName, repositoryName))
190                                    .thenCompose(unused -> mds.purgeRepo(SYSTEM, projectName, repositoryName)),
191                            resultHandler);
192     }
193 
194     @Override
195     public void unremoveRepository(String projectName, String repositoryName,
196                                    AsyncMethodCallback resultHandler) {
197         handleAsVoidResult(executor.execute(Command.unremoveRepository(SYSTEM, projectName, repositoryName))
198                                    .thenCompose(unused -> mds.restoreRepo(SYSTEM, projectName, repositoryName)),
199                            resultHandler);
200     }
201 
202     @Override
203     public void listRepositories(String projectName, AsyncMethodCallback resultHandler) {
204         handle(allAsList(projectApiManager.getProject(projectName).repos().list().entrySet().stream()
205                                           .map(e -> convert(e.getKey(), e.getValue()))
206                                           .collect(toList())),
207                resultHandler);
208     }
209 
210     @Override
211     public void listRemovedRepositories(String projectName, AsyncMethodCallback resultHandler) {
212         handle(() -> projectApiManager.getProject(projectName).repos().listRemoved().keySet(), resultHandler);
213     }
214 
215     @Override
216     public void normalizeRevision(String projectName, String repositoryName, Revision revision,
217                                   AsyncMethodCallback resultHandler) {
218 
219         final com.linecorp.centraldogma.common.Revision normalized =
220                 normalizeRevision(projectName, repositoryName, revision);
221 
222         resultHandler.onComplete(convert(normalized));
223     }
224 
225     private com.linecorp.centraldogma.common.Revision normalizeRevision(
226             String projectName, String repositoryName, Revision revision) {
227         final Repository repository = projectApiManager.getProject(projectName).repos().get(repositoryName);
228         final com.linecorp.centraldogma.common.Revision normalized =
229                 repository.normalizeNow(convert(revision));
230         final com.linecorp.centraldogma.common.Revision head = repository.normalizeNow(HEAD);
231         increaseCounterIfOldRevisionUsed(RequestContext.current(), repository, normalized, head);
232         return normalized;
233     }
234 
235     @Override
236     public void listFiles(String projectName, String repositoryName, Revision revision, String pathPattern,
237                           AsyncMethodCallback resultHandler) {
238         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
239         normalizeRevision(projectName, repositoryName, revision);
240         handle(projectApiManager.getProject(projectName).repos().get(repositoryName)
241                                 .find(convert(revision), pathPattern, FIND_ALL_WITHOUT_CONTENT)
242                                 .thenApply(entries -> {
243                                     final List<Entry> ret = new ArrayList<>(entries.size());
244                                     entries.forEach((path, entry) -> ret.add(
245                                             new Entry(path, convert(entry.type()))));
246                                     return ret;
247                                 }),
248                resultHandler);
249     }
250 
251     @Override
252     public void getFiles(String projectName, String repositoryName, Revision revision, String pathPattern,
253                          AsyncMethodCallback resultHandler) {
254         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
255         normalizeRevision(projectName, repositoryName, revision);
256         handle(projectApiManager.getProject(projectName).repos().get(repositoryName)
257                                 .find(convert(revision), pathPattern)
258                                 .thenApply(entries -> {
259                                     final List<Entry> ret = new ArrayList<>(entries.size());
260                                     ret.addAll(entries.entrySet().stream()
261                                                       .map(e -> convert(e.getValue()))
262                                                       .collect(toList()));
263                                     return ret;
264                                 }),
265                resultHandler);
266     }
267 
268     @Override
269     public void getHistory(String projectName, String repositoryName, Revision from, Revision to,
270                            String pathPattern, AsyncMethodCallback resultHandler) {
271         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
272         normalizeRevision(projectName, repositoryName, from);
273         normalizeRevision(projectName, repositoryName, to);
274         handle(projectApiManager.getProject(projectName).repos().get(repositoryName)
275                                 .history(convert(from), convert(to), pathPattern)
276                                 .thenApply(commits -> commits.stream()
277                                                              .map(Converter::convert)
278                                                              .collect(toList())),
279                resultHandler);
280     }
281 
282     @Override
283     public void getDiffs(String projectName, String repositoryName, Revision from, Revision to,
284                          String pathPattern, AsyncMethodCallback resultHandler) {
285         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
286         normalizeRevision(projectName, repositoryName, from);
287         normalizeRevision(projectName, repositoryName, to);
288         handle(projectApiManager.getProject(projectName).repos().get(repositoryName)
289                                 .diff(convert(from), convert(to), pathPattern)
290                                 .thenApply(diffs -> convert(diffs.values(), Converter::convert)),
291                resultHandler);
292     }
293 
294     @Override
295     public void getPreviewDiffs(String projectName, String repositoryName, Revision baseRevision,
296                                 List<Change> changes, AsyncMethodCallback resultHandler) {
297         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
298         normalizeRevision(projectName, repositoryName, baseRevision);
299         handle(projectApiManager.getProject(projectName).repos().get(repositoryName)
300                                 .previewDiff(convert(baseRevision), convert(changes, Converter::convert))
301                                 .thenApply(diffs -> convert(diffs.values(), Converter::convert)),
302                resultHandler);
303     }
304 
305     @Override
306     public void push(String projectName, String repositoryName, Revision baseRevision, Author author,
307                      String summary, Comment detail, List<Change> changes, AsyncMethodCallback resultHandler) {
308         final List<com.linecorp.centraldogma.common.Change<?>> convertedChanges =
309                 convert(changes, Converter::convert);
310         try {
311             checkPush(repositoryName, convertedChanges);
312         } catch (Exception e) {
313             resultHandler.onError(e);
314             return;
315         }
316         // TODO(trustin): Change Repository.commit() to return a Commit.
317         handle(executor.execute(Command.push(convert(author), projectName, repositoryName,
318                                              convert(baseRevision), summary, detail.getContent(),
319                                              convert(detail.getMarkup()), convertedChanges))
320                        .thenCompose(commitResult -> {
321                            final com.linecorp.centraldogma.common.Revision newRev = commitResult.revision();
322                            return projectApiManager.getProject(projectName).repos().get(repositoryName)
323                                                    .history(newRev, newRev, "/**");
324                        })
325                        .thenApply(commits -> convert(commits.get(0))),
326                resultHandler);
327     }
328 
329     @Override
330     public void getFile(String projectName, String repositoryName, Revision revision, Query query,
331                         AsyncMethodCallback resultHandler) {
332         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
333         normalizeRevision(projectName, repositoryName, revision);
334         handle(projectApiManager.getProject(projectName).repos().get(repositoryName)
335                                 .get(convert(revision), convert(query))
336                                 .thenApply(res -> new GetFileResult(convert(res.type()), res.contentAsText())),
337                resultHandler);
338     }
339 
340     @Override
341     public void diffFile(String projectName, String repositoryName, Revision from, Revision to, Query query,
342                          AsyncMethodCallback resultHandler) {
343         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
344         normalizeRevision(projectName, repositoryName, from);
345         normalizeRevision(projectName, repositoryName, to);
346         // FIXME(trustin): Remove the firstNonNull() on the change content once we make it optional.
347         handle(projectApiManager.getProject(projectName).repos().get(repositoryName)
348                                 .diff(convert(from), convert(to), convert(query))
349                                 .thenApply(change -> new DiffFileResult(
350                                         convert(change.type()), firstNonNull(change.contentAsText(), ""))),
351                resultHandler);
352     }
353 
354     @Override
355     public void mergeFiles(String projectName, String repositoryName, Revision revision,
356                            MergeQuery mergeQuery, AsyncMethodCallback resultHandler) {
357         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
358         normalizeRevision(projectName, repositoryName, revision);
359         handle(projectApiManager.getProject(projectName).repos().get(repositoryName)
360                                 .mergeFiles(convert(revision), convert(mergeQuery))
361                                 .thenApply(merged -> new MergedEntry(convert(merged.revision()),
362                                                                      convert(merged.type()),
363                                                                      merged.contentAsText(),
364                                                                      merged.paths())),
365                resultHandler);
366     }
367 
368     @Override
369     public void watchRepository(
370             String projectName, String repositoryName, Revision lastKnownRevision,
371             String pathPattern, long timeoutMillis, AsyncMethodCallback resultHandler) {
372         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
373         normalizeRevision(projectName, repositoryName, lastKnownRevision);
374         if (timeoutMillis <= 0) {
375             rejectInvalidWatchTimeout("watchRepository", resultHandler);
376             return;
377         }
378 
379         final Repository repo = projectApiManager.getProject(projectName).repos().get(repositoryName);
380         final CompletableFuture<com.linecorp.centraldogma.common.Revision> future =
381                 watchService.watchRepository(repo, convert(lastKnownRevision), pathPattern, timeoutMillis,
382                                              false);
383         handleWatchRepositoryResult(future, resultHandler);
384     }
385 
386     private static void handleWatchRepositoryResult(
387             CompletableFuture<com.linecorp.centraldogma.common.Revision> future,
388             AsyncMethodCallback resultHandler) {
389         future.handle((res, cause) -> {
390             if (cause == null) {
391                 final WatchRepositoryResult wrr = new WatchRepositoryResult();
392                 wrr.setRevision(convert(res));
393                 resultHandler.onComplete(wrr);
394             } else if (cause instanceof CancellationException) {
395                 resultHandler.onComplete(CentralDogmaConstants.EMPTY_WATCH_REPOSITORY_RESULT);
396             } else if (cause instanceof RequestAlreadyTimedOutException) {
397                 logger.warn("Ignoring the exception raised when a request has already timed out.");
398             } else {
399                 logAndInvokeOnError("watchRepository", resultHandler, cause);
400             }
401             return null;
402         });
403     }
404 
405     @Override
406     public void watchFile(
407             String projectName, String repositoryName, Revision lastKnownRevision,
408             Query query, long timeoutMillis, AsyncMethodCallback resultHandler) {
409         // Call normalizeRevision() first to check if the specified revision needs to be recorded.
410         normalizeRevision(projectName, repositoryName, lastKnownRevision);
411         if (timeoutMillis <= 0) {
412             rejectInvalidWatchTimeout("watchFile", resultHandler);
413             return;
414         }
415 
416         final Repository repo = projectApiManager.getProject(projectName).repos().get(repositoryName);
417         final CompletableFuture<com.linecorp.centraldogma.common.Entry<Object>> future =
418                 watchService.watchFile(repo, convert(lastKnownRevision), convert(query), timeoutMillis,
419                                        false);
420 
421         handleWatchFileResult(future, resultHandler);
422     }
423 
424     private static void rejectInvalidWatchTimeout(String operationName, AsyncMethodCallback resultHandler) {
425         final CentralDogmaException cde = new CentralDogmaException(ErrorCode.BAD_REQUEST);
426         CentralDogmaExceptions.log(operationName, cde);
427         resultHandler.onError(cde);
428     }
429 
430     private static void handleWatchFileResult(
431             CompletableFuture<com.linecorp.centraldogma.common.Entry<Object>> future,
432             AsyncMethodCallback resultHandler) {
433         future.handle((res, cause) -> {
434             if (cause == null) {
435                 final WatchFileResult wfr = new WatchFileResult();
436                 wfr.setRevision(convert(res.revision()));
437                 wfr.setType(convert(res.type()));
438                 wfr.setContent(res.contentAsText());
439                 resultHandler.onComplete(wfr);
440             } else if (cause instanceof CancellationException) {
441                 resultHandler.onComplete(CentralDogmaConstants.EMPTY_WATCH_FILE_RESULT);
442             } else if (cause instanceof RequestAlreadyTimedOutException) {
443                 logger.warn("Ignoring the exception raised when a request has already timed out.");
444             } else {
445                 logAndInvokeOnError("watchFile", resultHandler, cause);
446             }
447             return null;
448         });
449     }
450 
451     private static void logAndInvokeOnError(
452             String operationName, AsyncMethodCallback resultHandler, Throwable cause) {
453         final CentralDogmaException cde = convert(cause);
454         CentralDogmaExceptions.log(operationName, cde);
455         resultHandler.onError(cde);
456     }
457 
458     // Unimplemented methods
459 
460     @Override
461     public void getSchema(String projectName, AsyncMethodCallback resultHandler) {
462         unimplemented(resultHandler);
463     }
464 
465     @Override
466     public void saveSchema(String projectName, Schema schema, AsyncMethodCallback resultHandler) {
467         unimplemented(resultHandler);
468     }
469 
470     @Override
471     public void getNamedQuery(String projectName, String name, AsyncMethodCallback resultHandler) {
472         unimplemented(resultHandler);
473     }
474 
475     @Override
476     public void saveNamedQuery(String projectName, NamedQuery namedQuery, AsyncMethodCallback resultHandler) {
477         unimplemented(resultHandler);
478     }
479 
480     @Override
481     public void removeNamedQuery(String projectName, String name, AsyncMethodCallback resultHandler) {
482         unimplemented(resultHandler);
483     }
484 
485     @Override
486     public void listNamedQueries(String projectName, AsyncMethodCallback resultHandler) {
487         unimplemented(resultHandler);
488     }
489 
490     @Override
491     public void getPlugin(String projectName, String pluginName, AsyncMethodCallback resultHandler) {
492         unimplemented(resultHandler);
493     }
494 
495     @Override
496     public void savePlugin(String projectName, Plugin plugin, AsyncMethodCallback resultHandler) {
497         unimplemented(resultHandler);
498     }
499 
500     @Override
501     public void removePlugin(String projectName, String pluginName, AsyncMethodCallback resultHandler) {
502         unimplemented(resultHandler);
503     }
504 
505     @Override
506     public void listPlugins(String projectName, AsyncMethodCallback resultHandler) {
507         unimplemented(resultHandler);
508     }
509 
510     @Override
511     public void listPluginOperations(String projectName, AsyncMethodCallback resultHandler) {
512         unimplemented(resultHandler);
513     }
514 
515     @Override
516     public void performPluginOperation(String projectName, String pluginName, String operationName,
517                                        String params, AsyncMethodCallback resultHandler) {
518         unimplemented(resultHandler);
519     }
520 
521     @Override
522     public void queryByNamedQuery(String projectName, String namedQuery, Revision revision,
523                                   AsyncMethodCallback resultHandler) {
524         unimplemented(resultHandler);
525     }
526 
527     @Override
528     public void listSubscribers(String projectName, String repositoryName, String path,
529                                 AsyncMethodCallback resultHandler) {
530         unimplemented(resultHandler);
531     }
532 
533     private static void unimplemented(AsyncMethodCallback resultHandler) {
534         resultHandler.onError(new CentralDogmaException(ErrorCode.UNIMPLEMENTED));
535     }
536 }