1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.linecorp.centraldogma.client.armeria;
18
19 import static com.google.common.base.MoreObjects.firstNonNull;
20 import static com.google.common.base.Preconditions.checkArgument;
21 import static com.google.common.collect.ImmutableList.toImmutableList;
22 import static com.google.common.collect.ImmutableMap.toImmutableMap;
23 import static com.google.common.collect.ImmutableSet.toImmutableSet;
24 import static com.linecorp.centraldogma.internal.Util.unsafeCast;
25 import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.PROJECTS_PREFIX;
26 import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.REMOVED;
27 import static com.linecorp.centraldogma.internal.api.v1.HttpApiV1Constants.REPOS;
28 import static com.spotify.futures.CompletableFutures.exceptionallyCompletedFuture;
29 import static java.util.Objects.requireNonNull;
30
31 import java.io.UnsupportedEncodingException;
32 import java.net.URLEncoder;
33 import java.nio.charset.Charset;
34 import java.nio.charset.StandardCharsets;
35 import java.time.Instant;
36 import java.time.format.DateTimeFormatter;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Set;
40 import java.util.concurrent.CompletableFuture;
41 import java.util.concurrent.ScheduledExecutorService;
42 import java.util.function.BiFunction;
43 import java.util.function.Function;
44
45 import javax.annotation.Nullable;
46
47 import com.fasterxml.jackson.core.JsonParseException;
48 import com.fasterxml.jackson.core.JsonProcessingException;
49 import com.fasterxml.jackson.databind.JsonNode;
50 import com.fasterxml.jackson.databind.node.ArrayNode;
51 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
52 import com.fasterxml.jackson.databind.node.JsonNodeType;
53 import com.fasterxml.jackson.databind.node.ObjectNode;
54 import com.google.common.collect.ImmutableList;
55 import com.google.common.collect.ImmutableMap;
56 import com.google.common.collect.ImmutableSet;
57 import com.google.common.collect.Iterables;
58 import com.google.common.collect.Streams;
59 import com.google.common.math.LongMath;
60
61 import com.linecorp.armeria.client.Clients;
62 import com.linecorp.armeria.client.WebClient;
63 import com.linecorp.armeria.common.AggregatedHttpResponse;
64 import com.linecorp.armeria.common.HttpHeaderNames;
65 import com.linecorp.armeria.common.HttpMethod;
66 import com.linecorp.armeria.common.HttpStatus;
67 import com.linecorp.armeria.common.HttpStatusClass;
68 import com.linecorp.armeria.common.MediaType;
69 import com.linecorp.armeria.common.RequestHeaders;
70 import com.linecorp.armeria.common.RequestHeadersBuilder;
71 import com.linecorp.armeria.common.stream.ClosedStreamException;
72 import com.linecorp.armeria.common.util.Exceptions;
73 import com.linecorp.armeria.common.util.SafeCloseable;
74 import com.linecorp.armeria.common.util.TimeoutMode;
75 import com.linecorp.centraldogma.client.AbstractCentralDogma;
76 import com.linecorp.centraldogma.client.CentralDogmaRepository;
77 import com.linecorp.centraldogma.client.RepositoryInfo;
78 import com.linecorp.centraldogma.common.Author;
79 import com.linecorp.centraldogma.common.AuthorizationException;
80 import com.linecorp.centraldogma.common.CentralDogmaException;
81 import com.linecorp.centraldogma.common.Change;
82 import com.linecorp.centraldogma.common.ChangeConflictException;
83 import com.linecorp.centraldogma.common.ChangeType;
84 import com.linecorp.centraldogma.common.Commit;
85 import com.linecorp.centraldogma.common.Entry;
86 import com.linecorp.centraldogma.common.EntryNotFoundException;
87 import com.linecorp.centraldogma.common.EntryType;
88 import com.linecorp.centraldogma.common.InvalidPushException;
89 import com.linecorp.centraldogma.common.Markup;
90 import com.linecorp.centraldogma.common.MergeQuery;
91 import com.linecorp.centraldogma.common.MergedEntry;
92 import com.linecorp.centraldogma.common.PathPattern;
93 import com.linecorp.centraldogma.common.ProjectExistsException;
94 import com.linecorp.centraldogma.common.ProjectNotFoundException;
95 import com.linecorp.centraldogma.common.PushResult;
96 import com.linecorp.centraldogma.common.Query;
97 import com.linecorp.centraldogma.common.QueryExecutionException;
98 import com.linecorp.centraldogma.common.QueryType;
99 import com.linecorp.centraldogma.common.ReadOnlyException;
100 import com.linecorp.centraldogma.common.RedundantChangeException;
101 import com.linecorp.centraldogma.common.RepositoryExistsException;
102 import com.linecorp.centraldogma.common.RepositoryNotFoundException;
103 import com.linecorp.centraldogma.common.Revision;
104 import com.linecorp.centraldogma.common.RevisionNotFoundException;
105 import com.linecorp.centraldogma.common.ShuttingDownException;
106 import com.linecorp.centraldogma.internal.HistoryConstants;
107 import com.linecorp.centraldogma.internal.Jackson;
108 import com.linecorp.centraldogma.internal.Util;
109 import com.linecorp.centraldogma.internal.api.v1.WatchTimeout;
110
111 final class ArmeriaCentralDogma extends AbstractCentralDogma {
112
113 private static final MediaType JSON_PATCH_UTF8 = MediaType.JSON_PATCH.withCharset(StandardCharsets.UTF_8);
114
115 private static final byte[] UNREMOVE_PATCH = toBytes(JsonNodeFactory.instance.arrayNode(1).add(
116 JsonNodeFactory.instance.objectNode().put("op", "replace")
117 .put("path", "/status")
118 .put("value", "active")));
119 private static final String REMOVED_PARAM = "?status=removed";
120
121 private static final Map<String, Function<String, CentralDogmaException>> EXCEPTION_FACTORIES =
122 ImmutableMap.<String, Function<String, CentralDogmaException>>builder()
123 .put(ProjectExistsException.class.getName(), ProjectExistsException::new)
124 .put(ProjectNotFoundException.class.getName(), ProjectNotFoundException::new)
125 .put(QueryExecutionException.class.getName(), QueryExecutionException::new)
126 .put(RedundantChangeException.class.getName(), RedundantChangeException::new)
127 .put(RevisionNotFoundException.class.getName(), RevisionNotFoundException::new)
128 .put(EntryNotFoundException.class.getName(), EntryNotFoundException::new)
129 .put(ChangeConflictException.class.getName(), ChangeConflictException::new)
130 .put(RepositoryNotFoundException.class.getName(), RepositoryNotFoundException::new)
131 .put(AuthorizationException.class.getName(), AuthorizationException::new)
132 .put(ShuttingDownException.class.getName(), ShuttingDownException::new)
133 .put(RepositoryExistsException.class.getName(), RepositoryExistsException::new)
134 .put(InvalidPushException.class.getName(), InvalidPushException::new)
135 .put(ReadOnlyException.class.getName(), ReadOnlyException::new)
136 .build();
137
138 private final WebClient client;
139 private final String authorization;
140
141 ArmeriaCentralDogma(ScheduledExecutorService blockingTaskExecutor, WebClient client, String accessToken) {
142 super(blockingTaskExecutor);
143 this.client = requireNonNull(client, "client");
144 authorization = "Bearer " + requireNonNull(accessToken, "accessToken");
145 }
146
147 @Override
148 public CompletableFuture<Void> whenEndpointReady() {
149 return client.endpointGroup().whenReady().thenRun(() -> {});
150 }
151
152 @Override
153 public CompletableFuture<Void> createProject(String projectName) {
154 validateProjectName(projectName);
155 try {
156 final ObjectNode root = JsonNodeFactory.instance.objectNode();
157 root.put("name", projectName);
158
159 return client.execute(headers(HttpMethod.POST, PROJECTS_PREFIX), toBytes(root))
160 .aggregate()
161 .thenApply(ArmeriaCentralDogma::createProject);
162 } catch (Exception e) {
163 return exceptionallyCompletedFuture(e);
164 }
165 }
166
167 private static Void createProject(AggregatedHttpResponse res) {
168 switch (res.status().code()) {
169 case 200:
170 case 201:
171 return null;
172 }
173 return handleErrorResponse(res);
174 }
175
176 @Override
177 public CompletableFuture<Void> removeProject(String projectName) {
178 validateProjectName(projectName);
179 try {
180 return client.execute(headers(HttpMethod.DELETE, pathBuilder(projectName).toString()))
181 .aggregate()
182 .thenApply(ArmeriaCentralDogma::removeProject);
183 } catch (Exception e) {
184 return exceptionallyCompletedFuture(e);
185 }
186 }
187
188 private static Void removeProject(AggregatedHttpResponse res) {
189 switch (res.status().code()) {
190 case 200:
191 case 204:
192 return null;
193 }
194 return handleErrorResponse(res);
195 }
196
197 @Override
198 public CompletableFuture<Void> purgeProject(String projectName) {
199 validateProjectName(projectName);
200 try {
201 return client.execute(headers(HttpMethod.DELETE, pathBuilder(projectName).append(REMOVED)
202 .toString()))
203 .aggregate()
204 .thenApply(ArmeriaCentralDogma::handlePurgeResult);
205 } catch (Exception e) {
206 return exceptionallyCompletedFuture(e);
207 }
208 }
209
210 @Override
211 public CompletableFuture<Void> unremoveProject(String projectName) {
212 validateProjectName(projectName);
213 try {
214 return client.execute(headers(HttpMethod.PATCH, pathBuilder(projectName).toString()),
215 UNREMOVE_PATCH)
216 .aggregate()
217 .thenApply(ArmeriaCentralDogma::unremoveProject);
218 } catch (Exception e) {
219 return exceptionallyCompletedFuture(e);
220 }
221 }
222
223 private static Void unremoveProject(AggregatedHttpResponse res) {
224 if (res.status().code() == 200) {
225 return null;
226 }
227 return handleErrorResponse(res);
228 }
229
230 @Override
231 public CompletableFuture<Set<String>> listProjects() {
232 return client.execute(headers(HttpMethod.GET, PROJECTS_PREFIX))
233 .aggregate()
234 .thenApply(ArmeriaCentralDogma::handleNameList);
235 }
236
237 @Override
238 public CompletableFuture<Set<String>> listRemovedProjects() {
239 return client.execute(headers(HttpMethod.GET, PROJECTS_PREFIX + REMOVED_PARAM))
240 .aggregate()
241 .thenApply(ArmeriaCentralDogma::handleNameList);
242 }
243
244 @Override
245 public CompletableFuture<CentralDogmaRepository> createRepository(String projectName,
246 String repositoryName) {
247 validateProjectAndRepositoryName(projectName, repositoryName);
248 try {
249 final String path = pathBuilder(projectName).append(REPOS).toString();
250 final ObjectNode root = JsonNodeFactory.instance.objectNode();
251 root.put("name", repositoryName);
252
253 return client.execute(headers(HttpMethod.POST, path), toBytes(root))
254 .aggregate()
255 .thenApply(res -> {
256 switch (res.status().code()) {
257 case 200:
258 case 201:
259 return forRepo(projectName, repositoryName);
260 }
261 return handleErrorResponse(res);
262 });
263 } catch (Exception e) {
264 return exceptionallyCompletedFuture(e);
265 }
266 }
267
268 @Override
269 public CompletableFuture<Void> removeRepository(String projectName, String repositoryName) {
270 validateProjectAndRepositoryName(projectName, repositoryName);
271 try {
272 return client.execute(headers(HttpMethod.DELETE,
273 pathBuilder(projectName, repositoryName).toString()))
274 .aggregate()
275 .thenApply(ArmeriaCentralDogma::removeRepository);
276 } catch (Exception e) {
277 return exceptionallyCompletedFuture(e);
278 }
279 }
280
281 private static Void removeRepository(AggregatedHttpResponse res) {
282 switch (res.status().code()) {
283 case 200:
284 case 204:
285 return null;
286 }
287 return handleErrorResponse(res);
288 }
289
290 @Override
291 public CompletableFuture<Void> purgeRepository(String projectName, String repositoryName) {
292 validateProjectAndRepositoryName(projectName, repositoryName);
293 try {
294 return client.execute(headers(HttpMethod.DELETE,
295 pathBuilder(projectName, repositoryName).append(REMOVED).toString()))
296 .aggregate()
297 .thenApply(ArmeriaCentralDogma::handlePurgeResult);
298 } catch (Exception e) {
299 return exceptionallyCompletedFuture(e);
300 }
301 }
302
303 private static Void handlePurgeResult(AggregatedHttpResponse res) {
304 switch (res.status().code()) {
305 case 200:
306 case 204:
307 return null;
308 }
309 return handleErrorResponse(res);
310 }
311
312 @Override
313 public CompletableFuture<CentralDogmaRepository> unremoveRepository(String projectName,
314 String repositoryName) {
315 validateProjectAndRepositoryName(projectName, repositoryName);
316 try {
317 return client.execute(headers(HttpMethod.PATCH,
318 pathBuilder(projectName, repositoryName).toString()),
319 UNREMOVE_PATCH)
320 .aggregate()
321 .thenApply(res -> {
322 if (res.status().code() == 200) {
323 return forRepo(projectName, repositoryName);
324 }
325 return handleErrorResponse(res);
326 });
327 } catch (Exception e) {
328 return exceptionallyCompletedFuture(e);
329 }
330 }
331
332 @Override
333 public CompletableFuture<Map<String, RepositoryInfo>> listRepositories(String projectName) {
334 validateProjectName(projectName);
335 try {
336 return client.execute(headers(HttpMethod.GET, pathBuilder(projectName).append(REPOS).toString()))
337 .aggregate()
338 .thenApply(ArmeriaCentralDogma::listRepositories);
339 } catch (Exception e) {
340 return exceptionallyCompletedFuture(e);
341 }
342 }
343
344 private static Map<String, RepositoryInfo> listRepositories(AggregatedHttpResponse res) {
345 switch (res.status().code()) {
346 case 200:
347 return Streams.stream(toJson(res, JsonNodeType.ARRAY))
348 .map(node -> {
349 final String name = getField(node, "name").asText();
350 final Revision headRevision =
351 new Revision(getField(node, "headRevision").asInt());
352 return new RepositoryInfo(name, headRevision);
353 })
354 .collect(toImmutableMap(RepositoryInfo::name, Function.identity()));
355 case 204:
356 return ImmutableMap.of();
357 }
358 return handleErrorResponse(res);
359 }
360
361 @Override
362 public CompletableFuture<Set<String>> listRemovedRepositories(String projectName) {
363 validateProjectName(projectName);
364 try {
365 return client.execute(headers(HttpMethod.GET,
366 pathBuilder(projectName).append(REPOS)
367 .append(REMOVED_PARAM).toString()))
368 .aggregate()
369 .thenApply(ArmeriaCentralDogma::handleNameList);
370 } catch (Exception e) {
371 return exceptionallyCompletedFuture(e);
372 }
373 }
374
375 @Override
376 public CompletableFuture<Revision> normalizeRevision(String projectName, String repositoryName,
377 Revision revision) {
378 validateProjectAndRepositoryName(projectName, repositoryName);
379 requireNonNull(revision, "revision");
380 try {
381 final String path = pathBuilder(projectName, repositoryName)
382 .append("/revision/")
383 .append(revision.text())
384 .toString();
385
386 return client.execute(headers(HttpMethod.GET, path))
387 .aggregate()
388 .thenApply(ArmeriaCentralDogma::normalizeRevision);
389 } catch (Exception e) {
390 return exceptionallyCompletedFuture(e);
391 }
392 }
393
394 private static Revision normalizeRevision(AggregatedHttpResponse res) {
395 if (res.status().code() == 200) {
396 return new Revision(getField(toJson(res, JsonNodeType.OBJECT), "revision").asInt());
397 }
398 return handleErrorResponse(res);
399 }
400
401 @Override
402 public CompletableFuture<Map<String, EntryType>> listFiles(String projectName, String repositoryName,
403 Revision revision, PathPattern pathPattern) {
404 validateProjectAndRepositoryName(projectName, repositoryName);
405 requireNonNull(revision, "revision");
406 requireNonNull(pathPattern, "pathPattern");
407 try {
408 final StringBuilder path = pathBuilder(projectName, repositoryName);
409 path.append("/list").append(pathPattern.encoded()).append("?revision=").append(revision.major());
410
411 return client.execute(headers(HttpMethod.GET, path.toString()))
412 .aggregate()
413 .thenApply(ArmeriaCentralDogma::listFiles);
414 } catch (Exception e) {
415 return exceptionallyCompletedFuture(e);
416 }
417 }
418
419 private static Map<String, EntryType> listFiles(AggregatedHttpResponse res) {
420 switch (res.status().code()) {
421 case 200:
422 final ImmutableMap.Builder<String, EntryType> builder = ImmutableMap.builder();
423 final JsonNode node = toJson(res, JsonNodeType.ARRAY);
424 node.forEach(e -> builder.put(
425 getField(e, "path").asText(),
426 EntryType.valueOf(getField(e, "type").asText())));
427 return builder.build();
428 case 204:
429 return ImmutableMap.of();
430 }
431
432 return handleErrorResponse(res);
433 }
434
435 @Override
436 public <T> CompletableFuture<Entry<T>> getFile(String projectName, String repositoryName, Revision revision,
437 Query<T> query) {
438 validateProjectAndRepositoryName(projectName, repositoryName);
439 requireNonNull(revision, "revision");
440 requireNonNull(query, "query");
441 try {
442
443 return maybeNormalizeRevision(projectName, repositoryName, revision).thenCompose(normRev -> {
444 final StringBuilder path = pathBuilder(projectName, repositoryName);
445 path.append("/contents").append(query.path());
446 path.append("?revision=").append(normRev.text());
447 appendJsonPaths(path, query.type(), query.expressions());
448
449 return client.execute(headers(HttpMethod.GET, path.toString()))
450 .aggregate()
451 .thenApply(res -> getFile(normRev, res, query));
452 });
453 } catch (Exception e) {
454 return exceptionallyCompletedFuture(e);
455 }
456 }
457
458 private static <T> Entry<T> getFile(Revision normRev, AggregatedHttpResponse res, Query<T> query) {
459 if (res.status().code() == 200) {
460 final JsonNode node = toJson(res, JsonNodeType.OBJECT);
461 return toEntry(normRev, node, query.type());
462 }
463
464 return handleErrorResponse(res);
465 }
466
467 @Override
468 public CompletableFuture<Map<String, Entry<?>>> getFiles(String projectName, String repositoryName,
469 Revision revision, PathPattern pathPattern) {
470 validateProjectAndRepositoryName(projectName, repositoryName);
471 requireNonNull(revision, "revision");
472 requireNonNull(pathPattern, "pathPattern");
473 try {
474
475 return maybeNormalizeRevision(projectName, repositoryName, revision).thenCompose(normRev -> {
476 final StringBuilder path = pathBuilder(projectName, repositoryName);
477 path.append("/contents")
478 .append(pathPattern.encoded())
479 .append("?revision=")
480 .append(normRev.major());
481
482 return client.execute(headers(HttpMethod.GET, path.toString()))
483 .aggregate()
484 .thenApply(res -> getFiles(normRev, res));
485 });
486 } catch (Exception e) {
487 return exceptionallyCompletedFuture(e);
488 }
489 }
490
491 private static Map<String, Entry<?>> getFiles(Revision normRev, AggregatedHttpResponse res) {
492 switch (res.status().code()) {
493 case 200:
494 final JsonNode node = toJson(res, null);
495 final ImmutableMap.Builder<String, Entry<?>> builder = ImmutableMap.builder();
496 if (node.isObject()) {
497 final Entry<?> entry = toEntry(normRev, node, QueryType.IDENTITY);
498 builder.put(entry.path(), entry);
499 } else if (node.isArray()) {
500 node.forEach(e -> {
501 final Entry<?> entry = toEntry(normRev, e, QueryType.IDENTITY);
502 builder.put(entry.path(), entry);
503 });
504 } else {
505 return rejectNeitherArrayNorObject(res);
506 }
507 return builder.build();
508 case 204:
509 return ImmutableMap.of();
510 }
511
512 return handleErrorResponse(res);
513 }
514
515 @Override
516 public <T> CompletableFuture<MergedEntry<T>> mergeFiles(String projectName, String repositoryName,
517 Revision revision, MergeQuery<T> mergeQuery) {
518 validateProjectAndRepositoryName(projectName, repositoryName);
519 requireNonNull(revision, "revision");
520 requireNonNull(mergeQuery, "mergeQuery");
521 try {
522 final StringBuilder path = pathBuilder(projectName, repositoryName);
523 path.append("/merge?revision=").append(revision.major());
524 mergeQuery.mergeSources().forEach(
525 src -> path.append(src.isOptional() ? "&optional_path=" : "&path=")
526 .append(encodeParam(src.path())));
527 appendJsonPaths(path, mergeQuery.type(), mergeQuery.expressions());
528 return client.execute(headers(HttpMethod.GET, path.toString()))
529 .aggregate()
530 .thenApply(ArmeriaCentralDogma::mergeFiles);
531 } catch (Exception e) {
532 return exceptionallyCompletedFuture(e);
533 }
534 }
535
536 private static <T> MergedEntry<T> mergeFiles(AggregatedHttpResponse res) {
537 if (res.status().code() == 200) {
538 final JsonNode node = toJson(res, JsonNodeType.OBJECT);
539
540
541 final ImmutableList.Builder<String> pathsBuilder = ImmutableList.builder();
542 for (JsonNode path : getField(node, "paths")) {
543 if (path.getNodeType() != JsonNodeType.STRING) {
544 throw new CentralDogmaException("Received a merged entry with a non-string path: " + node);
545 }
546 pathsBuilder.add(path.asText());
547 }
548 final List<String> paths = pathsBuilder.build();
549 if (paths.isEmpty()) {
550 throw new CentralDogmaException("Received a merged entry with empty paths: " + node);
551 }
552
553
554 final Revision revision = new Revision(getField(node, "revision").asInt());
555 final EntryType type = EntryType.valueOf(getField(node, "type").asText());
556 final JsonNode content = getField(node, "content");
557 switch (type) {
558 case JSON: {
559 @SuppressWarnings("unchecked")
560 final MergedEntry<T> cast =
561 (MergedEntry<T>) MergedEntry.of(revision, type, content, paths);
562 return cast;
563 }
564 case TEXT: {
565 if (content.getNodeType() != JsonNodeType.STRING) {
566 throw new CentralDogmaException(
567 "Received a TEXT merged entry whose content is not a string: " + node);
568 }
569 @SuppressWarnings("unchecked")
570 final MergedEntry<T> cast =
571 (MergedEntry<T>) MergedEntry.of(revision, type, content.asText(), paths);
572 return cast;
573 }
574 default:
575 throw new CentralDogmaException(
576 "Received a merged entry whose type is neither JSON nor TEXT: " + node);
577 }
578 }
579
580 return handleErrorResponse(res);
581 }
582
583 @Override
584 public CompletableFuture<List<Commit>> getHistory(String projectName, String repositoryName,
585 Revision from, Revision to,
586 PathPattern pathPattern,
587 int maxCommits) {
588 requireNonNull(from, "from");
589 requireNonNull(to, "to");
590 requireNonNull(pathPattern, "pathPattern");
591 validateProjectAndRepositoryName(projectName, repositoryName);
592 checkArgument(maxCommits >= 0 && maxCommits <= HistoryConstants.MAX_MAX_COMMITS,
593 "maxCommits: %s (expected: 0 <= maxCommits <= %s)",
594 maxCommits, HistoryConstants.MAX_MAX_COMMITS);
595 try {
596 final StringBuilder path = pathBuilder(projectName, repositoryName);
597 path.append("/commits/").append(from.text());
598 path.append("?to=").append(to.text());
599 path.append("&path=").append(pathPattern.encoded());
600 if (maxCommits > 0) {
601 path.append("&maxCommits=").append(maxCommits);
602 }
603
604 return client.execute(headers(HttpMethod.GET, path.toString()))
605 .aggregate()
606 .thenApply(ArmeriaCentralDogma::getHistory);
607 } catch (Exception e) {
608 return exceptionallyCompletedFuture(e);
609 }
610 }
611
612 private static List<Commit> getHistory(AggregatedHttpResponse res) {
613 switch (res.status().code()) {
614 case 200:
615 final JsonNode node = toJson(res, null);
616 if (node.isObject()) {
617 return ImmutableList.of(toCommit(node));
618 } else if (node.isArray()) {
619 return Streams.stream(node)
620 .map(ArmeriaCentralDogma::toCommit)
621 .collect(toImmutableList());
622 } else {
623 return rejectNeitherArrayNorObject(res);
624 }
625 case 204:
626 return ImmutableList.of();
627 }
628
629 return handleErrorResponse(res);
630 }
631
632 @Override
633 public <T> CompletableFuture<Change<T>> getDiff(String projectName, String repositoryName, Revision from,
634 Revision to, Query<T> query) {
635 validateProjectAndRepositoryName(projectName, repositoryName);
636 requireNonNull(from, "from");
637 requireNonNull(to, "to");
638 requireNonNull(query, "query");
639 try {
640 final StringBuilder path = pathBuilder(projectName, repositoryName);
641 path.append("/compare");
642 path.append("?path=").append(encodeParam(query.path()));
643 path.append("&from=").append(from.text());
644 path.append("&to=").append(to.text());
645 appendJsonPaths(path, query.type(), query.expressions());
646
647 return client.execute(headers(HttpMethod.GET, path.toString()))
648 .aggregate()
649 .thenApply(ArmeriaCentralDogma::getDiff);
650 } catch (Exception e) {
651 return exceptionallyCompletedFuture(e);
652 }
653 }
654
655 @Nullable
656 private static <T> Change<T> getDiff(AggregatedHttpResponse res) {
657 switch (res.status().code()) {
658 case 200:
659 return toChange(toJson(res, JsonNodeType.OBJECT));
660 case 204:
661 return null;
662 }
663 return handleErrorResponse(res);
664 }
665
666 @Override
667 public CompletableFuture<List<Change<?>>> getDiff(String projectName, String repositoryName, Revision from,
668 Revision to, PathPattern pathPattern) {
669 validateProjectAndRepositoryName(projectName, repositoryName);
670 requireNonNull(from, "from");
671 requireNonNull(to, "to");
672 requireNonNull(pathPattern, "pathPattern");
673 try {
674 final StringBuilder path = pathBuilder(projectName, repositoryName);
675 path.append("/compare");
676 path.append("?pathPattern=").append(pathPattern.encoded());
677 path.append("&from=").append(from.text());
678 path.append("&to=").append(to.text());
679
680 return client.execute(headers(HttpMethod.GET, path.toString()))
681 .aggregate()
682 .thenApply(res -> {
683 if (res.status().code() == 200) {
684 final JsonNode node = toJson(res, null);
685 if (node.isObject()) {
686 return ImmutableList.of(toChange(node));
687 } else if (node.isArray()) {
688 return Streams.stream(node)
689 .map(ArmeriaCentralDogma::toChange)
690 .collect(toImmutableList());
691 } else {
692 return rejectNeitherArrayNorObject(res);
693 }
694 }
695
696 return handleErrorResponse(res);
697 });
698 } catch (Exception e) {
699 return exceptionallyCompletedFuture(e);
700 }
701 }
702
703 @Override
704 public CompletableFuture<List<Change<?>>> getPreviewDiffs(String projectName, String repositoryName,
705 Revision baseRevision,
706 Iterable<? extends Change<?>> changes) {
707 validateProjectAndRepositoryName(projectName, repositoryName);
708 requireNonNull(baseRevision, "baseRevision");
709 requireNonNull(changes, "changes");
710 try {
711 final String path = pathBuilder(projectName, repositoryName)
712 .append("/preview?revision=")
713 .append(baseRevision.text())
714 .toString();
715
716 final ArrayNode changesNode = toJson(changes);
717 return client.execute(headers(HttpMethod.POST, path), toBytes(changesNode))
718 .aggregate()
719 .thenApply(ArmeriaCentralDogma::getPreviewDiffs);
720 } catch (Exception e) {
721 return exceptionallyCompletedFuture(e);
722 }
723 }
724
725 private static List<Change<?>> getPreviewDiffs(AggregatedHttpResponse res) {
726 switch (res.status().code()) {
727 case 200:
728 final JsonNode node = toJson(res, JsonNodeType.ARRAY);
729 final ImmutableList.Builder<Change<?>> builder = ImmutableList.builder();
730 node.forEach(e -> builder.add(toChange(e)));
731 return builder.build();
732 case 204:
733 return ImmutableList.of();
734 }
735
736 return handleErrorResponse(res);
737 }
738
739 @Override
740 public CompletableFuture<PushResult> push(String projectName, String repositoryName, Revision baseRevision,
741 String summary, String detail, Markup markup,
742 Iterable<? extends Change<?>> changes) {
743 validateProjectAndRepositoryName(projectName, repositoryName);
744 requireNonNull(baseRevision, "baseRevision");
745 requireNonNull(summary, "summary");
746 checkArgument(!summary.isEmpty(), "summary is empty.");
747 requireNonNull(markup, "markup");
748 requireNonNull(changes, "changes");
749 checkArgument(!Iterables.isEmpty(changes), "changes is empty.");
750 try {
751 final String path = pathBuilder(projectName, repositoryName)
752 .append("/contents?revision=")
753 .append(baseRevision.text())
754 .toString();
755
756 final ObjectNode commitNode = JsonNodeFactory.instance.objectNode();
757 commitNode.set("commitMessage",
758 JsonNodeFactory.instance.objectNode()
759 .put("summary", summary)
760 .put("detail", detail)
761 .put("markup", markup.name()));
762 commitNode.set("changes", toJson(changes));
763
764 return client.execute(headers(HttpMethod.POST, path), toBytes(commitNode))
765 .aggregate()
766 .thenApply(ArmeriaCentralDogma::push);
767 } catch (Exception e) {
768 return exceptionallyCompletedFuture(e);
769 }
770 }
771
772 private static PushResult push(AggregatedHttpResponse res) {
773 if (res.status().code() == 200) {
774 final JsonNode node = toJson(res, JsonNodeType.OBJECT);
775 return new PushResult(
776 new Revision(getField(node, "revision").asInt()),
777 Instant.parse(getField(node, "pushedAt").asText()).toEpochMilli());
778 }
779
780 return handleErrorResponse(res);
781 }
782
783 @Override
784 public CompletableFuture<PushResult> push(String projectName, String repositoryName, Revision baseRevision,
785 Author author, String summary, String detail, Markup markup,
786 Iterable<? extends Change<?>> changes) {
787
788
789 return push(projectName, repositoryName, baseRevision, summary, detail, markup, changes);
790 }
791
792 @Override
793 public CompletableFuture<Revision> watchRepository(String projectName, String repositoryName,
794 Revision lastKnownRevision, PathPattern pathPattern,
795 long timeoutMillis, boolean errorOnEntryNotFound) {
796 validateProjectAndRepositoryName(projectName, repositoryName);
797 requireNonNull(lastKnownRevision, "lastKnownRevision");
798 requireNonNull(pathPattern, "pathPattern");
799 checkArgument(timeoutMillis > 0, "timeoutMillis: %s (expected: > 0)", timeoutMillis);
800 try {
801 final StringBuilder path = pathBuilder(projectName, repositoryName);
802 path.append("/contents").append(pathPattern.encoded());
803
804 return watch(lastKnownRevision, timeoutMillis, path.toString(), QueryType.IDENTITY,
805 ArmeriaCentralDogma::watchRepository, errorOnEntryNotFound);
806 } catch (Exception e) {
807 return exceptionallyCompletedFuture(e);
808 }
809 }
810
811 @Nullable
812 private static Revision watchRepository(AggregatedHttpResponse res, QueryType unused) {
813 switch (res.status().code()) {
814 case 200:
815 final JsonNode node = toJson(res, JsonNodeType.OBJECT);
816 return new Revision(getField(node, "revision").asInt());
817 case 304:
818 return null;
819 }
820
821 return handleErrorResponse(res);
822 }
823
824 @Override
825 public <T> CompletableFuture<Entry<T>> watchFile(String projectName, String repositoryName,
826 Revision lastKnownRevision, Query<T> query,
827 long timeoutMillis, boolean errorOnEntryNotFound) {
828 validateProjectAndRepositoryName(projectName, repositoryName);
829 requireNonNull(lastKnownRevision, "lastKnownRevision");
830 requireNonNull(query, "query");
831 checkArgument(timeoutMillis > 0, "timeoutMillis: %s (expected: > 0)", timeoutMillis);
832 try {
833
834 final StringBuilder path = pathBuilder(projectName, repositoryName);
835 path.append("/contents").append(query.path());
836 if (query.type() == QueryType.JSON_PATH) {
837 path.append('?');
838 query.expressions().forEach(expr -> path.append("jsonpath=").append(encodeParam(expr))
839 .append('&'));
840
841
842 path.setLength(path.length() - 1);
843 }
844
845 return watch(lastKnownRevision, timeoutMillis, path.toString(), query.type(),
846 ArmeriaCentralDogma::watchFile, errorOnEntryNotFound);
847 } catch (Exception e) {
848 return exceptionallyCompletedFuture(e);
849 }
850 }
851
852 @Nullable
853 private static <T> Entry<T> watchFile(AggregatedHttpResponse res, QueryType queryType) {
854 switch (res.status().code()) {
855 case 200:
856 final JsonNode node = toJson(res, JsonNodeType.OBJECT);
857 final Revision revision = new Revision(getField(node, "revision").asInt());
858 return toEntry(revision, getField(node, "entry"), queryType);
859 case 304:
860 return null;
861 }
862
863 return handleErrorResponse(res);
864 }
865
866 private <T> CompletableFuture<T> watch(Revision lastKnownRevision, long timeoutMillis,
867 String path, QueryType queryType,
868 BiFunction<AggregatedHttpResponse, QueryType, T> func,
869 boolean errorOnEntryNotFound) {
870 final RequestHeadersBuilder builder = headersBuilder(HttpMethod.GET, path);
871 builder.set(HttpHeaderNames.IF_NONE_MATCH, lastKnownRevision.text())
872 .set(HttpHeaderNames.PREFER, "wait=" + LongMath.saturatedAdd(timeoutMillis, 999) / 1000L +
873 ", notify-entry-not-found=" + errorOnEntryNotFound);
874
875 try (SafeCloseable ignored = Clients.withContextCustomizer(ctx -> {
876 final long responseTimeoutMillis = ctx.responseTimeoutMillis();
877 final long adjustmentMillis = WatchTimeout.availableTimeout(timeoutMillis, responseTimeoutMillis);
878 if (responseTimeoutMillis > 0) {
879 ctx.setResponseTimeoutMillis(TimeoutMode.EXTEND, adjustmentMillis);
880 } else {
881 ctx.setResponseTimeoutMillis(adjustmentMillis);
882 }
883 })) {
884 return client.execute(builder.build()).aggregate()
885 .handle((res, cause) -> {
886 if (cause == null) {
887 return func.apply(res, queryType);
888 }
889
890 if ((cause instanceof ClosedStreamException) &&
891 client.options().factory().isClosing()) {
892
893 return null;
894 }
895
896 return Exceptions.throwUnsafely(cause);
897 });
898 }
899 }
900
901 private static void validateProjectName(String projectName) {
902 Util.validateProjectName(projectName, "projectName");
903 }
904
905 private static void validateProjectAndRepositoryName(String projectName, String repositoryName) {
906 validateProjectName(projectName);
907 Util.validateRepositoryName(repositoryName, "repositoryName");
908 }
909
910 private RequestHeaders headers(HttpMethod method, String path) {
911 return headersBuilder(method, path).build();
912 }
913
914 private RequestHeadersBuilder headersBuilder(HttpMethod method, String path) {
915 final RequestHeadersBuilder builder = RequestHeaders.builder();
916 builder.method(method)
917 .path(path)
918 .set(HttpHeaderNames.AUTHORIZATION, authorization)
919 .setObject(HttpHeaderNames.ACCEPT, MediaType.JSON);
920
921 switch (method) {
922 case POST:
923 case PUT:
924 builder.contentType(MediaType.JSON_UTF_8);
925 break;
926 case PATCH:
927 builder.contentType(JSON_PATCH_UTF8);
928 break;
929 }
930
931 return builder;
932 }
933
934 private static StringBuilder pathBuilder(String projectName) {
935 return new StringBuilder().append(PROJECTS_PREFIX).append('/').append(projectName);
936 }
937
938 private static StringBuilder pathBuilder(String projectName, String repositoryName) {
939 return pathBuilder(projectName).append(REPOS).append('/').append(repositoryName);
940 }
941
942 private static void appendJsonPaths(StringBuilder path, QueryType queryType, Iterable<String> expressions) {
943 if (queryType == QueryType.JSON_PATH) {
944 expressions.forEach(expr -> path.append("&jsonpath=").append(encodeParam(expr)));
945 }
946 }
947
948 @SuppressWarnings("CharsetObjectCanBeUsed")
949 private static String encodeParam(String param) {
950 try {
951 return URLEncoder.encode(param, "UTF-8");
952 } catch (UnsupportedEncodingException e) {
953 throw new Error();
954 }
955 }
956
957
958
959
960 private static byte[] toBytes(JsonNode content) {
961 try {
962 return Jackson.writeValueAsBytes(content);
963 } catch (JsonProcessingException e) {
964
965 throw new Error(e);
966 }
967 }
968
969
970
971
972 private static ArrayNode toJson(Iterable<? extends Change<?>> changes) {
973 final ArrayNode changesNode = JsonNodeFactory.instance.arrayNode();
974 changes.forEach(c -> {
975 final ObjectNode changeNode = JsonNodeFactory.instance.objectNode();
976 changeNode.put("path", c.path());
977 changeNode.put("type", c.type().name());
978 final Class<?> contentType = c.type().contentType();
979 if (contentType == JsonNode.class) {
980 changeNode.set("content", (JsonNode) c.content());
981 } else if (contentType == String.class) {
982 changeNode.put("content", (String) c.content());
983 }
984 changesNode.add(changeNode);
985 });
986 return changesNode;
987 }
988
989
990
991
992 private static JsonNode toJson(AggregatedHttpResponse res, @Nullable JsonNodeType expectedNodeType) {
993 final String content = toString(res);
994 final JsonNode node;
995 try {
996 node = Jackson.readTree(content);
997 } catch (JsonParseException e) {
998 throw new CentralDogmaException("failed to parse the response JSON", e);
999 }
1000
1001 if (expectedNodeType != null && node.getNodeType() != expectedNodeType) {
1002 throw new CentralDogmaException(
1003 "invalid server response; expected: " + expectedNodeType +
1004 ", actual: " + node.getNodeType() + ", content: " + content);
1005 }
1006 return node;
1007 }
1008
1009 private static <T> T rejectNeitherArrayNorObject(AggregatedHttpResponse res) {
1010 throw new CentralDogmaException(
1011 "invalid server response; expected: " + JsonNodeType.OBJECT + " or " + JsonNodeType.ARRAY +
1012 ", content: " + toString(res));
1013 }
1014
1015 private static String toString(AggregatedHttpResponse res) {
1016 final MediaType contentType = firstNonNull(res.headers().contentType(), MediaType.JSON_UTF_8);
1017 final Charset charset = contentType.charset(StandardCharsets.UTF_8);
1018 return res.content(charset);
1019 }
1020
1021 private static <T> Entry<T> toEntry(Revision revision, JsonNode node, QueryType queryType) {
1022 final String entryPath = getField(node, "path").asText();
1023 final EntryType receivedEntryType = EntryType.valueOf(getField(node, "type").asText());
1024 switch (queryType) {
1025 case IDENTITY_TEXT:
1026 return entryAsText(revision, node, entryPath);
1027 case IDENTITY_JSON:
1028 case JSON_PATH:
1029 if (receivedEntryType != EntryType.JSON) {
1030 throw new CentralDogmaException("invalid entry type. entry type: " + receivedEntryType +
1031 " (expected: " + queryType + ')');
1032 }
1033 return entryAsJson(revision, node, entryPath);
1034 case IDENTITY:
1035 switch (receivedEntryType) {
1036 case JSON:
1037 return entryAsJson(revision, node, entryPath);
1038 case TEXT:
1039 return entryAsText(revision, node, entryPath);
1040 case DIRECTORY:
1041 return unsafeCast(Entry.ofDirectory(revision, entryPath));
1042 }
1043 }
1044 throw new Error();
1045 }
1046
1047 private static <T> Entry<T> entryAsText(Revision revision, JsonNode node, String entryPath) {
1048 final JsonNode content = getField(node, "content");
1049 final String content0;
1050 if (content.isContainerNode()) {
1051 content0 = content.toString();
1052 } else {
1053 content0 = content.asText();
1054 }
1055 return unsafeCast(Entry.ofText(revision, entryPath, content0));
1056 }
1057
1058 private static <T> Entry<T> entryAsJson(Revision revision, JsonNode node, String entryPath) {
1059 return unsafeCast(Entry.ofJson(revision, entryPath, getField(node, "content")));
1060 }
1061
1062 private static Commit toCommit(JsonNode node) {
1063 final Revision revision = new Revision(getField(node, "revision").asInt());
1064 final JsonNode authorNode = getField(node, "author");
1065 final Author author = new Author(getField(authorNode, "name").asText(),
1066 getField(authorNode, "email").asText());
1067 final long pushedAt = Instant.from(DateTimeFormatter.ISO_INSTANT.parse(
1068 getField(node, "pushedAt").asText())).toEpochMilli();
1069 final JsonNode commitMessageNode = getField(node, "commitMessage");
1070 final String summary = getField(commitMessageNode, "summary").asText();
1071 final String detail = getField(commitMessageNode, "detail").asText();
1072 final Markup markup = Markup.valueOf(getField(commitMessageNode, "markup").asText());
1073 return new Commit(revision, author, pushedAt, summary, detail, markup);
1074 }
1075
1076 private static <T> Change<T> toChange(JsonNode node) {
1077 final String actualPath = getField(node, "path").asText();
1078 final ChangeType type = ChangeType.valueOf(getField(node, "type").asText());
1079 switch (type) {
1080 case UPSERT_JSON:
1081 return unsafeCast(Change.ofJsonUpsert(actualPath, getField(node, "content")));
1082 case UPSERT_TEXT:
1083 return unsafeCast(Change.ofTextUpsert(actualPath, getField(node, "content").asText()));
1084 case REMOVE:
1085 return unsafeCast(Change.ofRemoval(actualPath));
1086 case RENAME:
1087 return unsafeCast(Change.ofRename(actualPath, getField(node, "content").asText()));
1088 case APPLY_JSON_PATCH:
1089 return unsafeCast(Change.ofJsonPatch(actualPath, getField(node, "content")));
1090 case APPLY_TEXT_PATCH:
1091 return unsafeCast(Change.ofTextPatch(actualPath, getField(node, "content").asText()));
1092 }
1093
1094 throw new Error();
1095 }
1096
1097 private static Set<String> handleNameList(AggregatedHttpResponse res) {
1098 switch (res.status().code()) {
1099 case 200:
1100 return Streams.stream(toJson(res, JsonNodeType.ARRAY))
1101 .map(node -> getField(node, "name").asText())
1102 .collect(toImmutableSet());
1103 case 204:
1104 return ImmutableSet.of();
1105 }
1106 return handleErrorResponse(res);
1107 }
1108
1109 private static JsonNode getField(JsonNode node, String fieldName) {
1110 final JsonNode field = node.get(fieldName);
1111 if (field == null) {
1112 throw new CentralDogmaException(
1113 "invalid server response; field '" + fieldName + "' does not exist: " + node);
1114 }
1115 return field;
1116 }
1117
1118 private static <T> T handleErrorResponse(AggregatedHttpResponse res) {
1119 final HttpStatus status = res.status();
1120 if (status.codeClass() != HttpStatusClass.SUCCESS) {
1121 final JsonNode node = toJson(res, JsonNodeType.OBJECT);
1122 final JsonNode exceptionNode = node.get("exception");
1123 final JsonNode messageNode = node.get("message");
1124
1125 if (exceptionNode != null) {
1126 final String typeName = exceptionNode.textValue();
1127 if (typeName != null) {
1128 final Function<String, CentralDogmaException> exceptionFactory =
1129 EXCEPTION_FACTORIES.get(typeName);
1130 if (exceptionFactory != null) {
1131 throw exceptionFactory.apply(messageNode.textValue());
1132 }
1133 }
1134 }
1135 }
1136
1137 throw new CentralDogmaException("unexpected response: " + res.headers() + ", " + res.contentUtf8());
1138 }
1139 }