1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.server.internal.thrift;
17
18 import static com.google.common.base.MoreObjects.firstNonNull;
19 import static com.linecorp.centraldogma.common.Author.SYSTEM;
20 import static com.linecorp.centraldogma.common.Revision.HEAD;
21 import static com.linecorp.centraldogma.internal.Util.validateProjectName;
22 import static com.linecorp.centraldogma.internal.Util.validateRepositoryName;
23 import static com.linecorp.centraldogma.server.internal.api.ContentServiceV1.checkMetaRepoPush;
24 import static com.linecorp.centraldogma.server.internal.api.RepositoryServiceV1.increaseCounterIfOldRevisionUsed;
25 import static com.linecorp.centraldogma.server.internal.thrift.Converter.convert;
26 import static com.linecorp.centraldogma.server.storage.project.Project.isInternalRepo;
27 import static com.linecorp.centraldogma.server.storage.repository.FindOptions.FIND_ALL_WITHOUT_CONTENT;
28 import static com.spotify.futures.CompletableFutures.allAsList;
29 import static java.util.Objects.requireNonNull;
30 import static java.util.stream.Collectors.toList;
31
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.concurrent.Callable;
36 import java.util.concurrent.CancellationException;
37 import java.util.concurrent.CompletableFuture;
38
39 import org.apache.thrift.async.AsyncMethodCallback;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import com.linecorp.armeria.common.RequestContext;
44 import com.linecorp.armeria.common.util.Exceptions;
45 import com.linecorp.centraldogma.internal.thrift.Author;
46 import com.linecorp.centraldogma.internal.thrift.CentralDogmaConstants;
47 import com.linecorp.centraldogma.internal.thrift.CentralDogmaException;
48 import com.linecorp.centraldogma.internal.thrift.CentralDogmaService;
49 import com.linecorp.centraldogma.internal.thrift.Change;
50 import com.linecorp.centraldogma.internal.thrift.Comment;
51 import com.linecorp.centraldogma.internal.thrift.DiffFileResult;
52 import com.linecorp.centraldogma.internal.thrift.Entry;
53 import com.linecorp.centraldogma.internal.thrift.ErrorCode;
54 import com.linecorp.centraldogma.internal.thrift.GetFileResult;
55 import com.linecorp.centraldogma.internal.thrift.MergeQuery;
56 import com.linecorp.centraldogma.internal.thrift.MergedEntry;
57 import com.linecorp.centraldogma.internal.thrift.NamedQuery;
58 import com.linecorp.centraldogma.internal.thrift.Plugin;
59 import com.linecorp.centraldogma.internal.thrift.Project;
60 import com.linecorp.centraldogma.internal.thrift.Query;
61 import com.linecorp.centraldogma.internal.thrift.Revision;
62 import com.linecorp.centraldogma.internal.thrift.Schema;
63 import com.linecorp.centraldogma.internal.thrift.WatchFileResult;
64 import com.linecorp.centraldogma.internal.thrift.WatchRepositoryResult;
65 import com.linecorp.centraldogma.server.command.Command;
66 import com.linecorp.centraldogma.server.command.CommandExecutor;
67 import com.linecorp.centraldogma.server.internal.api.WatchService;
68 import com.linecorp.centraldogma.server.internal.storage.RequestAlreadyTimedOutException;
69 import com.linecorp.centraldogma.server.internal.storage.project.ProjectApiManager;
70 import com.linecorp.centraldogma.server.metadata.MetadataService;
71 import com.linecorp.centraldogma.server.storage.repository.Repository;
72
73 public class CentralDogmaServiceImpl implements CentralDogmaService.AsyncIface {
74
75 private static final Logger logger = LoggerFactory.getLogger(CentralDogmaServiceImpl.class);
76
77 private static final IllegalArgumentException RESERVED_REPOSITORY_EXCEPTION =
78 Exceptions.clearTrace(new IllegalArgumentException(
79 "The repository is reserved by system and thus cannot be created or removed."));
80
81 private final ProjectApiManager projectApiManager;
82 private final CommandExecutor executor;
83 private final WatchService watchService;
84 private final MetadataService mds;
85
86 public CentralDogmaServiceImpl(ProjectApiManager projectApiManager, CommandExecutor executor,
87 WatchService watchService, MetadataService mds) {
88 this.projectApiManager = requireNonNull(projectApiManager, "projectApiManager");
89 this.executor = requireNonNull(executor, "executor");
90 this.watchService = requireNonNull(watchService, "watchService");
91 this.mds = requireNonNull(mds, "mds");
92 }
93
94 private static void handle(CompletableFuture<?> future, AsyncMethodCallback resultHandler) {
95 future.handle((res, cause) -> {
96 if (cause != null) {
97 resultHandler.onError(convert(cause));
98 } else {
99 resultHandler.onComplete(res);
100 }
101 return null;
102 });
103 }
104
105 private static void handle(Callable<?> task, AsyncMethodCallback resultHandler) {
106 try {
107 resultHandler.onComplete(task.call());
108 } catch (Throwable cause) {
109 resultHandler.onError(convert(cause));
110 }
111 }
112
113 private static void handleAsVoidResult(CompletableFuture<?> future, AsyncMethodCallback resultHandler) {
114 future.handle((res, cause) -> {
115 if (cause != null) {
116 resultHandler.onError(convert(cause));
117 } else {
118 resultHandler.onComplete(null);
119 }
120 return null;
121 });
122 }
123
124 @Override
125 public void createProject(String name, AsyncMethodCallback resultHandler) {
126 validateProjectName(name, "name", false);
127
128 handle(projectApiManager.createProject(name, SYSTEM), resultHandler);
129 }
130
131 @Override
132 public void removeProject(String name, AsyncMethodCallback resultHandler) {
133
134 handle(projectApiManager.removeProject(name, SYSTEM), resultHandler);
135 }
136
137 @Override
138 public void purgeProject(String name, AsyncMethodCallback resultHandler) {
139 handleAsVoidResult(projectApiManager.purgeProject(name, SYSTEM), resultHandler);
140 }
141
142 @Override
143 public void unremoveProject(String name, AsyncMethodCallback resultHandler) {
144
145 handleAsVoidResult(projectApiManager.unremoveProject(name, SYSTEM), resultHandler);
146 }
147
148 @Override
149 public void listProjects(AsyncMethodCallback resultHandler) {
150 handle(() -> {
151 final Map<String, com.linecorp.centraldogma.server.storage.project.Project> projects =
152 projectApiManager.listProjects(null);
153 final List<Project> ret = new ArrayList<>(projects.size());
154 projects.forEach((key, value) -> ret.add(convert(key, value)));
155 return ret;
156 }, resultHandler);
157 }
158
159 @Override
160 public void listRemovedProjects(AsyncMethodCallback resultHandler) {
161 handle(() -> projectApiManager.listRemovedProjects().keySet(), resultHandler);
162 }
163
164 @Override
165 public void createRepository(String projectName, String repositoryName,
166 AsyncMethodCallback resultHandler) {
167 validateRepositoryName(repositoryName, "repositoryName");
168
169 if (isInternalRepo(repositoryName)) {
170 resultHandler.onError(convert(RESERVED_REPOSITORY_EXCEPTION));
171 return;
172 }
173 handleAsVoidResult(executor.execute(Command.createRepository(SYSTEM, projectName, repositoryName))
174 .thenCompose(unused -> mds.addRepo(SYSTEM, projectName, repositoryName)),
175 resultHandler);
176 }
177
178 @Override
179 public void removeRepository(String projectName, String repositoryName,
180 AsyncMethodCallback resultHandler) {
181
182 if (isInternalRepo(repositoryName)) {
183 resultHandler.onError(convert(RESERVED_REPOSITORY_EXCEPTION));
184 return;
185 }
186 handleAsVoidResult(executor.execute(Command.removeRepository(SYSTEM, projectName, repositoryName))
187 .thenCompose(unused -> mds.removeRepo(SYSTEM, projectName, repositoryName)),
188 resultHandler);
189 }
190
191 @Override
192 public void purgeRepository(String projectName, String repositoryName, AsyncMethodCallback resultHandler) {
193 handleAsVoidResult(executor.execute(Command.purgeRepository(SYSTEM, projectName, repositoryName))
194 .thenCompose(unused -> mds.purgeRepo(SYSTEM, projectName, repositoryName)),
195 resultHandler);
196 }
197
198 @Override
199 public void unremoveRepository(String projectName, String repositoryName,
200 AsyncMethodCallback resultHandler) {
201 handleAsVoidResult(executor.execute(Command.unremoveRepository(SYSTEM, projectName, repositoryName))
202 .thenCompose(unused -> mds.restoreRepo(SYSTEM, projectName, repositoryName)),
203 resultHandler);
204 }
205
206 @Override
207 public void listRepositories(String projectName, AsyncMethodCallback resultHandler) {
208 handle(allAsList(projectApiManager.getProject(projectName, null).repos().list().entrySet().stream()
209 .map(e -> convert(e.getKey(), e.getValue()))
210 .collect(toList())),
211 resultHandler);
212 }
213
214 @Override
215 public void listRemovedRepositories(String projectName, AsyncMethodCallback resultHandler) {
216 handle(() -> projectApiManager.getProject(projectName, null).repos().listRemoved().keySet(),
217 resultHandler);
218 }
219
220 @Override
221 public void normalizeRevision(String projectName, String repositoryName, Revision revision,
222 AsyncMethodCallback resultHandler) {
223
224 final com.linecorp.centraldogma.common.Revision normalized =
225 normalizeRevision(projectName, repositoryName, revision);
226
227 resultHandler.onComplete(convert(normalized));
228 }
229
230 private com.linecorp.centraldogma.common.Revision normalizeRevision(
231 String projectName, String repositoryName, Revision revision) {
232 final Repository repository = projectApiManager.getProject(projectName, null).repos().get(
233 repositoryName);
234 final com.linecorp.centraldogma.common.Revision normalized =
235 repository.normalizeNow(convert(revision));
236 final com.linecorp.centraldogma.common.Revision head = repository.normalizeNow(HEAD);
237 increaseCounterIfOldRevisionUsed(RequestContext.current(), repository, normalized, head);
238 return normalized;
239 }
240
241 @Override
242 public void listFiles(String projectName, String repositoryName, Revision revision, String pathPattern,
243 AsyncMethodCallback resultHandler) {
244
245 normalizeRevision(projectName, repositoryName, revision);
246 handle(projectApiManager.getProject(projectName, null).repos().get(repositoryName)
247 .find(convert(revision), pathPattern, FIND_ALL_WITHOUT_CONTENT)
248 .thenApply(entries -> {
249 final List<Entry> ret = new ArrayList<>(entries.size());
250 entries.forEach((path, entry) -> ret.add(
251 new Entry(path, convert(entry.type()))));
252 return ret;
253 }),
254 resultHandler);
255 }
256
257 @Override
258 public void getFiles(String projectName, String repositoryName, Revision revision, String pathPattern,
259 AsyncMethodCallback resultHandler) {
260
261 normalizeRevision(projectName, repositoryName, revision);
262 handle(projectApiManager.getProject(projectName, null).repos().get(repositoryName)
263 .find(convert(revision), pathPattern)
264 .thenApply(entries -> {
265 final List<Entry> ret = new ArrayList<>(entries.size());
266 ret.addAll(entries.entrySet().stream()
267 .map(e -> convert(e.getValue()))
268 .collect(toList()));
269 return ret;
270 }),
271 resultHandler);
272 }
273
274 @Override
275 public void getHistory(String projectName, String repositoryName, Revision from, Revision to,
276 String pathPattern, AsyncMethodCallback resultHandler) {
277
278 normalizeRevision(projectName, repositoryName, from);
279 normalizeRevision(projectName, repositoryName, to);
280 handle(projectApiManager.getProject(projectName, null).repos().get(repositoryName)
281 .history(convert(from), convert(to), pathPattern)
282 .thenApply(commits -> commits.stream()
283 .map(Converter::convert)
284 .collect(toList())),
285 resultHandler);
286 }
287
288 @Override
289 public void getDiffs(String projectName, String repositoryName, Revision from, Revision to,
290 String pathPattern, AsyncMethodCallback resultHandler) {
291
292 normalizeRevision(projectName, repositoryName, from);
293 normalizeRevision(projectName, repositoryName, to);
294 handle(projectApiManager.getProject(projectName, null).repos().get(repositoryName)
295 .diff(convert(from), convert(to), pathPattern)
296 .thenApply(diffs -> convert(diffs.values(), Converter::convert)),
297 resultHandler);
298 }
299
300 @Override
301 public void getPreviewDiffs(String projectName, String repositoryName, Revision baseRevision,
302 List<Change> changes, AsyncMethodCallback resultHandler) {
303
304 normalizeRevision(projectName, repositoryName, baseRevision);
305 handle(projectApiManager.getProject(projectName, null).repos().get(repositoryName)
306 .previewDiff(convert(baseRevision), convert(changes, Converter::convert))
307 .thenApply(diffs -> convert(diffs.values(), Converter::convert)),
308 resultHandler);
309 }
310
311 @Override
312 public void push(String projectName, String repositoryName, Revision baseRevision, Author author,
313 String summary, Comment detail, List<Change> changes, AsyncMethodCallback resultHandler) {
314 final List<com.linecorp.centraldogma.common.Change<?>> convertedChanges =
315 convert(changes, Converter::convert);
316 try {
317 checkMetaRepoPush(repositoryName, convertedChanges);
318 } catch (Exception e) {
319 resultHandler.onError(e);
320 return;
321 }
322
323 handle(executor.execute(Command.push(convert(author), projectName, repositoryName,
324 convert(baseRevision), summary, detail.getContent(),
325 convert(detail.getMarkup()), convertedChanges))
326 .thenCompose(commitResult -> {
327 final com.linecorp.centraldogma.common.Revision newRev = commitResult.revision();
328 return projectApiManager.getProject(projectName, null).repos().get(repositoryName)
329 .history(newRev, newRev, "/**");
330 })
331 .thenApply(commits -> convert(commits.get(0))),
332 resultHandler);
333 }
334
335 @Override
336 public void getFile(String projectName, String repositoryName, Revision revision, Query query,
337 AsyncMethodCallback resultHandler) {
338
339 normalizeRevision(projectName, repositoryName, revision);
340 handle(projectApiManager.getProject(projectName, null).repos().get(repositoryName)
341 .get(convert(revision), convert(query))
342 .thenApply(res -> new GetFileResult(convert(res.type()), res.contentAsText())),
343 resultHandler);
344 }
345
346 @Override
347 public void diffFile(String projectName, String repositoryName, Revision from, Revision to, Query query,
348 AsyncMethodCallback resultHandler) {
349
350 normalizeRevision(projectName, repositoryName, from);
351 normalizeRevision(projectName, repositoryName, to);
352
353 handle(projectApiManager.getProject(projectName, null).repos().get(repositoryName)
354 .diff(convert(from), convert(to), convert(query))
355 .thenApply(change -> new DiffFileResult(
356 convert(change.type()), firstNonNull(change.contentAsText(), ""))),
357 resultHandler);
358 }
359
360 @Override
361 public void mergeFiles(String projectName, String repositoryName, Revision revision,
362 MergeQuery mergeQuery, AsyncMethodCallback resultHandler) {
363
364 normalizeRevision(projectName, repositoryName, revision);
365 handle(projectApiManager.getProject(projectName, null).repos().get(repositoryName)
366 .mergeFiles(convert(revision), convert(mergeQuery))
367 .thenApply(merged -> new MergedEntry(convert(merged.revision()),
368 convert(merged.type()),
369 merged.contentAsText(),
370 merged.paths())),
371 resultHandler);
372 }
373
374 @Override
375 public void watchRepository(
376 String projectName, String repositoryName, Revision lastKnownRevision,
377 String pathPattern, long timeoutMillis, AsyncMethodCallback resultHandler) {
378
379 normalizeRevision(projectName, repositoryName, lastKnownRevision);
380 if (timeoutMillis <= 0) {
381 rejectInvalidWatchTimeout("watchRepository", resultHandler);
382 return;
383 }
384
385 final Repository repo = projectApiManager.getProject(projectName, null).repos().get(repositoryName);
386 final CompletableFuture<com.linecorp.centraldogma.common.Revision> future =
387 watchService.watchRepository(repo, convert(lastKnownRevision), pathPattern, timeoutMillis,
388 false);
389 handleWatchRepositoryResult(future, resultHandler);
390 }
391
392 private static void handleWatchRepositoryResult(
393 CompletableFuture<com.linecorp.centraldogma.common.Revision> future,
394 AsyncMethodCallback resultHandler) {
395 future.handle((res, cause) -> {
396 if (cause == null) {
397 final WatchRepositoryResult wrr = new WatchRepositoryResult();
398 wrr.setRevision(convert(res));
399 resultHandler.onComplete(wrr);
400 } else if (cause instanceof CancellationException) {
401 resultHandler.onComplete(CentralDogmaConstants.EMPTY_WATCH_REPOSITORY_RESULT);
402 } else if (cause instanceof RequestAlreadyTimedOutException) {
403 logger.warn("Ignoring the exception raised when a request has already timed out.");
404 } else {
405 logAndInvokeOnError("watchRepository", resultHandler, cause);
406 }
407 return null;
408 });
409 }
410
411 @Override
412 public void watchFile(
413 String projectName, String repositoryName, Revision lastKnownRevision,
414 Query query, long timeoutMillis, AsyncMethodCallback resultHandler) {
415
416 normalizeRevision(projectName, repositoryName, lastKnownRevision);
417 if (timeoutMillis <= 0) {
418 rejectInvalidWatchTimeout("watchFile", resultHandler);
419 return;
420 }
421
422 final Repository repo = projectApiManager.getProject(projectName, null).repos().get(repositoryName);
423 final CompletableFuture<com.linecorp.centraldogma.common.Entry<Object>> future =
424 watchService.watchFile(repo, convert(lastKnownRevision), convert(query), timeoutMillis,
425 false);
426
427 handleWatchFileResult(future, resultHandler);
428 }
429
430 private static void rejectInvalidWatchTimeout(String operationName, AsyncMethodCallback resultHandler) {
431 final CentralDogmaException cde = new CentralDogmaException(ErrorCode.BAD_REQUEST);
432 CentralDogmaExceptions.log(operationName, cde);
433 resultHandler.onError(cde);
434 }
435
436 private static void handleWatchFileResult(
437 CompletableFuture<com.linecorp.centraldogma.common.Entry<Object>> future,
438 AsyncMethodCallback resultHandler) {
439 future.handle((res, cause) -> {
440 if (cause == null) {
441 final WatchFileResult wfr = new WatchFileResult();
442 wfr.setRevision(convert(res.revision()));
443 wfr.setType(convert(res.type()));
444 wfr.setContent(res.contentAsText());
445 resultHandler.onComplete(wfr);
446 } else if (cause instanceof CancellationException) {
447 resultHandler.onComplete(CentralDogmaConstants.EMPTY_WATCH_FILE_RESULT);
448 } else if (cause instanceof RequestAlreadyTimedOutException) {
449 logger.warn("Ignoring the exception raised when a request has already timed out.");
450 } else {
451 logAndInvokeOnError("watchFile", resultHandler, cause);
452 }
453 return null;
454 });
455 }
456
457 private static void logAndInvokeOnError(
458 String operationName, AsyncMethodCallback resultHandler, Throwable cause) {
459 final CentralDogmaException cde = convert(cause);
460 CentralDogmaExceptions.log(operationName, cde);
461 resultHandler.onError(cde);
462 }
463
464
465
466 @Override
467 public void getSchema(String projectName, AsyncMethodCallback resultHandler) {
468 unimplemented(resultHandler);
469 }
470
471 @Override
472 public void saveSchema(String projectName, Schema schema, AsyncMethodCallback resultHandler) {
473 unimplemented(resultHandler);
474 }
475
476 @Override
477 public void getNamedQuery(String projectName, String name, AsyncMethodCallback resultHandler) {
478 unimplemented(resultHandler);
479 }
480
481 @Override
482 public void saveNamedQuery(String projectName, NamedQuery namedQuery, AsyncMethodCallback resultHandler) {
483 unimplemented(resultHandler);
484 }
485
486 @Override
487 public void removeNamedQuery(String projectName, String name, AsyncMethodCallback resultHandler) {
488 unimplemented(resultHandler);
489 }
490
491 @Override
492 public void listNamedQueries(String projectName, AsyncMethodCallback resultHandler) {
493 unimplemented(resultHandler);
494 }
495
496 @Override
497 public void getPlugin(String projectName, String pluginName, AsyncMethodCallback resultHandler) {
498 unimplemented(resultHandler);
499 }
500
501 @Override
502 public void savePlugin(String projectName, Plugin plugin, AsyncMethodCallback resultHandler) {
503 unimplemented(resultHandler);
504 }
505
506 @Override
507 public void removePlugin(String projectName, String pluginName, AsyncMethodCallback resultHandler) {
508 unimplemented(resultHandler);
509 }
510
511 @Override
512 public void listPlugins(String projectName, AsyncMethodCallback resultHandler) {
513 unimplemented(resultHandler);
514 }
515
516 @Override
517 public void listPluginOperations(String projectName, AsyncMethodCallback resultHandler) {
518 unimplemented(resultHandler);
519 }
520
521 @Override
522 public void performPluginOperation(String projectName, String pluginName, String operationName,
523 String params, AsyncMethodCallback resultHandler) {
524 unimplemented(resultHandler);
525 }
526
527 @Override
528 public void queryByNamedQuery(String projectName, String namedQuery, Revision revision,
529 AsyncMethodCallback resultHandler) {
530 unimplemented(resultHandler);
531 }
532
533 @Override
534 public void listSubscribers(String projectName, String repositoryName, String path,
535 AsyncMethodCallback resultHandler) {
536 unimplemented(resultHandler);
537 }
538
539 private static void unimplemented(AsyncMethodCallback resultHandler) {
540 resultHandler.onError(new CentralDogmaException(ErrorCode.UNIMPLEMENTED));
541 }
542 }