1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.server.internal.storage.repository.git;
17  
18  import static com.google.common.base.Preconditions.checkArgument;
19  import static com.google.common.base.Preconditions.checkState;
20  import static com.linecorp.centraldogma.server.internal.storage.repository.git.GitRepository.R_HEADS_MASTER;
21  import static org.eclipse.jgit.lib.ConfigConstants.CONFIG_CORE_SECTION;
22  
23  import java.io.EOFException;
24  import java.io.File;
25  import java.io.IOException;
26  import java.nio.ByteBuffer;
27  import java.nio.channels.FileChannel;
28  import java.nio.file.Path;
29  import java.nio.file.StandardOpenOption;
30  
31  import javax.annotation.Nullable;
32  
33  import org.eclipse.jgit.lib.ObjectId;
34  import org.eclipse.jgit.lib.Repository;
35  import org.eclipse.jgit.revwalk.RevCommit;
36  import org.eclipse.jgit.revwalk.RevWalk;
37  import org.slf4j.Logger;
38  import org.slf4j.LoggerFactory;
39  
40  import com.google.common.annotations.VisibleForTesting;
41  
42  import com.linecorp.centraldogma.common.Revision;
43  import com.linecorp.centraldogma.common.RevisionNotFoundException;
44  import com.linecorp.centraldogma.server.storage.StorageException;
45  
46  /**
47   * Simple file-based database of {@link Revision}-to-{@link ObjectId} mappings.
48   *
49   * <h3>File layout</h3>
50   *
51   * <pre>{@code
52   * database = record*
53   * record = revision commitId (24 bytes)
54   * revision = 32-bit signed big-endian integer (4 bytes)
55   * commitId = 160-bit SHA1 hash (20 bytes)
56   * }</pre>
57   *
58   * {@link CommitIdDatabase} makes use of the invariant where:
59   * <ul>
60   *   <li>A {@link Revision} in a repository always starts from 1 and monotonically increases by 1.</li>
61   *   <li>A record has fixed length of 24 bytes.</li>
62   * </ul>
63   * Therefore, {@link #put(Revision, ObjectId)} is always appending at the end of the database file and
64   * {@link #get(Revision)} is always reading a record at the offset {@code (revision - 1) * 24}.
65   */
66  final class CommitIdDatabase implements AutoCloseable {
67  
68      private static final Logger logger = LoggerFactory.getLogger(CommitIdDatabase.class);
69  
70      private static final int RECORD_LEN = 4 + 20; // 32-bit integer + 160-bit SHA1 hash
71  
72      private static final ThreadLocal<ByteBuffer> threadLocalBuffer =
73              ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(RECORD_LEN));
74  
75      private final Path path;
76      private final FileChannel channel;
77      private final boolean fsync;
78      private volatile Revision headRevision;
79  
80      CommitIdDatabase(Repository repo) {
81          // NB: We enable fsync only when our Git repository has been configured so,
82          //     because there's no point of doing fsync only on this file when the
83          //     Git repository does not.
84          this(repo.getDirectory(), repo.getConfig().getBoolean(CONFIG_CORE_SECTION, "fsyncObjectFiles", false));
85      }
86  
87      @VisibleForTesting
88      CommitIdDatabase(File rootDir) {
89          this(rootDir, false);
90      }
91  
92      private CommitIdDatabase(File rootDir, boolean fsync) {
93          path = new File(rootDir, "commit_ids.dat").toPath();
94          try {
95              channel = FileChannel.open(path,
96                                         StandardOpenOption.CREATE,
97                                         StandardOpenOption.READ,
98                                         StandardOpenOption.WRITE);
99          } catch (IOException e) {
100             throw new StorageException("failed to open a commit ID database: " + path, e);
101         }
102 
103         this.fsync = fsync;
104         boolean success = false;
105         try {
106             final long size;
107             try {
108                 size = channel.size();
109             } catch (IOException e) {
110                 throw new StorageException("failed to get the file length: " + path, e);
111             }
112 
113             if (size % RECORD_LEN != 0) {
114                 throw new StorageException("incorrect file length: " + path + " (" + size + " bytes)");
115             }
116 
117             final int numRecords = (int) (size / RECORD_LEN);
118             headRevision = numRecords > 0 ? new Revision(numRecords) : null;
119             success = true;
120         } finally {
121             if (!success) {
122                 close();
123             }
124         }
125     }
126 
127     @Nullable Revision headRevision() {
128         return headRevision;
129     }
130 
131     ObjectId get(Revision revision) {
132         final Revision headRevision = this.headRevision;
133         checkState(headRevision != null, "initial commit not available yet: %s", path);
134         checkArgument(!revision.isRelative(), "revision: %s (expected: an absolute revision)", revision);
135         if (revision.major() > headRevision.major()) {
136             throw new RevisionNotFoundException(revision);
137         }
138 
139         final ByteBuffer buf = threadLocalBuffer.get();
140         buf.clear();
141         long pos = (long) (revision.major() - 1) * RECORD_LEN;
142         try {
143             do {
144                 final int readBytes = channel.read(buf, pos);
145                 if (readBytes < 0) {
146                     throw new EOFException();
147                 }
148                 pos += readBytes;
149             } while (buf.hasRemaining());
150         } catch (IOException e) {
151             throw new StorageException("failed to read the commit ID database: " + path, e);
152         }
153 
154         buf.flip();
155 
156         final int actualRevision = buf.getInt();
157         if (actualRevision != revision.major()) {
158             throw new StorageException("incorrect revision number in the commit ID database: " + path +
159                                        "(actual: " + actualRevision + ", expected: " + revision.major() + ')');
160         }
161 
162         return new ObjectId(buf.getInt(), buf.getInt(), buf.getInt(), buf.getInt(), buf.getInt());
163     }
164 
165     void put(Revision revision, ObjectId commitId) {
166         put(revision, commitId, true);
167     }
168 
169     private synchronized void put(Revision revision, ObjectId commitId, boolean safeMode) {
170         if (safeMode) {
171             final Revision expected;
172             if (headRevision == null) {
173                 expected = Revision.INIT;
174             } else {
175                 expected = headRevision.forward(1);
176             }
177             checkState(revision.equals(expected), "incorrect revision: %s (expected: %s)", revision, expected);
178         }
179 
180         // Build a record.
181         final ByteBuffer buf = threadLocalBuffer.get();
182         buf.clear();
183         buf.putInt(revision.major());
184         commitId.copyRawTo(buf);
185         buf.flip();
186 
187         // Append a record to the file.
188         long pos = (long) (revision.major() - 1) * RECORD_LEN;
189         try {
190             do {
191                 pos += channel.write(buf, pos);
192             } while (buf.hasRemaining());
193 
194             if (safeMode && fsync) {
195                 channel.force(true);
196             }
197         } catch (IOException e) {
198             throw new StorageException("failed to update the commit ID database: " + path, e);
199         }
200 
201         if (safeMode ||
202             headRevision == null ||
203             headRevision.major() < revision.major()) {
204             headRevision = revision;
205         }
206     }
207 
208     void rebuild(Repository gitRepo) {
209         logger.warn("Rebuilding the commit ID database ..");
210 
211         // Drop everything.
212         try {
213             channel.truncate(0);
214         } catch (IOException e) {
215             throw new StorageException("failed to drop the commit ID database: " + path, e);
216         }
217         headRevision = null;
218 
219         // Get the commit IDs of all revisions.
220         try (RevWalk revWalk = new RevWalk(gitRepo)) {
221             final Revision headRevision;
222             final ObjectId headCommitId = gitRepo.resolve(R_HEADS_MASTER);
223             if (headCommitId == null) {
224                 throw new StorageException("failed to determine the HEAD: " + gitRepo.getDirectory());
225             }
226 
227             RevCommit revCommit = revWalk.parseCommit(headCommitId);
228             headRevision = CommitUtil.extractRevision(revCommit.getFullMessage());
229 
230             // NB: We did not store the last commit ID until all commit IDs are stored,
231             //     so that the partially built database always has mismatching head revision.
232 
233             ObjectId currentId;
234             Revision previousRevision = headRevision;
235             loop: for (;;) {
236                 switch (revCommit.getParentCount()) {
237                     case 0:
238                         // End of the history
239                         break loop;
240                     case 1:
241                         currentId = revCommit.getParent(0);
242                         break;
243                     default:
244                         throw new StorageException("found more than one parent: " +
245                                                    gitRepo.getDirectory());
246                 }
247 
248                 revCommit = revWalk.parseCommit(currentId);
249 
250                 final Revision currentRevision = CommitUtil.extractRevision(revCommit.getFullMessage());
251                 final Revision expectedRevision = previousRevision.backward(1);
252                 if (!currentRevision.equals(expectedRevision)) {
253                     throw new StorageException("mismatching revision: " + gitRepo.getDirectory() +
254                                                " (actual: " + currentRevision.major() +
255                                                ", expected: " + expectedRevision.major() + ')');
256                 }
257 
258                 put(currentRevision, currentId, false);
259                 previousRevision = currentRevision;
260             }
261 
262             // All commit IDs except the head have been stored. Store the head finally.
263             put(headRevision, headCommitId);
264         } catch (Exception e) {
265             throw new StorageException("failed to rebuild the commit ID database", e);
266         }
267 
268         logger.info("Rebuilt the commit ID database.");
269     }
270 
271     @Override
272     public void close() {
273         try {
274             channel.close();
275         } catch (IOException e) {
276             logger.warn("Failed to close the commit ID database: {}", path, e);
277         }
278     }
279 }