1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.linecorp.centraldogma.server.internal.storage.repository.cache;
18
19 import static com.google.common.base.MoreObjects.toStringHelper;
20 import static com.linecorp.centraldogma.internal.Util.unsafeCast;
21 import static com.linecorp.centraldogma.server.internal.api.HttpApiUtil.throwUnsafelyIfNonNull;
22 import static com.linecorp.centraldogma.server.storage.repository.FindOptions.FIND_ONE_WITH_CONTENT;
23 import static java.util.Objects.requireNonNull;
24
25 import java.util.List;
26 import java.util.Map;
27 import java.util.concurrent.CancellationException;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.concurrent.Executor;
30
31 import com.linecorp.armeria.common.CommonPools;
32 import com.linecorp.armeria.common.RequestContext;
33 import com.linecorp.armeria.common.util.Exceptions;
34 import com.linecorp.centraldogma.common.Author;
35 import com.linecorp.centraldogma.common.Change;
36 import com.linecorp.centraldogma.common.Commit;
37 import com.linecorp.centraldogma.common.Entry;
38 import com.linecorp.centraldogma.common.EntryNotFoundException;
39 import com.linecorp.centraldogma.common.Markup;
40 import com.linecorp.centraldogma.common.MergeQuery;
41 import com.linecorp.centraldogma.common.MergedEntry;
42 import com.linecorp.centraldogma.common.Query;
43 import com.linecorp.centraldogma.common.QueryType;
44 import com.linecorp.centraldogma.common.Revision;
45 import com.linecorp.centraldogma.common.RevisionRange;
46 import com.linecorp.centraldogma.server.command.CommitResult;
47 import com.linecorp.centraldogma.server.internal.storage.repository.RepositoryCache;
48 import com.linecorp.centraldogma.server.storage.project.Project;
49 import com.linecorp.centraldogma.server.storage.repository.FindOption;
50 import com.linecorp.centraldogma.server.storage.repository.Repository;
51
52 final class CachingRepository implements Repository {
53
54 private static final CancellationException CANCELLATION_EXCEPTION =
55 Exceptions.clearTrace(new CancellationException("watch cancelled by caller"));
56
57 private final Repository repo;
58 private final RepositoryCache cache;
59
60 CachingRepository(Repository repo, RepositoryCache cache) {
61 this.repo = requireNonNull(repo, "repo");
62 this.cache = requireNonNull(cache, "cache");
63 }
64
65 @Override
66 public long creationTimeMillis() {
67 return repo.creationTimeMillis();
68 }
69
70 @Override
71 public Author author() {
72 return repo.author();
73 }
74
75 @Override
76 public <T> CompletableFuture<Entry<T>> getOrNull(Revision revision, Query<T> query) {
77 requireNonNull(revision, "revision");
78 requireNonNull(query, "query");
79
80 final Revision normalizedRevision = normalizeNow(revision);
81 if (query.type() == QueryType.IDENTITY || query.type() == QueryType.IDENTITY_TEXT ||
82 query.type() == QueryType.IDENTITY_JSON) {
83
84
85 final String path = query.path();
86 final CompletableFuture<Entry<?>> future =
87 find(revision, path, FIND_ONE_WITH_CONTENT).thenApply(findResult -> findResult.get(path));
88 return unsafeCast(future);
89 }
90
91 final CompletableFuture<Object> future =
92 cache.get(new CacheableQueryCall(repo, normalizedRevision, query))
93 .handleAsync((result, cause) -> {
94 throwUnsafelyIfNonNull(cause);
95 return result != CacheableQueryCall.EMPTY ? result : null;
96 }, executor());
97 return unsafeCast(future);
98 }
99
100 @Override
101 public CompletableFuture<Map<String, Entry<?>>> find(Revision revision, String pathPattern,
102 Map<FindOption<?>, ?> options) {
103 requireNonNull(revision, "revision");
104 requireNonNull(pathPattern, "pathPattern");
105 requireNonNull(options, "options");
106
107 final Revision normalizedRevision = normalizeNow(revision);
108 return cache.get(new CacheableFindCall(repo, normalizedRevision, pathPattern, options))
109 .handleAsync((unused, cause) -> {
110 throwUnsafelyIfNonNull(cause);
111 return unused;
112 }, executor());
113 }
114
115 @Override
116 public CompletableFuture<List<Commit>> history(Revision from, Revision to,
117 String pathPattern, int maxCommits) {
118 requireNonNull(from, "from");
119 requireNonNull(to, "to");
120 requireNonNull(pathPattern, "pathPattern");
121 if (maxCommits <= 0) {
122 throw new IllegalArgumentException("maxCommits: " + maxCommits + " (expected: > 0)");
123 }
124
125 final RevisionRange range = normalizeNow(from, to);
126
127
128
129 final int actualMaxCommits = Math.min(
130 maxCommits, Math.abs(range.from().major() - range.to().major()) + 1);
131 return cache.get(new CacheableHistoryCall(repo, range.from(), range.to(),
132 pathPattern, actualMaxCommits))
133 .handleAsync((unused, cause) -> {
134 throwUnsafelyIfNonNull(cause);
135 return unused;
136 }, executor());
137 }
138
139 @Override
140 public CompletableFuture<Change<?>> diff(Revision from, Revision to, Query<?> query) {
141 requireNonNull(from, "from");
142 requireNonNull(to, "to");
143 requireNonNull(query, "query");
144
145 final RevisionRange range = normalizeNow(from, to).toAscending();
146 return cache.get(new CacheableSingleDiffCall(repo, range.from(), range.to(), query))
147 .handleAsync((unused, cause) -> {
148 throwUnsafelyIfNonNull(cause);
149 return unused;
150 }, executor());
151 }
152
153 @Override
154 public CompletableFuture<Map<String, Change<?>>> diff(Revision from, Revision to, String pathPattern) {
155 requireNonNull(from, "from");
156 requireNonNull(to, "to");
157 requireNonNull(pathPattern, "pathPattern");
158
159 final RevisionRange range = normalizeNow(from, to).toAscending();
160 return cache.get(new CacheableMultiDiffCall(repo, range.from(), range.to(), pathPattern))
161 .handleAsync((unused, cause) -> {
162 throwUnsafelyIfNonNull(cause);
163 return unused;
164 }, executor());
165 }
166
167 @Override
168 public CompletableFuture<Revision> findLatestRevision(Revision lastKnownRevision, String pathPattern,
169 boolean errorOnEntryNotFound) {
170 requireNonNull(lastKnownRevision, "lastKnownRevision");
171 requireNonNull(pathPattern, "pathPattern");
172
173 final RevisionRange range = normalizeNow(lastKnownRevision, Revision.HEAD);
174 if (range.from().equals(range.to())) {
175
176 return CompletableFuture.completedFuture(null);
177 }
178
179 return cache.get(new CacheableFindLatestRevCall(repo, range.from(), range.to(),
180 pathPattern, errorOnEntryNotFound))
181 .handleAsync((result, cause) -> {
182 throwUnsafelyIfNonNull(cause);
183 if (result == CacheableFindLatestRevCall.ENTRY_NOT_FOUND) {
184 throw new EntryNotFoundException(range.from(), pathPattern);
185 }
186 if (result == CacheableFindLatestRevCall.EMPTY) {
187 return null;
188 }
189 return result;
190 }, executor());
191 }
192
193 @Override
194 public CompletableFuture<Revision> watch(Revision lastKnownRevision, String pathPattern,
195 boolean errorOnEntryNotFound) {
196 requireNonNull(lastKnownRevision, "lastKnownRevision");
197 requireNonNull(pathPattern, "pathPattern");
198
199 final CompletableFuture<Revision> latestRevFuture =
200 findLatestRevision(lastKnownRevision, pathPattern, errorOnEntryNotFound);
201 if (latestRevFuture.isCompletedExceptionally() || latestRevFuture.getNow(null) != null) {
202 return latestRevFuture;
203 }
204
205 final CompletableFuture<Revision> future = new CompletableFuture<>();
206 latestRevFuture.whenComplete((latestRevision, cause) -> {
207 if (cause != null) {
208 future.completeExceptionally(cause);
209 return;
210 }
211
212 if (latestRevision != null) {
213 future.complete(latestRevision);
214 return;
215 }
216
217
218 final CompletableFuture<Revision> watchFuture =
219 repo.watch(lastKnownRevision, pathPattern, errorOnEntryNotFound);
220 watchFuture.whenComplete((watchResult, watchCause) -> {
221 if (watchCause == null) {
222 future.complete(watchResult);
223 } else {
224 future.completeExceptionally(watchCause);
225 }
226 });
227
228
229
230
231
232
233
234
235
236
237
238
239 future.whenComplete(
240 (unused1, unused2) -> watchFuture.completeExceptionally(CANCELLATION_EXCEPTION));
241 });
242
243 return future;
244 }
245
246 @Override
247 public <T> CompletableFuture<MergedEntry<T>> mergeFiles(Revision revision, MergeQuery<T> query) {
248 requireNonNull(revision, "revision");
249 requireNonNull(query, "query");
250
251 final Revision normalizedRevision = normalizeNow(revision);
252 final CacheableMergeQueryCall key = new CacheableMergeQueryCall(repo, normalizedRevision, query);
253 final CompletableFuture<MergedEntry<?>> value = cache.getIfPresent(key);
254 if (value != null) {
255 return unsafeCast(value.handleAsync((unused, cause) -> {
256 throwUnsafelyIfNonNull(cause);
257 return unused;
258 }, executor()));
259 }
260
261 return Repository.super.mergeFiles(normalizedRevision, query).thenApply(mergedEntry -> {
262 key.computedValue(mergedEntry);
263 cache.get(key);
264 return mergedEntry;
265 });
266 }
267
268 private static Executor executor() {
269 return RequestContext.mapCurrent(RequestContext::eventLoop, CommonPools.workerGroup()::next);
270 }
271
272
273
274 @Override
275 public Project parent() {
276 return repo.parent();
277 }
278
279 @Override
280 public String name() {
281 return repo.name();
282 }
283
284 @Override
285 public Revision normalizeNow(Revision revision) {
286 return repo.normalizeNow(revision);
287 }
288
289 @Override
290 public RevisionRange normalizeNow(Revision from, Revision to) {
291 return repo.normalizeNow(from, to);
292 }
293
294 @Override
295 public CompletableFuture<Map<String, Change<?>>> previewDiff(Revision baseRevision,
296 Iterable<Change<?>> changes) {
297 requireNonNull(baseRevision, "baseRevision");
298 return repo.previewDiff(baseRevision, changes);
299 }
300
301 @Override
302 public CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis,
303 Author author, String summary, String detail, Markup markup,
304 Iterable<Change<?>> changes, boolean normalizing) {
305
306 return repo.commit(baseRevision, commitTimeMillis, author, summary, detail, markup, changes,
307 normalizing);
308 }
309
310 @Override
311 public String toString() {
312 return toStringHelper(this)
313 .add("repo", repo)
314 .toString();
315 }
316 }