1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.linecorp.centraldogma.server.internal.storage.repository.git;
18
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.IdentityHashMap;
22 import java.util.Iterator;
23 import java.util.LinkedHashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Map.Entry;
27 import java.util.Set;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.function.Supplier;
30
31 import javax.annotation.Nullable;
32
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import com.google.common.annotations.VisibleForTesting;
37
38 import com.linecorp.centraldogma.common.CentralDogmaException;
39 import com.linecorp.centraldogma.common.Revision;
40
41 final class CommitWatchers {
42
43 private static final Logger logger = LoggerFactory.getLogger(CommitWatchers.class);
44
45 @VisibleForTesting
46 final Map<PathPatternFilter, Set<Watch>> watchesMap = new WatcherMap(8192);
47
48 void add(Revision lastKnownRev, String pathPattern, CompletableFuture<Revision> future) {
49 add0(PathPatternFilter.of(pathPattern), new Watch(lastKnownRev, future));
50 }
51
52 private void add0(final PathPatternFilter pathPattern, Watch watch) {
53 synchronized (watchesMap) {
54 final Set<Watch> watches =
55 watchesMap.computeIfAbsent(pathPattern,
56 k -> Collections.newSetFromMap(new IdentityHashMap<>()));
57 watches.add(watch);
58 }
59
60 watch.future.whenComplete((revision, cause) -> {
61 if (watch.removed) {
62 return;
63 }
64
65
66
67 synchronized (watchesMap) {
68 final Set<Watch> watches = watchesMap.get(pathPattern);
69 watches.remove(watch);
70 if (watches.isEmpty()) {
71 watchesMap.remove(pathPattern);
72 }
73 }
74 });
75 }
76
77 void notify(Revision revision, String path) {
78 List<Watch> eligibleWatches = null;
79 synchronized (watchesMap) {
80 if (watchesMap.isEmpty()) {
81 return;
82 }
83
84 for (final Iterator<Entry<PathPatternFilter, Set<Watch>>> mapIt = watchesMap.entrySet().iterator();
85 mapIt.hasNext();) {
86
87 final Entry<PathPatternFilter, Set<Watch>> entry = mapIt.next();
88 if (!entry.getKey().matches(path)) {
89 continue;
90 }
91
92 final Set<Watch> watches = entry.getValue();
93 for (final Iterator<Watch> i = watches.iterator(); i.hasNext();) {
94 final Watch w = i.next();
95 final Revision lastKnownRevision = w.lastKnownRevision;
96 if (lastKnownRevision.compareTo(revision) < 0) {
97 eligibleWatches = move(eligibleWatches, i, w);
98 } else {
99 logIneligibleFuture(lastKnownRevision, revision);
100 }
101 }
102
103 if (watches.isEmpty()) {
104 mapIt.remove();
105 }
106 }
107 }
108
109 if (eligibleWatches == null) {
110 return;
111 }
112
113
114 final int numEligiblePromises = eligibleWatches.size();
115 for (int i = 0; i < numEligiblePromises; i++) {
116 eligibleWatches.get(i).future.complete(revision);
117 }
118 }
119
120 void close(Supplier<CentralDogmaException> causeSupplier) {
121 List<Watch> eligibleWatches = null;
122 synchronized (watchesMap) {
123 for (final Set<Watch> watches : watchesMap.values()) {
124 for (final Iterator<Watch> i = watches.iterator(); i.hasNext();) {
125 final Watch w = i.next();
126 eligibleWatches = move(eligibleWatches, i, w);
127 }
128 }
129 }
130
131 if (eligibleWatches == null) {
132 return;
133 }
134
135
136 final CentralDogmaException cause = causeSupplier.get();
137 final int numEligiblePromises = eligibleWatches.size();
138 for (int i = 0; i < numEligiblePromises; i++) {
139 eligibleWatches.get(i).future.completeExceptionally(cause);
140 }
141 }
142
143 private static List<Watch> move(@Nullable List<Watch> watches, Iterator<Watch> i, Watch w) {
144 i.remove();
145 w.removed = true;
146
147 if (watches == null) {
148 watches = new ArrayList<>();
149 }
150
151 watches.add(w);
152 return watches;
153 }
154
155 private static void logIneligibleFuture(Revision lastKnownRevision, Revision newRevision) {
156 logger.debug("Not notifying a future with same or newer lastKnownRevision: {} (newRevision: {})",
157 lastKnownRevision, newRevision);
158 }
159
160 private static final class WatcherMap
161 extends LinkedHashMap<PathPatternFilter, Set<Watch>> {
162
163 private static final long serialVersionUID = 6793455658134063005L;
164
165 private final int maxEntries;
166
167 WatcherMap(int maxEntries) {
168 super(maxEntries, 0.75f, true);
169 this.maxEntries = maxEntries;
170 }
171
172 @Override
173 protected boolean removeEldestEntry(Entry<PathPatternFilter, Set<Watch>> eldest) {
174
175 return size() > maxEntries && eldest.getValue().isEmpty();
176 }
177 }
178
179 private static final class Watch {
180
181 final Revision lastKnownRevision;
182 final CompletableFuture<Revision> future;
183 volatile boolean removed;
184
185 Watch(Revision lastKnownRevision, CompletableFuture<Revision> future) {
186 this.lastKnownRevision = lastKnownRevision;
187 this.future = future;
188 }
189 }
190 }