1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.linecorp.centraldogma.server.internal.storage.repository.git;
18  
19  import java.util.ArrayList;
20  import java.util.Collections;
21  import java.util.IdentityHashMap;
22  import java.util.Iterator;
23  import java.util.LinkedHashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.Map.Entry;
27  import java.util.Set;
28  import java.util.concurrent.CompletableFuture;
29  import java.util.function.Supplier;
30  
31  import javax.annotation.Nullable;
32  
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  import com.google.common.annotations.VisibleForTesting;
37  
38  import com.linecorp.centraldogma.common.CentralDogmaException;
39  import com.linecorp.centraldogma.common.Revision;
40  import com.linecorp.centraldogma.server.internal.storage.repository.git.Watch.WatchListener;
41  
42  final class CommitWatchers {
43  
44      private static final Logger logger = LoggerFactory.getLogger(CommitWatchers.class);
45  
46      @VisibleForTesting
47      final Map<PathPatternFilter, Set<Watch>> watchesMap = new WatcherMap(8192);
48  
49      void add(Revision lastKnownRev, String pathPattern,
50               @Nullable CompletableFuture<Revision> future, @Nullable WatchListener listener) {
51          add0(PathPatternFilter.of(pathPattern), new Watch(lastKnownRev, future, listener));
52      }
53  
54      private void add0(final PathPatternFilter pathPattern, Watch watch) {
55          synchronized (watchesMap) {
56              final Set<Watch> watches =
57                      watchesMap.computeIfAbsent(pathPattern,
58                                                 k -> Collections.newSetFromMap(new IdentityHashMap<>()));
59              watches.add(watch);
60          }
61  
62          final CompletableFuture<Revision> future = watch.future();
63          if (future == null) {
64              return;
65          }
66          future.whenComplete((revision, cause) -> {
67              if (watch.wasRemoved()) {
68                  return;
69              }
70  
71              // Remove manually only when the watch was not removed from the set successfully.
72              // This usually happens when a user cancels the promise.
73              synchronized (watchesMap) {
74                  final Set<Watch> watches = watchesMap.get(pathPattern);
75                  watches.remove(watch);
76                  if (watches.isEmpty()) {
77                      watchesMap.remove(pathPattern);
78                  }
79              }
80          });
81      }
82  
83      void notify(Revision revision, String path) {
84          List<Watch> eligibleWatches = null;
85          synchronized (watchesMap) {
86              if (watchesMap.isEmpty()) {
87                  return;
88              }
89  
90              for (final Iterator<Entry<PathPatternFilter, Set<Watch>>> mapIt = watchesMap.entrySet().iterator();
91                   mapIt.hasNext();) {
92  
93                  final Entry<PathPatternFilter, Set<Watch>> entry = mapIt.next();
94                  if (!entry.getKey().matches(path)) {
95                      continue;
96                  }
97  
98                  final Set<Watch> watches = entry.getValue();
99                  for (final Iterator<Watch> i = watches.iterator(); i.hasNext();) {
100                     final Watch w = i.next();
101                     final Revision lastKnownRevision = w.lastKnownRevision();
102                     if (lastKnownRevision.compareTo(revision) < 0) {
103                         eligibleWatches = move(eligibleWatches, i, w);
104                     } else {
105                         logIneligibleFuture(lastKnownRevision, revision);
106                     }
107                 }
108 
109                 if (watches.isEmpty()) {
110                     mapIt.remove();
111                 }
112             }
113         }
114 
115         if (eligibleWatches == null) {
116             return;
117         }
118 
119         // Notify the matching promises found above.
120         final int numEligiblePromises = eligibleWatches.size();
121         for (int i = 0; i < numEligiblePromises; i++) {
122             eligibleWatches.get(i).notify(revision);
123         }
124     }
125 
126     void close(Supplier<CentralDogmaException> causeSupplier) {
127         List<Watch> eligibleWatches = null;
128         synchronized (watchesMap) {
129             for (final Set<Watch> watches : watchesMap.values()) {
130                 for (final Iterator<Watch> i = watches.iterator(); i.hasNext();) {
131                     final Watch w = i.next();
132                     if (!w.canRemove()) {
133                         // ResponseListener does not need to propagate errors when closing.
134                         i.remove();
135                     } else {
136                         eligibleWatches = move(eligibleWatches, i, w);
137                     }
138                 }
139             }
140         }
141 
142         if (eligibleWatches == null) {
143             return;
144         }
145 
146         // Notify the matching promises found above.
147         final CentralDogmaException cause = causeSupplier.get();
148         final int numEligiblePromises = eligibleWatches.size();
149         for (int i = 0; i < numEligiblePromises; i++) {
150             eligibleWatches.get(i).notifyFailure(cause);
151         }
152     }
153 
154     private static List<Watch> move(@Nullable List<Watch> watches, Iterator<Watch> i, Watch w) {
155         if (w.canRemove()) {
156             i.remove();
157             w.remove();
158         }
159 
160         if (watches == null) {
161             watches = new ArrayList<>();
162         }
163 
164         watches.add(w);
165         return watches;
166     }
167 
168     private static void logIneligibleFuture(Revision lastKnownRevision, Revision newRevision) {
169         logger.debug("Not notifying a future with same or newer lastKnownRevision: {} (newRevision: {})",
170                      lastKnownRevision, newRevision);
171     }
172 
173     private static final class WatcherMap
174             extends LinkedHashMap<PathPatternFilter, Set<Watch>> {
175 
176         private static final long serialVersionUID = 6793455658134063005L;
177 
178         private final int maxEntries;
179 
180         WatcherMap(int maxEntries) {
181             super(maxEntries, 0.75f, true);
182             this.maxEntries = maxEntries;
183         }
184 
185         @Override
186         protected boolean removeEldestEntry(Entry<PathPatternFilter, Set<Watch>> eldest) {
187             // Remove only the entries with empty watchers.
188             return size() > maxEntries && eldest.getValue().isEmpty();
189         }
190     }
191 }