1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.linecorp.centraldogma.server.storage.repository;
18
19 import static com.linecorp.centraldogma.internal.Util.unsafeCast;
20 import static com.linecorp.centraldogma.internal.Util.validateFilePath;
21 import static com.linecorp.centraldogma.internal.Util.validateJsonFilePath;
22 import static com.linecorp.centraldogma.server.storage.repository.FindOptions.FIND_ONE_WITHOUT_CONTENT;
23 import static com.linecorp.centraldogma.server.storage.repository.FindOptions.FIND_ONE_WITH_CONTENT;
24 import static com.linecorp.centraldogma.server.storage.repository.RepositoryUtil.applyQuery;
25 import static com.linecorp.centraldogma.server.storage.repository.RepositoryUtil.mergeEntries;
26 import static java.util.Objects.requireNonNull;
27
28 import java.util.ArrayList;
29 import java.util.Arrays;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.concurrent.CompletableFuture;
33
34 import com.fasterxml.jackson.databind.JsonNode;
35 import com.google.common.collect.ImmutableList;
36 import com.google.common.collect.ImmutableMap;
37 import com.spotify.futures.CompletableFutures;
38
39 import com.linecorp.centraldogma.common.Author;
40 import com.linecorp.centraldogma.common.CentralDogmaException;
41 import com.linecorp.centraldogma.common.Change;
42 import com.linecorp.centraldogma.common.Commit;
43 import com.linecorp.centraldogma.common.Entry;
44 import com.linecorp.centraldogma.common.EntryNotFoundException;
45 import com.linecorp.centraldogma.common.EntryType;
46 import com.linecorp.centraldogma.common.Markup;
47 import com.linecorp.centraldogma.common.MergeQuery;
48 import com.linecorp.centraldogma.common.MergeSource;
49 import com.linecorp.centraldogma.common.MergedEntry;
50 import com.linecorp.centraldogma.common.Query;
51 import com.linecorp.centraldogma.common.QueryExecutionException;
52 import com.linecorp.centraldogma.common.Revision;
53 import com.linecorp.centraldogma.common.RevisionNotFoundException;
54 import com.linecorp.centraldogma.common.RevisionRange;
55 import com.linecorp.centraldogma.internal.HistoryConstants;
56 import com.linecorp.centraldogma.server.command.CommitResult;
57 import com.linecorp.centraldogma.server.internal.replication.ReplicationLog;
58 import com.linecorp.centraldogma.server.storage.StorageException;
59 import com.linecorp.centraldogma.server.storage.project.Project;
60
61
62
63
64 public interface Repository {
65
66 int DEFAULT_MAX_COMMITS = 100;
67 int MAX_MAX_COMMITS = HistoryConstants.MAX_MAX_COMMITS;
68
69 String ALL_PATH = "/**";
70
71
72
73
74 Project parent();
75
76
77
78
79 String name();
80
81
82
83
84 long creationTimeMillis();
85
86
87
88
89 Author author();
90
91
92
93
94
95
96
97 @Deprecated
98 default CompletableFuture<Revision> normalize(Revision revision) {
99 try {
100 return CompletableFuture.completedFuture(normalizeNow(revision));
101 } catch (Exception e) {
102 return CompletableFutures.exceptionallyCompletedFuture(e);
103 }
104 }
105
106
107
108
109
110
111 Revision normalizeNow(Revision revision);
112
113
114
115
116
117
118
119 RevisionRange normalizeNow(Revision from, Revision to);
120
121
122
123
124 default CompletableFuture<Boolean> exists(Revision revision, String path) {
125 validateFilePath(path, "path");
126 return find(revision, path, FIND_ONE_WITHOUT_CONTENT).thenApply(result -> !result.isEmpty());
127 }
128
129
130
131
132
133
134
135
136 default CompletableFuture<Entry<?>> get(Revision revision, String path) {
137 return getOrNull(revision, path).thenApply(entry -> {
138 if (entry == null) {
139 throw new EntryNotFoundException(revision, path);
140 }
141
142 return entry;
143 });
144 }
145
146
147
148
149
150
151
152
153 default <T> CompletableFuture<Entry<T>> get(Revision revision, Query<T> query) {
154 return getOrNull(revision, query).thenApply(res -> {
155 if (res == null) {
156 throw new EntryNotFoundException(revision, query.path());
157 }
158
159 return res;
160 });
161 }
162
163
164
165
166
167
168
169
170
171 default CompletableFuture<Entry<?>> getOrNull(Revision revision, String path) {
172 validateFilePath(path, "path");
173
174 return find(revision, path, FIND_ONE_WITH_CONTENT).thenApply(findResult -> findResult.get(path));
175 }
176
177
178
179
180
181
182
183
184
185 default <T> CompletableFuture<Entry<T>> getOrNull(Revision revision, Query<T> query) {
186 requireNonNull(query, "query");
187 requireNonNull(revision, "revision");
188
189 return getOrNull(revision, query.path()).thenApply(result -> {
190 if (result == null) {
191 return null;
192 }
193
194 @SuppressWarnings("unchecked")
195 final Entry<T> entry = (Entry<T>) result;
196
197 try {
198 return applyQuery(entry, query);
199 } catch (CentralDogmaException e) {
200 throw e;
201 } catch (Exception e) {
202 throw new QueryExecutionException(e);
203 }
204 });
205 }
206
207
208
209
210
211
212 default CompletableFuture<Map<String, Entry<?>>> find(Revision revision, String pathPattern) {
213 return find(revision, pathPattern, ImmutableMap.of());
214 }
215
216
217
218
219
220
221 CompletableFuture<Map<String, Entry<?>>> find(Revision revision, String pathPattern,
222 Map<FindOption<?>, ?> options);
223
224
225
226
227 default CompletableFuture<Change<?>> diff(Revision from, Revision to, Query<?> query) {
228 requireNonNull(from, "from");
229 requireNonNull(to, "to");
230 requireNonNull(query, "query");
231
232 final RevisionRange range;
233 try {
234 range = normalizeNow(from, to).toAscending();
235 } catch (Exception e) {
236 return CompletableFutures.exceptionallyCompletedFuture(e);
237 }
238
239 final String path = query.path();
240 final CompletableFuture<Entry<?>> fromEntryFuture = getOrNull(range.from(), path);
241 final CompletableFuture<Entry<?>> toEntryFuture = getOrNull(range.to(), path);
242
243 final CompletableFuture<Change<?>> future =
244 CompletableFutures.combine(fromEntryFuture, toEntryFuture, (fromEntry, toEntry) -> {
245 @SuppressWarnings("unchecked")
246 final Query<Object> castQuery = (Query<Object>) query;
247
248
249 if (fromEntry != null) {
250 if (toEntry == null) {
251
252 return Change.ofRemoval(path);
253 }
254 } else if (toEntry != null) {
255
256 final EntryType toEntryType = toEntry.type();
257 if (!query.type().supportedEntryTypes().contains(toEntryType)) {
258 throw new QueryExecutionException("unsupported entry type: " + toEntryType);
259 }
260
261 final Object toContent = castQuery.apply(toEntry.content());
262
263 switch (toEntryType) {
264 case JSON:
265 return Change.ofJsonUpsert(path, (JsonNode) toContent);
266 case TEXT:
267 return Change.ofTextUpsert(path, (String) toContent);
268 default:
269 throw new Error();
270 }
271 } else {
272
273 throw new EntryNotFoundException(path + " (" + from + ", " + to + ')');
274 }
275
276
277 final EntryType entryType = fromEntry.type();
278 if (!query.type().supportedEntryTypes().contains(entryType)) {
279 throw new QueryExecutionException("unsupported entry type: " + entryType);
280 }
281 if (entryType != toEntry.type()) {
282 throw new QueryExecutionException(
283 "mismatching entry type: " + entryType + " != " + toEntry.type());
284 }
285
286 final Object fromContent = castQuery.apply(fromEntry.content());
287 final Object toContent = castQuery.apply(toEntry.content());
288
289 switch (entryType) {
290 case JSON:
291 return Change.ofJsonPatch(path, (JsonNode) fromContent, (JsonNode) toContent);
292 case TEXT:
293 return Change.ofTextPatch(path, (String) fromContent, (String) toContent);
294 default:
295 throw new Error();
296 }
297 }).toCompletableFuture();
298
299 return unsafeCast(future);
300 }
301
302
303
304
305
306 CompletableFuture<Map<String, Change<?>>> diff(Revision from, Revision to, String pathPattern);
307
308
309
310
311 default CompletableFuture<Map<String, Change<?>>> previewDiff(Revision baseRevision, Change<?>... changes) {
312 requireNonNull(changes, "changes");
313 return previewDiff(baseRevision, Arrays.asList(changes));
314 }
315
316
317
318
319 CompletableFuture<Map<String, Change<?>>> previewDiff(Revision baseRevision, Iterable<Change<?>> changes);
320
321
322
323
324
325
326 default CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis,
327 Author author, String summary, Iterable<Change<?>> changes) {
328 return commit(baseRevision, commitTimeMillis, author, summary, "", Markup.PLAINTEXT, changes,
329 true);
330 }
331
332
333
334
335
336
337 default CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis,
338 Author author, String summary, Change<?>... changes) {
339 return commit(baseRevision, commitTimeMillis, author, summary, "", Markup.PLAINTEXT, changes);
340 }
341
342
343
344
345
346
347 default CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis,
348 Author author, String summary, String detail, Markup markup,
349 Change<?>... changes) {
350 requireNonNull(changes, "changes");
351 return commit(baseRevision, commitTimeMillis, author, summary, detail, markup,
352 ImmutableList.copyOf(changes), true);
353 }
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371 CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis,
372 Author author, String summary, String detail, Markup markup,
373 Iterable<Change<?>> changes, boolean directExecution);
374
375
376
377
378
379
380
381
382
383
384
385
386 default CompletableFuture<List<Commit>> history(Revision from, Revision to, String pathPattern) {
387 return history(from, to, pathPattern, DEFAULT_MAX_COMMITS);
388 }
389
390
391
392
393
394
395
396
397
398
399
400
401
402 CompletableFuture<List<Commit>> history(Revision from, Revision to, String pathPattern, int maxCommits);
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423 default CompletableFuture<Revision> findLatestRevision(Revision lastKnownRevision, String pathPattern) {
424 return findLatestRevision(lastKnownRevision, pathPattern, false);
425 }
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446 CompletableFuture<Revision> findLatestRevision(Revision lastKnownRevision, String pathPattern,
447 boolean errorOnEntryNotFound);
448
449
450
451
452
453 default CompletableFuture<Revision> watch(Revision lastKnownRevision, String pathPattern) {
454 return watch(lastKnownRevision, pathPattern, false);
455 }
456
457
458
459
460
461 CompletableFuture<Revision> watch(Revision lastKnownRevision, String pathPattern,
462 boolean errorOnEntryNotFound);
463
464
465
466
467
468 default <T> CompletableFuture<Entry<T>> watch(Revision lastKnownRevision, Query<T> query) {
469 return watch(lastKnownRevision, query, false);
470 }
471
472
473
474
475
476 default <T> CompletableFuture<Entry<T>> watch(Revision lastKnownRevision, Query<T> query,
477 boolean errorOnEntryNotFound) {
478 return RepositoryUtil.watch(this, lastKnownRevision, query, errorOnEntryNotFound);
479 }
480
481
482
483
484 default <T> CompletableFuture<MergedEntry<T>> mergeFiles(Revision revision, MergeQuery<T> query) {
485 requireNonNull(revision, "revision");
486 requireNonNull(query, "query");
487
488 final List<MergeSource> mergeSources = query.mergeSources();
489
490 mergeSources.forEach(path -> validateJsonFilePath(path.path(), "path"));
491
492 final Revision normalizedRevision;
493 try {
494 normalizedRevision = normalizeNow(revision);
495 } catch (Exception e) {
496 return CompletableFutures.exceptionallyCompletedFuture(e);
497 }
498 final List<CompletableFuture<Entry<?>>> entryFutures = new ArrayList<>(mergeSources.size());
499 mergeSources.forEach(path -> {
500 if (!path.isOptional()) {
501 entryFutures.add(get(normalizedRevision, path.path()));
502 } else {
503 entryFutures.add(getOrNull(normalizedRevision, path.path()));
504 }
505 });
506
507 final CompletableFuture<MergedEntry<?>> mergedEntryFuture = mergeEntries(entryFutures, revision,
508 query);
509 final CompletableFuture<MergedEntry<T>> future = new CompletableFuture<>();
510 mergedEntryFuture.handle((mergedEntry, cause) -> {
511 if (cause != null) {
512 if (!(cause instanceof CentralDogmaException)) {
513 cause = new QueryExecutionException(cause);
514 }
515 future.completeExceptionally(cause);
516 return null;
517 }
518 future.complete(unsafeCast(mergedEntry));
519 return null;
520 });
521
522 return future;
523 }
524 }