1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.linecorp.centraldogma.server.internal.storage.repository.cache;
18  
19  import static com.google.common.base.MoreObjects.toStringHelper;
20  import static com.linecorp.centraldogma.internal.Util.unsafeCast;
21  import static com.linecorp.centraldogma.server.internal.api.HttpApiUtil.throwUnsafelyIfNonNull;
22  import static com.linecorp.centraldogma.server.storage.repository.FindOptions.FIND_ONE_WITH_CONTENT;
23  import static java.util.Objects.requireNonNull;
24  
25  import java.util.List;
26  import java.util.Map;
27  import java.util.concurrent.CancellationException;
28  import java.util.concurrent.CompletableFuture;
29  import java.util.concurrent.Executor;
30  
31  import com.linecorp.armeria.common.CommonPools;
32  import com.linecorp.armeria.common.RequestContext;
33  import com.linecorp.armeria.common.util.Exceptions;
34  import com.linecorp.centraldogma.common.Author;
35  import com.linecorp.centraldogma.common.Change;
36  import com.linecorp.centraldogma.common.Commit;
37  import com.linecorp.centraldogma.common.Entry;
38  import com.linecorp.centraldogma.common.EntryNotFoundException;
39  import com.linecorp.centraldogma.common.Markup;
40  import com.linecorp.centraldogma.common.MergeQuery;
41  import com.linecorp.centraldogma.common.MergedEntry;
42  import com.linecorp.centraldogma.common.Query;
43  import com.linecorp.centraldogma.common.QueryType;
44  import com.linecorp.centraldogma.common.Revision;
45  import com.linecorp.centraldogma.common.RevisionRange;
46  import com.linecorp.centraldogma.server.command.CommitResult;
47  import com.linecorp.centraldogma.server.internal.storage.repository.RepositoryCache;
48  import com.linecorp.centraldogma.server.storage.project.Project;
49  import com.linecorp.centraldogma.server.storage.repository.FindOption;
50  import com.linecorp.centraldogma.server.storage.repository.Repository;
51  
52  final class CachingRepository implements Repository {
53  
54      private static final CancellationException CANCELLATION_EXCEPTION =
55              Exceptions.clearTrace(new CancellationException("watch cancelled by caller"));
56  
57      private final Repository repo;
58      private final RepositoryCache cache;
59  
60      CachingRepository(Repository repo, RepositoryCache cache) {
61          this.repo = requireNonNull(repo, "repo");
62          this.cache = requireNonNull(cache, "cache");
63      }
64  
65      @Override
66      public long creationTimeMillis() {
67          return repo.creationTimeMillis();
68      }
69  
70      @Override
71      public Author author() {
72          return repo.author();
73      }
74  
75      @Override
76      public <T> CompletableFuture<Entry<T>> getOrNull(Revision revision, Query<T> query) {
77          requireNonNull(revision, "revision");
78          requireNonNull(query, "query");
79  
80          final Revision normalizedRevision = normalizeNow(revision);
81          if (query.type() == QueryType.IDENTITY || query.type() == QueryType.IDENTITY_TEXT ||
82              query.type() == QueryType.IDENTITY_JSON) {
83              // If the query is an IDENTITY type, call find() so that the caches are reused in one place when
84              // calls getOrNull(), find() and mergeFiles().
85              final String path = query.path();
86              final CompletableFuture<Entry<?>> future =
87                      find(revision, path, FIND_ONE_WITH_CONTENT).thenApply(findResult -> findResult.get(path));
88              return unsafeCast(future);
89          }
90  
91          final CompletableFuture<Object> future =
92                  cache.get(new CacheableQueryCall(repo, normalizedRevision, query))
93                       .handleAsync((result, cause) -> {
94                           throwUnsafelyIfNonNull(cause);
95                           return result != CacheableQueryCall.EMPTY ? result : null;
96                       }, executor());
97          return unsafeCast(future);
98      }
99  
100     @Override
101     public CompletableFuture<Map<String, Entry<?>>> find(Revision revision, String pathPattern,
102                                                          Map<FindOption<?>, ?> options) {
103         requireNonNull(revision, "revision");
104         requireNonNull(pathPattern, "pathPattern");
105         requireNonNull(options, "options");
106 
107         final Revision normalizedRevision = normalizeNow(revision);
108         return cache.get(new CacheableFindCall(repo, normalizedRevision, pathPattern, options))
109                     .handleAsync((unused, cause) -> {
110                         throwUnsafelyIfNonNull(cause);
111                         return unused;
112                     }, executor());
113     }
114 
115     @Override
116     public CompletableFuture<List<Commit>> history(Revision from, Revision to,
117                                                    String pathPattern, int maxCommits) {
118         requireNonNull(from, "from");
119         requireNonNull(to, "to");
120         requireNonNull(pathPattern, "pathPattern");
121         if (maxCommits <= 0) {
122             throw new IllegalArgumentException("maxCommits: " + maxCommits + " (expected: > 0)");
123         }
124 
125         final RevisionRange range = normalizeNow(from, to);
126 
127         // Make sure maxCommits do not exceed its theoretical limit to increase the chance of cache hit.
128         // e.g. when from = 2 and to = 4, the same result should be yielded when maxCommits >= 3.
129         final int actualMaxCommits = Math.min(
130                 maxCommits, Math.abs(range.from().major() - range.to().major()) + 1);
131         return cache.get(new CacheableHistoryCall(repo, range.from(), range.to(),
132                                                   pathPattern, actualMaxCommits))
133                     .handleAsync((unused, cause) -> {
134                         throwUnsafelyIfNonNull(cause);
135                         return unused;
136                     }, executor());
137     }
138 
139     @Override
140     public CompletableFuture<Change<?>> diff(Revision from, Revision to, Query<?> query) {
141         requireNonNull(from, "from");
142         requireNonNull(to, "to");
143         requireNonNull(query, "query");
144 
145         final RevisionRange range = normalizeNow(from, to).toAscending();
146         return cache.get(new CacheableSingleDiffCall(repo, range.from(), range.to(), query))
147                     .handleAsync((unused, cause) -> {
148                         throwUnsafelyIfNonNull(cause);
149                         return unused;
150                     }, executor());
151     }
152 
153     @Override
154     public CompletableFuture<Map<String, Change<?>>> diff(Revision from, Revision to, String pathPattern) {
155         requireNonNull(from, "from");
156         requireNonNull(to, "to");
157         requireNonNull(pathPattern, "pathPattern");
158 
159         final RevisionRange range = normalizeNow(from, to).toAscending();
160         return cache.get(new CacheableMultiDiffCall(repo, range.from(), range.to(), pathPattern))
161                     .handleAsync((unused, cause) -> {
162                         throwUnsafelyIfNonNull(cause);
163                         return unused;
164                     }, executor());
165     }
166 
167     @Override
168     public CompletableFuture<Revision> findLatestRevision(Revision lastKnownRevision, String pathPattern,
169                                                           boolean errorOnEntryNotFound) {
170         requireNonNull(lastKnownRevision, "lastKnownRevision");
171         requireNonNull(pathPattern, "pathPattern");
172 
173         final RevisionRange range = normalizeNow(lastKnownRevision, Revision.HEAD);
174         if (range.from().equals(range.to())) {
175             // Empty range.
176             return CompletableFuture.completedFuture(null);
177         }
178 
179         return cache.get(new CacheableFindLatestRevCall(repo, range.from(), range.to(),
180                                                         pathPattern, errorOnEntryNotFound))
181                     .handleAsync((result, cause) -> {
182                         throwUnsafelyIfNonNull(cause);
183                         if (result == CacheableFindLatestRevCall.ENTRY_NOT_FOUND) {
184                             throw new EntryNotFoundException(range.from(), pathPattern);
185                         }
186                         if (result == CacheableFindLatestRevCall.EMPTY) {
187                             return null;
188                         }
189                         return result;
190                     }, executor());
191     }
192 
193     @Override
194     public CompletableFuture<Revision> watch(Revision lastKnownRevision, String pathPattern,
195                                              boolean errorOnEntryNotFound) {
196         requireNonNull(lastKnownRevision, "lastKnownRevision");
197         requireNonNull(pathPattern, "pathPattern");
198 
199         final CompletableFuture<Revision> latestRevFuture =
200                 findLatestRevision(lastKnownRevision, pathPattern, errorOnEntryNotFound);
201         if (latestRevFuture.isCompletedExceptionally() || latestRevFuture.getNow(null) != null) {
202             return latestRevFuture;
203         }
204 
205         final CompletableFuture<Revision> future = new CompletableFuture<>();
206         latestRevFuture.whenComplete((latestRevision, cause) -> {
207             if (cause != null) {
208                 future.completeExceptionally(cause);
209                 return;
210             }
211 
212             if (latestRevision != null) {
213                 future.complete(latestRevision);
214                 return;
215             }
216 
217             // Propagate the state of 'watchFuture' to 'future'.
218             final CompletableFuture<Revision> watchFuture =
219                     repo.watch(lastKnownRevision, pathPattern, errorOnEntryNotFound);
220             watchFuture.whenComplete((watchResult, watchCause) -> {
221                 if (watchCause == null) {
222                     future.complete(watchResult);
223                 } else {
224                     future.completeExceptionally(watchCause);
225                 }
226             });
227 
228             // Cancel the 'watchFuture' if 'future' is complete. 'future' is complete on the following cases:
229             //
230             // 1) The state of 'watchFuture' has been propagated to 'future' by the callback we registered
231             //    above. In this case, 'watchFuture.completeExceptionally()' call below has no effect because
232             //    'watchFuture' is complete already.
233             //
234             // 2) A user completed 'future' by his or her own, most often for cancellation.
235             //    'watchFuture' will be completed by 'watchFuture.completeExceptionally()' below.
236             //    The callback we registered to 'watchFuture' above will have no effect because 'future' is
237             //    complete already.
238 
239             future.whenComplete(
240                     (unused1, unused2) -> watchFuture.completeExceptionally(CANCELLATION_EXCEPTION));
241         });
242 
243         return future;
244     }
245 
246     @Override
247     public <T> CompletableFuture<MergedEntry<T>> mergeFiles(Revision revision, MergeQuery<T> query) {
248         requireNonNull(revision, "revision");
249         requireNonNull(query, "query");
250 
251         final Revision normalizedRevision = normalizeNow(revision);
252         final CacheableMergeQueryCall key = new CacheableMergeQueryCall(repo, normalizedRevision, query);
253         final CompletableFuture<MergedEntry<?>> value = cache.getIfPresent(key);
254         if (value != null) {
255             return unsafeCast(value.handleAsync((unused, cause) -> {
256                 throwUnsafelyIfNonNull(cause);
257                 return unused;
258             }, executor()));
259         }
260 
261         return Repository.super.mergeFiles(normalizedRevision, query).thenApply(mergedEntry -> {
262             key.computedValue(mergedEntry);
263             cache.get(key);
264             return mergedEntry;
265         });
266     }
267 
268     private static Executor executor() {
269         return RequestContext.mapCurrent(RequestContext::eventLoop, CommonPools.workerGroup()::next);
270     }
271 
272     // Simple delegations
273 
274     @Override
275     public Project parent() {
276         return repo.parent();
277     }
278 
279     @Override
280     public String name() {
281         return repo.name();
282     }
283 
284     @Override
285     public Revision normalizeNow(Revision revision) {
286         return repo.normalizeNow(revision);
287     }
288 
289     @Override
290     public RevisionRange normalizeNow(Revision from, Revision to) {
291         return repo.normalizeNow(from, to);
292     }
293 
294     @Override
295     public CompletableFuture<Map<String, Change<?>>> previewDiff(Revision baseRevision,
296                                                                  Iterable<Change<?>> changes) {
297         requireNonNull(baseRevision, "baseRevision");
298         return repo.previewDiff(baseRevision, changes);
299     }
300 
301     @Override
302     public CompletableFuture<CommitResult> commit(Revision baseRevision, long commitTimeMillis,
303                                                   Author author, String summary, String detail, Markup markup,
304                                                   Iterable<Change<?>> changes, boolean normalizing) {
305 
306         return repo.commit(baseRevision, commitTimeMillis, author, summary, detail, markup, changes,
307                            normalizing);
308     }
309 
310     @Override
311     public String toString() {
312         return toStringHelper(this)
313                 .add("repo", repo)
314                 .toString();
315     }
316 }