1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.linecorp.centraldogma.server.internal.storage.repository;
18
19 import static java.util.Objects.requireNonNull;
20
21 import java.util.concurrent.CompletableFuture;
22
23 import javax.annotation.Nullable;
24
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import com.github.benmanes.caffeine.cache.AsyncCache;
29 import com.github.benmanes.caffeine.cache.Caffeine;
30 import com.github.benmanes.caffeine.cache.CaffeineSpec;
31 import com.github.benmanes.caffeine.cache.Weigher;
32 import com.google.common.base.MoreObjects;
33
34 import com.linecorp.armeria.common.util.SafeCloseable;
35 import com.linecorp.armeria.internal.common.RequestContextUtil;
36 import com.linecorp.centraldogma.server.storage.repository.CacheableCall;
37
38 import io.micrometer.core.instrument.MeterRegistry;
39 import io.micrometer.core.instrument.binder.cache.CaffeineCacheMetrics;
40
41 public final class RepositoryCache {
42
43 public static final Logger logger = LoggerFactory.getLogger(RepositoryCache.class);
44
45 @Nullable
46 public static String validateCacheSpec(@Nullable String cacheSpec) {
47 if (cacheSpec == null) {
48 return null;
49 }
50
51 try {
52 CaffeineSpec.parse(cacheSpec);
53 return cacheSpec;
54 } catch (IllegalArgumentException e) {
55 throw new IllegalArgumentException("cacheSpec: " + cacheSpec + " (" + e.getMessage() + ')');
56 }
57 }
58
59 @SuppressWarnings("rawtypes")
60 private final AsyncCache<CacheableCall, Object> cache;
61 private final String cacheSpec;
62
63 @SuppressWarnings({ "rawtypes", "unchecked" })
64 public RepositoryCache(String cacheSpec, MeterRegistry meterRegistry) {
65 this.cacheSpec = requireNonNull(validateCacheSpec(cacheSpec), "cacheSpec");
66 requireNonNull(meterRegistry, "meterRegistry");
67
68 final Caffeine<Object, Object> builder = Caffeine.from(cacheSpec);
69 if (cacheSpec.contains("maximumWeight=")) {
70 builder.weigher((Weigher<CacheableCall, Object>) CacheableCall::weigh);
71 }
72 cache = builder.recordStats()
73 .buildAsync();
74
75 CaffeineCacheMetrics.monitor(meterRegistry, cache, "repository");
76 }
77
78 public <T> CompletableFuture<T> get(CacheableCall<T> call) {
79 requireNonNull(call, "call");
80 final CompletableFuture<T> future = new CompletableFuture<>();
81
82 final CompletableFuture<T> prior =
83 (CompletableFuture<T>) cache.asMap().putIfAbsent(call, (CompletableFuture<Object>) future);
84 if (prior != null) {
85 return prior;
86 }
87
88 call.execute().handle((result, cause) -> {
89 try (SafeCloseable ignored = RequestContextUtil.pop()) {
90 if (cause != null) {
91 future.completeExceptionally(cause);
92 } else {
93 future.complete(result);
94 }
95 }
96 return null;
97 });
98 return future;
99 }
100
101 public void clear() {
102 cache.synchronous().invalidateAll();
103 }
104
105 @Override
106 public String toString() {
107 return MoreObjects.toStringHelper(this)
108 .add("spec", cacheSpec)
109 .add("stats", cache.synchronous().stats())
110 .toString();
111 }
112 }