1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.linecorp.centraldogma.server.storage.repository;
18
19 import static com.linecorp.armeria.common.util.Functions.voidFunction;
20 import static com.linecorp.centraldogma.common.QueryType.IDENTITY;
21 import static com.linecorp.centraldogma.common.QueryType.IDENTITY_JSON;
22 import static com.linecorp.centraldogma.common.QueryType.IDENTITY_TEXT;
23 import static com.linecorp.centraldogma.common.QueryType.JSON_PATH;
24 import static com.linecorp.centraldogma.internal.Util.unsafeCast;
25 import static java.util.Objects.requireNonNull;
26
27 import java.util.List;
28 import java.util.Objects;
29 import java.util.concurrent.CancellationException;
30 import java.util.concurrent.CompletableFuture;
31 import java.util.stream.Collectors;
32
33 import javax.annotation.Nullable;
34
35 import com.fasterxml.jackson.core.JsonParseException;
36 import com.fasterxml.jackson.databind.JsonNode;
37 import com.google.common.collect.ImmutableList;
38 import com.google.common.collect.ImmutableList.Builder;
39 import com.google.common.collect.Iterables;
40 import com.google.common.collect.Streams;
41 import com.spotify.futures.CompletableFutures;
42
43 import com.linecorp.armeria.common.util.Exceptions;
44 import com.linecorp.centraldogma.common.Entry;
45 import com.linecorp.centraldogma.common.EntryNotFoundException;
46 import com.linecorp.centraldogma.common.EntryType;
47 import com.linecorp.centraldogma.common.MergeQuery;
48 import com.linecorp.centraldogma.common.MergeSource;
49 import com.linecorp.centraldogma.common.MergedEntry;
50 import com.linecorp.centraldogma.common.Query;
51 import com.linecorp.centraldogma.common.QueryExecutionException;
52 import com.linecorp.centraldogma.common.QuerySyntaxException;
53 import com.linecorp.centraldogma.common.QueryType;
54 import com.linecorp.centraldogma.common.Revision;
55 import com.linecorp.centraldogma.internal.Jackson;
56
57
58
59
60 final class RepositoryUtil {
61
62 private static final CancellationException CANCELLATION_EXCEPTION =
63 Exceptions.clearTrace(new CancellationException("parent complete"));
64
65 static CompletableFuture<MergedEntry<?>> mergeEntries(
66 List<CompletableFuture<Entry<?>>> entryFutures, Revision revision,
67 MergeQuery<?> query) {
68 requireNonNull(entryFutures, "entryFutures");
69 requireNonNull(revision, "revision");
70 requireNonNull(query, "query");
71
72 final CompletableFuture<MergedEntry<JsonNode>> future = new CompletableFuture<>();
73 CompletableFutures.allAsList(entryFutures).handle((entries, cause) -> {
74 if (cause != null) {
75 future.completeExceptionally(Exceptions.peel(cause));
76 return null;
77 }
78
79 final Builder<JsonNode> jsonNodesBuilder = ImmutableList.builder();
80 final Builder<String> pathsBuilder = ImmutableList.builder();
81 for (Entry<?> entry : entries) {
82 if (entry == null) {
83 continue;
84 }
85 try {
86 jsonNodesBuilder.add(entry.contentAsJson());
87 pathsBuilder.add(entry.path());
88 } catch (JsonParseException e) {
89 future.completeExceptionally(e);
90 return null;
91 }
92 }
93
94 JsonNode result;
95 try {
96 final List<JsonNode> jsonNodes = jsonNodesBuilder.build();
97 if (jsonNodes.isEmpty()) {
98 throw new EntryNotFoundException(revision, concatenatePaths(query.mergeSources()));
99 }
100
101 result = Jackson.mergeTree(jsonNodes);
102 final List<String> expressions = query.expressions();
103 if (!Iterables.isEmpty(expressions)) {
104 result = Jackson.extractTree(result, expressions);
105 }
106 } catch (Exception e) {
107 future.completeExceptionally(e);
108 return null;
109 }
110
111 future.complete(MergedEntry.of(revision, EntryType.JSON, result, pathsBuilder.build()));
112 return null;
113 });
114 return unsafeCast(future);
115 }
116
117 private static String concatenatePaths(Iterable<MergeSource> mergeSources) {
118 return Streams.stream(mergeSources).map(MergeSource::path).collect(Collectors.joining(","));
119 }
120
121
122
123
124
125
126
127
128
129
130 static <T> Entry<T> applyQuery(Entry<T> entry, Query<T> query) {
131 requireNonNull(query, "query");
132 entry.content();
133 final EntryType entryType = entry.type();
134
135 final QueryType queryType = query.type();
136 if (!queryType.supportedEntryTypes().contains(entryType)) {
137 throw new QueryExecutionException("Unsupported entry type: " + entryType +
138 " (query: " + query + ')');
139 }
140
141 if (queryType == IDENTITY || queryType == IDENTITY_TEXT || queryType == IDENTITY_JSON) {
142 return entry;
143 } else if (queryType == JSON_PATH) {
144 return Entry.of(entry.revision(), query.path(), entryType, query.apply(entry.content()));
145 } else {
146 throw new QueryExecutionException("Unsupported entry type: " + entryType +
147 " (query: " + query + ')');
148 }
149 }
150
151 static <T> CompletableFuture<Entry<T>> watch(Repository repo, Revision lastKnownRev, Query<T> query,
152 boolean errorOnEntryNotFound) {
153 requireNonNull(repo, "repo");
154 requireNonNull(lastKnownRev, "lastKnownRev");
155 requireNonNull(query, "query");
156
157 final Query<Object> castQuery = unsafeCast(query);
158 final CompletableFuture<Entry<Object>> parentFuture = new CompletableFuture<>();
159 repo.getOrNull(lastKnownRev, castQuery)
160 .thenAccept(oldResult -> watch(repo, castQuery, lastKnownRev, oldResult,
161 parentFuture, errorOnEntryNotFound))
162 .exceptionally(voidFunction(parentFuture::completeExceptionally));
163
164 return unsafeCast(parentFuture);
165 }
166
167 private static void watch(Repository repo, Query<Object> query,
168 Revision lastKnownRev, @Nullable Entry<Object> oldResult,
169 CompletableFuture<Entry<Object>> parentFuture, boolean errorOnEntryNotFound) {
170
171 final CompletableFuture<Revision> future = repo.watch(lastKnownRev, query.path(), errorOnEntryNotFound);
172 parentFuture.whenComplete((res, cause) -> future.completeExceptionally(CANCELLATION_EXCEPTION));
173
174 future.thenCompose(newRev -> repo.getOrNull(newRev, query).thenAccept(newResult -> {
175 if (errorOnEntryNotFound && newResult == null) {
176
177 parentFuture.completeExceptionally(new EntryNotFoundException(newRev, query.path()));
178 return;
179 }
180
181 if (newResult == null ||
182 oldResult != null && Objects.equals(oldResult.content(), newResult.content())) {
183
184 if (!parentFuture.isDone()) {
185
186 watch(repo, query, newRev, oldResult, parentFuture, errorOnEntryNotFound);
187 }
188 } else {
189 parentFuture.complete(newResult);
190 }
191 })).exceptionally(voidFunction(parentFuture::completeExceptionally));
192 }
193
194 private RepositoryUtil() {}
195 }