1   /*
2    * Copyright 2019 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.linecorp.centraldogma.server.metadata;
18  
19  import static com.linecorp.armeria.common.util.Functions.voidFunction;
20  import static java.util.Objects.requireNonNull;
21  
22  import java.util.concurrent.CompletableFuture;
23  import java.util.concurrent.CompletionStage;
24  import java.util.function.Function;
25  import java.util.function.Supplier;
26  
27  import com.fasterxml.jackson.databind.JsonNode;
28  import com.google.common.collect.ImmutableList;
29  
30  import com.linecorp.armeria.common.util.Exceptions;
31  import com.linecorp.centraldogma.common.Author;
32  import com.linecorp.centraldogma.common.Change;
33  import com.linecorp.centraldogma.common.ChangeConflictException;
34  import com.linecorp.centraldogma.common.Entry;
35  import com.linecorp.centraldogma.common.Markup;
36  import com.linecorp.centraldogma.common.RedundantChangeException;
37  import com.linecorp.centraldogma.common.Revision;
38  import com.linecorp.centraldogma.internal.Jackson;
39  import com.linecorp.centraldogma.server.command.Command;
40  import com.linecorp.centraldogma.server.command.CommandExecutor;
41  import com.linecorp.centraldogma.server.command.CommitResult;
42  import com.linecorp.centraldogma.server.storage.project.ProjectManager;
43  import com.linecorp.centraldogma.server.storage.repository.Repository;
44  
45  final class RepositorySupport<T> {
46  
47      private final ProjectManager projectManager;
48      private final CommandExecutor executor;
49      private final Function<Entry<?>, T> entryConverter;
50  
51      RepositorySupport(ProjectManager projectManager, CommandExecutor executor,
52                        Function<Entry<?>, T> entryConverter) {
53          this.projectManager = requireNonNull(projectManager, "projectManager");
54          this.executor = requireNonNull(executor, "executor");
55          this.entryConverter = requireNonNull(entryConverter, "entryConverter");
56      }
57  
58      public ProjectManager projectManager() {
59          return projectManager;
60      }
61  
62      CompletableFuture<HolderWithRevision<T>> fetch(String projectName, String repoName, String path) {
63          requireNonNull(projectName, "projectName");
64          requireNonNull(repoName, "repoName");
65          return fetch(projectManager().get(projectName).repos().get(repoName), path);
66      }
67  
68      CompletableFuture<HolderWithRevision<T>> fetch(String projectName, String repoName, String path,
69                                                     Revision revision) {
70          requireNonNull(projectName, "projectName");
71          requireNonNull(repoName, "repoName");
72          requireNonNull(revision, "revision");
73          return fetch(projectManager().get(projectName).repos().get(repoName), path, revision);
74      }
75  
76      private CompletableFuture<HolderWithRevision<T>> fetch(Repository repository, String path) {
77          requireNonNull(path, "path");
78          final Revision revision = normalize(repository);
79          return fetch(repository, path, revision);
80      }
81  
82      private CompletableFuture<HolderWithRevision<T>> fetch(Repository repository, String path,
83                                                             Revision revision) {
84          requireNonNull(repository, "repository");
85          requireNonNull(path, "path");
86          requireNonNull(revision, "revision");
87          return repository.get(revision, path)
88                           .thenApply(entryConverter)
89                           .thenApply((T obj) -> HolderWithRevision.of(obj, revision));
90      }
91  
92      CompletableFuture<Revision> push(String projectName, String repoName,
93                                       Author author, String commitSummary, Change<?> change) {
94          return push(projectName, repoName, author, commitSummary, change, Revision.HEAD);
95      }
96  
97      private CompletableFuture<Revision> push(String projectName, String repoName, Author author,
98                                               String commitSummary, Change<?> change, Revision revision) {
99          requireNonNull(projectName, "projectName");
100         requireNonNull(repoName, "repoName");
101         requireNonNull(author, "author");
102         requireNonNull(commitSummary, "commitSummary");
103         requireNonNull(change, "change");
104 
105         return executor.execute(
106                 Command.push(author, projectName, repoName, revision, commitSummary, "",
107                              Markup.PLAINTEXT, ImmutableList.of(change)))
108                        .thenApply(CommitResult::revision);
109     }
110 
111     CompletableFuture<Revision> push(String projectName, String repoName, Author author, String commitSummary,
112                                      Supplier<CompletionStage<HolderWithRevision<Change<?>>>> changeSupplier) {
113         requireNonNull(projectName, "projectName");
114         requireNonNull(repoName, "repoName");
115         requireNonNull(author, "author");
116         requireNonNull(commitSummary, "commitSummary");
117         requireNonNull(changeSupplier, "changeSupplier");
118 
119         final CompletableFuture<Revision> future = new CompletableFuture<>();
120         push(projectName, repoName, author, commitSummary, changeSupplier, future);
121         return future;
122     }
123 
124     private void push(String projectName, String repoName, Author author, String commitSummary,
125                       Supplier<CompletionStage<HolderWithRevision<Change<?>>>> changeSupplier,
126                       CompletableFuture<Revision> future) {
127         changeSupplier.get().thenAccept(changeWithRevision -> {
128             final Revision revision = changeWithRevision.revision();
129             final Change<?> change = changeWithRevision.object();
130 
131             push(projectName, repoName, author, commitSummary, change, revision)
132                     .thenAccept(future::complete)
133                     .exceptionally(voidFunction(cause -> {
134                         cause = Exceptions.peel(cause);
135                         if (cause instanceof ChangeConflictException) {
136                             final Revision latestRevision;
137                             try {
138                                 latestRevision = projectManager().get(projectName).repos().get(repoName)
139                                                                  .normalizeNow(Revision.HEAD);
140                             } catch (Throwable cause1) {
141                                 future.completeExceptionally(cause1);
142                                 return;
143                             }
144 
145                             if (revision.equals(latestRevision)) {
146                                 future.completeExceptionally(cause);
147                                 return;
148                             }
149                             // Try again.
150                             push(projectName, repoName, author, commitSummary, changeSupplier, future);
151                         } else if (cause instanceof RedundantChangeException) {
152                             future.complete(revision);
153                         } else {
154                             future.completeExceptionally(cause);
155                         }
156                     }));
157         }).exceptionally(voidFunction(future::completeExceptionally));
158     }
159 
160     Revision normalize(Repository repository) {
161         requireNonNull(repository, "repository");
162         try {
163             return repository.normalizeNow(Revision.HEAD);
164         } catch (Throwable cause) {
165             return Exceptions.throwUnsafely(cause);
166         }
167     }
168 
169     @SuppressWarnings("unchecked")
170     static <T> T convertWithJackson(Entry<?> entry, Class<T> clazz) {
171         requireNonNull(entry, "entry");
172         requireNonNull(clazz, "clazz");
173         try {
174             return Jackson.treeToValue(((Entry<JsonNode>) entry).content(), clazz);
175         } catch (Throwable cause) {
176             return Exceptions.throwUnsafely(cause);
177         }
178     }
179 }