1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.linecorp.centraldogma.server.storage.repository;
18  
19  import static com.linecorp.centraldogma.internal.Util.unsafeCast;
20  import static com.linecorp.centraldogma.internal.Util.validateFilePath;
21  import static com.linecorp.centraldogma.internal.Util.validateJsonFilePath;
22  import static com.linecorp.centraldogma.server.storage.repository.FindOptions.FIND_ONE_WITHOUT_CONTENT;
23  import static com.linecorp.centraldogma.server.storage.repository.FindOptions.FIND_ONE_WITH_CONTENT;
24  import static com.linecorp.centraldogma.server.storage.repository.RepositoryUtil.applyQuery;
25  import static com.linecorp.centraldogma.server.storage.repository.RepositoryUtil.mergeEntries;
26  import static java.util.Objects.requireNonNull;
27  
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.concurrent.CompletableFuture;
33  
34  import com.fasterxml.jackson.databind.JsonNode;
35  import com.google.common.collect.ImmutableList;
36  import com.google.common.collect.ImmutableMap;
37  import com.spotify.futures.CompletableFutures;
38  
39  import com.linecorp.centraldogma.common.Author;
40  import com.linecorp.centraldogma.common.CentralDogmaException;
41  import com.linecorp.centraldogma.common.Change;
42  import com.linecorp.centraldogma.common.Commit;
43  import com.linecorp.centraldogma.common.Entry;
44  import com.linecorp.centraldogma.common.EntryNotFoundException;
45  import com.linecorp.centraldogma.common.EntryType;
46  import com.linecorp.centraldogma.common.Markup;
47  import com.linecorp.centraldogma.common.MergeQuery;
48  import com.linecorp.centraldogma.common.MergeSource;
49  import com.linecorp.centraldogma.common.MergedEntry;
50  import com.linecorp.centraldogma.common.Query;
51  import com.linecorp.centraldogma.common.QueryExecutionException;
52  import com.linecorp.centraldogma.common.Revision;
53  import com.linecorp.centraldogma.common.RevisionNotFoundException;
54  import com.linecorp.centraldogma.common.RevisionRange;
55  import com.linecorp.centraldogma.internal.HistoryConstants;
56  import com.linecorp.centraldogma.server.command.CommitResult;
57  import com.linecorp.centraldogma.server.command.ContentTransformer;
58  import com.linecorp.centraldogma.server.internal.replication.ReplicationLog;
59  import com.linecorp.centraldogma.server.storage.StorageException;
60  import com.linecorp.centraldogma.server.storage.project.Project;
61  
62  /**
63   * Revision-controlled filesystem-like repository.
64   */
65  public interface Repository {
66  
67      int DEFAULT_MAX_COMMITS = 100;
68      int MAX_MAX_COMMITS = HistoryConstants.MAX_MAX_COMMITS;
69  
70      String ALL_PATH = "/**";
71  
72      /**
73       * Returns the jGit {@link org.eclipse.jgit.lib.Repository}.
74       */
75      org.eclipse.jgit.lib.Repository jGitRepository();
76  
77      /**
78       * Returns the parent {@link Project} of this {@link Repository}.
79       */
80      Project parent();
81  
82      /**
83       * Returns the name of this {@link Repository}.
84       */
85      String name();
86  
87      /**
88       * Returns the creation time of this {@link Repository}.
89       */
90      long creationTimeMillis();
91  
92      /**
93       * Returns the author who created this {@link Repository}.
94       */
95      Author author();
96  
97      /**
98       * Returns the {@link CompletableFuture} whose value is the absolute {@link Revision} of the
99       * specified {@link Revision}.
100      *
101      * @deprecated Use {@link #normalizeNow(Revision)}.
102      */
103     @Deprecated
104     default CompletableFuture<Revision> normalize(Revision revision) {
105         try {
106             return CompletableFuture.completedFuture(normalizeNow(revision));
107         } catch (Exception e) {
108             return CompletableFutures.exceptionallyCompletedFuture(e);
109         }
110     }
111 
112     /**
113      * Returns the absolute {@link Revision} of the specified {@link Revision}.
114      *
115      * @throws RevisionNotFoundException if the specified {@link Revision} is not found
116      */
117     Revision normalizeNow(Revision revision);
118 
119     /**
120      * Returns a {@link RevisionRange} which contains the absolute {@link Revision}s of the specified
121      * {@code from} and {@code to}.
122      *
123      * @throws RevisionNotFoundException if the specified {@code from} or {@code to} is not found
124      */
125     RevisionRange normalizeNow(Revision from, Revision to);
126 
127     /**
128      * Returns {@code true} if and only if an {@link Entry} exists at the specified {@code path}.
129      */
130     default CompletableFuture<Boolean> exists(Revision revision, String path) {
131         validateFilePath(path, "path");
132         return find(revision, path, FIND_ONE_WITHOUT_CONTENT).thenApply(result -> !result.isEmpty());
133     }
134 
135     /**
136      * Retrieves an {@link Entry} at the specified {@code path}.
137      *
138      * @throws EntryNotFoundException if there's no entry at the specified {@code path}
139      *
140      * @see #getOrNull(Revision, String)
141      */
142     default CompletableFuture<Entry<?>> get(Revision revision, String path) {
143         return getOrNull(revision, path).thenApply(entry -> {
144             if (entry == null) {
145                 throw new EntryNotFoundException(revision, path);
146             }
147 
148             return entry;
149         });
150     }
151 
152     /**
153      * Performs the specified {@link Query}.
154      *
155      * @throws EntryNotFoundException if there's no entry at the path specified in the {@link Query}
156      *
157      * @see #getOrNull(Revision, Query)
158      */
159     default <T> CompletableFuture<Entry<T>> get(Revision revision, Query<T> query) {
160         return getOrNull(revision, query).thenApply(res -> {
161             if (res == null) {
162                 throw new EntryNotFoundException(revision, query.path());
163             }
164 
165             return res;
166         });
167     }
168 
169     /**
170      * Retrieves an {@link Entry} at the specified {@code path}.
171      *
172      * @return the {@link Entry} at the specified {@code path} if exists.
173      *         The specified {@code other} if there's no such {@link Entry}.
174      *
175      * @see #get(Revision, String)
176      */
177     default CompletableFuture<Entry<?>> getOrNull(Revision revision, String path) {
178         validateFilePath(path, "path");
179 
180         return find(revision, path, FIND_ONE_WITH_CONTENT).thenApply(findResult -> findResult.get(path));
181     }
182 
183     /**
184      * Performs the specified {@link Query}.
185      *
186      * @return the {@link Entry} on a successful query.
187      *         The specified {@code other} on a failure due to missing entry.
188      *
189      * @see #get(Revision, Query)
190      */
191     default <T> CompletableFuture<Entry<T>> getOrNull(Revision revision, Query<T> query) {
192         requireNonNull(query, "query");
193         requireNonNull(revision, "revision");
194 
195         return getOrNull(revision, query.path()).thenApply(result -> {
196             if (result == null) {
197                 return null;
198             }
199 
200             @SuppressWarnings("unchecked")
201             final Entry<T> entry = (Entry<T>) result;
202 
203             try {
204                 return applyQuery(entry, query);
205             } catch (CentralDogmaException e) {
206                 throw e;
207             } catch (Exception e) {
208                 throw new QueryExecutionException(e);
209             }
210         });
211     }
212 
213     /**
214      * Finds the {@link Entry}s that match the specified {@code pathPattern}.
215      *
216      * @return a {@link Map} whose value is the matching {@link Entry} and key is its path
217      */
218     default CompletableFuture<Map<String, Entry<?>>> find(Revision revision, String pathPattern) {
219         return find(revision, pathPattern, ImmutableMap.of());
220     }
221 
222     /**
223      * Finds the {@link Entry}s that match the specified {@code pathPattern}.
224      *
225      * @return a {@link Map} whose value is the matching {@link Entry} and key is its path
226      */
227     CompletableFuture<Map<String, Entry<?>>> find(Revision revision, String pathPattern,
228                                                   Map<FindOption<?>, ?> options);
229 
230     /**
231      * Query a file at two different revisions and return the diff of the two query results.
232      */
233     default CompletableFuture<Change<?>> diff(Revision from, Revision to, Query<?> query) {
234         return diff(from, to, query, DiffResultType.NORMAL);
235     }
236 
237     /**
238      * Query a file at two different revisions and return the diff of the two query results.
239      */
240     default CompletableFuture<Change<?>> diff(Revision from, Revision to, Query<?> query,
241                                               DiffResultType diffResultType) {
242         requireNonNull(from, "from");
243         requireNonNull(to, "to");
244         requireNonNull(query, "query");
245         requireNonNull(diffResultType, "diffResultType");
246 
247         final RevisionRange range;
248         try {
249             range = normalizeNow(from, to).toAscending();
250         } catch (Exception e) {
251             return CompletableFutures.exceptionallyCompletedFuture(e);
252         }
253 
254         final String path = query.path();
255         final CompletableFuture<Entry<?>> fromEntryFuture = getOrNull(range.from(), path);
256         final CompletableFuture<Entry<?>> toEntryFuture = getOrNull(range.to(), path);
257 
258         final CompletableFuture<Change<?>> future =
259                 CompletableFutures.combine(fromEntryFuture, toEntryFuture, (fromEntry, toEntry) -> {
260                     @SuppressWarnings("unchecked")
261                     final Query<Object> castQuery = (Query<Object>) query;
262 
263                     // Handle the case where the entry does not exist at 'from' or 'to'.
264                     if (fromEntry != null) {
265                         if (toEntry == null) {
266                             // The entry has been removed.
267                             return Change.ofRemoval(path);
268                         }
269                     } else if (toEntry != null) {
270                         // The entry has been created.
271                         final EntryType toEntryType = toEntry.type();
272                         if (!query.type().supportedEntryTypes().contains(toEntryType)) {
273                             throw new QueryExecutionException("unsupported entry type: " + toEntryType);
274                         }
275 
276                         final Object toContent = castQuery.apply(toEntry.content());
277 
278                         switch (toEntryType) {
279                             case JSON:
280                                 return Change.ofJsonUpsert(path, (JsonNode) toContent);
281                             case TEXT:
282                                 return Change.ofTextUpsert(path, (String) toContent);
283                             default:
284                                 throw new Error();
285                         }
286                     } else {
287                         // The entry did not exist both at 'from' and 'to'.
288                         throw new EntryNotFoundException(path + " (" + from + ", " + to + ')');
289                     }
290 
291                     // Handle the case where the entry exists both at 'from' and at 'to'.
292                     final EntryType entryType = fromEntry.type();
293                     if (!query.type().supportedEntryTypes().contains(entryType)) {
294                         throw new QueryExecutionException("unsupported entry type: " + entryType);
295                     }
296                     if (entryType != toEntry.type()) {
297                         throw new QueryExecutionException(
298                                 "mismatching entry type: " + entryType + " != " + toEntry.type());
299                     }
300 
301                     final Object fromContent = castQuery.apply(fromEntry.content());
302                     final Object toContent = castQuery.apply(toEntry.content());
303 
304                     switch (entryType) {
305                         case JSON:
306                             if (diffResultType == DiffResultType.PATCH_TO_UPSERT) {
307                                 return Change.ofJsonUpsert(path, (JsonNode) toContent);
308                             }
309                             return Change.ofJsonPatch(path, (JsonNode) fromContent, (JsonNode) toContent);
310                         case TEXT:
311                             if (diffResultType == DiffResultType.PATCH_TO_UPSERT) {
312                                 return Change.ofTextUpsert(path, (String) toContent);
313                             }
314                             return Change.ofTextPatch(path, (String) fromContent, (String) toContent);
315                         default:
316                             throw new Error();
317                     }
318                 }).toCompletableFuture();
319         return unsafeCast(future);
320     }
321 
322     /**
323      * Returns the diff for all files that are matched by the specified {@code pathPattern}
324      * between the specified two {@link Revision}s.
325      *
326      * @throws StorageException if {@code from} or {@code to} does not exist.
327      */
328     default CompletableFuture<Map<String, Change<?>>> diff(Revision from, Revision to, String pathPattern) {
329         return diff(from, to, pathPattern, DiffResultType.NORMAL);
330     }
331 
332     /**
333      * Returns the diff for all files that are matched by the specified {@code pathPattern}
334      * between the specified two {@link Revision}s.
335      *
336      * @throws StorageException if {@code from} or {@code to} does not exist.
337      */
338     CompletableFuture<Map<String, Change<?>>> diff(Revision from, Revision to, String pathPattern,
339                                                    DiffResultType diffResultType);
340 
341     /**
342      * Generates the preview diff against the specified {@code baseRevision} and {@code changes}.
343      */
344     default CompletableFuture<Map<String, Change<?>>> previewDiff(Revision baseRevision, Change<?>... changes) {
345         requireNonNull(changes, "changes");
346         return previewDiff(baseRevision, Arrays.asList(changes));
347     }
348 
349     /**
350      * Generates the preview diff against the specified {@code baseRevision} and {@code changes}.
351      */
352     CompletableFuture<Map<String, Change<?>>> previewDiff(Revision baseRevision, Iterable<Change<?>> changes);
353 
354     /**
355      * Adds the specified changes to this {@link Repository}.
356      *
357      * @return the {@link Revision} of the new {@link Commit}
358      */
359     default CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis,
360                                                    Author author, String summary, Iterable<Change<?>> changes) {
361         return commit(baseRevision, commitTimeMillis, author, summary, "", Markup.PLAINTEXT, changes,
362                       true);
363     }
364 
365     /**
366      * Adds the specified changes to this {@link Repository}.
367      *
368      * @return the {@link Revision} of the new {@link Commit}
369      */
370     default CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis,
371                                                    Author author, String summary, Change<?>... changes) {
372         return commit(baseRevision, commitTimeMillis, author, summary, "", Markup.PLAINTEXT, changes);
373     }
374 
375     /**
376      * Adds the specified changes to this {@link Repository}.
377      *
378      * @return the {@link Revision} of the new {@link Commit}
379      */
380     default CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis,
381                                                    Author author, String summary, String detail, Markup markup,
382                                                    Change<?>... changes) {
383         requireNonNull(changes, "changes");
384         return commit(baseRevision, commitTimeMillis, author, summary, detail, markup,
385                       ImmutableList.copyOf(changes), true);
386     }
387 
388     /**
389      * Adds the specified changes to this {@link Repository}.
390      *
391      * @param baseRevision the base {@link Revision} of this {@link Commit}
392      * @param commitTimeMillis the time and date of this {@link Commit}, represented as the number of
393      *                         milliseconds since the epoch (midnight, January 1, 1970 UTC)
394      * @param author the {@link Author} of this {@link Commit}
395      * @param summary the human-readable summary of this {@link Commit}
396      * @param detail the human-readable detailed description of this {@link Commit}
397      * @param markup the {@link Markup} language of {@code summary} and {@code detail}
398      * @param changes the changes to be applied
399      * @param directExecution whether this {@link Commit} is received by this server and executed directly.
400      *                        {@code false} if this commit is delivered by a {@link ReplicationLog}.
401      *
402      * @return the {@link Revision} of the new {@link Commit}
403      */
404     CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis,
405                                            Author author, String summary, String detail, Markup markup,
406                                            Iterable<Change<?>> changes, boolean directExecution);
407 
408     /**
409      * Adds the content that is transformed by the specified {@link ContentTransformer} to
410      * this {@link Repository}.
411      */
412     CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis,
413                                            Author author, String summary, String detail, Markup markup,
414                                            ContentTransformer<?> transformer);
415 
416     /**
417      * Get a list of {@link Commit} for given pathPattern.
418      *
419      * @param from the starting revision (inclusive)
420      * @param to the end revision (inclusive)
421      *
422      * @param pathPattern the path pattern
423      * @return {@link Commit}
424      *
425      * @throws StorageException when any internal error occurs.
426      */
427     default CompletableFuture<List<Commit>> history(Revision from, Revision to, String pathPattern) {
428         return history(from, to, pathPattern, DEFAULT_MAX_COMMITS);
429     }
430 
431     /**
432      * Get a list of {@link Commit} for given pathPattern.
433      *
434      * @param from the starting revision (inclusive)
435      * @param to the end revision (inclusive)
436      * @param maxCommits the maximum number of {@link Commit}s to return
437      *
438      * @param pathPattern the path pattern
439      * @return {@link Commit}
440      *
441      * @throws StorageException when any internal error occurs.
442      */
443     CompletableFuture<List<Commit>> history(Revision from, Revision to, String pathPattern, int maxCommits);
444 
445     /**
446      * Returns the latest {@link Revision} if there are any {@link Change}s since {@code lastKnownRevision}
447      * that affected the path matched by the specified {@code pathPattern}. The behavior of this method could
448      * be represented as the following code:
449      * <pre>{@code
450      * RevisionRange range = repository.normalizeNow(lastKnownRevision, Revision.HEAD);
451      * return repository.diff(range.from(), range.to(), pathPattern).thenApply(diff -> {
452      *     if (diff.isEmpty()) {
453      *         return null;
454      *     } else {
455      *         return range.to();
456      *     }
457      * });
458      * }</pre>
459      * .. although it would be implemented more efficiently.
460      *
461      * @return the latest {@link Revision} if there's a match, or {@code null} if there's no match or
462      *         {@code lastKnownRevision} is the latest {@link Revision}
463      */
464     default CompletableFuture<Revision> findLatestRevision(Revision lastKnownRevision, String pathPattern) {
465         return findLatestRevision(lastKnownRevision, pathPattern, false);
466     }
467 
468     /**
469      * Returns the latest {@link Revision} if there are any {@link Change}s since {@code lastKnownRevision}
470      * that affected the path matched by the specified {@code pathPattern}. The behavior of this method could
471      * be represented as the following code:
472      * <pre>{@code
473      * RevisionRange range = repository.normalizeNow(lastKnownRevision, Revision.HEAD);
474      * return repository.diff(range.from(), range.to(), pathPattern).thenApply(diff -> {
475      *     if (diff.isEmpty()) {
476      *         return null;
477      *     } else {
478      *         return range.to();
479      *     }
480      * });
481      * }</pre>
482      * .. although it would be implemented more efficiently.
483      *
484      * @return the latest {@link Revision} if there's a match, or {@code null} if there's no match or
485      *         {@code lastKnownRevision} is the latest {@link Revision}
486      */
487     CompletableFuture<Revision> findLatestRevision(Revision lastKnownRevision, String pathPattern,
488                                                    boolean errorOnEntryNotFound);
489 
490     /**
491      * Awaits and retrieves the latest revision of the commit that changed the file that matches the specified
492      * {@code pathPattern} since the specified last known revision.
493      */
494     default CompletableFuture<Revision> watch(Revision lastKnownRevision, String pathPattern) {
495         return watch(lastKnownRevision, pathPattern, false);
496     }
497 
498     /**
499      * Awaits and retrieves the latest revision of the commit that changed the file that matches the specified
500      * {@code pathPattern} since the specified last known revision.
501      */
502     CompletableFuture<Revision> watch(Revision lastKnownRevision, String pathPattern,
503                                       boolean errorOnEntryNotFound);
504 
505     /**
506      * Awaits and retrieves the change in the query result of the specified file asynchronously since the
507      * specified last known revision.
508      */
509     default <T> CompletableFuture<Entry<T>> watch(Revision lastKnownRevision, Query<T> query) {
510         return watch(lastKnownRevision, query, false);
511     }
512 
513     /**
514      * Awaits and retrieves the change in the query result of the specified file asynchronously since the
515      * specified last known revision.
516      */
517     default <T> CompletableFuture<Entry<T>> watch(Revision lastKnownRevision, Query<T> query,
518                                                   boolean errorOnEntryNotFound) {
519         return RepositoryUtil.watch(this, lastKnownRevision, query, errorOnEntryNotFound);
520     }
521 
522     /**
523      * Merges the JSON files sequentially as specified in the {@link MergeQuery}.
524      */
525     default <T> CompletableFuture<MergedEntry<T>> mergeFiles(Revision revision, MergeQuery<T> query) {
526         requireNonNull(revision, "revision");
527         requireNonNull(query, "query");
528 
529         final List<MergeSource> mergeSources = query.mergeSources();
530         // Only JSON files can currently be merged.
531         mergeSources.forEach(path -> validateJsonFilePath(path.path(), "path"));
532 
533         final Revision normalizedRevision;
534         try {
535             normalizedRevision = normalizeNow(revision);
536         } catch (Exception e) {
537             return CompletableFutures.exceptionallyCompletedFuture(e);
538         }
539         final List<CompletableFuture<Entry<?>>> entryFutures = new ArrayList<>(mergeSources.size());
540         mergeSources.forEach(path -> {
541             if (!path.isOptional()) {
542                 entryFutures.add(get(normalizedRevision, path.path()));
543             } else {
544                 entryFutures.add(getOrNull(normalizedRevision, path.path()));
545             }
546         });
547 
548         final CompletableFuture<MergedEntry<?>> mergedEntryFuture = mergeEntries(entryFutures, revision,
549                                                                                  query);
550         final CompletableFuture<MergedEntry<T>> future = new CompletableFuture<>();
551         mergedEntryFuture.handle((mergedEntry, cause) -> {
552             if (cause != null) {
553                 if (!(cause instanceof CentralDogmaException)) {
554                     cause = new QueryExecutionException(cause);
555                 }
556                 future.completeExceptionally(cause);
557                 return null;
558             }
559             future.complete(unsafeCast(mergedEntry));
560             return null;
561         });
562 
563         return future;
564     }
565 
566     /**
567      * Executes the specified {@link CacheableCall} in this {@link Repository}.
568      */
569     <T> CompletableFuture<T> execute(CacheableCall<T> cacheableCall);
570 
571     /**
572      * Adds the {@link RepositoryListener} that gets notified whenever changes matching with
573      * {@link RepositoryListener#pathPattern()} are pushed to this {@link Repository}.
574      */
575     void addListener(RepositoryListener listener);
576 }