1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.linecorp.centraldogma.server.storage.repository;
18  
19  import static com.linecorp.armeria.common.util.Functions.voidFunction;
20  import static com.linecorp.centraldogma.common.QueryType.IDENTITY;
21  import static com.linecorp.centraldogma.common.QueryType.IDENTITY_JSON;
22  import static com.linecorp.centraldogma.common.QueryType.IDENTITY_TEXT;
23  import static com.linecorp.centraldogma.common.QueryType.JSON_PATH;
24  import static com.linecorp.centraldogma.internal.Util.unsafeCast;
25  import static java.util.Objects.requireNonNull;
26  
27  import java.util.List;
28  import java.util.Objects;
29  import java.util.concurrent.CancellationException;
30  import java.util.concurrent.CompletableFuture;
31  import java.util.stream.Collectors;
32  
33  import javax.annotation.Nullable;
34  
35  import com.fasterxml.jackson.core.JsonParseException;
36  import com.fasterxml.jackson.databind.JsonNode;
37  import com.google.common.collect.ImmutableList;
38  import com.google.common.collect.ImmutableList.Builder;
39  import com.google.common.collect.Iterables;
40  import com.google.common.collect.Streams;
41  import com.spotify.futures.CompletableFutures;
42  
43  import com.linecorp.armeria.common.util.Exceptions;
44  import com.linecorp.centraldogma.common.Entry;
45  import com.linecorp.centraldogma.common.EntryNotFoundException;
46  import com.linecorp.centraldogma.common.EntryType;
47  import com.linecorp.centraldogma.common.MergeQuery;
48  import com.linecorp.centraldogma.common.MergeSource;
49  import com.linecorp.centraldogma.common.MergedEntry;
50  import com.linecorp.centraldogma.common.Query;
51  import com.linecorp.centraldogma.common.QueryExecutionException;
52  import com.linecorp.centraldogma.common.QuerySyntaxException;
53  import com.linecorp.centraldogma.common.QueryType;
54  import com.linecorp.centraldogma.common.Revision;
55  import com.linecorp.centraldogma.internal.Jackson;
56  
57  /**
58   * Utility methods that are useful when implementing a {@link Repository} implementation.
59   */
60  final class RepositoryUtil {
61  
62      private static final CancellationException CANCELLATION_EXCEPTION =
63              Exceptions.clearTrace(new CancellationException("parent complete"));
64  
65      static CompletableFuture<MergedEntry<?>> mergeEntries(
66              List<CompletableFuture<Entry<?>>> entryFutures, Revision revision,
67              MergeQuery<?> query) {
68          requireNonNull(entryFutures, "entryFutures");
69          requireNonNull(revision, "revision");
70          requireNonNull(query, "query");
71  
72          final CompletableFuture<MergedEntry<JsonNode>> future = new CompletableFuture<>();
73          CompletableFutures.allAsList(entryFutures).handle((entries, cause) -> {
74              if (cause != null) {
75                  future.completeExceptionally(Exceptions.peel(cause));
76                  return null;
77              }
78  
79              final Builder<JsonNode> jsonNodesBuilder = ImmutableList.builder();
80              final Builder<String> pathsBuilder = ImmutableList.builder();
81              for (Entry<?> entry : entries) {
82                  if (entry == null) {
83                      continue;
84                  }
85                  try {
86                      jsonNodesBuilder.add(entry.contentAsJson());
87                      pathsBuilder.add(entry.path());
88                  } catch (JsonParseException e) {
89                      future.completeExceptionally(e);
90                      return null;
91                  }
92              }
93  
94              JsonNode result;
95              try {
96                  final List<JsonNode> jsonNodes = jsonNodesBuilder.build();
97                  if (jsonNodes.isEmpty()) {
98                      throw new EntryNotFoundException(revision, concatenatePaths(query.mergeSources()));
99                  }
100 
101                 result = Jackson.mergeTree(jsonNodes);
102                 final List<String> expressions = query.expressions();
103                 if (!Iterables.isEmpty(expressions)) {
104                     result = Jackson.extractTree(result, expressions);
105                 }
106             } catch (Exception e) {
107                 future.completeExceptionally(e);
108                 return null;
109             }
110 
111             future.complete(MergedEntry.of(revision, EntryType.JSON, result, pathsBuilder.build()));
112             return null;
113         });
114         return unsafeCast(future);
115     }
116 
117     private static String concatenatePaths(Iterable<MergeSource> mergeSources) {
118         return Streams.stream(mergeSources).map(MergeSource::path).collect(Collectors.joining(","));
119     }
120 
121     /**
122      * Applies the specified {@link Query} to the {@link Entry#content()} of the specified {@link Entry} and
123      * returns the query result.
124      *
125      * @throws IllegalStateException if the specified {@link Entry} is a directory
126      * @throws QuerySyntaxException if the syntax of specified {@link Query} is invalid
127      * @throws QueryExecutionException if an {@link Exception} is raised while applying the specified
128      *                                 {@link Query} to the {@link Entry#content()}
129      */
130     static <T> Entry<T> applyQuery(Entry<T> entry, Query<T> query) {
131         requireNonNull(query, "query");
132         entry.content(); // Ensure that content is not null.
133         final EntryType entryType = entry.type();
134 
135         final QueryType queryType = query.type();
136         if (!queryType.supportedEntryTypes().contains(entryType)) {
137             throw new QueryExecutionException("Unsupported entry type: " + entryType +
138                                               " (query: " + query + ')');
139         }
140 
141         if (queryType == IDENTITY || queryType == IDENTITY_TEXT || queryType == IDENTITY_JSON) {
142             return entry;
143         } else if (queryType == JSON_PATH) {
144             return Entry.of(entry.revision(), query.path(), entryType, query.apply(entry.content()));
145         } else {
146             throw new QueryExecutionException("Unsupported entry type: " + entryType +
147                                               " (query: " + query + ')');
148         }
149     }
150 
151     static <T> CompletableFuture<Entry<T>> watch(Repository repo, Revision lastKnownRev, Query<T> query,
152                                                  boolean errorOnEntryNotFound) {
153         requireNonNull(repo, "repo");
154         requireNonNull(lastKnownRev, "lastKnownRev");
155         requireNonNull(query, "query");
156 
157         final Query<Object> castQuery = unsafeCast(query);
158         final CompletableFuture<Entry<Object>> parentFuture = new CompletableFuture<>();
159         repo.getOrNull(lastKnownRev, castQuery)
160             .thenAccept(oldResult -> watch(repo, castQuery, lastKnownRev, oldResult,
161                                            parentFuture, errorOnEntryNotFound))
162             .exceptionally(voidFunction(parentFuture::completeExceptionally));
163 
164         return unsafeCast(parentFuture);
165     }
166 
167     private static void watch(Repository repo, Query<Object> query,
168                               Revision lastKnownRev, @Nullable Entry<Object> oldResult,
169                               CompletableFuture<Entry<Object>> parentFuture, boolean errorOnEntryNotFound) {
170 
171         final CompletableFuture<Revision> future = repo.watch(lastKnownRev, query.path(), errorOnEntryNotFound);
172         parentFuture.whenComplete((res, cause) -> future.completeExceptionally(CANCELLATION_EXCEPTION));
173 
174         future.thenCompose(newRev -> repo.getOrNull(newRev, query).thenAccept(newResult -> {
175             if (errorOnEntryNotFound && newResult == null) {
176                 // The entry is removed.
177                 parentFuture.completeExceptionally(new EntryNotFoundException(newRev, query.path()));
178                 return;
179             }
180 
181             if (newResult == null ||
182                 oldResult != null && Objects.equals(oldResult.content(), newResult.content())) {
183                 // Entry does not exist or did not change; watch again for more changes.
184                 if (!parentFuture.isDone()) {
185                     // ... only when the parent future has not been cancelled.
186                     watch(repo, query, newRev, oldResult, parentFuture, errorOnEntryNotFound);
187                 }
188             } else {
189                 parentFuture.complete(newResult);
190             }
191         })).exceptionally(voidFunction(parentFuture::completeExceptionally));
192     }
193 
194     private RepositoryUtil() {}
195 }