1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.server.internal.storage;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.collect.ImmutableList.toImmutableList;
20 import static java.util.Objects.requireNonNull;
21
22 import java.time.Duration;
23 import java.time.Instant;
24 import java.util.List;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.ScheduledExecutorService;
27 import java.util.concurrent.TimeUnit;
28 import java.util.function.Predicate;
29
30 import javax.annotation.Nullable;
31
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34
35 import com.google.common.annotations.VisibleForTesting;
36 import com.google.common.util.concurrent.FutureCallback;
37 import com.google.common.util.concurrent.Futures;
38 import com.google.common.util.concurrent.ListenableScheduledFuture;
39 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
40 import com.google.common.util.concurrent.MoreExecutors;
41
42 import com.linecorp.centraldogma.common.Author;
43 import com.linecorp.centraldogma.server.command.Command;
44 import com.linecorp.centraldogma.server.command.CommandExecutor;
45 import com.linecorp.centraldogma.server.metadata.MetadataService;
46 import com.linecorp.centraldogma.server.metadata.Token;
47 import com.linecorp.centraldogma.server.metadata.Tokens;
48 import com.linecorp.centraldogma.server.storage.project.ProjectManager;
49
50
51
52
53 public class PurgeSchedulingService {
54
55 private static final Logger logger = LoggerFactory.getLogger(PurgeSchedulingService.class);
56
57
58
59
60 private static final Duration TICK = Duration.ofMinutes(1);
61
62 private final ProjectManager projectManager;
63 private final ScheduledExecutorService purgeWorker;
64 private final long maxRemovedRepositoryAgeMillis;
65 private final StoragePurgingScheduler storagePurgingScheduler = new StoragePurgingScheduler();
66
67 public PurgeSchedulingService(ProjectManager projectManager, ScheduledExecutorService purgeWorker,
68 long maxRemovedRepositoryAgeMillis) {
69 this.projectManager = requireNonNull(projectManager, "projectManager");
70 this.purgeWorker = requireNonNull(purgeWorker, "purgeWorker");
71 checkArgument(maxRemovedRepositoryAgeMillis >= 0,
72 "maxRemovedRepositoryAgeMillis: %s (expected: >= 0)", maxRemovedRepositoryAgeMillis);
73 this.maxRemovedRepositoryAgeMillis = maxRemovedRepositoryAgeMillis;
74 }
75
76 public void start(CommandExecutor commandExecutor,
77 MetadataService metadataService) {
78 if (isDisabled()) {
79 return;
80 }
81 requireNonNull(commandExecutor, "commandExecutor");
82 requireNonNull(metadataService, "metadataService");
83 cleanPurgedFiles();
84 storagePurgingScheduler.start(() -> {
85 try {
86 purgeProjectAndRepository(commandExecutor, metadataService);
87 purgeToken(metadataService);
88 } catch (Exception e) {
89 logger.warn("Unexpected purging service failure", e);
90 }
91 });
92 }
93
94 public boolean isStarted() {
95 return storagePurgingScheduler.isStarted();
96 }
97
98 public void stop() {
99 if (!isDisabled()) {
100 storagePurgingScheduler.stop();
101 }
102 }
103
104 private void cleanPurgedFiles() {
105 projectManager.purgeMarked();
106 projectManager.list().forEach((unused, project) -> project.repos().purgeMarked());
107 }
108
109 @VisibleForTesting
110 void purgeProjectAndRepository(CommandExecutor commandExecutor,
111 MetadataService metadataService) {
112 final long minAllowedTimestamp = System.currentTimeMillis() - maxRemovedRepositoryAgeMillis;
113 final Predicate<Instant> olderThanMinAllowed =
114 removedAt -> removedAt.toEpochMilli() < minAllowedTimestamp;
115
116 purgeProject(commandExecutor, olderThanMinAllowed);
117 purgeRepository(commandExecutor, metadataService, olderThanMinAllowed);
118 }
119
120 private void purgeProject(CommandExecutor commandExecutor,
121 Predicate<Instant> olderThanMinAllowed) {
122 projectManager
123 .listRemoved()
124 .forEach((projectName, removal) -> {
125 if (olderThanMinAllowed.test(removal)) {
126 commandExecutor
127 .execute(Command.purgeProject(Author.SYSTEM, projectName)).join();
128 }
129 });
130 }
131
132 private void purgeRepository(CommandExecutor commandExecutor,
133 MetadataService metadataService,
134 Predicate<Instant> olderThanMinAllowed) {
135 projectManager
136 .list()
137 .forEach((unused, project) -> {
138 project.repos().listRemoved()
139 .forEach((repoName, removal) -> {
140 if (olderThanMinAllowed.test(removal)) {
141 commandExecutor.execute(Command.purgeRepository(Author.SYSTEM,
142 project.name(),
143 repoName))
144 .join();
145 metadataService.purgeRepo(Author.SYSTEM, project.name(), repoName).join();
146 }
147 });
148 });
149 }
150
151 private static void purgeToken(MetadataService metadataService) {
152 final Tokens tokens = metadataService.getTokens().join();
153 final List<String> purging = tokens.appIds().values()
154 .stream()
155 .filter(Token::isDeleted)
156 .map(Token::appId)
157 .collect(toImmutableList());
158
159 if (!purging.isEmpty()) {
160 logger.info("Purging {} tokens: {}", purging.size(), purging);
161 purging.forEach(appId -> metadataService.purgeToken(Author.SYSTEM, appId));
162 }
163 }
164
165 private boolean isDisabled() {
166 return maxRemovedRepositoryAgeMillis == 0;
167 }
168
169 private final class StoragePurgingScheduler {
170
171 @Nullable
172 private volatile ListeningScheduledExecutorService scheduler;
173
174 public boolean isStarted() {
175 return scheduler != null;
176 }
177
178 public synchronized void start(Runnable task) {
179 if (isStarted()) {
180 return;
181 }
182 requireNonNull(task, "task");
183 final ListeningScheduledExecutorService scheduler = MoreExecutors.listeningDecorator(purgeWorker);
184 this.scheduler = scheduler;
185 @SuppressWarnings("UnstableApiUsage")
186 final ListenableScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(
187 task,
188 TICK.getSeconds(), TICK.getSeconds(), TimeUnit.SECONDS);
189
190 Futures.addCallback(future, new FutureCallback<Object>() {
191 @Override
192 public void onSuccess(@Nullable Object result) {}
193
194 @Override
195 public void onFailure(Throwable cause) {
196 logger.error("Storage purge scheduler stopped due to an unexpected exception:", cause);
197 }
198 }, purgeWorker);
199 }
200
201 public synchronized void stop() {
202 final ExecutorService scheduler = this.scheduler;
203
204 try {
205 final boolean interrupted = terminate(scheduler);
206 if (interrupted) {
207 Thread.currentThread().interrupt();
208 }
209 } finally {
210 this.scheduler = null;
211 }
212 }
213
214 private boolean terminate(@Nullable ExecutorService executor) {
215 if (executor == null) {
216 return false;
217 }
218
219 boolean interrupted = false;
220 for (;;) {
221 executor.shutdownNow();
222 try {
223 if (executor.awaitTermination(1, TimeUnit.MINUTES)) {
224 break;
225 }
226 } catch (InterruptedException e) {
227
228 interrupted = true;
229 }
230 }
231
232 return interrupted;
233 }
234 }
235 }