1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.server.internal.thrift;
17
18 import static com.google.common.base.MoreObjects.firstNonNull;
19 import static com.linecorp.centraldogma.common.Author.SYSTEM;
20 import static com.linecorp.centraldogma.common.Revision.HEAD;
21 import static com.linecorp.centraldogma.server.internal.api.ContentServiceV1.checkPush;
22 import static com.linecorp.centraldogma.server.internal.api.RepositoryServiceV1.increaseCounterIfOldRevisionUsed;
23 import static com.linecorp.centraldogma.server.internal.thrift.Converter.convert;
24 import static com.linecorp.centraldogma.server.storage.project.Project.isReservedRepoName;
25 import static com.linecorp.centraldogma.server.storage.repository.FindOptions.FIND_ALL_WITHOUT_CONTENT;
26 import static com.spotify.futures.CompletableFutures.allAsList;
27 import static java.util.Objects.requireNonNull;
28 import static java.util.stream.Collectors.toList;
29
30 import java.util.ArrayList;
31 import java.util.List;
32 import java.util.Map;
33 import java.util.concurrent.Callable;
34 import java.util.concurrent.CancellationException;
35 import java.util.concurrent.CompletableFuture;
36
37 import org.apache.thrift.async.AsyncMethodCallback;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 import com.linecorp.armeria.common.RequestContext;
42 import com.linecorp.armeria.common.util.Exceptions;
43 import com.linecorp.centraldogma.internal.thrift.Author;
44 import com.linecorp.centraldogma.internal.thrift.CentralDogmaConstants;
45 import com.linecorp.centraldogma.internal.thrift.CentralDogmaException;
46 import com.linecorp.centraldogma.internal.thrift.CentralDogmaService;
47 import com.linecorp.centraldogma.internal.thrift.Change;
48 import com.linecorp.centraldogma.internal.thrift.Comment;
49 import com.linecorp.centraldogma.internal.thrift.DiffFileResult;
50 import com.linecorp.centraldogma.internal.thrift.Entry;
51 import com.linecorp.centraldogma.internal.thrift.ErrorCode;
52 import com.linecorp.centraldogma.internal.thrift.GetFileResult;
53 import com.linecorp.centraldogma.internal.thrift.MergeQuery;
54 import com.linecorp.centraldogma.internal.thrift.MergedEntry;
55 import com.linecorp.centraldogma.internal.thrift.NamedQuery;
56 import com.linecorp.centraldogma.internal.thrift.Plugin;
57 import com.linecorp.centraldogma.internal.thrift.Project;
58 import com.linecorp.centraldogma.internal.thrift.Query;
59 import com.linecorp.centraldogma.internal.thrift.Revision;
60 import com.linecorp.centraldogma.internal.thrift.Schema;
61 import com.linecorp.centraldogma.internal.thrift.WatchFileResult;
62 import com.linecorp.centraldogma.internal.thrift.WatchRepositoryResult;
63 import com.linecorp.centraldogma.server.command.Command;
64 import com.linecorp.centraldogma.server.command.CommandExecutor;
65 import com.linecorp.centraldogma.server.internal.api.WatchService;
66 import com.linecorp.centraldogma.server.internal.storage.RequestAlreadyTimedOutException;
67 import com.linecorp.centraldogma.server.internal.storage.project.ProjectApiManager;
68 import com.linecorp.centraldogma.server.metadata.MetadataService;
69 import com.linecorp.centraldogma.server.storage.repository.Repository;
70
71 public class CentralDogmaServiceImpl implements CentralDogmaService.AsyncIface {
72
73 private static final Logger logger = LoggerFactory.getLogger(CentralDogmaServiceImpl.class);
74
75 private static final IllegalArgumentException RESERVED_REPOSITORY_EXCEPTION =
76 Exceptions.clearTrace(new IllegalArgumentException(
77 "The repository is reserved by system and thus cannot be created or removed."));
78
79 private final ProjectApiManager projectApiManager;
80 private final CommandExecutor executor;
81 private final WatchService watchService;
82 private final MetadataService mds;
83
84 public CentralDogmaServiceImpl(ProjectApiManager projectApiManager, CommandExecutor executor,
85 WatchService watchService, MetadataService mds) {
86 this.projectApiManager = requireNonNull(projectApiManager, "projectApiManager");
87 this.executor = requireNonNull(executor, "executor");
88 this.watchService = requireNonNull(watchService, "watchService");
89 this.mds = requireNonNull(mds, "mds");
90 }
91
92 private static void handle(CompletableFuture<?> future, AsyncMethodCallback resultHandler) {
93 future.handle((res, cause) -> {
94 if (cause != null) {
95 resultHandler.onError(convert(cause));
96 } else {
97 resultHandler.onComplete(res);
98 }
99 return null;
100 });
101 }
102
103 private static void handle(Callable<?> task, AsyncMethodCallback resultHandler) {
104 try {
105 resultHandler.onComplete(task.call());
106 } catch (Throwable cause) {
107 resultHandler.onError(convert(cause));
108 }
109 }
110
111 private static void handleAsVoidResult(CompletableFuture<?> future, AsyncMethodCallback resultHandler) {
112 future.handle((res, cause) -> {
113 if (cause != null) {
114 resultHandler.onError(convert(cause));
115 } else {
116 resultHandler.onComplete(null);
117 }
118 return null;
119 });
120 }
121
122 @Override
123 public void createProject(String name, AsyncMethodCallback resultHandler) {
124
125 handle(projectApiManager.createProject(name, SYSTEM), resultHandler);
126 }
127
128 @Override
129 public void removeProject(String name, AsyncMethodCallback resultHandler) {
130
131 handle(projectApiManager.removeProject(name, SYSTEM), resultHandler);
132 }
133
134 @Override
135 public void purgeProject(String name, AsyncMethodCallback resultHandler) {
136 handleAsVoidResult(projectApiManager.purgeProject(name, SYSTEM), resultHandler);
137 }
138
139 @Override
140 public void unremoveProject(String name, AsyncMethodCallback resultHandler) {
141
142 handleAsVoidResult(projectApiManager.unremoveProject(name, SYSTEM), resultHandler);
143 }
144
145 @Override
146 public void listProjects(AsyncMethodCallback resultHandler) {
147 handle(() -> {
148 final Map<String, com.linecorp.centraldogma.server.storage.project.Project> projects =
149 projectApiManager.listProjects();
150 final List<Project> ret = new ArrayList<>(projects.size());
151 projects.forEach((key, value) -> ret.add(convert(key, value)));
152 return ret;
153 }, resultHandler);
154 }
155
156 @Override
157 public void listRemovedProjects(AsyncMethodCallback resultHandler) {
158 handle(() -> projectApiManager.listRemovedProjects().keySet(), resultHandler);
159 }
160
161 @Override
162 public void createRepository(String projectName, String repositoryName,
163 AsyncMethodCallback resultHandler) {
164
165 if (isReservedRepoName(repositoryName)) {
166 resultHandler.onError(convert(RESERVED_REPOSITORY_EXCEPTION));
167 return;
168 }
169 handleAsVoidResult(executor.execute(Command.createRepository(SYSTEM, projectName, repositoryName))
170 .thenCompose(unused -> mds.addRepo(SYSTEM, projectName, repositoryName)),
171 resultHandler);
172 }
173
174 @Override
175 public void removeRepository(String projectName, String repositoryName,
176 AsyncMethodCallback resultHandler) {
177
178 if (isReservedRepoName(repositoryName)) {
179 resultHandler.onError(convert(RESERVED_REPOSITORY_EXCEPTION));
180 return;
181 }
182 handleAsVoidResult(executor.execute(Command.removeRepository(SYSTEM, projectName, repositoryName))
183 .thenCompose(unused -> mds.removeRepo(SYSTEM, projectName, repositoryName)),
184 resultHandler);
185 }
186
187 @Override
188 public void purgeRepository(String projectName, String repositoryName, AsyncMethodCallback resultHandler) {
189 handleAsVoidResult(executor.execute(Command.purgeRepository(SYSTEM, projectName, repositoryName))
190 .thenCompose(unused -> mds.purgeRepo(SYSTEM, projectName, repositoryName)),
191 resultHandler);
192 }
193
194 @Override
195 public void unremoveRepository(String projectName, String repositoryName,
196 AsyncMethodCallback resultHandler) {
197 handleAsVoidResult(executor.execute(Command.unremoveRepository(SYSTEM, projectName, repositoryName))
198 .thenCompose(unused -> mds.restoreRepo(SYSTEM, projectName, repositoryName)),
199 resultHandler);
200 }
201
202 @Override
203 public void listRepositories(String projectName, AsyncMethodCallback resultHandler) {
204 handle(allAsList(projectApiManager.getProject(projectName).repos().list().entrySet().stream()
205 .map(e -> convert(e.getKey(), e.getValue()))
206 .collect(toList())),
207 resultHandler);
208 }
209
210 @Override
211 public void listRemovedRepositories(String projectName, AsyncMethodCallback resultHandler) {
212 handle(() -> projectApiManager.getProject(projectName).repos().listRemoved().keySet(), resultHandler);
213 }
214
215 @Override
216 public void normalizeRevision(String projectName, String repositoryName, Revision revision,
217 AsyncMethodCallback resultHandler) {
218
219 final com.linecorp.centraldogma.common.Revision normalized =
220 normalizeRevision(projectName, repositoryName, revision);
221
222 resultHandler.onComplete(convert(normalized));
223 }
224
225 private com.linecorp.centraldogma.common.Revision normalizeRevision(
226 String projectName, String repositoryName, Revision revision) {
227 final Repository repository = projectApiManager.getProject(projectName).repos().get(repositoryName);
228 final com.linecorp.centraldogma.common.Revision normalized =
229 repository.normalizeNow(convert(revision));
230 final com.linecorp.centraldogma.common.Revision head = repository.normalizeNow(HEAD);
231 increaseCounterIfOldRevisionUsed(RequestContext.current(), repository, normalized, head);
232 return normalized;
233 }
234
235 @Override
236 public void listFiles(String projectName, String repositoryName, Revision revision, String pathPattern,
237 AsyncMethodCallback resultHandler) {
238
239 normalizeRevision(projectName, repositoryName, revision);
240 handle(projectApiManager.getProject(projectName).repos().get(repositoryName)
241 .find(convert(revision), pathPattern, FIND_ALL_WITHOUT_CONTENT)
242 .thenApply(entries -> {
243 final List<Entry> ret = new ArrayList<>(entries.size());
244 entries.forEach((path, entry) -> ret.add(
245 new Entry(path, convert(entry.type()))));
246 return ret;
247 }),
248 resultHandler);
249 }
250
251 @Override
252 public void getFiles(String projectName, String repositoryName, Revision revision, String pathPattern,
253 AsyncMethodCallback resultHandler) {
254
255 normalizeRevision(projectName, repositoryName, revision);
256 handle(projectApiManager.getProject(projectName).repos().get(repositoryName)
257 .find(convert(revision), pathPattern)
258 .thenApply(entries -> {
259 final List<Entry> ret = new ArrayList<>(entries.size());
260 ret.addAll(entries.entrySet().stream()
261 .map(e -> convert(e.getValue()))
262 .collect(toList()));
263 return ret;
264 }),
265 resultHandler);
266 }
267
268 @Override
269 public void getHistory(String projectName, String repositoryName, Revision from, Revision to,
270 String pathPattern, AsyncMethodCallback resultHandler) {
271
272 normalizeRevision(projectName, repositoryName, from);
273 normalizeRevision(projectName, repositoryName, to);
274 handle(projectApiManager.getProject(projectName).repos().get(repositoryName)
275 .history(convert(from), convert(to), pathPattern)
276 .thenApply(commits -> commits.stream()
277 .map(Converter::convert)
278 .collect(toList())),
279 resultHandler);
280 }
281
282 @Override
283 public void getDiffs(String projectName, String repositoryName, Revision from, Revision to,
284 String pathPattern, AsyncMethodCallback resultHandler) {
285
286 normalizeRevision(projectName, repositoryName, from);
287 normalizeRevision(projectName, repositoryName, to);
288 handle(projectApiManager.getProject(projectName).repos().get(repositoryName)
289 .diff(convert(from), convert(to), pathPattern)
290 .thenApply(diffs -> convert(diffs.values(), Converter::convert)),
291 resultHandler);
292 }
293
294 @Override
295 public void getPreviewDiffs(String projectName, String repositoryName, Revision baseRevision,
296 List<Change> changes, AsyncMethodCallback resultHandler) {
297
298 normalizeRevision(projectName, repositoryName, baseRevision);
299 handle(projectApiManager.getProject(projectName).repos().get(repositoryName)
300 .previewDiff(convert(baseRevision), convert(changes, Converter::convert))
301 .thenApply(diffs -> convert(diffs.values(), Converter::convert)),
302 resultHandler);
303 }
304
305 @Override
306 public void push(String projectName, String repositoryName, Revision baseRevision, Author author,
307 String summary, Comment detail, List<Change> changes, AsyncMethodCallback resultHandler) {
308 final List<com.linecorp.centraldogma.common.Change<?>> convertedChanges =
309 convert(changes, Converter::convert);
310 try {
311 checkPush(repositoryName, convertedChanges);
312 } catch (Exception e) {
313 resultHandler.onError(e);
314 return;
315 }
316
317 handle(executor.execute(Command.push(convert(author), projectName, repositoryName,
318 convert(baseRevision), summary, detail.getContent(),
319 convert(detail.getMarkup()), convertedChanges))
320 .thenCompose(commitResult -> {
321 final com.linecorp.centraldogma.common.Revision newRev = commitResult.revision();
322 return projectApiManager.getProject(projectName).repos().get(repositoryName)
323 .history(newRev, newRev, "/**");
324 })
325 .thenApply(commits -> convert(commits.get(0))),
326 resultHandler);
327 }
328
329 @Override
330 public void getFile(String projectName, String repositoryName, Revision revision, Query query,
331 AsyncMethodCallback resultHandler) {
332
333 normalizeRevision(projectName, repositoryName, revision);
334 handle(projectApiManager.getProject(projectName).repos().get(repositoryName)
335 .get(convert(revision), convert(query))
336 .thenApply(res -> new GetFileResult(convert(res.type()), res.contentAsText())),
337 resultHandler);
338 }
339
340 @Override
341 public void diffFile(String projectName, String repositoryName, Revision from, Revision to, Query query,
342 AsyncMethodCallback resultHandler) {
343
344 normalizeRevision(projectName, repositoryName, from);
345 normalizeRevision(projectName, repositoryName, to);
346
347 handle(projectApiManager.getProject(projectName).repos().get(repositoryName)
348 .diff(convert(from), convert(to), convert(query))
349 .thenApply(change -> new DiffFileResult(
350 convert(change.type()), firstNonNull(change.contentAsText(), ""))),
351 resultHandler);
352 }
353
354 @Override
355 public void mergeFiles(String projectName, String repositoryName, Revision revision,
356 MergeQuery mergeQuery, AsyncMethodCallback resultHandler) {
357
358 normalizeRevision(projectName, repositoryName, revision);
359 handle(projectApiManager.getProject(projectName).repos().get(repositoryName)
360 .mergeFiles(convert(revision), convert(mergeQuery))
361 .thenApply(merged -> new MergedEntry(convert(merged.revision()),
362 convert(merged.type()),
363 merged.contentAsText(),
364 merged.paths())),
365 resultHandler);
366 }
367
368 @Override
369 public void watchRepository(
370 String projectName, String repositoryName, Revision lastKnownRevision,
371 String pathPattern, long timeoutMillis, AsyncMethodCallback resultHandler) {
372
373 normalizeRevision(projectName, repositoryName, lastKnownRevision);
374 if (timeoutMillis <= 0) {
375 rejectInvalidWatchTimeout("watchRepository", resultHandler);
376 return;
377 }
378
379 final Repository repo = projectApiManager.getProject(projectName).repos().get(repositoryName);
380 final CompletableFuture<com.linecorp.centraldogma.common.Revision> future =
381 watchService.watchRepository(repo, convert(lastKnownRevision), pathPattern, timeoutMillis,
382 false);
383 handleWatchRepositoryResult(future, resultHandler);
384 }
385
386 private static void handleWatchRepositoryResult(
387 CompletableFuture<com.linecorp.centraldogma.common.Revision> future,
388 AsyncMethodCallback resultHandler) {
389 future.handle((res, cause) -> {
390 if (cause == null) {
391 final WatchRepositoryResult wrr = new WatchRepositoryResult();
392 wrr.setRevision(convert(res));
393 resultHandler.onComplete(wrr);
394 } else if (cause instanceof CancellationException) {
395 resultHandler.onComplete(CentralDogmaConstants.EMPTY_WATCH_REPOSITORY_RESULT);
396 } else if (cause instanceof RequestAlreadyTimedOutException) {
397 logger.warn("Ignoring the exception raised when a request has already timed out.");
398 } else {
399 logAndInvokeOnError("watchRepository", resultHandler, cause);
400 }
401 return null;
402 });
403 }
404
405 @Override
406 public void watchFile(
407 String projectName, String repositoryName, Revision lastKnownRevision,
408 Query query, long timeoutMillis, AsyncMethodCallback resultHandler) {
409
410 normalizeRevision(projectName, repositoryName, lastKnownRevision);
411 if (timeoutMillis <= 0) {
412 rejectInvalidWatchTimeout("watchFile", resultHandler);
413 return;
414 }
415
416 final Repository repo = projectApiManager.getProject(projectName).repos().get(repositoryName);
417 final CompletableFuture<com.linecorp.centraldogma.common.Entry<Object>> future =
418 watchService.watchFile(repo, convert(lastKnownRevision), convert(query), timeoutMillis,
419 false);
420
421 handleWatchFileResult(future, resultHandler);
422 }
423
424 private static void rejectInvalidWatchTimeout(String operationName, AsyncMethodCallback resultHandler) {
425 final CentralDogmaException cde = new CentralDogmaException(ErrorCode.BAD_REQUEST);
426 CentralDogmaExceptions.log(operationName, cde);
427 resultHandler.onError(cde);
428 }
429
430 private static void handleWatchFileResult(
431 CompletableFuture<com.linecorp.centraldogma.common.Entry<Object>> future,
432 AsyncMethodCallback resultHandler) {
433 future.handle((res, cause) -> {
434 if (cause == null) {
435 final WatchFileResult wfr = new WatchFileResult();
436 wfr.setRevision(convert(res.revision()));
437 wfr.setType(convert(res.type()));
438 wfr.setContent(res.contentAsText());
439 resultHandler.onComplete(wfr);
440 } else if (cause instanceof CancellationException) {
441 resultHandler.onComplete(CentralDogmaConstants.EMPTY_WATCH_FILE_RESULT);
442 } else if (cause instanceof RequestAlreadyTimedOutException) {
443 logger.warn("Ignoring the exception raised when a request has already timed out.");
444 } else {
445 logAndInvokeOnError("watchFile", resultHandler, cause);
446 }
447 return null;
448 });
449 }
450
451 private static void logAndInvokeOnError(
452 String operationName, AsyncMethodCallback resultHandler, Throwable cause) {
453 final CentralDogmaException cde = convert(cause);
454 CentralDogmaExceptions.log(operationName, cde);
455 resultHandler.onError(cde);
456 }
457
458
459
460 @Override
461 public void getSchema(String projectName, AsyncMethodCallback resultHandler) {
462 unimplemented(resultHandler);
463 }
464
465 @Override
466 public void saveSchema(String projectName, Schema schema, AsyncMethodCallback resultHandler) {
467 unimplemented(resultHandler);
468 }
469
470 @Override
471 public void getNamedQuery(String projectName, String name, AsyncMethodCallback resultHandler) {
472 unimplemented(resultHandler);
473 }
474
475 @Override
476 public void saveNamedQuery(String projectName, NamedQuery namedQuery, AsyncMethodCallback resultHandler) {
477 unimplemented(resultHandler);
478 }
479
480 @Override
481 public void removeNamedQuery(String projectName, String name, AsyncMethodCallback resultHandler) {
482 unimplemented(resultHandler);
483 }
484
485 @Override
486 public void listNamedQueries(String projectName, AsyncMethodCallback resultHandler) {
487 unimplemented(resultHandler);
488 }
489
490 @Override
491 public void getPlugin(String projectName, String pluginName, AsyncMethodCallback resultHandler) {
492 unimplemented(resultHandler);
493 }
494
495 @Override
496 public void savePlugin(String projectName, Plugin plugin, AsyncMethodCallback resultHandler) {
497 unimplemented(resultHandler);
498 }
499
500 @Override
501 public void removePlugin(String projectName, String pluginName, AsyncMethodCallback resultHandler) {
502 unimplemented(resultHandler);
503 }
504
505 @Override
506 public void listPlugins(String projectName, AsyncMethodCallback resultHandler) {
507 unimplemented(resultHandler);
508 }
509
510 @Override
511 public void listPluginOperations(String projectName, AsyncMethodCallback resultHandler) {
512 unimplemented(resultHandler);
513 }
514
515 @Override
516 public void performPluginOperation(String projectName, String pluginName, String operationName,
517 String params, AsyncMethodCallback resultHandler) {
518 unimplemented(resultHandler);
519 }
520
521 @Override
522 public void queryByNamedQuery(String projectName, String namedQuery, Revision revision,
523 AsyncMethodCallback resultHandler) {
524 unimplemented(resultHandler);
525 }
526
527 @Override
528 public void listSubscribers(String projectName, String repositoryName, String path,
529 AsyncMethodCallback resultHandler) {
530 unimplemented(resultHandler);
531 }
532
533 private static void unimplemented(AsyncMethodCallback resultHandler) {
534 resultHandler.onError(new CentralDogmaException(ErrorCode.UNIMPLEMENTED));
535 }
536 }