1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.linecorp.centraldogma.server.internal.storage.repository.cache;
18  
19  import static com.google.common.base.MoreObjects.toStringHelper;
20  import static com.linecorp.centraldogma.internal.Util.unsafeCast;
21  import static com.linecorp.centraldogma.server.internal.api.HttpApiUtil.throwUnsafelyIfNonNull;
22  import static com.linecorp.centraldogma.server.storage.repository.FindOptions.FIND_ALL_WITH_CONTENT;
23  import static java.util.Objects.requireNonNull;
24  
25  import java.util.LinkedHashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.concurrent.CancellationException;
29  import java.util.concurrent.CompletableFuture;
30  import java.util.concurrent.Executor;
31  import java.util.stream.Collectors;
32  import java.util.stream.Stream;
33  
34  import com.google.common.collect.ImmutableMap;
35  
36  import com.linecorp.armeria.common.CommonPools;
37  import com.linecorp.armeria.common.RequestContext;
38  import com.linecorp.armeria.common.util.Exceptions;
39  import com.linecorp.centraldogma.common.Author;
40  import com.linecorp.centraldogma.common.Change;
41  import com.linecorp.centraldogma.common.Commit;
42  import com.linecorp.centraldogma.common.Entry;
43  import com.linecorp.centraldogma.common.EntryNotFoundException;
44  import com.linecorp.centraldogma.common.Markup;
45  import com.linecorp.centraldogma.common.MergeQuery;
46  import com.linecorp.centraldogma.common.MergedEntry;
47  import com.linecorp.centraldogma.common.Query;
48  import com.linecorp.centraldogma.common.Revision;
49  import com.linecorp.centraldogma.common.RevisionRange;
50  import com.linecorp.centraldogma.server.command.CommitResult;
51  import com.linecorp.centraldogma.server.command.ContentTransformer;
52  import com.linecorp.centraldogma.server.internal.storage.repository.RepositoryCache;
53  import com.linecorp.centraldogma.server.internal.storage.repository.git.PathPatternFilter;
54  import com.linecorp.centraldogma.server.storage.project.Project;
55  import com.linecorp.centraldogma.server.storage.repository.CacheableCall;
56  import com.linecorp.centraldogma.server.storage.repository.DiffResultType;
57  import com.linecorp.centraldogma.server.storage.repository.FindOption;
58  import com.linecorp.centraldogma.server.storage.repository.Repository;
59  import com.linecorp.centraldogma.server.storage.repository.RepositoryListener;
60  
61  final class CachingRepository implements Repository {
62  
63      private static final CancellationException CANCELLATION_EXCEPTION =
64              Exceptions.clearTrace(new CancellationException("watch cancelled by caller"));
65  
66      private final Repository repo;
67      private final RepositoryCache cache;
68  
69      CachingRepository(Repository repo, RepositoryCache cache) {
70          this.repo = requireNonNull(repo, "repo");
71          this.cache = requireNonNull(cache, "cache");
72      }
73  
74      @Override
75      public org.eclipse.jgit.lib.Repository jGitRepository() {
76          return repo.jGitRepository();
77      }
78  
79      @Override
80      public long creationTimeMillis() {
81          return repo.creationTimeMillis();
82      }
83  
84      @Override
85      public Author author() {
86          return repo.author();
87      }
88  
89      @Override
90      public CompletableFuture<Entry<?>> getOrNull(Revision revision, String path) {
91          requireNonNull(revision, "revision");
92          requireNonNull(path, "path");
93  
94          final Revision normalizedRevision = normalizeNow(revision);
95          // The size of the repositories in Central Dogma are relatively small. Therefore, caching the entire
96          // repo rather than caching each file separately would result in a higher cache hit rate. Additionally,
97          // when checking repository access patterns, it was found that a client tends to send multiple queries
98          // to fully scan repository files.
99          return find(normalizedRevision, ALL_PATH, FIND_ALL_WITH_CONTENT).thenApply(all -> all.get(path));
100     }
101 
102     @Override
103     public CompletableFuture<Map<String, Entry<?>>> find(Revision revision, String pathPattern,
104                                                          Map<FindOption<?>, ?> options) {
105         requireNonNull(revision, "revision");
106         requireNonNull(pathPattern, "pathPattern");
107         requireNonNull(options, "options");
108 
109         final Revision normalizedRevision = normalizeNow(revision);
110 
111         Map<FindOption<?>, ?> cacheableOptions = options;
112         final Integer maxEntries = (Integer) options.get(FindOption.MAX_ENTRIES);
113         if (maxEntries != null) {
114             final ImmutableMap.Builder<FindOption<?>, Object> newOptions = ImmutableMap.builder();
115             options.forEach((key, value) -> {
116                 if (key != FindOption.MAX_ENTRIES) {
117                     newOptions.put(key, value);
118                 }
119             });
120             cacheableOptions = newOptions.build();
121         }
122 
123         return execute(new CacheableFindCall(repo, normalizedRevision, ALL_PATH, cacheableOptions))
124                     .thenApply((all) -> {
125                         if (all.isEmpty()) {
126                             return all;
127                         }
128 
129                         Stream<Map.Entry<String, Entry<?>>> stream = all.entrySet().stream();
130                         if (!pathPattern.equals(ALL_PATH)) {
131                             final PathPatternFilter filter = PathPatternFilter.of(pathPattern);
132                             stream = stream.filter(entry -> filter.matches(entry.getKey()));
133                         }
134                         if (maxEntries != null) {
135                             stream = stream.limit(maxEntries);
136                         }
137 
138                         // Use LinkedHashMap to 1) keep the order and 2) allow callers to mutate it.
139                         return stream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
140                                                                (oldV, newV) -> oldV,
141                                                                LinkedHashMap::new));
142                     });
143     }
144 
145     @Override
146     public CompletableFuture<List<Commit>> history(Revision from, Revision to,
147                                                    String pathPattern, int maxCommits) {
148         requireNonNull(from, "from");
149         requireNonNull(to, "to");
150         requireNonNull(pathPattern, "pathPattern");
151         if (maxCommits <= 0) {
152             throw new IllegalArgumentException("maxCommits: " + maxCommits + " (expected: > 0)");
153         }
154 
155         final RevisionRange range = normalizeNow(from, to);
156 
157         // Make sure maxCommits do not exceed its theoretical limit to increase the chance of cache hit.
158         // e.g. when from = 2 and to = 4, the same result should be yielded when maxCommits >= 3.
159         final int actualMaxCommits = Math.min(
160                 maxCommits, Math.abs(range.from().major() - range.to().major()) + 1);
161         return execute(new CacheableHistoryCall(repo, range.from(), range.to(), pathPattern, actualMaxCommits));
162     }
163 
164     @Override
165     public CompletableFuture<Change<?>> diff(Revision from, Revision to, Query<?> query) {
166         requireNonNull(from, "from");
167         requireNonNull(to, "to");
168         requireNonNull(query, "query");
169 
170         final RevisionRange range = normalizeNow(from, to).toAscending();
171         return execute(new CacheableSingleDiffCall(repo, range.from(), range.to(), query));
172     }
173 
174     @Override
175     public CompletableFuture<Map<String, Change<?>>> diff(Revision from, Revision to, String pathPattern,
176                                                           DiffResultType diffResultType) {
177         requireNonNull(from, "from");
178         requireNonNull(to, "to");
179         requireNonNull(pathPattern, "pathPattern");
180         requireNonNull(diffResultType, "diffResultType");
181 
182         final RevisionRange range = normalizeNow(from, to).toAscending();
183         return execute(new CacheableMultiDiffCall(repo, range.from(), range.to(), pathPattern, diffResultType));
184     }
185 
186     @Override
187     public CompletableFuture<Revision> findLatestRevision(Revision lastKnownRevision, String pathPattern,
188                                                           boolean errorOnEntryNotFound) {
189         requireNonNull(lastKnownRevision, "lastKnownRevision");
190         requireNonNull(pathPattern, "pathPattern");
191 
192         final RevisionRange range = normalizeNow(lastKnownRevision, Revision.HEAD);
193         if (range.from().equals(range.to())) {
194             // Empty range.
195             return CompletableFuture.completedFuture(null);
196         }
197 
198         return cache.get(new CacheableFindLatestRevCall(repo, range.from(), range.to(),
199                                                         pathPattern, errorOnEntryNotFound))
200                     .handleAsync((result, cause) -> {
201                         throwUnsafelyIfNonNull(cause);
202                         if (result == CacheableFindLatestRevCall.ENTRY_NOT_FOUND) {
203                             throw new EntryNotFoundException(range.from(), pathPattern);
204                         }
205                         if (result == CacheableFindLatestRevCall.EMPTY) {
206                             return null;
207                         }
208                         return result;
209                     }, executor());
210     }
211 
212     @Override
213     public CompletableFuture<Revision> watch(Revision lastKnownRevision, String pathPattern,
214                                              boolean errorOnEntryNotFound) {
215         requireNonNull(lastKnownRevision, "lastKnownRevision");
216         requireNonNull(pathPattern, "pathPattern");
217 
218         final CompletableFuture<Revision> latestRevFuture =
219                 findLatestRevision(lastKnownRevision, pathPattern, errorOnEntryNotFound);
220         if (latestRevFuture.isCompletedExceptionally() || latestRevFuture.getNow(null) != null) {
221             return latestRevFuture;
222         }
223 
224         final CompletableFuture<Revision> future = new CompletableFuture<>();
225         latestRevFuture.whenComplete((latestRevision, cause) -> {
226             if (cause != null) {
227                 future.completeExceptionally(cause);
228                 return;
229             }
230 
231             if (latestRevision != null) {
232                 future.complete(latestRevision);
233                 return;
234             }
235 
236             // Propagate the state of 'watchFuture' to 'future'.
237             final CompletableFuture<Revision> watchFuture =
238                     repo.watch(lastKnownRevision, pathPattern, errorOnEntryNotFound);
239             watchFuture.whenComplete((watchResult, watchCause) -> {
240                 if (watchCause == null) {
241                     future.complete(watchResult);
242                 } else {
243                     future.completeExceptionally(watchCause);
244                 }
245             });
246 
247             // Cancel the 'watchFuture' if 'future' is complete. 'future' is complete on the following cases:
248             //
249             // 1) The state of 'watchFuture' has been propagated to 'future' by the callback we registered
250             //    above. In this case, 'watchFuture.completeExceptionally()' call below has no effect because
251             //    'watchFuture' is complete already.
252             //
253             // 2) A user completed 'future' by his or her own, most often for cancellation.
254             //    'watchFuture' will be completed by 'watchFuture.completeExceptionally()' below.
255             //    The callback we registered to 'watchFuture' above will have no effect because 'future' is
256             //    complete already.
257 
258             future.whenComplete(
259                     (unused1, unused2) -> watchFuture.completeExceptionally(CANCELLATION_EXCEPTION));
260         });
261 
262         return future;
263     }
264 
265     @Override
266     public <T> CompletableFuture<MergedEntry<T>> mergeFiles(Revision revision, MergeQuery<T> query) {
267         requireNonNull(revision, "revision");
268         requireNonNull(query, "query");
269 
270         final Revision normalizedRevision = normalizeNow(revision);
271         return execute(new CacheableMergeQueryCall<>(repo, normalizedRevision, query));
272     }
273 
274     @Override
275     public <T> CompletableFuture<T> execute(CacheableCall<T> cacheableCall) {
276         return unsafeCast(cache.get(cacheableCall).handleAsync((result, cause) -> {
277             throwUnsafelyIfNonNull(cause);
278             return result;
279         }, executor()));
280     }
281 
282     @Override
283     public void addListener(RepositoryListener listener) {
284         repo.addListener(listener);
285     }
286 
287     private static Executor executor() {
288         return RequestContext.mapCurrent(RequestContext::eventLoop, CommonPools.workerGroup()::next);
289     }
290 
291     // Simple delegations
292 
293     @Override
294     public Project parent() {
295         return repo.parent();
296     }
297 
298     @Override
299     public String name() {
300         return repo.name();
301     }
302 
303     @Override
304     public Revision normalizeNow(Revision revision) {
305         return repo.normalizeNow(revision);
306     }
307 
308     @Override
309     public RevisionRange normalizeNow(Revision from, Revision to) {
310         return repo.normalizeNow(from, to);
311     }
312 
313     @Override
314     public CompletableFuture<Map<String, Change<?>>> previewDiff(Revision baseRevision,
315                                                                  Iterable<Change<?>> changes) {
316         requireNonNull(baseRevision, "baseRevision");
317         return repo.previewDiff(baseRevision, changes);
318     }
319 
320     @Override
321     public CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis,
322                                                   Author author, String summary, String detail, Markup markup,
323                                                   Iterable<Change<?>> changes, boolean normalizing) {
324 
325         return repo.commit(baseRevision, commitTimeMillis, author, summary, detail, markup, changes,
326                            normalizing);
327     }
328 
329     @Override
330     public CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis, Author author,
331                                                   String summary, String detail, Markup markup,
332                                                   ContentTransformer<?> transformer) {
333         return repo.commit(baseRevision, commitTimeMillis, author, summary, detail, markup, transformer);
334     }
335 
336     @Override
337     public String toString() {
338         return toStringHelper(this)
339                 .add("repo", repo)
340                 .toString();
341     }
342 }