1   /*
2    * Copyright 2019 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.server.internal.storage;
17  
18  import static com.google.common.base.Preconditions.checkArgument;
19  import static com.google.common.collect.ImmutableList.toImmutableList;
20  import static com.linecorp.centraldogma.server.internal.ExecutorServiceUtil.terminate;
21  import static java.util.Objects.requireNonNull;
22  
23  import java.time.Duration;
24  import java.time.Instant;
25  import java.util.List;
26  import java.util.concurrent.ExecutorService;
27  import java.util.concurrent.ScheduledExecutorService;
28  import java.util.concurrent.TimeUnit;
29  import java.util.function.Predicate;
30  
31  import javax.annotation.Nullable;
32  
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  import com.google.common.annotations.VisibleForTesting;
37  import com.google.common.util.concurrent.FutureCallback;
38  import com.google.common.util.concurrent.Futures;
39  import com.google.common.util.concurrent.ListenableScheduledFuture;
40  import com.google.common.util.concurrent.ListeningScheduledExecutorService;
41  import com.google.common.util.concurrent.MoreExecutors;
42  
43  import com.linecorp.centraldogma.common.Author;
44  import com.linecorp.centraldogma.server.command.Command;
45  import com.linecorp.centraldogma.server.command.CommandExecutor;
46  import com.linecorp.centraldogma.server.metadata.MetadataService;
47  import com.linecorp.centraldogma.server.metadata.Token;
48  import com.linecorp.centraldogma.server.metadata.Tokens;
49  import com.linecorp.centraldogma.server.storage.project.ProjectManager;
50  
51  /**
52   * A service class for purging projects and repositories that were removed before.
53   */
54  public class PurgeSchedulingService {
55  
56      private static final Logger logger = LoggerFactory.getLogger(PurgeSchedulingService.class);
57  
58      /**
59       * How often to run the storage purge schedules. i.e. every minute.
60       */
61      private static final Duration TICK = Duration.ofMinutes(1);
62  
63      private final ProjectManager projectManager;
64      private final ScheduledExecutorService purgeWorker;
65      private final long maxRemovedRepositoryAgeMillis;
66      private final StoragePurgingScheduler storagePurgingScheduler = new StoragePurgingScheduler();
67  
68      public PurgeSchedulingService(ProjectManager projectManager, ScheduledExecutorService purgeWorker,
69                                    long maxRemovedRepositoryAgeMillis) {
70          this.projectManager = requireNonNull(projectManager, "projectManager");
71          this.purgeWorker = requireNonNull(purgeWorker, "purgeWorker");
72          checkArgument(maxRemovedRepositoryAgeMillis >= 0,
73                        "maxRemovedRepositoryAgeMillis: %s (expected: >= 0)", maxRemovedRepositoryAgeMillis);
74          this.maxRemovedRepositoryAgeMillis = maxRemovedRepositoryAgeMillis;
75      }
76  
77      public void start(CommandExecutor commandExecutor,
78                        MetadataService metadataService) {
79          if (isDisabled()) {
80              return;
81          }
82          requireNonNull(commandExecutor, "commandExecutor");
83          requireNonNull(metadataService, "metadataService");
84          cleanPurgedFiles();
85          storagePurgingScheduler.start(() -> {
86              try {
87                  purgeProjectAndRepository(commandExecutor, metadataService);
88                  purgeToken(metadataService);
89              } catch (Exception e) {
90                  logger.warn("Unexpected purging service failure", e);
91              }
92          });
93      }
94  
95      public boolean isStarted() {
96          return storagePurgingScheduler.isStarted();
97      }
98  
99      public void stop() {
100         if (!isDisabled()) {
101             storagePurgingScheduler.stop();
102         }
103     }
104 
105     private void cleanPurgedFiles() {
106         projectManager.purgeMarked();
107         projectManager.list().forEach((unused, project) -> project.repos().purgeMarked());
108     }
109 
110     @VisibleForTesting
111     void purgeProjectAndRepository(CommandExecutor commandExecutor,
112                                    MetadataService metadataService) {
113         final long minAllowedTimestamp = System.currentTimeMillis() - maxRemovedRepositoryAgeMillis;
114         final Predicate<Instant> olderThanMinAllowed =
115                 removedAt -> removedAt.toEpochMilli() < minAllowedTimestamp;
116 
117         purgeProject(commandExecutor, olderThanMinAllowed);
118         purgeRepository(commandExecutor, metadataService, olderThanMinAllowed);
119     }
120 
121     private void purgeProject(CommandExecutor commandExecutor,
122                               Predicate<Instant> olderThanMinAllowed) {
123         projectManager
124                 .listRemoved()
125                 .forEach((projectName, removal) -> {
126                     if (olderThanMinAllowed.test(removal)) {
127                         commandExecutor
128                                 .execute(Command.purgeProject(Author.SYSTEM, projectName)).join();
129                     }
130                 });
131     }
132 
133     private void purgeRepository(CommandExecutor commandExecutor,
134                                  MetadataService metadataService,
135                                  Predicate<Instant> olderThanMinAllowed) {
136         projectManager
137                 .list()
138                 .forEach((unused, project) -> {
139                     project.repos().listRemoved()
140                            .forEach((repoName, removal) -> {
141                                if (olderThanMinAllowed.test(removal)) {
142                                    commandExecutor.execute(Command.purgeRepository(Author.SYSTEM,
143                                                                                    project.name(),
144                                                                                    repoName))
145                                                   .join();
146                                    metadataService.purgeRepo(Author.SYSTEM, project.name(), repoName).join();
147                                }
148                            });
149                 });
150     }
151 
152     private static void purgeToken(MetadataService metadataService) {
153         final Tokens tokens = metadataService.getTokens();
154         final List<String> purging = tokens.appIds().values()
155                                            .stream()
156                                            .filter(Token::isDeleted)
157                                            .map(Token::appId)
158                                            .collect(toImmutableList());
159 
160         if (!purging.isEmpty()) {
161             logger.info("Purging {} tokens: {}", purging.size(), purging);
162             purging.forEach(appId -> metadataService.purgeToken(Author.SYSTEM, appId));
163         }
164     }
165 
166     private boolean isDisabled() {
167         return maxRemovedRepositoryAgeMillis == 0;
168     }
169 
170     private final class StoragePurgingScheduler {
171 
172         @Nullable
173         private volatile ListeningScheduledExecutorService scheduler;
174 
175         public boolean isStarted() {
176             return scheduler != null;
177         }
178 
179         public synchronized void start(Runnable task) {
180             if (isStarted()) {
181                 return;
182             }
183             requireNonNull(task, "task");
184             final ListeningScheduledExecutorService scheduler = MoreExecutors.listeningDecorator(purgeWorker);
185             this.scheduler = scheduler;
186             @SuppressWarnings("UnstableApiUsage")
187             final ListenableScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(
188                     task,
189                     TICK.getSeconds(), TICK.getSeconds(), TimeUnit.SECONDS);
190 
191             Futures.addCallback(future, new FutureCallback<Object>() {
192                 @Override
193                 public void onSuccess(@Nullable Object result) {}
194 
195                 @Override
196                 public void onFailure(Throwable cause) {
197                     logger.error("Storage purge scheduler stopped due to an unexpected exception:", cause);
198                 }
199             }, purgeWorker);
200         }
201 
202         public synchronized void stop() {
203             final ExecutorService scheduler = this.scheduler;
204 
205             try {
206                 final boolean interrupted = terminate(scheduler);
207                 if (interrupted) {
208                     Thread.currentThread().interrupt();
209                 }
210             } finally {
211                 this.scheduler = null;
212             }
213         }
214     }
215 }