1   /*
2    * Copyright 2021 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.client;
17  
18  import static com.linecorp.centraldogma.internal.Util.unsafeCast;
19  
20  import java.util.concurrent.CompletableFuture;
21  import java.util.concurrent.Executor;
22  import java.util.concurrent.ScheduledExecutorService;
23  import java.util.function.Function;
24  
25  import javax.annotation.Nullable;
26  
27  import com.linecorp.centraldogma.common.PathPattern;
28  import com.linecorp.centraldogma.common.Revision;
29  
30  final class FilesWatcher<T> extends AbstractWatcher<T> {
31  
32      private final CentralDogma centralDogma;
33      private final String projectName;
34      private final String repositoryName;
35      private final PathPattern pathPattern;
36      private final long timeoutMillis;
37      private final boolean errorOnEntryNotFound;
38      @Nullable
39      private final Function<Revision, ? extends T> mapper;
40      @Nullable
41      private final Executor mapperExecutor;
42  
43      FilesWatcher(CentralDogma centralDogma, ScheduledExecutorService watchScheduler,
44                   String projectName, String repositoryName, PathPattern pathPattern,
45                   long timeoutMillis, boolean errorOnEntryNotFound,
46                   @Nullable Function<Object, ? extends T> mapper, Executor mapperExecutor,
47                   long delayOnSuccessMillis, long initialDelayMillis, long maxDelayMillis,
48                   double multiplier, double jitterRate) {
49          super(watchScheduler, projectName, repositoryName, pathPattern.patternString(), errorOnEntryNotFound,
50                delayOnSuccessMillis, initialDelayMillis, maxDelayMillis, multiplier, jitterRate);
51          this.centralDogma = centralDogma;
52          this.projectName = projectName;
53          this.repositoryName = repositoryName;
54          this.pathPattern = pathPattern;
55          this.timeoutMillis = timeoutMillis;
56          this.errorOnEntryNotFound = errorOnEntryNotFound;
57          this.mapper = mapper != null ? unsafeCast(mapper) : null;
58          this.mapperExecutor = mapperExecutor;
59      }
60  
61      @Override
62      CompletableFuture<Latest<T>> doWatch(Revision lastKnownRevision) {
63          final CompletableFuture<Revision> future = centralDogma.watchRepository(
64                  projectName, repositoryName, lastKnownRevision,
65                  pathPattern, timeoutMillis, errorOnEntryNotFound);
66          if (mapper == null) {
67              return future.thenApply(revision -> {
68                  if (revision == null) {
69                      return null;
70                  }
71                  //noinspection unchecked
72                  return new Latest<>(revision, (T) revision);
73              });
74          }
75          return future.thenApplyAsync(revision -> {
76              if (revision == null) {
77                  return null;
78              }
79              return new Latest<>(revision, mapper.apply(revision));
80          }, mapperExecutor);
81      }
82  }