1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.linecorp.centraldogma.server.internal.storage.repository.git;
18
19 import java.util.ArrayList;
20 import java.util.Collections;
21 import java.util.IdentityHashMap;
22 import java.util.Iterator;
23 import java.util.LinkedHashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.Map.Entry;
27 import java.util.Set;
28 import java.util.concurrent.CompletableFuture;
29 import java.util.function.Supplier;
30
31 import javax.annotation.Nullable;
32
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import com.google.common.annotations.VisibleForTesting;
37
38 import com.linecorp.centraldogma.common.CentralDogmaException;
39 import com.linecorp.centraldogma.common.Revision;
40 import com.linecorp.centraldogma.server.internal.storage.repository.git.Watch.WatchListener;
41
42 final class CommitWatchers {
43
44 private static final Logger logger = LoggerFactory.getLogger(CommitWatchers.class);
45
46 @VisibleForTesting
47 final Map<PathPatternFilter, Set<Watch>> watchesMap = new WatcherMap(8192);
48
49 void add(Revision lastKnownRev, String pathPattern,
50 @Nullable CompletableFuture<Revision> future, @Nullable WatchListener listener) {
51 add0(PathPatternFilter.of(pathPattern), new Watch(lastKnownRev, future, listener));
52 }
53
54 private void add0(final PathPatternFilter pathPattern, Watch watch) {
55 synchronized (watchesMap) {
56 final Set<Watch> watches =
57 watchesMap.computeIfAbsent(pathPattern,
58 k -> Collections.newSetFromMap(new IdentityHashMap<>()));
59 watches.add(watch);
60 }
61
62 final CompletableFuture<Revision> future = watch.future();
63 if (future == null) {
64 return;
65 }
66 future.whenComplete((revision, cause) -> {
67 if (watch.wasRemoved()) {
68 return;
69 }
70
71
72
73 synchronized (watchesMap) {
74 final Set<Watch> watches = watchesMap.get(pathPattern);
75 watches.remove(watch);
76 if (watches.isEmpty()) {
77 watchesMap.remove(pathPattern);
78 }
79 }
80 });
81 }
82
83 void notify(Revision revision, String path) {
84 List<Watch> eligibleWatches = null;
85 synchronized (watchesMap) {
86 if (watchesMap.isEmpty()) {
87 return;
88 }
89
90 for (final Iterator<Entry<PathPatternFilter, Set<Watch>>> mapIt = watchesMap.entrySet().iterator();
91 mapIt.hasNext();) {
92
93 final Entry<PathPatternFilter, Set<Watch>> entry = mapIt.next();
94 if (!entry.getKey().matches(path)) {
95 continue;
96 }
97
98 final Set<Watch> watches = entry.getValue();
99 for (final Iterator<Watch> i = watches.iterator(); i.hasNext();) {
100 final Watch w = i.next();
101 final Revision lastKnownRevision = w.lastKnownRevision();
102 if (lastKnownRevision.compareTo(revision) < 0) {
103 eligibleWatches = move(eligibleWatches, i, w);
104 } else {
105 logIneligibleFuture(lastKnownRevision, revision);
106 }
107 }
108
109 if (watches.isEmpty()) {
110 mapIt.remove();
111 }
112 }
113 }
114
115 if (eligibleWatches == null) {
116 return;
117 }
118
119
120 final int numEligiblePromises = eligibleWatches.size();
121 for (int i = 0; i < numEligiblePromises; i++) {
122 eligibleWatches.get(i).notify(revision);
123 }
124 }
125
126 void close(Supplier<CentralDogmaException> causeSupplier) {
127 List<Watch> eligibleWatches = null;
128 synchronized (watchesMap) {
129 for (final Set<Watch> watches : watchesMap.values()) {
130 for (final Iterator<Watch> i = watches.iterator(); i.hasNext();) {
131 final Watch w = i.next();
132 if (!w.canRemove()) {
133
134 i.remove();
135 } else {
136 eligibleWatches = move(eligibleWatches, i, w);
137 }
138 }
139 }
140 }
141
142 if (eligibleWatches == null) {
143 return;
144 }
145
146
147 final CentralDogmaException cause = causeSupplier.get();
148 final int numEligiblePromises = eligibleWatches.size();
149 for (int i = 0; i < numEligiblePromises; i++) {
150 eligibleWatches.get(i).notifyFailure(cause);
151 }
152 }
153
154 private static List<Watch> move(@Nullable List<Watch> watches, Iterator<Watch> i, Watch w) {
155 if (w.canRemove()) {
156 i.remove();
157 w.remove();
158 }
159
160 if (watches == null) {
161 watches = new ArrayList<>();
162 }
163
164 watches.add(w);
165 return watches;
166 }
167
168 private static void logIneligibleFuture(Revision lastKnownRevision, Revision newRevision) {
169 logger.debug("Not notifying a future with same or newer lastKnownRevision: {} (newRevision: {})",
170 lastKnownRevision, newRevision);
171 }
172
173 private static final class WatcherMap
174 extends LinkedHashMap<PathPatternFilter, Set<Watch>> {
175
176 private static final long serialVersionUID = 6793455658134063005L;
177
178 private final int maxEntries;
179
180 WatcherMap(int maxEntries) {
181 super(maxEntries, 0.75f, true);
182 this.maxEntries = maxEntries;
183 }
184
185 @Override
186 protected boolean removeEldestEntry(Entry<PathPatternFilter, Set<Watch>> eldest) {
187
188 return size() > maxEntries && eldest.getValue().isEmpty();
189 }
190 }
191 }