1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.server.internal.storage;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.collect.ImmutableList.toImmutableList;
20 import static com.linecorp.centraldogma.server.internal.ExecutorServiceUtil.terminate;
21 import static java.util.Objects.requireNonNull;
22
23 import java.time.Duration;
24 import java.time.Instant;
25 import java.util.List;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.ScheduledExecutorService;
28 import java.util.concurrent.TimeUnit;
29 import java.util.function.Predicate;
30
31 import javax.annotation.Nullable;
32
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 import com.google.common.annotations.VisibleForTesting;
37 import com.google.common.util.concurrent.FutureCallback;
38 import com.google.common.util.concurrent.Futures;
39 import com.google.common.util.concurrent.ListenableScheduledFuture;
40 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
41 import com.google.common.util.concurrent.MoreExecutors;
42
43 import com.linecorp.centraldogma.common.Author;
44 import com.linecorp.centraldogma.server.command.Command;
45 import com.linecorp.centraldogma.server.command.CommandExecutor;
46 import com.linecorp.centraldogma.server.metadata.MetadataService;
47 import com.linecorp.centraldogma.server.metadata.Token;
48 import com.linecorp.centraldogma.server.metadata.Tokens;
49 import com.linecorp.centraldogma.server.storage.project.ProjectManager;
50
51
52
53
54 public class PurgeSchedulingService {
55
56 private static final Logger logger = LoggerFactory.getLogger(PurgeSchedulingService.class);
57
58
59
60
61 private static final Duration TICK = Duration.ofMinutes(1);
62
63 private final ProjectManager projectManager;
64 private final ScheduledExecutorService purgeWorker;
65 private final long maxRemovedRepositoryAgeMillis;
66 private final StoragePurgingScheduler storagePurgingScheduler = new StoragePurgingScheduler();
67
68 public PurgeSchedulingService(ProjectManager projectManager, ScheduledExecutorService purgeWorker,
69 long maxRemovedRepositoryAgeMillis) {
70 this.projectManager = requireNonNull(projectManager, "projectManager");
71 this.purgeWorker = requireNonNull(purgeWorker, "purgeWorker");
72 checkArgument(maxRemovedRepositoryAgeMillis >= 0,
73 "maxRemovedRepositoryAgeMillis: %s (expected: >= 0)", maxRemovedRepositoryAgeMillis);
74 this.maxRemovedRepositoryAgeMillis = maxRemovedRepositoryAgeMillis;
75 }
76
77 public void start(CommandExecutor commandExecutor,
78 MetadataService metadataService) {
79 if (isDisabled()) {
80 return;
81 }
82 requireNonNull(commandExecutor, "commandExecutor");
83 requireNonNull(metadataService, "metadataService");
84 cleanPurgedFiles();
85 storagePurgingScheduler.start(() -> {
86 try {
87 purgeProjectAndRepository(commandExecutor, metadataService);
88 purgeToken(metadataService);
89 } catch (Exception e) {
90 logger.warn("Unexpected purging service failure", e);
91 }
92 });
93 }
94
95 public boolean isStarted() {
96 return storagePurgingScheduler.isStarted();
97 }
98
99 public void stop() {
100 if (!isDisabled()) {
101 storagePurgingScheduler.stop();
102 }
103 }
104
105 private void cleanPurgedFiles() {
106 projectManager.purgeMarked();
107 projectManager.list().forEach((unused, project) -> project.repos().purgeMarked());
108 }
109
110 @VisibleForTesting
111 void purgeProjectAndRepository(CommandExecutor commandExecutor,
112 MetadataService metadataService) {
113 final long minAllowedTimestamp = System.currentTimeMillis() - maxRemovedRepositoryAgeMillis;
114 final Predicate<Instant> olderThanMinAllowed =
115 removedAt -> removedAt.toEpochMilli() < minAllowedTimestamp;
116
117 purgeProject(commandExecutor, olderThanMinAllowed);
118 purgeRepository(commandExecutor, metadataService, olderThanMinAllowed);
119 }
120
121 private void purgeProject(CommandExecutor commandExecutor,
122 Predicate<Instant> olderThanMinAllowed) {
123 projectManager
124 .listRemoved()
125 .forEach((projectName, removal) -> {
126 if (olderThanMinAllowed.test(removal)) {
127 commandExecutor
128 .execute(Command.purgeProject(Author.SYSTEM, projectName)).join();
129 }
130 });
131 }
132
133 private void purgeRepository(CommandExecutor commandExecutor,
134 MetadataService metadataService,
135 Predicate<Instant> olderThanMinAllowed) {
136 projectManager
137 .list()
138 .forEach((unused, project) -> {
139 project.repos().listRemoved()
140 .forEach((repoName, removal) -> {
141 if (olderThanMinAllowed.test(removal)) {
142 commandExecutor.execute(Command.purgeRepository(Author.SYSTEM,
143 project.name(),
144 repoName))
145 .join();
146 metadataService.purgeRepo(Author.SYSTEM, project.name(), repoName).join();
147 }
148 });
149 });
150 }
151
152 private static void purgeToken(MetadataService metadataService) {
153 final Tokens tokens = metadataService.getTokens();
154 final List<String> purging = tokens.appIds().values()
155 .stream()
156 .filter(Token::isDeleted)
157 .map(Token::appId)
158 .collect(toImmutableList());
159
160 if (!purging.isEmpty()) {
161 logger.info("Purging {} tokens: {}", purging.size(), purging);
162 purging.forEach(appId -> metadataService.purgeToken(Author.SYSTEM, appId));
163 }
164 }
165
166 private boolean isDisabled() {
167 return maxRemovedRepositoryAgeMillis == 0;
168 }
169
170 private final class StoragePurgingScheduler {
171
172 @Nullable
173 private volatile ListeningScheduledExecutorService scheduler;
174
175 public boolean isStarted() {
176 return scheduler != null;
177 }
178
179 public synchronized void start(Runnable task) {
180 if (isStarted()) {
181 return;
182 }
183 requireNonNull(task, "task");
184 final ListeningScheduledExecutorService scheduler = MoreExecutors.listeningDecorator(purgeWorker);
185 this.scheduler = scheduler;
186 @SuppressWarnings("UnstableApiUsage")
187 final ListenableScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(
188 task,
189 TICK.getSeconds(), TICK.getSeconds(), TimeUnit.SECONDS);
190
191 Futures.addCallback(future, new FutureCallback<Object>() {
192 @Override
193 public void onSuccess(@Nullable Object result) {}
194
195 @Override
196 public void onFailure(Throwable cause) {
197 logger.error("Storage purge scheduler stopped due to an unexpected exception:", cause);
198 }
199 }, purgeWorker);
200 }
201
202 public synchronized void stop() {
203 final ExecutorService scheduler = this.scheduler;
204
205 try {
206 final boolean interrupted = terminate(scheduler);
207 if (interrupted) {
208 Thread.currentThread().interrupt();
209 }
210 } finally {
211 this.scheduler = null;
212 }
213 }
214 }
215 }