1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.linecorp.centraldogma.server.internal.mirror;
18
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static java.util.Objects.requireNonNull;
21
22 import java.io.File;
23 import java.time.Duration;
24 import java.time.ZonedDateTime;
25 import java.util.Set;
26 import java.util.concurrent.CompletableFuture;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.SynchronousQueue;
31 import java.util.concurrent.ThreadPoolExecutor;
32 import java.util.concurrent.TimeUnit;
33
34 import javax.annotation.Nullable;
35
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38
39 import com.google.common.util.concurrent.FutureCallback;
40 import com.google.common.util.concurrent.Futures;
41 import com.google.common.util.concurrent.ListenableFuture;
42 import com.google.common.util.concurrent.ListenableScheduledFuture;
43 import com.google.common.util.concurrent.ListeningExecutorService;
44 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
45 import com.google.common.util.concurrent.MoreExecutors;
46
47 import com.linecorp.centraldogma.server.MirrorException;
48 import com.linecorp.centraldogma.server.MirroringService;
49 import com.linecorp.centraldogma.server.command.CommandExecutor;
50 import com.linecorp.centraldogma.server.mirror.Mirror;
51 import com.linecorp.centraldogma.server.storage.project.Project;
52 import com.linecorp.centraldogma.server.storage.project.ProjectManager;
53
54 import io.micrometer.core.instrument.MeterRegistry;
55 import io.micrometer.core.instrument.binder.jvm.ExecutorServiceMetrics;
56 import io.netty.util.concurrent.DefaultThreadFactory;
57
58 public final class DefaultMirroringService implements MirroringService {
59
60 private static final Logger logger = LoggerFactory.getLogger(DefaultMirroringService.class);
61
62
63
64
65 private static final Duration TICK = Duration.ofSeconds(1);
66
67 private final File workDir;
68 private final ProjectManager projectManager;
69 private final int numThreads;
70 private final int maxNumFilesPerMirror;
71 private final long maxNumBytesPerMirror;
72
73 private volatile CommandExecutor commandExecutor;
74 private volatile ListeningScheduledExecutorService scheduler;
75 private volatile ListeningExecutorService worker;
76
77 private ZonedDateTime lastExecutionTime;
78 private final MeterRegistry meterRegistry;
79
80 DefaultMirroringService(File workDir, ProjectManager projectManager, MeterRegistry meterRegistry,
81 int numThreads, int maxNumFilesPerMirror, long maxNumBytesPerMirror) {
82
83 this.workDir = requireNonNull(workDir, "workDir");
84 this.projectManager = requireNonNull(projectManager, "projectManager");
85 this.meterRegistry = requireNonNull(meterRegistry, "meterRegistry");
86
87 checkArgument(numThreads > 0, "numThreads: %s (expected: > 0)", numThreads);
88 checkArgument(maxNumFilesPerMirror > 0,
89 "maxNumFilesPerMirror: %s (expected: > 0)", maxNumFilesPerMirror);
90 checkArgument(maxNumBytesPerMirror > 0,
91 "maxNumBytesPerMirror: %s (expected: > 0)", maxNumBytesPerMirror);
92 this.numThreads = numThreads;
93 this.maxNumFilesPerMirror = maxNumFilesPerMirror;
94 this.maxNumBytesPerMirror = maxNumBytesPerMirror;
95 }
96
97 public boolean isStarted() {
98 return scheduler != null;
99 }
100
101 public synchronized void start(CommandExecutor commandExecutor) {
102 if (isStarted()) {
103 return;
104 }
105
106 this.commandExecutor = requireNonNull(commandExecutor, "commandExecutor");
107
108 ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
109 new DefaultThreadFactory("mirroring-scheduler", true));
110 executorService = ExecutorServiceMetrics.monitor(meterRegistry, executorService,
111 "mirroringScheduler");
112 scheduler = MoreExecutors.listeningDecorator(executorService);
113
114
115
116 final SynchronousQueue<Runnable> workQueue = new SynchronousQueue<>();
117 worker = MoreExecutors.listeningDecorator(new ThreadPoolExecutor(
118 0, numThreads, 90, TimeUnit.SECONDS, workQueue,
119 new DefaultThreadFactory("mirroring-worker", true),
120 (rejectedTask, executor) -> {
121
122
123 try {
124 workQueue.put(rejectedTask);
125 } catch (InterruptedException e) {
126
127 Thread.currentThread().interrupt();
128 }
129 }));
130
131 final ListenableScheduledFuture<?> future = scheduler.scheduleWithFixedDelay(
132 this::schedulePendingMirrors,
133 TICK.getSeconds(), TICK.getSeconds(), TimeUnit.SECONDS);
134
135 Futures.addCallback(future, new FutureCallback<Object>() {
136 @Override
137 public void onSuccess(@Nullable Object result) {}
138
139 @Override
140 public void onFailure(Throwable cause) {
141 logger.error("Git mirroring scheduler stopped due to an unexpected exception:", cause);
142 }
143 }, MoreExecutors.directExecutor());
144 }
145
146 public synchronized void stop() {
147 final ExecutorService scheduler = this.scheduler;
148 final ExecutorService worker = this.worker;
149
150 try {
151 final boolean interrupted = terminate(scheduler) || terminate(worker);
152 if (interrupted) {
153 Thread.currentThread().interrupt();
154 }
155 } finally {
156 this.scheduler = null;
157 this.worker = null;
158 }
159 }
160
161 private static boolean terminate(ExecutorService executor) {
162 if (executor == null) {
163 return false;
164 }
165
166 boolean interrupted = false;
167 for (;;) {
168 executor.shutdownNow();
169 try {
170 if (executor.awaitTermination(1, TimeUnit.MINUTES)) {
171 break;
172 }
173 } catch (InterruptedException e) {
174
175 interrupted = true;
176 }
177 }
178
179 return interrupted;
180 }
181
182 private void schedulePendingMirrors() {
183 final ZonedDateTime now = ZonedDateTime.now();
184 if (lastExecutionTime == null) {
185 lastExecutionTime = now.minus(TICK);
186 }
187
188 final ZonedDateTime currentLastExecutionTime = lastExecutionTime;
189 lastExecutionTime = now;
190
191 projectManager.list()
192 .values()
193 .forEach(project -> {
194 final Set<Mirror> mirrors;
195 try {
196 mirrors = project.metaRepo().mirrors();
197 } catch (Exception e) {
198 logger.warn("Failed to load the mirror list from: {}", project.name(), e);
199 return;
200 }
201 mirrors.forEach(m -> {
202 try {
203 if (m.nextExecutionTime(currentLastExecutionTime).compareTo(now) < 0) {
204 run(project, m);
205 }
206 } catch (Exception e) {
207 logger.warn("Unexpected exception while mirroring: {}", m, e);
208 }
209 });
210 });
211 }
212
213 @Override
214 public CompletableFuture<Void> mirror() {
215 if (commandExecutor == null) {
216 return CompletableFuture.completedFuture(null);
217 }
218
219 return CompletableFuture.runAsync(
220 () -> projectManager.list().values()
221 .forEach(p -> p.metaRepo().mirrors()
222 .forEach(m -> run(m, p.name(), false))),
223 worker);
224 }
225
226 private void run(Project project, Mirror m) {
227 final ListenableFuture<?> future = worker.submit(() -> run(m, project.name(), true));
228 Futures.addCallback(future, new FutureCallback<Object>() {
229 @Override
230 public void onSuccess(@Nullable Object result) {}
231
232 @Override
233 public void onFailure(Throwable cause) {
234 logger.warn("Unexpected Git mirroring failure: {}", m, cause);
235 }
236 }, MoreExecutors.directExecutor());
237 }
238
239 private void run(Mirror m, String projectName, boolean logOnFailure) {
240 logger.info("Mirroring: {}", m);
241 try {
242 new MirroringTask(m, projectName, meterRegistry)
243 .run(workDir, commandExecutor, maxNumFilesPerMirror, maxNumBytesPerMirror);
244 } catch (Exception e) {
245 if (logOnFailure) {
246 logger.warn("Unexpected exception while mirroring: {}", m, e);
247 } else {
248 if (e instanceof MirrorException) {
249 throw (MirrorException) e;
250 } else {
251 throw new MirrorException(e);
252 }
253 }
254 }
255 }
256 }