1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.linecorp.centraldogma.server.internal.storage.repository.git;
18  
19  import java.util.ArrayList;
20  import java.util.Collections;
21  import java.util.IdentityHashMap;
22  import java.util.Iterator;
23  import java.util.LinkedHashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  import java.util.Set;
28  import java.util.concurrent.CompletableFuture;
29  import java.util.function.Supplier;
30  
31  import javax.annotation.Nullable;
32  
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  import com.google.common.annotations.VisibleForTesting;
37  
38  import com.linecorp.centraldogma.common.CentralDogmaException;
39  import com.linecorp.centraldogma.common.Revision;
40  
41  final class CommitWatchers {
42  
43      private static final Logger logger = LoggerFactory.getLogger(CommitWatchers.class);
44  
45      @VisibleForTesting
46      final Map<PathPatternFilter, Set<Watch>> watchesMap = new WatcherMap(8192);
47  
48      void add(Revision lastKnownRev, String pathPattern, CompletableFuture<Revision> future) {
49          add0(PathPatternFilter.of(pathPattern), new Watch(lastKnownRev, future));
50      }
51  
52      private void add0(final PathPatternFilter pathPattern, Watch watch) {
53          synchronized (watchesMap) {
54              final Set<Watch> watches =
55                      watchesMap.computeIfAbsent(pathPattern,
56                                                 k -> Collections.newSetFromMap(new IdentityHashMap<>()));
57              watches.add(watch);
58          }
59  
60          watch.future.whenComplete((revision, cause) -> {
61              if (watch.removed) {
62                  return;
63              }
64  
65              // Remove manually only when the watch was not removed from the set successfully.
66              // This usually happens when a user cancels the promise.
67              synchronized (watchesMap) {
68                  final Set<Watch> watches = watchesMap.get(pathPattern);
69                  watches.remove(watch);
70                  if (watches.isEmpty()) {
71                      watchesMap.remove(pathPattern);
72                  }
73              }
74          });
75      }
76  
77      void notify(Revision revision, String path) {
78          List<Watch> eligibleWatches = null;
79          synchronized (watchesMap) {
80              if (watchesMap.isEmpty()) {
81                  return;
82              }
83  
84              for (final Iterator<Entry<PathPatternFilter, Set<Watch>>> mapIt = watchesMap.entrySet().iterator();
85                   mapIt.hasNext();) {
86  
87                  final Entry<PathPatternFilter, Set<Watch>> entry = mapIt.next();
88                  if (!entry.getKey().matches(path)) {
89                      continue;
90                  }
91  
92                  final Set<Watch> watches = entry.getValue();
93                  for (final Iterator<Watch> i = watches.iterator(); i.hasNext();) {
94                      final Watch w = i.next();
95                      final Revision lastKnownRevision = w.lastKnownRevision;
96                      if (lastKnownRevision.compareTo(revision) < 0) {
97                          eligibleWatches = move(eligibleWatches, i, w);
98                      } else {
99                          logIneligibleFuture(lastKnownRevision, revision);
100                     }
101                 }
102 
103                 if (watches.isEmpty()) {
104                     mapIt.remove();
105                 }
106             }
107         }
108 
109         if (eligibleWatches == null) {
110             return;
111         }
112 
113         // Notify the matching promises found above.
114         final int numEligiblePromises = eligibleWatches.size();
115         for (int i = 0; i < numEligiblePromises; i++) {
116             eligibleWatches.get(i).future.complete(revision);
117         }
118     }
119 
120     void close(Supplier<CentralDogmaException> causeSupplier) {
121         List<Watch> eligibleWatches = null;
122         synchronized (watchesMap) {
123             for (final Set<Watch> watches : watchesMap.values()) {
124                 for (final Iterator<Watch> i = watches.iterator(); i.hasNext();) {
125                     final Watch w = i.next();
126                     eligibleWatches = move(eligibleWatches, i, w);
127                 }
128             }
129         }
130 
131         if (eligibleWatches == null) {
132             return;
133         }
134 
135         // Notify the matching promises found above.
136         final CentralDogmaException cause = causeSupplier.get();
137         final int numEligiblePromises = eligibleWatches.size();
138         for (int i = 0; i < numEligiblePromises; i++) {
139             eligibleWatches.get(i).future.completeExceptionally(cause);
140         }
141     }
142 
143     private static List<Watch> move(@Nullable List<Watch> watches, Iterator<Watch> i, Watch w) {
144         i.remove();
145         w.removed = true;
146 
147         if (watches == null) {
148             watches = new ArrayList<>();
149         }
150 
151         watches.add(w);
152         return watches;
153     }
154 
155     private static void logIneligibleFuture(Revision lastKnownRevision, Revision newRevision) {
156         logger.debug("Not notifying a future with same or newer lastKnownRevision: {} (newRevision: {})",
157                      lastKnownRevision, newRevision);
158     }
159 
160     private static final class WatcherMap
161             extends LinkedHashMap<PathPatternFilter, Set<Watch>> {
162 
163         private static final long serialVersionUID = 6793455658134063005L;
164 
165         private final int maxEntries;
166 
167         WatcherMap(int maxEntries) {
168             super(maxEntries, 0.75f, true);
169             this.maxEntries = maxEntries;
170         }
171 
172         @Override
173         protected boolean removeEldestEntry(Entry<PathPatternFilter, Set<Watch>> eldest) {
174             // Remove only the entries with empty watchers.
175             return size() > maxEntries && eldest.getValue().isEmpty();
176         }
177     }
178 
179     private static final class Watch {
180 
181         final Revision lastKnownRevision;
182         final CompletableFuture<Revision> future;
183         volatile boolean removed;
184 
185         Watch(Revision lastKnownRevision, CompletableFuture<Revision> future) {
186             this.lastKnownRevision = lastKnownRevision;
187             this.future = future;
188         }
189     }
190 }