1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.linecorp.centraldogma.server.internal.storage.repository.cache;
18
19 import static com.google.common.base.MoreObjects.toStringHelper;
20 import static com.linecorp.centraldogma.internal.Util.unsafeCast;
21 import static com.linecorp.centraldogma.server.internal.api.HttpApiUtil.throwUnsafelyIfNonNull;
22 import static com.linecorp.centraldogma.server.storage.repository.FindOptions.FIND_ALL_WITH_CONTENT;
23 import static java.util.Objects.requireNonNull;
24
25 import java.util.LinkedHashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.CancellationException;
29 import java.util.concurrent.CompletableFuture;
30 import java.util.concurrent.Executor;
31 import java.util.stream.Collectors;
32 import java.util.stream.Stream;
33
34 import com.google.common.collect.ImmutableMap;
35
36 import com.linecorp.armeria.common.CommonPools;
37 import com.linecorp.armeria.common.RequestContext;
38 import com.linecorp.armeria.common.util.Exceptions;
39 import com.linecorp.centraldogma.common.Author;
40 import com.linecorp.centraldogma.common.Change;
41 import com.linecorp.centraldogma.common.Commit;
42 import com.linecorp.centraldogma.common.Entry;
43 import com.linecorp.centraldogma.common.EntryNotFoundException;
44 import com.linecorp.centraldogma.common.Markup;
45 import com.linecorp.centraldogma.common.MergeQuery;
46 import com.linecorp.centraldogma.common.MergedEntry;
47 import com.linecorp.centraldogma.common.Query;
48 import com.linecorp.centraldogma.common.Revision;
49 import com.linecorp.centraldogma.common.RevisionRange;
50 import com.linecorp.centraldogma.server.command.CommitResult;
51 import com.linecorp.centraldogma.server.command.ContentTransformer;
52 import com.linecorp.centraldogma.server.internal.storage.repository.RepositoryCache;
53 import com.linecorp.centraldogma.server.internal.storage.repository.git.PathPatternFilter;
54 import com.linecorp.centraldogma.server.storage.project.Project;
55 import com.linecorp.centraldogma.server.storage.repository.CacheableCall;
56 import com.linecorp.centraldogma.server.storage.repository.DiffResultType;
57 import com.linecorp.centraldogma.server.storage.repository.FindOption;
58 import com.linecorp.centraldogma.server.storage.repository.Repository;
59 import com.linecorp.centraldogma.server.storage.repository.RepositoryListener;
60
61 final class CachingRepository implements Repository {
62
63 private static final CancellationException CANCELLATION_EXCEPTION =
64 Exceptions.clearTrace(new CancellationException("watch cancelled by caller"));
65
66 private final Repository repo;
67 private final RepositoryCache cache;
68
69 CachingRepository(Repository repo, RepositoryCache cache) {
70 this.repo = requireNonNull(repo, "repo");
71 this.cache = requireNonNull(cache, "cache");
72 }
73
74 @Override
75 public org.eclipse.jgit.lib.Repository jGitRepository() {
76 return repo.jGitRepository();
77 }
78
79 @Override
80 public long creationTimeMillis() {
81 return repo.creationTimeMillis();
82 }
83
84 @Override
85 public Author author() {
86 return repo.author();
87 }
88
89 @Override
90 public CompletableFuture<Entry<?>> getOrNull(Revision revision, String path) {
91 requireNonNull(revision, "revision");
92 requireNonNull(path, "path");
93
94 final Revision normalizedRevision = normalizeNow(revision);
95
96
97
98
99 return find(normalizedRevision, ALL_PATH, FIND_ALL_WITH_CONTENT).thenApply(all -> all.get(path));
100 }
101
102 @Override
103 public CompletableFuture<Map<String, Entry<?>>> find(Revision revision, String pathPattern,
104 Map<FindOption<?>, ?> options) {
105 requireNonNull(revision, "revision");
106 requireNonNull(pathPattern, "pathPattern");
107 requireNonNull(options, "options");
108
109 final Revision normalizedRevision = normalizeNow(revision);
110
111 Map<FindOption<?>, ?> cacheableOptions = options;
112 final Integer maxEntries = (Integer) options.get(FindOption.MAX_ENTRIES);
113 if (maxEntries != null) {
114 final ImmutableMap.Builder<FindOption<?>, Object> newOptions = ImmutableMap.builder();
115 options.forEach((key, value) -> {
116 if (key != FindOption.MAX_ENTRIES) {
117 newOptions.put(key, value);
118 }
119 });
120 cacheableOptions = newOptions.build();
121 }
122
123 return execute(new CacheableFindCall(repo, normalizedRevision, ALL_PATH, cacheableOptions))
124 .thenApply((all) -> {
125 if (all.isEmpty()) {
126 return all;
127 }
128
129 Stream<Map.Entry<String, Entry<?>>> stream = all.entrySet().stream();
130 if (!pathPattern.equals(ALL_PATH)) {
131 final PathPatternFilter filter = PathPatternFilter.of(pathPattern);
132 stream = stream.filter(entry -> filter.matches(entry.getKey()));
133 }
134 if (maxEntries != null) {
135 stream = stream.limit(maxEntries);
136 }
137
138
139 return stream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
140 (oldV, newV) -> oldV,
141 LinkedHashMap::new));
142 });
143 }
144
145 @Override
146 public CompletableFuture<List<Commit>> history(Revision from, Revision to,
147 String pathPattern, int maxCommits) {
148 requireNonNull(from, "from");
149 requireNonNull(to, "to");
150 requireNonNull(pathPattern, "pathPattern");
151 if (maxCommits <= 0) {
152 throw new IllegalArgumentException("maxCommits: " + maxCommits + " (expected: > 0)");
153 }
154
155 final RevisionRange range = normalizeNow(from, to);
156
157
158
159 final int actualMaxCommits = Math.min(
160 maxCommits, Math.abs(range.from().major() - range.to().major()) + 1);
161 return execute(new CacheableHistoryCall(repo, range.from(), range.to(), pathPattern, actualMaxCommits));
162 }
163
164 @Override
165 public CompletableFuture<Change<?>> diff(Revision from, Revision to, Query<?> query) {
166 requireNonNull(from, "from");
167 requireNonNull(to, "to");
168 requireNonNull(query, "query");
169
170 final RevisionRange range = normalizeNow(from, to).toAscending();
171 return execute(new CacheableSingleDiffCall(repo, range.from(), range.to(), query));
172 }
173
174 @Override
175 public CompletableFuture<Map<String, Change<?>>> diff(Revision from, Revision to, String pathPattern,
176 DiffResultType diffResultType) {
177 requireNonNull(from, "from");
178 requireNonNull(to, "to");
179 requireNonNull(pathPattern, "pathPattern");
180 requireNonNull(diffResultType, "diffResultType");
181
182 final RevisionRange range = normalizeNow(from, to).toAscending();
183 return execute(new CacheableMultiDiffCall(repo, range.from(), range.to(), pathPattern, diffResultType));
184 }
185
186 @Override
187 public CompletableFuture<Revision> findLatestRevision(Revision lastKnownRevision, String pathPattern,
188 boolean errorOnEntryNotFound) {
189 requireNonNull(lastKnownRevision, "lastKnownRevision");
190 requireNonNull(pathPattern, "pathPattern");
191
192 final RevisionRange range = normalizeNow(lastKnownRevision, Revision.HEAD);
193 if (range.from().equals(range.to())) {
194
195 return CompletableFuture.completedFuture(null);
196 }
197
198 return cache.get(new CacheableFindLatestRevCall(repo, range.from(), range.to(),
199 pathPattern, errorOnEntryNotFound))
200 .handleAsync((result, cause) -> {
201 throwUnsafelyIfNonNull(cause);
202 if (result == CacheableFindLatestRevCall.ENTRY_NOT_FOUND) {
203 throw new EntryNotFoundException(range.from(), pathPattern);
204 }
205 if (result == CacheableFindLatestRevCall.EMPTY) {
206 return null;
207 }
208 return result;
209 }, executor());
210 }
211
212 @Override
213 public CompletableFuture<Revision> watch(Revision lastKnownRevision, String pathPattern,
214 boolean errorOnEntryNotFound) {
215 requireNonNull(lastKnownRevision, "lastKnownRevision");
216 requireNonNull(pathPattern, "pathPattern");
217
218 final CompletableFuture<Revision> latestRevFuture =
219 findLatestRevision(lastKnownRevision, pathPattern, errorOnEntryNotFound);
220 if (latestRevFuture.isCompletedExceptionally() || latestRevFuture.getNow(null) != null) {
221 return latestRevFuture;
222 }
223
224 final CompletableFuture<Revision> future = new CompletableFuture<>();
225 latestRevFuture.whenComplete((latestRevision, cause) -> {
226 if (cause != null) {
227 future.completeExceptionally(cause);
228 return;
229 }
230
231 if (latestRevision != null) {
232 future.complete(latestRevision);
233 return;
234 }
235
236
237 final CompletableFuture<Revision> watchFuture =
238 repo.watch(lastKnownRevision, pathPattern, errorOnEntryNotFound);
239 watchFuture.whenComplete((watchResult, watchCause) -> {
240 if (watchCause == null) {
241 future.complete(watchResult);
242 } else {
243 future.completeExceptionally(watchCause);
244 }
245 });
246
247
248
249
250
251
252
253
254
255
256
257
258 future.whenComplete(
259 (unused1, unused2) -> watchFuture.completeExceptionally(CANCELLATION_EXCEPTION));
260 });
261
262 return future;
263 }
264
265 @Override
266 public <T> CompletableFuture<MergedEntry<T>> mergeFiles(Revision revision, MergeQuery<T> query) {
267 requireNonNull(revision, "revision");
268 requireNonNull(query, "query");
269
270 final Revision normalizedRevision = normalizeNow(revision);
271 return execute(new CacheableMergeQueryCall<>(repo, normalizedRevision, query));
272 }
273
274 @Override
275 public <T> CompletableFuture<T> execute(CacheableCall<T> cacheableCall) {
276 return unsafeCast(cache.get(cacheableCall).handleAsync((result, cause) -> {
277 throwUnsafelyIfNonNull(cause);
278 return result;
279 }, executor()));
280 }
281
282 @Override
283 public void addListener(RepositoryListener listener) {
284 repo.addListener(listener);
285 }
286
287 private static Executor executor() {
288 return RequestContext.mapCurrent(RequestContext::eventLoop, CommonPools.workerGroup()::next);
289 }
290
291
292
293 @Override
294 public Project parent() {
295 return repo.parent();
296 }
297
298 @Override
299 public String name() {
300 return repo.name();
301 }
302
303 @Override
304 public Revision normalizeNow(Revision revision) {
305 return repo.normalizeNow(revision);
306 }
307
308 @Override
309 public RevisionRange normalizeNow(Revision from, Revision to) {
310 return repo.normalizeNow(from, to);
311 }
312
313 @Override
314 public CompletableFuture<Map<String, Change<?>>> previewDiff(Revision baseRevision,
315 Iterable<Change<?>> changes) {
316 requireNonNull(baseRevision, "baseRevision");
317 return repo.previewDiff(baseRevision, changes);
318 }
319
320 @Override
321 public CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis,
322 Author author, String summary, String detail, Markup markup,
323 Iterable<Change<?>> changes, boolean normalizing) {
324
325 return repo.commit(baseRevision, commitTimeMillis, author, summary, detail, markup, changes,
326 normalizing);
327 }
328
329 @Override
330 public CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis, Author author,
331 String summary, String detail, Markup markup,
332 ContentTransformer<?> transformer) {
333 return repo.commit(baseRevision, commitTimeMillis, author, summary, detail, markup, transformer);
334 }
335
336 @Override
337 public String toString() {
338 return toStringHelper(this)
339 .add("repo", repo)
340 .toString();
341 }
342 }