1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package com.linecorp.centraldogma.server.internal.storage.repository.git;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.base.Preconditions.checkState;
20 import static com.linecorp.centraldogma.server.internal.storage.repository.git.GitRepository.R_HEADS_MASTER;
21 import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_CORE_SECTION;
22
23 import java.io.EOFException;
24 import java.io.File;
25 import java.io.IOException;
26 import java.nio.ByteBuffer;
27 import java.nio.channels.FileChannel;
28 import java.nio.file.Path;
29 import java.nio.file.StandardOpenOption;
30
31 import javax.annotation.Nullable;
32
33 import org.eclipse.jgit.lib.ObjectId;
34 import org.eclipse.jgit.lib.Repository;
35 import org.eclipse.jgit.revwalk.RevCommit;
36 import org.eclipse.jgit.revwalk.RevWalk;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import com.google.common.annotations.VisibleForTesting;
41
42 import com.linecorp.centraldogma.common.Revision;
43 import com.linecorp.centraldogma.common.RevisionNotFoundException;
44 import com.linecorp.centraldogma.server.storage.StorageException;
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 final class CommitIdDatabase implements AutoCloseable {
67
68 private static final Logger logger = LoggerFactory.getLogger(CommitIdDatabase.class);
69
70 private static final int RECORD_LEN = 4 + 20;
71
72 private static final ThreadLocal<ByteBuffer> threadLocalBuffer =
73 ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(RECORD_LEN));
74
75 private final Path path;
76 private final FileChannel channel;
77 private final boolean fsync;
78 private volatile Revision headRevision;
79
80 CommitIdDatabase(Repository repo) {
81
82
83
84 this(repo.getDirectory(), repo.getConfig().getBoolean(CONFIG_CORE_SECTION, "fsyncObjectFiles", false));
85 }
86
87 @VisibleForTesting
88 CommitIdDatabase(File rootDir) {
89 this(rootDir, false);
90 }
91
92 private CommitIdDatabase(File rootDir, boolean fsync) {
93 path = new File(rootDir, "commit_ids.dat").toPath();
94 try {
95 channel = FileChannel.open(path,
96 StandardOpenOption.CREATE,
97 StandardOpenOption.READ,
98 StandardOpenOption.WRITE);
99 } catch (IOException e) {
100 throw new StorageException("failed to open a commit ID database: " + path, e);
101 }
102
103 this.fsync = fsync;
104 boolean success = false;
105 try {
106 final long size;
107 try {
108 size = channel.size();
109 } catch (IOException e) {
110 throw new StorageException("failed to get the file length: " + path, e);
111 }
112
113 if (size % RECORD_LEN != 0) {
114 throw new StorageException("incorrect file length: " + path + " (" + size + " bytes)");
115 }
116
117 final int numRecords = (int) (size / RECORD_LEN);
118 headRevision = numRecords > 0 ? new Revision(numRecords) : null;
119 success = true;
120 } finally {
121 if (!success) {
122 close();
123 }
124 }
125 }
126
127 @Nullable Revision headRevision() {
128 return headRevision;
129 }
130
131 ObjectId get(Revision revision) {
132 final Revision headRevision = this.headRevision;
133 checkState(headRevision != null, "initial commit not available yet: %s", path);
134 checkArgument(!revision.isRelative(), "revision: %s (expected: an absolute revision)", revision);
135 if (revision.major() > headRevision.major()) {
136 throw new RevisionNotFoundException(revision);
137 }
138
139 final ByteBuffer buf = threadLocalBuffer.get();
140 buf.clear();
141 long pos = (long) (revision.major() - 1) * RECORD_LEN;
142 try {
143 do {
144 final int readBytes = channel.read(buf, pos);
145 if (readBytes < 0) {
146 throw new EOFException();
147 }
148 pos += readBytes;
149 } while (buf.hasRemaining());
150 } catch (IOException e) {
151 throw new StorageException("failed to read the commit ID database: " + path, e);
152 }
153
154 buf.flip();
155
156 final int actualRevision = buf.getInt();
157 if (actualRevision != revision.major()) {
158 throw new StorageException("incorrect revision number in the commit ID database: " + path +
159 "(actual: " + actualRevision + ", expected: " + revision.major() + ')');
160 }
161
162 return new ObjectId(buf.getInt(), buf.getInt(), buf.getInt(), buf.getInt(), buf.getInt());
163 }
164
165 void put(Revision revision, ObjectId commitId) {
166 put(revision, commitId, true);
167 }
168
169 private synchronized void put(Revision revision, ObjectId commitId, boolean safeMode) {
170 if (safeMode) {
171 final Revision expected;
172 if (headRevision == null) {
173 expected = Revision.INIT;
174 } else {
175 expected = headRevision.forward(1);
176 }
177 checkState(revision.equals(expected), "incorrect revision: %s (expected: %s)", revision, expected);
178 }
179
180
181 final ByteBuffer buf = threadLocalBuffer.get();
182 buf.clear();
183 buf.putInt(revision.major());
184 commitId.copyRawTo(buf);
185 buf.flip();
186
187
188 long pos = (long) (revision.major() - 1) * RECORD_LEN;
189 try {
190 do {
191 pos += channel.write(buf, pos);
192 } while (buf.hasRemaining());
193
194 if (safeMode && fsync) {
195 channel.force(true);
196 }
197 } catch (IOException e) {
198 throw new StorageException("failed to update the commit ID database: " + path, e);
199 }
200
201 if (safeMode ||
202 headRevision == null ||
203 headRevision.major() < revision.major()) {
204 headRevision = revision;
205 }
206 }
207
208 void rebuild(Repository gitRepo) {
209 logger.warn("Rebuilding the commit ID database ..");
210
211
212 try {
213 channel.truncate(0);
214 } catch (IOException e) {
215 throw new StorageException("failed to drop the commit ID database: " + path, e);
216 }
217 headRevision = null;
218
219
220 try (RevWalk revWalk = new RevWalk(gitRepo)) {
221 final Revision headRevision;
222 final ObjectId headCommitId = gitRepo.resolve(R_HEADS_MASTER);
223 if (headCommitId == null) {
224 throw new StorageException("failed to determine the HEAD: " + gitRepo.getDirectory());
225 }
226
227 RevCommit revCommit = revWalk.parseCommit(headCommitId);
228 headRevision = CommitUtil.extractRevision(revCommit.getFullMessage());
229
230
231
232
233 ObjectId currentId;
234 Revision previousRevision = headRevision;
235 loop: for (;;) {
236 switch (revCommit.getParentCount()) {
237 case 0:
238
239 break loop;
240 case 1:
241 currentId = revCommit.getParent(0);
242 break;
243 default:
244 throw new StorageException("found more than one parent: " +
245 gitRepo.getDirectory());
246 }
247
248 revCommit = revWalk.parseCommit(currentId);
249
250 final Revision currentRevision = CommitUtil.extractRevision(revCommit.getFullMessage());
251 final Revision expectedRevision = previousRevision.backward(1);
252 if (!currentRevision.equals(expectedRevision)) {
253 throw new StorageException("mismatching revision: " + gitRepo.getDirectory() +
254 " (actual: " + currentRevision.major() +
255 ", expected: " + expectedRevision.major() + ')');
256 }
257
258 put(currentRevision, currentId, false);
259 previousRevision = currentRevision;
260 }
261
262
263 put(headRevision, headCommitId);
264 } catch (Exception e) {
265 throw new StorageException("failed to rebuild the commit ID database", e);
266 }
267
268 logger.info("Rebuilt the commit ID database.");
269 }
270
271 @Override
272 public void close() {
273 try {
274 channel.close();
275 } catch (IOException e) {
276 logger.warn("Failed to close the commit ID database: {}", path, e);
277 }
278 }
279 }