1   /*
2    * Copyright 2019 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.linecorp.centraldogma.client.armeria;
18  
19  import static com.google.common.base.MoreObjects.firstNonNull;
20  import static com.google.common.base.Preconditions.checkArgument;
21  import static com.google.common.collect.ImmutableList.toImmutableList;
22  import static com.google.common.collect.ImmutableMap.toImmutableMap;
23  import static com.google.common.collect.ImmutableSet.toImmutableSet;
24  import static com.linecorp.centraldogma.internal.Util.unsafeCast;
25  import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.PROJECTS_PREFIX;
26  import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.REMOVED;
27  import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.REPOS;
28  import static com.spotify.futures.CompletableFutures.exceptionallyCompletedFuture;
29  import static java.util.Objects.requireNonNull;
30  
31  import java.io.UnsupportedEncodingException;
32  import java.net.URLEncoder;
33  import java.nio.charset.Charset;
34  import java.nio.charset.StandardCharsets;
35  import java.time.Instant;
36  import java.time.format.DateTimeFormatter;
37  import java.util.List;
38  import java.util.Map;
39  import java.util.Set;
40  import java.util.concurrent.CompletableFuture;
41  import java.util.concurrent.ScheduledExecutorService;
42  import java.util.function.BiFunction;
43  import java.util.function.Function;
44  
45  import javax.annotation.Nullable;
46  
47  import com.fasterxml.jackson.core.JsonParseException;
48  import com.fasterxml.jackson.core.JsonProcessingException;
49  import com.fasterxml.jackson.databind.JsonNode;
50  import com.fasterxml.jackson.databind.node.ArrayNode;
51  import com.fasterxml.jackson.databind.node.JsonNodeFactory;
52  import com.fasterxml.jackson.databind.node.JsonNodeType;
53  import com.fasterxml.jackson.databind.node.ObjectNode;
54  import com.google.common.collect.ImmutableList;
55  import com.google.common.collect.ImmutableMap;
56  import com.google.common.collect.ImmutableSet;
57  import com.google.common.collect.Iterables;
58  import com.google.common.collect.Streams;
59  import com.google.common.math.LongMath;
60  
61  import com.linecorp.armeria.client.Clients;
62  import com.linecorp.armeria.client.WebClient;
63  import com.linecorp.armeria.common.AggregatedHttpResponse;
64  import com.linecorp.armeria.common.HttpHeaderNames;
65  import com.linecorp.armeria.common.HttpMethod;
66  import com.linecorp.armeria.common.HttpStatus;
67  import com.linecorp.armeria.common.HttpStatusClass;
68  import com.linecorp.armeria.common.MediaType;
69  import com.linecorp.armeria.common.RequestHeaders;
70  import com.linecorp.armeria.common.RequestHeadersBuilder;
71  import com.linecorp.armeria.common.stream.ClosedStreamException;
72  import com.linecorp.armeria.common.util.Exceptions;
73  import com.linecorp.armeria.common.util.SafeCloseable;
74  import com.linecorp.armeria.common.util.TimeoutMode;
75  import com.linecorp.centraldogma.client.AbstractCentralDogma;
76  import com.linecorp.centraldogma.client.CentralDogmaRepository;
77  import com.linecorp.centraldogma.client.RepositoryInfo;
78  import com.linecorp.centraldogma.common.Author;
79  import com.linecorp.centraldogma.common.AuthorizationException;
80  import com.linecorp.centraldogma.common.CentralDogmaException;
81  import com.linecorp.centraldogma.common.Change;
82  import com.linecorp.centraldogma.common.ChangeConflictException;
83  import com.linecorp.centraldogma.common.ChangeType;
84  import com.linecorp.centraldogma.common.Commit;
85  import com.linecorp.centraldogma.common.Entry;
86  import com.linecorp.centraldogma.common.EntryNotFoundException;
87  import com.linecorp.centraldogma.common.EntryType;
88  import com.linecorp.centraldogma.common.InvalidPushException;
89  import com.linecorp.centraldogma.common.Markup;
90  import com.linecorp.centraldogma.common.MergeQuery;
91  import com.linecorp.centraldogma.common.MergedEntry;
92  import com.linecorp.centraldogma.common.PathPattern;
93  import com.linecorp.centraldogma.common.ProjectExistsException;
94  import com.linecorp.centraldogma.common.ProjectNotFoundException;
95  import com.linecorp.centraldogma.common.PushResult;
96  import com.linecorp.centraldogma.common.Query;
97  import com.linecorp.centraldogma.common.QueryExecutionException;
98  import com.linecorp.centraldogma.common.QueryType;
99  import com.linecorp.centraldogma.common.ReadOnlyException;
100 import com.linecorp.centraldogma.common.RedundantChangeException;
101 import com.linecorp.centraldogma.common.RepositoryExistsException;
102 import com.linecorp.centraldogma.common.RepositoryNotFoundException;
103 import com.linecorp.centraldogma.common.Revision;
104 import com.linecorp.centraldogma.common.RevisionNotFoundException;
105 import com.linecorp.centraldogma.common.ShuttingDownException;
106 import com.linecorp.centraldogma.internal.HistoryConstants;
107 import com.linecorp.centraldogma.internal.Jackson;
108 import com.linecorp.centraldogma.internal.Util;
109 import com.linecorp.centraldogma.internal.api.v1.WatchTimeout;
110 
111 final class ArmeriaCentralDogma extends AbstractCentralDogma {
112 
113     private static final MediaType JSON_PATCH_UTF8 = MediaType.JSON_PATCH.withCharset(StandardCharsets.UTF_8);
114 
115     private static final byte[] UNREMOVE_PATCH = toBytes(JsonNodeFactory.instance.arrayNode(1).add(
116             JsonNodeFactory.instance.objectNode().put("op", "replace")
117                                     .put("path", "/status")
118                                     .put("value", "active")));
119     private static final String REMOVED_PARAM = "?status=removed";
120 
121     private static final Map<String, Function<String, CentralDogmaException>> EXCEPTION_FACTORIES =
122             ImmutableMap.<String, Function<String, CentralDogmaException>>builder()
123                         .put(ProjectExistsException.class.getName(), ProjectExistsException::new)
124                         .put(ProjectNotFoundException.class.getName(), ProjectNotFoundException::new)
125                         .put(QueryExecutionException.class.getName(), QueryExecutionException::new)
126                         .put(RedundantChangeException.class.getName(), RedundantChangeException::new)
127                         .put(RevisionNotFoundException.class.getName(), RevisionNotFoundException::new)
128                         .put(EntryNotFoundException.class.getName(), EntryNotFoundException::new)
129                         .put(ChangeConflictException.class.getName(), ChangeConflictException::new)
130                         .put(RepositoryNotFoundException.class.getName(), RepositoryNotFoundException::new)
131                         .put(AuthorizationException.class.getName(), AuthorizationException::new)
132                         .put(ShuttingDownException.class.getName(), ShuttingDownException::new)
133                         .put(RepositoryExistsException.class.getName(), RepositoryExistsException::new)
134                         .put(InvalidPushException.class.getName(), InvalidPushException::new)
135                         .put(ReadOnlyException.class.getName(), ReadOnlyException::new)
136                         .build();
137 
138     private final WebClient client;
139     private final String authorization;
140 
141     ArmeriaCentralDogma(ScheduledExecutorService blockingTaskExecutor, WebClient client, String accessToken) {
142         super(blockingTaskExecutor);
143         this.client = requireNonNull(client, "client");
144         authorization = "Bearer " + requireNonNull(accessToken, "accessToken");
145     }
146 
147     @Override
148     public CompletableFuture<Void> whenEndpointReady() {
149         return client.endpointGroup().whenReady().thenRun(() -> {});
150     }
151 
152     @Override
153     public CompletableFuture<Void> createProject(String projectName) {
154         validateProjectName(projectName);
155         try {
156             final ObjectNode root = JsonNodeFactory.instance.objectNode();
157             root.put("name", projectName);
158 
159             return client.execute(headers(HttpMethod.POST, PROJECTS_PREFIX), toBytes(root))
160                          .aggregate()
161                          .thenApply(ArmeriaCentralDogma::createProject);
162         } catch (Exception e) {
163             return exceptionallyCompletedFuture(e);
164         }
165     }
166 
167     private static Void createProject(AggregatedHttpResponse res) {
168         switch (res.status().code()) {
169             case 200:
170             case 201:
171                 return null;
172         }
173         return handleErrorResponse(res);
174     }
175 
176     @Override
177     public CompletableFuture<Void> removeProject(String projectName) {
178         validateProjectName(projectName);
179         try {
180             return client.execute(headers(HttpMethod.DELETE, pathBuilder(projectName).toString()))
181                          .aggregate()
182                          .thenApply(ArmeriaCentralDogma::removeProject);
183         } catch (Exception e) {
184             return exceptionallyCompletedFuture(e);
185         }
186     }
187 
188     private static Void removeProject(AggregatedHttpResponse res) {
189         switch (res.status().code()) {
190             case 200:
191             case 204:
192                 return null;
193         }
194         return handleErrorResponse(res);
195     }
196 
197     @Override
198     public CompletableFuture<Void> purgeProject(String projectName) {
199         validateProjectName(projectName);
200         try {
201             return client.execute(headers(HttpMethod.DELETE, pathBuilder(projectName).append(REMOVED)
202                                                                                      .toString()))
203                          .aggregate()
204                          .thenApply(ArmeriaCentralDogma::handlePurgeResult);
205         } catch (Exception e) {
206             return exceptionallyCompletedFuture(e);
207         }
208     }
209 
210     @Override
211     public CompletableFuture<Void> unremoveProject(String projectName) {
212         validateProjectName(projectName);
213         try {
214             return client.execute(headers(HttpMethod.PATCH, pathBuilder(projectName).toString()),
215                                   UNREMOVE_PATCH)
216                          .aggregate()
217                          .thenApply(ArmeriaCentralDogma::unremoveProject);
218         } catch (Exception e) {
219             return exceptionallyCompletedFuture(e);
220         }
221     }
222 
223     private static Void unremoveProject(AggregatedHttpResponse res) {
224         if (res.status().code() == 200) { // OK
225             return null;
226         }
227         return handleErrorResponse(res);
228     }
229 
230     @Override
231     public CompletableFuture<Set<String>> listProjects() {
232         return client.execute(headers(HttpMethod.GET, PROJECTS_PREFIX))
233                      .aggregate()
234                      .thenApply(ArmeriaCentralDogma::handleNameList);
235     }
236 
237     @Override
238     public CompletableFuture<Set<String>> listRemovedProjects() {
239         return client.execute(headers(HttpMethod.GET, PROJECTS_PREFIX + REMOVED_PARAM))
240                      .aggregate()
241                      .thenApply(ArmeriaCentralDogma::handleNameList);
242     }
243 
244     @Override
245     public CompletableFuture<CentralDogmaRepository> createRepository(String projectName,
246                                                                       String repositoryName) {
247         validateProjectAndRepositoryName(projectName, repositoryName);
248         try {
249             final String path = pathBuilder(projectName).append(REPOS).toString();
250             final ObjectNode root = JsonNodeFactory.instance.objectNode();
251             root.put("name", repositoryName);
252 
253             return client.execute(headers(HttpMethod.POST, path), toBytes(root))
254                          .aggregate()
255                          .thenApply(res -> {
256                              switch (res.status().code()) {
257                                  case 200:
258                                  case 201:
259                                      return forRepo(projectName, repositoryName);
260                              }
261                              return handleErrorResponse(res);
262                          });
263         } catch (Exception e) {
264             return exceptionallyCompletedFuture(e);
265         }
266     }
267 
268     @Override
269     public CompletableFuture<Void> removeRepository(String projectName, String repositoryName) {
270         validateProjectAndRepositoryName(projectName, repositoryName);
271         try {
272             return client.execute(headers(HttpMethod.DELETE,
273                                           pathBuilder(projectName, repositoryName).toString()))
274                          .aggregate()
275                          .thenApply(ArmeriaCentralDogma::removeRepository);
276         } catch (Exception e) {
277             return exceptionallyCompletedFuture(e);
278         }
279     }
280 
281     private static Void removeRepository(AggregatedHttpResponse res) {
282         switch (res.status().code()) {
283             case 200:
284             case 204:
285                 return null;
286         }
287         return handleErrorResponse(res);
288     }
289 
290     @Override
291     public CompletableFuture<Void> purgeRepository(String projectName, String repositoryName) {
292         validateProjectAndRepositoryName(projectName, repositoryName);
293         try {
294             return client.execute(headers(HttpMethod.DELETE,
295                                           pathBuilder(projectName, repositoryName).append(REMOVED).toString()))
296                          .aggregate()
297                          .thenApply(ArmeriaCentralDogma::handlePurgeResult);
298         } catch (Exception e) {
299             return exceptionallyCompletedFuture(e);
300         }
301     }
302 
303     private static Void handlePurgeResult(AggregatedHttpResponse res) {
304         switch (res.status().code()) {
305             case 200:
306             case 204:
307                 return null;
308         }
309         return handleErrorResponse(res);
310     }
311 
312     @Override
313     public CompletableFuture<CentralDogmaRepository> unremoveRepository(String projectName,
314                                                                         String repositoryName) {
315         validateProjectAndRepositoryName(projectName, repositoryName);
316         try {
317             return client.execute(headers(HttpMethod.PATCH,
318                                           pathBuilder(projectName, repositoryName).toString()),
319                                   UNREMOVE_PATCH)
320                          .aggregate()
321                          .thenApply(res -> {
322                              if (res.status().code() == 200) {
323                                  return forRepo(projectName, repositoryName);
324                              }
325                              return handleErrorResponse(res);
326                          });
327         } catch (Exception e) {
328             return exceptionallyCompletedFuture(e);
329         }
330     }
331 
332     @Override
333     public CompletableFuture<Map<String, RepositoryInfo>> listRepositories(String projectName) {
334         validateProjectName(projectName);
335         try {
336             return client.execute(headers(HttpMethod.GET, pathBuilder(projectName).append(REPOS).toString()))
337                          .aggregate()
338                          .thenApply(ArmeriaCentralDogma::listRepositories);
339         } catch (Exception e) {
340             return exceptionallyCompletedFuture(e);
341         }
342     }
343 
344     private static Map<String, RepositoryInfo> listRepositories(AggregatedHttpResponse res) {
345         switch (res.status().code()) {
346             case 200:
347                 return Streams.stream(toJson(res, JsonNodeType.ARRAY))
348                               .map(node -> {
349                                   final String name = getField(node, "name").asText();
350                                   final Revision headRevision =
351                                           new Revision(getField(node, "headRevision").asInt());
352                                   return new RepositoryInfo(name, headRevision);
353                               })
354                               .collect(toImmutableMap(RepositoryInfo::name, Function.identity()));
355             case 204:
356                 return ImmutableMap.of();
357         }
358         return handleErrorResponse(res);
359     }
360 
361     @Override
362     public CompletableFuture<Set<String>> listRemovedRepositories(String projectName) {
363         validateProjectName(projectName);
364         try {
365             return client.execute(headers(HttpMethod.GET,
366                                           pathBuilder(projectName).append(REPOS)
367                                                                   .append(REMOVED_PARAM).toString()))
368                          .aggregate()
369                          .thenApply(ArmeriaCentralDogma::handleNameList);
370         } catch (Exception e) {
371             return exceptionallyCompletedFuture(e);
372         }
373     }
374 
375     @Override
376     public CompletableFuture<Revision> normalizeRevision(String projectName, String repositoryName,
377                                                          Revision revision) {
378         validateProjectAndRepositoryName(projectName, repositoryName);
379         requireNonNull(revision, "revision");
380         try {
381             final String path = pathBuilder(projectName, repositoryName)
382                     .append("/revision/")
383                     .append(revision.text())
384                     .toString();
385 
386             return client.execute(headers(HttpMethod.GET, path))
387                          .aggregate()
388                          .thenApply(ArmeriaCentralDogma::normalizeRevision);
389         } catch (Exception e) {
390             return exceptionallyCompletedFuture(e);
391         }
392     }
393 
394     private static Revision normalizeRevision(AggregatedHttpResponse res) {
395         if (res.status().code() == 200) {
396             return new Revision(getField(toJson(res, JsonNodeType.OBJECT), "revision").asInt());
397         }
398         return handleErrorResponse(res);
399     }
400 
401     @Override
402     public CompletableFuture<Map<String, EntryType>> listFiles(String projectName, String repositoryName,
403                                                                Revision revision, PathPattern pathPattern) {
404         validateProjectAndRepositoryName(projectName, repositoryName);
405         requireNonNull(revision, "revision");
406         requireNonNull(pathPattern, "pathPattern");
407         try {
408             final StringBuilder path = pathBuilder(projectName, repositoryName);
409             path.append("/list").append(pathPattern.encoded()).append("?revision=").append(revision.major());
410 
411             return client.execute(headers(HttpMethod.GET, path.toString()))
412                          .aggregate()
413                          .thenApply(ArmeriaCentralDogma::listFiles);
414         } catch (Exception e) {
415             return exceptionallyCompletedFuture(e);
416         }
417     }
418 
419     private static Map<String, EntryType> listFiles(AggregatedHttpResponse res) {
420         switch (res.status().code()) {
421             case 200:
422                 final ImmutableMap.Builder<String, EntryType> builder = ImmutableMap.builder();
423                 final JsonNode node = toJson(res, JsonNodeType.ARRAY);
424                 node.forEach(e -> builder.put(
425                         getField(e, "path").asText(),
426                         EntryType.valueOf(getField(e, "type").asText())));
427                 return builder.build();
428             case 204:
429                 return ImmutableMap.of();
430         }
431 
432         return handleErrorResponse(res);
433     }
434 
435     @Override
436     public <T> CompletableFuture<Entry<T>> getFile(String projectName, String repositoryName, Revision revision,
437                                                    Query<T> query) {
438         validateProjectAndRepositoryName(projectName, repositoryName);
439         requireNonNull(revision, "revision");
440         requireNonNull(query, "query");
441         try {
442             // TODO(trustin) No need to normalize a revision once server response contains it.
443             return maybeNormalizeRevision(projectName, repositoryName, revision).thenCompose(normRev -> {
444                 final StringBuilder path = pathBuilder(projectName, repositoryName);
445                 path.append("/contents").append(query.path());
446                 path.append("?revision=").append(normRev.text());
447                 appendJsonPaths(path, query.type(), query.expressions());
448 
449                 return client.execute(headers(HttpMethod.GET, path.toString()))
450                              .aggregate()
451                              .thenApply(res -> getFile(normRev, res, query));
452             });
453         } catch (Exception e) {
454             return exceptionallyCompletedFuture(e);
455         }
456     }
457 
458     private static <T> Entry<T> getFile(Revision normRev, AggregatedHttpResponse res, Query<T> query) {
459         if (res.status().code() == 200) {
460             final JsonNode node = toJson(res, JsonNodeType.OBJECT);
461             return toEntry(normRev, node, query.type());
462         }
463 
464         return handleErrorResponse(res);
465     }
466 
467     @Override
468     public CompletableFuture<Map<String, Entry<?>>> getFiles(String projectName, String repositoryName,
469                                                              Revision revision, PathPattern pathPattern) {
470         validateProjectAndRepositoryName(projectName, repositoryName);
471         requireNonNull(revision, "revision");
472         requireNonNull(pathPattern, "pathPattern");
473         try {
474             // TODO(trustin) No need to normalize a revision once server response contains it.
475             return maybeNormalizeRevision(projectName, repositoryName, revision).thenCompose(normRev -> {
476                 final StringBuilder path = pathBuilder(projectName, repositoryName);
477                 path.append("/contents")
478                     .append(pathPattern.encoded())
479                     .append("?revision=")
480                     .append(normRev.major());
481 
482                 return client.execute(headers(HttpMethod.GET, path.toString()))
483                              .aggregate()
484                              .thenApply(res -> getFiles(normRev, res));
485             });
486         } catch (Exception e) {
487             return exceptionallyCompletedFuture(e);
488         }
489     }
490 
491     private static Map<String, Entry<?>> getFiles(Revision normRev, AggregatedHttpResponse res) {
492         switch (res.status().code()) {
493             case 200:
494                 final JsonNode node = toJson(res, null);
495                 final ImmutableMap.Builder<String, Entry<?>> builder = ImmutableMap.builder();
496                 if (node.isObject()) { // Single entry
497                     final Entry<?> entry = toEntry(normRev, node, QueryType.IDENTITY);
498                     builder.put(entry.path(), entry);
499                 } else if (node.isArray()) { // Multiple entries
500                     node.forEach(e -> {
501                         final Entry<?> entry = toEntry(normRev, e, QueryType.IDENTITY);
502                         builder.put(entry.path(), entry);
503                     });
504                 } else {
505                     return rejectNeitherArrayNorObject(res);
506                 }
507                 return builder.build();
508             case 204:
509                 return ImmutableMap.of();
510         }
511 
512         return handleErrorResponse(res);
513     }
514 
515     @Override
516     public <T> CompletableFuture<MergedEntry<T>> mergeFiles(String projectName, String repositoryName,
517                                                             Revision revision, MergeQuery<T> mergeQuery) {
518         validateProjectAndRepositoryName(projectName, repositoryName);
519         requireNonNull(revision, "revision");
520         requireNonNull(mergeQuery, "mergeQuery");
521         try {
522             final StringBuilder path = pathBuilder(projectName, repositoryName);
523             path.append("/merge?revision=").append(revision.major());
524             mergeQuery.mergeSources().forEach(
525                     src -> path.append(src.isOptional() ? "&optional_path=" : "&path=")
526                                .append(encodeParam(src.path())));
527             appendJsonPaths(path, mergeQuery.type(), mergeQuery.expressions());
528             return client.execute(headers(HttpMethod.GET, path.toString()))
529                          .aggregate()
530                          .thenApply(ArmeriaCentralDogma::mergeFiles);
531         } catch (Exception e) {
532             return exceptionallyCompletedFuture(e);
533         }
534     }
535 
536     private static <T> MergedEntry<T> mergeFiles(AggregatedHttpResponse res) {
537         if (res.status().code() == 200) {
538             final JsonNode node = toJson(res, JsonNodeType.OBJECT);
539 
540             // Build path list.
541             final ImmutableList.Builder<String> pathsBuilder = ImmutableList.builder();
542             for (JsonNode path : getField(node, "paths")) {
543                 if (path.getNodeType() != JsonNodeType.STRING) {
544                     throw new CentralDogmaException("Received a merged entry with a non-string path: " + node);
545                 }
546                 pathsBuilder.add(path.asText());
547             }
548             final List<String> paths = pathsBuilder.build();
549             if (paths.isEmpty()) {
550                 throw new CentralDogmaException("Received a merged entry with empty paths: " + node);
551             }
552 
553             // Build the merged entry.
554             final Revision revision = new Revision(getField(node, "revision").asInt());
555             final EntryType type = EntryType.valueOf(getField(node, "type").asText());
556             final JsonNode content = getField(node, "content");
557             switch (type) {
558                 case JSON: {
559                     @SuppressWarnings("unchecked")
560                     final MergedEntry<T> cast =
561                             (MergedEntry<T>) MergedEntry.of(revision, type, content, paths);
562                     return cast;
563                 }
564                 case TEXT: {
565                     if (content.getNodeType() != JsonNodeType.STRING) {
566                         throw new CentralDogmaException(
567                                 "Received a TEXT merged entry whose content is not a string: " + node);
568                     }
569                     @SuppressWarnings("unchecked")
570                     final MergedEntry<T> cast =
571                             (MergedEntry<T>) MergedEntry.of(revision, type, content.asText(), paths);
572                     return cast;
573                 }
574                 default:
575                     throw new CentralDogmaException(
576                             "Received a merged entry whose type is neither JSON nor TEXT: " + node);
577             }
578         }
579 
580         return handleErrorResponse(res);
581     }
582 
583     @Override
584     public CompletableFuture<List<Commit>> getHistory(String projectName, String repositoryName,
585                                                       Revision from, Revision to,
586                                                       PathPattern pathPattern,
587                                                       int maxCommits) {
588         requireNonNull(from, "from");
589         requireNonNull(to, "to");
590         requireNonNull(pathPattern, "pathPattern");
591         validateProjectAndRepositoryName(projectName, repositoryName);
592         checkArgument(maxCommits >= 0 && maxCommits <= HistoryConstants.MAX_MAX_COMMITS,
593                       "maxCommits: %s (expected: 0 <= maxCommits <= %s)",
594                       maxCommits, HistoryConstants.MAX_MAX_COMMITS);
595         try {
596             final StringBuilder path = pathBuilder(projectName, repositoryName);
597             path.append("/commits/").append(from.text());
598             path.append("?to=").append(to.text());
599             path.append("&path=").append(pathPattern.encoded());
600             if (maxCommits > 0) {
601                 path.append("&maxCommits=").append(maxCommits);
602             }
603 
604             return client.execute(headers(HttpMethod.GET, path.toString()))
605                          .aggregate()
606                          .thenApply(ArmeriaCentralDogma::getHistory);
607         } catch (Exception e) {
608             return exceptionallyCompletedFuture(e);
609         }
610     }
611 
612     private static List<Commit> getHistory(AggregatedHttpResponse res) {
613         switch (res.status().code()) {
614             case 200:
615                 final JsonNode node = toJson(res, null);
616                 if (node.isObject()) {
617                     return ImmutableList.of(toCommit(node));
618                 } else if (node.isArray()) {
619                     return Streams.stream(node)
620                                   .map(ArmeriaCentralDogma::toCommit)
621                                   .collect(toImmutableList());
622                 } else {
623                     return rejectNeitherArrayNorObject(res);
624                 }
625             case 204:
626                 return ImmutableList.of();
627         }
628 
629         return handleErrorResponse(res);
630     }
631 
632     @Override
633     public <T> CompletableFuture<Change<T>> getDiff(String projectName, String repositoryName, Revision from,
634                                                     Revision to, Query<T> query) {
635         validateProjectAndRepositoryName(projectName, repositoryName);
636         requireNonNull(from, "from");
637         requireNonNull(to, "to");
638         requireNonNull(query, "query");
639         try {
640             final StringBuilder path = pathBuilder(projectName, repositoryName);
641             path.append("/compare");
642             path.append("?path=").append(encodeParam(query.path()));
643             path.append("&from=").append(from.text());
644             path.append("&to=").append(to.text());
645             appendJsonPaths(path, query.type(), query.expressions());
646 
647             return client.execute(headers(HttpMethod.GET, path.toString()))
648                          .aggregate()
649                          .thenApply(ArmeriaCentralDogma::getDiff);
650         } catch (Exception e) {
651             return exceptionallyCompletedFuture(e);
652         }
653     }
654 
655     @Nullable
656     private static <T> Change<T> getDiff(AggregatedHttpResponse res) {
657         switch (res.status().code()) {
658             case 200:
659                 return toChange(toJson(res, JsonNodeType.OBJECT));
660             case 204:
661                 return null;
662         }
663         return handleErrorResponse(res);
664     }
665 
666     @Override
667     public CompletableFuture<List<Change<?>>> getDiff(String projectName, String repositoryName, Revision from,
668                                                       Revision to, PathPattern pathPattern) {
669         validateProjectAndRepositoryName(projectName, repositoryName);
670         requireNonNull(from, "from");
671         requireNonNull(to, "to");
672         requireNonNull(pathPattern, "pathPattern");
673         try {
674             final StringBuilder path = pathBuilder(projectName, repositoryName);
675             path.append("/compare");
676             path.append("?pathPattern=").append(pathPattern.encoded());
677             path.append("&from=").append(from.text());
678             path.append("&to=").append(to.text());
679 
680             return client.execute(headers(HttpMethod.GET, path.toString()))
681                          .aggregate()
682                          .thenApply(res -> {
683                              if (res.status().code() == 200) {
684                                  final JsonNode node = toJson(res, null);
685                                  if (node.isObject()) {
686                                      return ImmutableList.of(toChange(node));
687                                  } else if (node.isArray()) {
688                                      return Streams.stream(node)
689                                                    .map(ArmeriaCentralDogma::toChange)
690                                                    .collect(toImmutableList());
691                                  } else {
692                                      return rejectNeitherArrayNorObject(res);
693                                  }
694                              }
695 
696                              return handleErrorResponse(res);
697                          });
698         } catch (Exception e) {
699             return exceptionallyCompletedFuture(e);
700         }
701     }
702 
703     @Override
704     public CompletableFuture<List<Change<?>>> getPreviewDiffs(String projectName, String repositoryName,
705                                                               Revision baseRevision,
706                                                               Iterable<? extends Change<?>> changes) {
707         validateProjectAndRepositoryName(projectName, repositoryName);
708         requireNonNull(baseRevision, "baseRevision");
709         requireNonNull(changes, "changes");
710         try {
711             final String path = pathBuilder(projectName, repositoryName)
712                     .append("/preview?revision=")
713                     .append(baseRevision.text())
714                     .toString();
715 
716             final ArrayNode changesNode = toJson(changes);
717             return client.execute(headers(HttpMethod.POST, path), toBytes(changesNode))
718                          .aggregate()
719                          .thenApply(ArmeriaCentralDogma::getPreviewDiffs);
720         } catch (Exception e) {
721             return exceptionallyCompletedFuture(e);
722         }
723     }
724 
725     private static List<Change<?>> getPreviewDiffs(AggregatedHttpResponse res) {
726         switch (res.status().code()) {
727             case 200:
728                 final JsonNode node = toJson(res, JsonNodeType.ARRAY);
729                 final ImmutableList.Builder<Change<?>> builder = ImmutableList.builder();
730                 node.forEach(e -> builder.add(toChange(e)));
731                 return builder.build();
732             case 204:
733                 return ImmutableList.of();
734         }
735 
736         return handleErrorResponse(res);
737     }
738 
739     @Override
740     public CompletableFuture<PushResult> push(String projectName, String repositoryName, Revision baseRevision,
741                                               String summary, String detail, Markup markup,
742                                               Iterable<? extends Change<?>> changes) {
743         validateProjectAndRepositoryName(projectName, repositoryName);
744         requireNonNull(baseRevision, "baseRevision");
745         requireNonNull(summary, "summary");
746         checkArgument(!summary.isEmpty(), "summary is empty.");
747         requireNonNull(markup, "markup");
748         requireNonNull(changes, "changes");
749         checkArgument(!Iterables.isEmpty(changes), "changes is empty.");
750         try {
751             final String path = pathBuilder(projectName, repositoryName)
752                     .append("/contents?revision=")
753                     .append(baseRevision.text())
754                     .toString();
755 
756             final ObjectNode commitNode = JsonNodeFactory.instance.objectNode();
757             commitNode.set("commitMessage",
758                            JsonNodeFactory.instance.objectNode()
759                                                    .put("summary", summary)
760                                                    .put("detail", detail)
761                                                    .put("markup", markup.name()));
762             commitNode.set("changes", toJson(changes));
763 
764             return client.execute(headers(HttpMethod.POST, path), toBytes(commitNode))
765                          .aggregate()
766                          .thenApply(ArmeriaCentralDogma::push);
767         } catch (Exception e) {
768             return exceptionallyCompletedFuture(e);
769         }
770     }
771 
772     private static PushResult push(AggregatedHttpResponse res) {
773         if (res.status().code() == 200) {
774             final JsonNode node = toJson(res, JsonNodeType.OBJECT);
775             return new PushResult(
776                     new Revision(getField(node, "revision").asInt()),
777                     Instant.parse(getField(node, "pushedAt").asText()).toEpochMilli());
778         }
779 
780         return handleErrorResponse(res);
781     }
782 
783     @Override
784     public CompletableFuture<PushResult> push(String projectName, String repositoryName, Revision baseRevision,
785                                               Author author, String summary, String detail, Markup markup,
786                                               Iterable<? extends Change<?>> changes) {
787         // The author specified by the client will be ignored.
788         // The server will determine the author.
789         return push(projectName, repositoryName, baseRevision, summary, detail, markup, changes);
790     }
791 
792     @Override
793     public CompletableFuture<Revision> watchRepository(String projectName, String repositoryName,
794                                                        Revision lastKnownRevision, PathPattern pathPattern,
795                                                        long timeoutMillis, boolean errorOnEntryNotFound) {
796         validateProjectAndRepositoryName(projectName, repositoryName);
797         requireNonNull(lastKnownRevision, "lastKnownRevision");
798         requireNonNull(pathPattern, "pathPattern");
799         checkArgument(timeoutMillis > 0, "timeoutMillis: %s (expected: > 0)", timeoutMillis);
800         try {
801             final StringBuilder path = pathBuilder(projectName, repositoryName);
802             path.append("/contents").append(pathPattern.encoded());
803 
804             return watch(lastKnownRevision, timeoutMillis, path.toString(), QueryType.IDENTITY,
805                          ArmeriaCentralDogma::watchRepository, errorOnEntryNotFound);
806         } catch (Exception e) {
807             return exceptionallyCompletedFuture(e);
808         }
809     }
810 
811     @Nullable
812     private static Revision watchRepository(AggregatedHttpResponse res, QueryType unused) {
813         switch (res.status().code()) {
814             case 200: // OK
815                 final JsonNode node = toJson(res, JsonNodeType.OBJECT);
816                 return new Revision(getField(node, "revision").asInt());
817             case 304: // Not Modified
818                 return null;
819         }
820 
821         return handleErrorResponse(res);
822     }
823 
824     @Override
825     public <T> CompletableFuture<Entry<T>> watchFile(String projectName, String repositoryName,
826                                                      Revision lastKnownRevision, Query<T> query,
827                                                      long timeoutMillis, boolean errorOnEntryNotFound) {
828         validateProjectAndRepositoryName(projectName, repositoryName);
829         requireNonNull(lastKnownRevision, "lastKnownRevision");
830         requireNonNull(query, "query");
831         checkArgument(timeoutMillis > 0, "timeoutMillis: %s (expected: > 0)", timeoutMillis);
832         try {
833 
834             final StringBuilder path = pathBuilder(projectName, repositoryName);
835             path.append("/contents").append(query.path());
836             if (query.type() == QueryType.JSON_PATH) {
837                 path.append('?');
838                 query.expressions().forEach(expr -> path.append("jsonpath=").append(encodeParam(expr))
839                                                         .append('&'));
840 
841                 // Remove the trailing '?' or '&'.
842                 path.setLength(path.length() - 1);
843             }
844 
845             return watch(lastKnownRevision, timeoutMillis, path.toString(), query.type(),
846                          ArmeriaCentralDogma::watchFile, errorOnEntryNotFound);
847         } catch (Exception e) {
848             return exceptionallyCompletedFuture(e);
849         }
850     }
851 
852     @Nullable
853     private static <T> Entry<T> watchFile(AggregatedHttpResponse res, QueryType queryType) {
854         switch (res.status().code()) {
855             case 200: // OK
856                 final JsonNode node = toJson(res, JsonNodeType.OBJECT);
857                 final Revision revision = new Revision(getField(node, "revision").asInt());
858                 return toEntry(revision, getField(node, "entry"), queryType);
859             case 304: // Not Modified
860                 return null;
861         }
862 
863         return handleErrorResponse(res);
864     }
865 
866     private <T> CompletableFuture<T> watch(Revision lastKnownRevision, long timeoutMillis,
867                                            String path, QueryType queryType,
868                                            BiFunction<AggregatedHttpResponse, QueryType, T> func,
869                                            boolean errorOnEntryNotFound) {
870         final RequestHeadersBuilder builder = headersBuilder(HttpMethod.GET, path);
871         builder.set(HttpHeaderNames.IF_NONE_MATCH, lastKnownRevision.text())
872                .set(HttpHeaderNames.PREFER, "wait=" + LongMath.saturatedAdd(timeoutMillis, 999) / 1000L +
873                                             ", notify-entry-not-found=" + errorOnEntryNotFound);
874 
875         try (SafeCloseable ignored = Clients.withContextCustomizer(ctx -> {
876             final long responseTimeoutMillis = ctx.responseTimeoutMillis();
877             final long adjustmentMillis = WatchTimeout.availableTimeout(timeoutMillis, responseTimeoutMillis);
878             if (responseTimeoutMillis > 0) {
879                 ctx.setResponseTimeoutMillis(TimeoutMode.EXTEND, adjustmentMillis);
880             } else {
881                 ctx.setResponseTimeoutMillis(adjustmentMillis);
882             }
883         })) {
884             return client.execute(builder.build()).aggregate()
885                          .handle((res, cause) -> {
886                              if (cause == null) {
887                                  return func.apply(res, queryType);
888                              }
889 
890                              if ((cause instanceof ClosedStreamException) &&
891                                  client.options().factory().isClosing()) {
892                                  // A user closed the client factory while watching.
893                                  return null;
894                              }
895 
896                              return Exceptions.throwUnsafely(cause);
897                          });
898         }
899     }
900 
901     private static void validateProjectName(String projectName) {
902         Util.validateProjectName(projectName, "projectName");
903     }
904 
905     private static void validateProjectAndRepositoryName(String projectName, String repositoryName) {
906         validateProjectName(projectName);
907         Util.validateRepositoryName(repositoryName, "repositoryName");
908     }
909 
910     private RequestHeaders headers(HttpMethod method, String path) {
911         return headersBuilder(method, path).build();
912     }
913 
914     private RequestHeadersBuilder headersBuilder(HttpMethod method, String path) {
915         final RequestHeadersBuilder builder = RequestHeaders.builder();
916         builder.method(method)
917                .path(path)
918                .set(HttpHeaderNames.AUTHORIZATION, authorization)
919                .setObject(HttpHeaderNames.ACCEPT, MediaType.JSON);
920 
921         switch (method) {
922             case POST:
923             case PUT:
924                 builder.contentType(MediaType.JSON_UTF_8);
925                 break;
926             case PATCH:
927                 builder.contentType(JSON_PATCH_UTF8);
928                 break;
929         }
930 
931         return builder;
932     }
933 
934     private static StringBuilder pathBuilder(String projectName) {
935         return new StringBuilder().append(PROJECTS_PREFIX).append('/').append(projectName);
936     }
937 
938     private static StringBuilder pathBuilder(String projectName, String repositoryName) {
939         return pathBuilder(projectName).append(REPOS).append('/').append(repositoryName);
940     }
941 
942     private static void appendJsonPaths(StringBuilder path, QueryType queryType, Iterable<String> expressions) {
943         if (queryType == QueryType.JSON_PATH) {
944             expressions.forEach(expr -> path.append("&jsonpath=").append(encodeParam(expr)));
945         }
946     }
947 
948     @SuppressWarnings("CharsetObjectCanBeUsed") // We target Java 8.
949     private static String encodeParam(String param) {
950         try {
951             return URLEncoder.encode(param, "UTF-8");
952         } catch (UnsupportedEncodingException e) {
953             throw new Error(); // Never reaches here.
954         }
955     }
956 
957     /**
958      * Encodes the specified {@link JsonNode} into a byte array.
959      */
960     private static byte[] toBytes(JsonNode content) {
961         try {
962             return Jackson.writeValueAsBytes(content);
963         } catch (JsonProcessingException e) {
964             // Should never reach here.
965             throw new Error(e);
966         }
967     }
968 
969     /**
970      * Encodes a list of {@link Change}s into a JSON array.
971      */
972     private static ArrayNode toJson(Iterable<? extends Change<?>> changes) {
973         final ArrayNode changesNode = JsonNodeFactory.instance.arrayNode();
974         changes.forEach(c -> {
975             final ObjectNode changeNode = JsonNodeFactory.instance.objectNode();
976             changeNode.put("path", c.path());
977             changeNode.put("type", c.type().name());
978             final Class<?> contentType = c.type().contentType();
979             if (contentType == JsonNode.class) {
980                 changeNode.set("content", (JsonNode) c.content());
981             } else if (contentType == String.class) {
982                 changeNode.put("content", (String) c.content());
983             }
984             changesNode.add(changeNode);
985         });
986         return changesNode;
987     }
988 
989     /**
990      * Parses the content of the specified {@link AggregatedHttpResponse} into a {@link JsonNode}.
991      */
992     private static JsonNode toJson(AggregatedHttpResponse res, @Nullable JsonNodeType expectedNodeType) {
993         final String content = toString(res);
994         final JsonNode node;
995         try {
996             node = Jackson.readTree(content);
997         } catch (JsonParseException e) {
998             throw new CentralDogmaException("failed to parse the response JSON", e);
999         }
1000 
1001         if (expectedNodeType != null && node.getNodeType() != expectedNodeType) {
1002             throw new CentralDogmaException(
1003                     "invalid server response; expected: " + expectedNodeType +
1004                     ", actual: " + node.getNodeType() + ", content: " + content);
1005         }
1006         return node;
1007     }
1008 
1009     private static <T> T rejectNeitherArrayNorObject(AggregatedHttpResponse res) {
1010         throw new CentralDogmaException(
1011                 "invalid server response; expected: " + JsonNodeType.OBJECT + " or " + JsonNodeType.ARRAY +
1012                 ", content: " + toString(res));
1013     }
1014 
1015     private static String toString(AggregatedHttpResponse res) {
1016         final MediaType contentType = firstNonNull(res.headers().contentType(), MediaType.JSON_UTF_8);
1017         final Charset charset = contentType.charset(StandardCharsets.UTF_8);
1018         return res.content(charset);
1019     }
1020 
1021     private static <T> Entry<T> toEntry(Revision revision, JsonNode node, QueryType queryType) {
1022         final String entryPath = getField(node, "path").asText();
1023         final EntryType receivedEntryType = EntryType.valueOf(getField(node, "type").asText());
1024         switch (queryType) {
1025             case IDENTITY_TEXT:
1026                 return entryAsText(revision, node, entryPath);
1027             case IDENTITY_JSON:
1028             case JSON_PATH:
1029                 if (receivedEntryType != EntryType.JSON) {
1030                     throw new CentralDogmaException("invalid entry type. entry type: " + receivedEntryType +
1031                                                     " (expected: " + queryType + ')');
1032                 }
1033                 return entryAsJson(revision, node, entryPath);
1034             case IDENTITY:
1035                 switch (receivedEntryType) {
1036                     case JSON:
1037                         return entryAsJson(revision, node, entryPath);
1038                     case TEXT:
1039                         return entryAsText(revision, node, entryPath);
1040                     case DIRECTORY:
1041                         return unsafeCast(Entry.ofDirectory(revision, entryPath));
1042                 }
1043         }
1044         throw new Error(); // Should never reach here.
1045     }
1046 
1047     private static <T> Entry<T> entryAsText(Revision revision, JsonNode node, String entryPath) {
1048         final JsonNode content = getField(node, "content");
1049         final String content0;
1050         if (content.isContainerNode()) {
1051             content0 = content.toString();
1052         } else {
1053             content0 = content.asText();
1054         }
1055         return unsafeCast(Entry.ofText(revision, entryPath, content0));
1056     }
1057 
1058     private static <T> Entry<T> entryAsJson(Revision revision, JsonNode node, String entryPath) {
1059         return unsafeCast(Entry.ofJson(revision, entryPath, getField(node, "content")));
1060     }
1061 
1062     private static Commit toCommit(JsonNode node) {
1063         final Revision revision = new Revision(getField(node, "revision").asInt());
1064         final JsonNode authorNode = getField(node, "author");
1065         final Author author = new Author(getField(authorNode, "name").asText(),
1066                                          getField(authorNode, "email").asText());
1067         final long pushedAt = Instant.from(DateTimeFormatter.ISO_INSTANT.parse(
1068                 getField(node, "pushedAt").asText())).toEpochMilli();
1069         final JsonNode commitMessageNode = getField(node, "commitMessage");
1070         final String summary = getField(commitMessageNode, "summary").asText();
1071         final String detail = getField(commitMessageNode, "detail").asText();
1072         final Markup markup = Markup.valueOf(getField(commitMessageNode, "markup").asText());
1073         return new Commit(revision, author, pushedAt, summary, detail, markup);
1074     }
1075 
1076     private static <T> Change<T> toChange(JsonNode node) {
1077         final String actualPath = getField(node, "path").asText();
1078         final ChangeType type = ChangeType.valueOf(getField(node, "type").asText());
1079         switch (type) {
1080             case UPSERT_JSON:
1081                 return unsafeCast(Change.ofJsonUpsert(actualPath, getField(node, "content")));
1082             case UPSERT_TEXT:
1083                 return unsafeCast(Change.ofTextUpsert(actualPath, getField(node, "content").asText()));
1084             case REMOVE:
1085                 return unsafeCast(Change.ofRemoval(actualPath));
1086             case RENAME:
1087                 return unsafeCast(Change.ofRename(actualPath, getField(node, "content").asText()));
1088             case APPLY_JSON_PATCH:
1089                 return unsafeCast(Change.ofJsonPatch(actualPath, getField(node, "content")));
1090             case APPLY_TEXT_PATCH:
1091                 return unsafeCast(Change.ofTextPatch(actualPath, getField(node, "content").asText()));
1092         }
1093 
1094         throw new Error(); // Never reaches here.
1095     }
1096 
1097     private static Set<String> handleNameList(AggregatedHttpResponse res) {
1098         switch (res.status().code()) {
1099             case 200:
1100                 return Streams.stream(toJson(res, JsonNodeType.ARRAY))
1101                               .map(node -> getField(node, "name").asText())
1102                               .collect(toImmutableSet());
1103             case 204:
1104                 return ImmutableSet.of();
1105         }
1106         return handleErrorResponse(res);
1107     }
1108 
1109     private static JsonNode getField(JsonNode node, String fieldName) {
1110         final JsonNode field = node.get(fieldName);
1111         if (field == null) {
1112             throw new CentralDogmaException(
1113                     "invalid server response; field '" + fieldName + "' does not exist: " + node);
1114         }
1115         return field;
1116     }
1117 
1118     private static <T> T handleErrorResponse(AggregatedHttpResponse res) {
1119         final HttpStatus status = res.status();
1120         if (status.codeClass() != HttpStatusClass.SUCCESS) {
1121             final JsonNode node = toJson(res, JsonNodeType.OBJECT);
1122             final JsonNode exceptionNode = node.get("exception");
1123             final JsonNode messageNode = node.get("message");
1124 
1125             if (exceptionNode != null) {
1126                 final String typeName = exceptionNode.textValue();
1127                 if (typeName != null) {
1128                     final Function<String, CentralDogmaException> exceptionFactory =
1129                             EXCEPTION_FACTORIES.get(typeName);
1130                     if (exceptionFactory != null) {
1131                         throw exceptionFactory.apply(messageNode.textValue());
1132                     }
1133                 }
1134             }
1135         }
1136 
1137         throw new CentralDogmaException("unexpected response: " + res.headers() + ", " + res.contentUtf8());
1138     }
1139 }