1   /*
2    * Copyright 2019 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.server.internal.storage;
17  
18  import static com.google.common.base.Preconditions.checkArgument;
19  import static com.google.common.collect.ImmutableList.toImmutableList;
20  import static java.util.Objects.requireNonNull;
21  
22  import java.time.Duration;
23  import java.time.Instant;
24  import java.util.List;
25  import java.util.concurrent.ExecutorService;
26  import java.util.concurrent.ScheduledExecutorService;
27  import java.util.concurrent.TimeUnit;
28  import java.util.function.Predicate;
29  
30  import javax.annotation.Nullable;
31  
32  import org.slf4j.Logger;
33  import org.slf4j.LoggerFactory;
34  
35  import com.google.common.annotations.VisibleForTesting;
36  import com.google.common.util.concurrent.FutureCallback;
37  import com.google.common.util.concurrent.Futures;
38  import com.google.common.util.concurrent.ListenableScheduledFuture;
39  import com.google.common.util.concurrent.ListeningScheduledExecutorService;
40  import com.google.common.util.concurrent.MoreExecutors;
41  
42  import com.linecorp.centraldogma.common.Author;
43  import com.linecorp.centraldogma.server.command.Command;
44  import com.linecorp.centraldogma.server.command.CommandExecutor;
45  import com.linecorp.centraldogma.server.metadata.MetadataService;
46  import com.linecorp.centraldogma.server.metadata.Token;
47  import com.linecorp.centraldogma.server.metadata.Tokens;
48  import com.linecorp.centraldogma.server.storage.project.ProjectManager;
49  
50  /**
51   * A service class for purging projects and repositories that were removed before.
52   */
53  public class PurgeSchedulingService {
54  
55      private static final Logger logger = LoggerFactory.getLogger(PurgeSchedulingService.class);
56  
57      /**
58       * How often to run the storage purge schedules. i.e. every minute.
59       */
60      private static final Duration TICK = Duration.ofMinutes(1);
61  
62      private final ProjectManager projectManager;
63      private final ScheduledExecutorService purgeWorker;
64      private final long maxRemovedRepositoryAgeMillis;
65      private final StoragePurgingScheduler storagePurgingScheduler = new StoragePurgingScheduler();
66  
67      public PurgeSchedulingService(ProjectManager projectManager, ScheduledExecutorService purgeWorker,
68                                    long maxRemovedRepositoryAgeMillis) {
69          this.projectManager = requireNonNull(projectManager, "projectManager");
70          this.purgeWorker = requireNonNull(purgeWorker, "purgeWorker");
71          checkArgument(maxRemovedRepositoryAgeMillis >= 0,
72                        "maxRemovedRepositoryAgeMillis: %s (expected: >= 0)", maxRemovedRepositoryAgeMillis);
73          this.maxRemovedRepositoryAgeMillis = maxRemovedRepositoryAgeMillis;
74      }
75  
76      public void start(CommandExecutor commandExecutor,
77                        MetadataService metadataService) {
78          if (isDisabled()) {
79              return;
80          }
81          requireNonNull(commandExecutor, "commandExecutor");
82          requireNonNull(metadataService, "metadataService");
83          cleanPurgedFiles();
84          storagePurgingScheduler.start(() -> {
85              try {
86                  purgeProjectAndRepository(commandExecutor, metadataService);
87                  purgeToken(metadataService);
88              } catch (Exception e) {
89                  logger.warn("Unexpected purging service failure", e);
90              }
91          });
92      }
93  
94      public boolean isStarted() {
95          return storagePurgingScheduler.isStarted();
96      }
97  
98      public void stop() {
99          if (!isDisabled()) {
100             storagePurgingScheduler.stop();
101         }
102     }
103 
104     private void cleanPurgedFiles() {
105         projectManager.purgeMarked();
106         projectManager.list().forEach((unused, project) -> project.repos().purgeMarked());
107     }
108 
109     @VisibleForTesting
110     void purgeProjectAndRepository(CommandExecutor commandExecutor,
111                                    MetadataService metadataService) {
112         final long minAllowedTimestamp = System.currentTimeMillis() - maxRemovedRepositoryAgeMillis;
113         final Predicate<Instant> olderThanMinAllowed =
114                 removedAt -> removedAt.toEpochMilli() < minAllowedTimestamp;
115 
116         purgeProject(commandExecutor, olderThanMinAllowed);
117         purgeRepository(commandExecutor, metadataService, olderThanMinAllowed);
118     }
119 
120     private void purgeProject(CommandExecutor commandExecutor,
121                               Predicate<Instant> olderThanMinAllowed) {
122         projectManager
123                 .listRemoved()
124                 .forEach((projectName, removal) -> {
125                     if (olderThanMinAllowed.test(removal)) {
126                         commandExecutor
127                                 .execute(Command.purgeProject(Author.SYSTEM, projectName)).join();
128                     }
129                 });
130     }
131 
132     private void purgeRepository(CommandExecutor commandExecutor,
133                                  MetadataService metadataService,
134                                  Predicate<Instant> olderThanMinAllowed) {
135         projectManager
136                 .list()
137                 .forEach((unused, project) -> {
138                     project.repos().listRemoved()
139                            .forEach((repoName, removal) -> {
140                                if (olderThanMinAllowed.test(removal)) {
141                                    commandExecutor.execute(Command.purgeRepository(Author.SYSTEM,
142                                                                                    project.name(),
143                                                                                    repoName))
144                                                   .join();
145                                    metadataService.purgeRepo(Author.SYSTEM, project.name(), repoName).join();
146                                }
147                            });
148                 });
149     }
150 
151     private static void purgeToken(MetadataService metadataService) {
152         final Tokens tokens = metadataService.getTokens().join();
153         final List<String> purging = tokens.appIds().values()
154                                            .stream()
155                                            .filter(Token::isDeleted)
156                                            .map(Token::appId)
157                                            .collect(toImmutableList());
158 
159         if (!purging.isEmpty()) {
160             logger.info("Purging {} tokens: {}", purging.size(), purging);
161             purging.forEach(appId -> metadataService.purgeToken(Author.SYSTEM, appId));
162         }
163     }
164 
165     private boolean isDisabled() {
166         return maxRemovedRepositoryAgeMillis == 0;
167     }
168 
169     private final class StoragePurgingScheduler {
170 
171         @Nullable
172         private volatile ListeningScheduledExecutorService scheduler;
173 
174         public boolean isStarted() {
175             return scheduler != null;
176         }
177 
178         public synchronized void start(Runnable task) {
179             if (isStarted()) {
180                 return;
181             }
182             requireNonNull(task, "task");
183             final ListeningScheduledExecutorService scheduler = MoreExecutors.listeningDecorator(purgeWorker);
184             this.scheduler = scheduler;
185             @SuppressWarnings("UnstableApiUsage")
186             final ListenableScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(
187                     task,
188                     TICK.getSeconds(), TICK.getSeconds(), TimeUnit.SECONDS);
189 
190             Futures.addCallback(future, new FutureCallback<Object>() {
191                 @Override
192                 public void onSuccess(@Nullable Object result) {}
193 
194                 @Override
195                 public void onFailure(Throwable cause) {
196                     logger.error("Storage purge scheduler stopped due to an unexpected exception:", cause);
197                 }
198             }, purgeWorker);
199         }
200 
201         public synchronized void stop() {
202             final ExecutorService scheduler = this.scheduler;
203 
204             try {
205                 final boolean interrupted = terminate(scheduler);
206                 if (interrupted) {
207                     Thread.currentThread().interrupt();
208                 }
209             } finally {
210                 this.scheduler = null;
211             }
212         }
213 
214         private boolean terminate(@Nullable ExecutorService executor) {
215             if (executor == null) {
216                 return false;
217             }
218 
219             boolean interrupted = false;
220             for (;;) {
221                 executor.shutdownNow();
222                 try {
223                     if (executor.awaitTermination(1, TimeUnit.MINUTES)) {
224                         break;
225                     }
226                 } catch (InterruptedException e) {
227                     // Propagate later.
228                     interrupted = true;
229                 }
230             }
231 
232             return interrupted;
233         }
234     }
235 }