1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.linecorp.centraldogma.server.storage.repository;
18  
19  import static com.linecorp.centraldogma.internal.Util.unsafeCast;
20  import static com.linecorp.centraldogma.internal.Util.validateFilePath;
21  import static com.linecorp.centraldogma.internal.Util.validateJsonFilePath;
22  import static com.linecorp.centraldogma.server.storage.repository.FindOptions.FIND_ONE_WITHOUT_CONTENT;
23  import static com.linecorp.centraldogma.server.storage.repository.FindOptions.FIND_ONE_WITH_CONTENT;
24  import static com.linecorp.centraldogma.server.storage.repository.RepositoryUtil.applyQuery;
25  import static com.linecorp.centraldogma.server.storage.repository.RepositoryUtil.mergeEntries;
26  import static java.util.Objects.requireNonNull;
27  
28  import java.util.ArrayList;
29  import java.util.Arrays;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.concurrent.CompletableFuture;
33  
34  import com.fasterxml.jackson.databind.JsonNode;
35  import com.google.common.collect.ImmutableList;
36  import com.google.common.collect.ImmutableMap;
37  import com.spotify.futures.CompletableFutures;
38  
39  import com.linecorp.centraldogma.common.Author;
40  import com.linecorp.centraldogma.common.CentralDogmaException;
41  import com.linecorp.centraldogma.common.Change;
42  import com.linecorp.centraldogma.common.Commit;
43  import com.linecorp.centraldogma.common.Entry;
44  import com.linecorp.centraldogma.common.EntryNotFoundException;
45  import com.linecorp.centraldogma.common.EntryType;
46  import com.linecorp.centraldogma.common.Markup;
47  import com.linecorp.centraldogma.common.MergeQuery;
48  import com.linecorp.centraldogma.common.MergeSource;
49  import com.linecorp.centraldogma.common.MergedEntry;
50  import com.linecorp.centraldogma.common.Query;
51  import com.linecorp.centraldogma.common.QueryExecutionException;
52  import com.linecorp.centraldogma.common.Revision;
53  import com.linecorp.centraldogma.common.RevisionNotFoundException;
54  import com.linecorp.centraldogma.common.RevisionRange;
55  import com.linecorp.centraldogma.internal.HistoryConstants;
56  import com.linecorp.centraldogma.server.command.CommitResult;
57  import com.linecorp.centraldogma.server.internal.replication.ReplicationLog;
58  import com.linecorp.centraldogma.server.storage.StorageException;
59  import com.linecorp.centraldogma.server.storage.project.Project;
60  
61  /**
62   * Revision-controlled filesystem-like repository.
63   */
64  public interface Repository {
65  
66      int DEFAULT_MAX_COMMITS = 100;
67      int MAX_MAX_COMMITS = HistoryConstants.MAX_MAX_COMMITS;
68  
69      String ALL_PATH = "/**";
70  
71      /**
72       * Returns the parent {@link Project} of this {@link Repository}.
73       */
74      Project parent();
75  
76      /**
77       * Returns the name of this {@link Repository}.
78       */
79      String name();
80  
81      /**
82       * Returns the creation time of this {@link Repository}.
83       */
84      long creationTimeMillis();
85  
86      /**
87       * Returns the author who created this {@link Repository}.
88       */
89      Author author();
90  
91      /**
92       * Returns the {@link CompletableFuture} whose value is the absolute {@link Revision} of the
93       * specified {@link Revision}.
94       *
95       * @deprecated Use {@link #normalizeNow(Revision)}.
96       */
97      @Deprecated
98      default CompletableFuture<Revision> normalize(Revision revision) {
99          try {
100             return CompletableFuture.completedFuture(normalizeNow(revision));
101         } catch (Exception e) {
102             return CompletableFutures.exceptionallyCompletedFuture(e);
103         }
104     }
105 
106     /**
107      * Returns the absolute {@link Revision} of the specified {@link Revision}.
108      *
109      * @throws RevisionNotFoundException if the specified {@link Revision} is not found
110      */
111     Revision normalizeNow(Revision revision);
112 
113     /**
114      * Returns a {@link RevisionRange} which contains the absolute {@link Revision}s of the specified
115      * {@code from} and {@code to}.
116      *
117      * @throws RevisionNotFoundException if the specified {@code from} or {@code to} is not found
118      */
119     RevisionRange normalizeNow(Revision from, Revision to);
120 
121     /**
122      * Returns {@code true} if and only if an {@link Entry} exists at the specified {@code path}.
123      */
124     default CompletableFuture<Boolean> exists(Revision revision, String path) {
125         validateFilePath(path, "path");
126         return find(revision, path, FIND_ONE_WITHOUT_CONTENT).thenApply(result -> !result.isEmpty());
127     }
128 
129     /**
130      * Retrieves an {@link Entry} at the specified {@code path}.
131      *
132      * @throws EntryNotFoundException if there's no entry at the specified {@code path}
133      *
134      * @see #getOrNull(Revision, String)
135      */
136     default CompletableFuture<Entry<?>> get(Revision revision, String path) {
137         return getOrNull(revision, path).thenApply(entry -> {
138             if (entry == null) {
139                 throw new EntryNotFoundException(revision, path);
140             }
141 
142             return entry;
143         });
144     }
145 
146     /**
147      * Performs the specified {@link Query}.
148      *
149      * @throws EntryNotFoundException if there's no entry at the path specified in the {@link Query}
150      *
151      * @see #getOrNull(Revision, Query)
152      */
153     default <T> CompletableFuture<Entry<T>> get(Revision revision, Query<T> query) {
154         return getOrNull(revision, query).thenApply(res -> {
155             if (res == null) {
156                 throw new EntryNotFoundException(revision, query.path());
157             }
158 
159             return res;
160         });
161     }
162 
163     /**
164      * Retrieves an {@link Entry} at the specified {@code path}.
165      *
166      * @return the {@link Entry} at the specified {@code path} if exists.
167      *         The specified {@code other} if there's no such {@link Entry}.
168      *
169      * @see #get(Revision, String)
170      */
171     default CompletableFuture<Entry<?>> getOrNull(Revision revision, String path) {
172         validateFilePath(path, "path");
173 
174         return find(revision, path, FIND_ONE_WITH_CONTENT).thenApply(findResult -> findResult.get(path));
175     }
176 
177     /**
178      * Performs the specified {@link Query}.
179      *
180      * @return the {@link Entry} on a successful query.
181      *         The specified {@code other} on a failure due to missing entry.
182      *
183      * @see #get(Revision, Query)
184      */
185     default <T> CompletableFuture<Entry<T>> getOrNull(Revision revision, Query<T> query) {
186         requireNonNull(query, "query");
187         requireNonNull(revision, "revision");
188 
189         return getOrNull(revision, query.path()).thenApply(result -> {
190             if (result == null) {
191                 return null;
192             }
193 
194             @SuppressWarnings("unchecked")
195             final Entry<T> entry = (Entry<T>) result;
196 
197             try {
198                 return applyQuery(entry, query);
199             } catch (CentralDogmaException e) {
200                 throw e;
201             } catch (Exception e) {
202                 throw new QueryExecutionException(e);
203             }
204         });
205     }
206 
207     /**
208      * Finds the {@link Entry}s that match the specified {@code pathPattern}.
209      *
210      * @return a {@link Map} whose value is the matching {@link Entry} and key is its path
211      */
212     default CompletableFuture<Map<String, Entry<?>>> find(Revision revision, String pathPattern) {
213         return find(revision, pathPattern, ImmutableMap.of());
214     }
215 
216     /**
217      * Finds the {@link Entry}s that match the specified {@code pathPattern}.
218      *
219      * @return a {@link Map} whose value is the matching {@link Entry} and key is its path
220      */
221     CompletableFuture<Map<String, Entry<?>>> find(Revision revision, String pathPattern,
222                                                   Map<FindOption<?>, ?> options);
223 
224     /**
225      * Query a file at two different revisions and return the diff of the two query results.
226      */
227     default CompletableFuture<Change<?>> diff(Revision from, Revision to, Query<?> query) {
228         requireNonNull(from, "from");
229         requireNonNull(to, "to");
230         requireNonNull(query, "query");
231 
232         final RevisionRange range;
233         try {
234             range = normalizeNow(from, to).toAscending();
235         } catch (Exception e) {
236             return CompletableFutures.exceptionallyCompletedFuture(e);
237         }
238 
239         final String path = query.path();
240         final CompletableFuture<Entry<?>> fromEntryFuture = getOrNull(range.from(), path);
241         final CompletableFuture<Entry<?>> toEntryFuture = getOrNull(range.to(), path);
242 
243         final CompletableFuture<Change<?>> future =
244                 CompletableFutures.combine(fromEntryFuture, toEntryFuture, (fromEntry, toEntry) -> {
245                     @SuppressWarnings("unchecked")
246                     final Query<Object> castQuery = (Query<Object>) query;
247 
248                     // Handle the case where the entry does not exist at 'from' or 'to'.
249                     if (fromEntry != null) {
250                         if (toEntry == null) {
251                             // The entry has been removed.
252                             return Change.ofRemoval(path);
253                         }
254                     } else if (toEntry != null) {
255                         // The entry has been created.
256                         final EntryType toEntryType = toEntry.type();
257                         if (!query.type().supportedEntryTypes().contains(toEntryType)) {
258                             throw new QueryExecutionException("unsupported entry type: " + toEntryType);
259                         }
260 
261                         final Object toContent = castQuery.apply(toEntry.content());
262 
263                         switch (toEntryType) {
264                             case JSON:
265                                 return Change.ofJsonUpsert(path, (JsonNode) toContent);
266                             case TEXT:
267                                 return Change.ofTextUpsert(path, (String) toContent);
268                             default:
269                                 throw new Error();
270                         }
271                     } else {
272                         // The entry did not exist both at 'from' and 'to'.
273                         throw new EntryNotFoundException(path + " (" + from + ", " + to + ')');
274                     }
275 
276                     // Handle the case where the entry exists both at 'from' and at 'to'.
277                     final EntryType entryType = fromEntry.type();
278                     if (!query.type().supportedEntryTypes().contains(entryType)) {
279                         throw new QueryExecutionException("unsupported entry type: " + entryType);
280                     }
281                     if (entryType != toEntry.type()) {
282                         throw new QueryExecutionException(
283                                 "mismatching entry type: " + entryType + " != " + toEntry.type());
284                     }
285 
286                     final Object fromContent = castQuery.apply(fromEntry.content());
287                     final Object toContent = castQuery.apply(toEntry.content());
288 
289                     switch (entryType) {
290                         case JSON:
291                             return Change.ofJsonPatch(path, (JsonNode) fromContent, (JsonNode) toContent);
292                         case TEXT:
293                             return Change.ofTextPatch(path, (String) fromContent, (String) toContent);
294                         default:
295                             throw new Error();
296                     }
297                 }).toCompletableFuture();
298 
299         return unsafeCast(future);
300     }
301 
302     /**
303      * Returns the diff for all files that are matched by the specified {@code pathPattern}
304      * between the specified two {@link Revision}s.
305      */
306     CompletableFuture<Map<String, Change<?>>> diff(Revision from, Revision to, String pathPattern);
307 
308     /**
309      * Generates the preview diff against the specified {@code baseRevision} and {@code changes}.
310      */
311     default CompletableFuture<Map<String, Change<?>>> previewDiff(Revision baseRevision, Change<?>... changes) {
312         requireNonNull(changes, "changes");
313         return previewDiff(baseRevision, Arrays.asList(changes));
314     }
315 
316     /**
317      * Generates the preview diff against the specified {@code baseRevision} and {@code changes}.
318      */
319     CompletableFuture<Map<String, Change<?>>> previewDiff(Revision baseRevision, Iterable<Change<?>> changes);
320 
321     /**
322      * Adds the specified changes to this {@link Repository}.
323      *
324      * @return the {@link Revision} of the new {@link Commit}
325      */
326     default CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis,
327                                                    Author author, String summary, Iterable<Change<?>> changes) {
328         return commit(baseRevision, commitTimeMillis, author, summary, "", Markup.PLAINTEXT, changes,
329                       true);
330     }
331 
332     /**
333      * Adds the specified changes to this {@link Repository}.
334      *
335      * @return the {@link Revision} of the new {@link Commit}
336      */
337     default CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis,
338                                                    Author author, String summary, Change<?>... changes) {
339         return commit(baseRevision, commitTimeMillis, author, summary, "", Markup.PLAINTEXT, changes);
340     }
341 
342     /**
343      * Adds the specified changes to this {@link Repository}.
344      *
345      * @return the {@link Revision} of the new {@link Commit}
346      */
347     default CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis,
348                                                    Author author, String summary, String detail, Markup markup,
349                                                    Change<?>... changes) {
350         requireNonNull(changes, "changes");
351         return commit(baseRevision, commitTimeMillis, author, summary, detail, markup,
352                       ImmutableList.copyOf(changes), true);
353     }
354 
355     /**
356      * Adds the specified changes to this {@link Repository}.
357      *
358      * @param baseRevision the base {@link Revision} of this {@link Commit}
359      * @param commitTimeMillis the time and date of this {@link Commit}, represented as the number of
360      *                         milliseconds since the epoch (midnight, January 1, 1970 UTC)
361      * @param author the {@link Author} of this {@link Commit}
362      * @param summary the human-readable summary of this {@link Commit}
363      * @param detail the human-readable detailed description of this {@link Commit}
364      * @param markup the {@link Markup} language of {@code summary} and {@code detail}
365      * @param changes the changes to be applied
366      * @param directExecution whether this {@link Commit} is received by this server and executed directly.
367      *                        {@code false} if this commit is delivered by a {@link ReplicationLog}.
368      *
369      * @return the {@link Revision} of the new {@link Commit}
370      */
371     CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis,
372                                            Author author, String summary, String detail, Markup markup,
373                                            Iterable<Change<?>> changes, boolean directExecution);
374 
375     /**
376      * Get a list of {@link Commit} for given pathPattern.
377      *
378      * @param from the starting revision (inclusive)
379      * @param to the end revision (inclusive)
380      *
381      * @param pathPattern the path pattern
382      * @return {@link Commit}
383      *
384      * @throws StorageException when any internal error occurs.
385      */
386     default CompletableFuture<List<Commit>> history(Revision from, Revision to, String pathPattern) {
387         return history(from, to, pathPattern, DEFAULT_MAX_COMMITS);
388     }
389 
390     /**
391      * Get a list of {@link Commit} for given pathPattern.
392      *
393      * @param from the starting revision (inclusive)
394      * @param to the end revision (inclusive)
395      * @param maxCommits the maximum number of {@link Commit}s to return
396      *
397      * @param pathPattern the path pattern
398      * @return {@link Commit}
399      *
400      * @throws StorageException when any internal error occurs.
401      */
402     CompletableFuture<List<Commit>> history(Revision from, Revision to, String pathPattern, int maxCommits);
403 
404     /**
405      * Returns the latest {@link Revision} if there are any {@link Change}s since {@code lastKnownRevision}
406      * that affected the path matched by the specified {@code pathPattern}. The behavior of this method could
407      * be represented as the following code:
408      * <pre>{@code
409      * RevisionRange range = repository.normalizeNow(lastKnownRevision, Revision.HEAD);
410      * return repository.diff(range.from(), range.to(), pathPattern).thenApply(diff -> {
411      *     if (diff.isEmpty()) {
412      *         return null;
413      *     } else {
414      *         return range.to();
415      *     }
416      * });
417      * }</pre>
418      * .. although it would be implemented more efficiently.
419      *
420      * @return the latest {@link Revision} if there's a match, or {@code null} if there's no match or
421      *         {@code lastKnownRevision} is the latest {@link Revision}
422      */
423     default CompletableFuture<Revision> findLatestRevision(Revision lastKnownRevision, String pathPattern) {
424         return findLatestRevision(lastKnownRevision, pathPattern, false);
425     }
426 
427     /**
428      * Returns the latest {@link Revision} if there are any {@link Change}s since {@code lastKnownRevision}
429      * that affected the path matched by the specified {@code pathPattern}. The behavior of this method could
430      * be represented as the following code:
431      * <pre>{@code
432      * RevisionRange range = repository.normalizeNow(lastKnownRevision, Revision.HEAD);
433      * return repository.diff(range.from(), range.to(), pathPattern).thenApply(diff -> {
434      *     if (diff.isEmpty()) {
435      *         return null;
436      *     } else {
437      *         return range.to();
438      *     }
439      * });
440      * }</pre>
441      * .. although it would be implemented more efficiently.
442      *
443      * @return the latest {@link Revision} if there's a match, or {@code null} if there's no match or
444      *         {@code lastKnownRevision} is the latest {@link Revision}
445      */
446     CompletableFuture<Revision> findLatestRevision(Revision lastKnownRevision, String pathPattern,
447                                                    boolean errorOnEntryNotFound);
448 
449     /**
450      * Awaits and retrieves the latest revision of the commit that changed the file that matches the specified
451      * {@code pathPattern} since the specified last known revision.
452      */
453     default CompletableFuture<Revision> watch(Revision lastKnownRevision, String pathPattern) {
454         return watch(lastKnownRevision, pathPattern, false);
455     }
456 
457     /**
458      * Awaits and retrieves the latest revision of the commit that changed the file that matches the specified
459      * {@code pathPattern} since the specified last known revision.
460      */
461     CompletableFuture<Revision> watch(Revision lastKnownRevision, String pathPattern,
462                                       boolean errorOnEntryNotFound);
463 
464     /**
465      * Awaits and retrieves the change in the query result of the specified file asynchronously since the
466      * specified last known revision.
467      */
468     default <T> CompletableFuture<Entry<T>> watch(Revision lastKnownRevision, Query<T> query) {
469         return watch(lastKnownRevision, query, false);
470     }
471 
472     /**
473      * Awaits and retrieves the change in the query result of the specified file asynchronously since the
474      * specified last known revision.
475      */
476     default <T> CompletableFuture<Entry<T>> watch(Revision lastKnownRevision, Query<T> query,
477                                                   boolean errorOnEntryNotFound) {
478         return RepositoryUtil.watch(this, lastKnownRevision, query, errorOnEntryNotFound);
479     }
480 
481     /**
482      * Merges the JSON files sequentially as specified in the {@link MergeQuery}.
483      */
484     default <T> CompletableFuture<MergedEntry<T>> mergeFiles(Revision revision, MergeQuery<T> query) {
485         requireNonNull(revision, "revision");
486         requireNonNull(query, "query");
487 
488         final List<MergeSource> mergeSources = query.mergeSources();
489         // Only JSON files can currently be merged.
490         mergeSources.forEach(path -> validateJsonFilePath(path.path(), "path"));
491 
492         final Revision normalizedRevision;
493         try {
494             normalizedRevision = normalizeNow(revision);
495         } catch (Exception e) {
496             return CompletableFutures.exceptionallyCompletedFuture(e);
497         }
498         final List<CompletableFuture<Entry<?>>> entryFutures = new ArrayList<>(mergeSources.size());
499         mergeSources.forEach(path -> {
500             if (!path.isOptional()) {
501                 entryFutures.add(get(normalizedRevision, path.path()));
502             } else {
503                 entryFutures.add(getOrNull(normalizedRevision, path.path()));
504             }
505         });
506 
507         final CompletableFuture<MergedEntry<?>> mergedEntryFuture = mergeEntries(entryFutures, revision,
508                                                                                  query);
509         final CompletableFuture<MergedEntry<T>> future = new CompletableFuture<>();
510         mergedEntryFuture.handle((mergedEntry, cause) -> {
511             if (cause != null) {
512                 if (!(cause instanceof CentralDogmaException)) {
513                     cause = new QueryExecutionException(cause);
514                 }
515                 future.completeExceptionally(cause);
516                 return null;
517             }
518             future.complete(unsafeCast(mergedEntry));
519             return null;
520         });
521 
522         return future;
523     }
524 }