1   /*
2    * Copyright 2021 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.client;
17  
18  import static com.linecorp.centraldogma.internal.Util.unsafeCast;
19  
20  import java.util.concurrent.CompletableFuture;
21  import java.util.concurrent.Executor;
22  import java.util.concurrent.ScheduledExecutorService;
23  import java.util.function.Function;
24  
25  import javax.annotation.Nullable;
26  
27  import com.linecorp.centraldogma.common.PathPattern;
28  import com.linecorp.centraldogma.common.Revision;
29  
30  import io.micrometer.core.instrument.MeterRegistry;
31  
32  final class FilesWatcher<T> extends AbstractWatcher<T> {
33  
34      private final CentralDogma centralDogma;
35      private final String projectName;
36      private final String repositoryName;
37      private final PathPattern pathPattern;
38      private final long timeoutMillis;
39      private final boolean errorOnEntryNotFound;
40      @Nullable
41      private final Function<Revision, ? extends T> mapper;
42      @Nullable
43      private final Executor mapperExecutor;
44  
45      FilesWatcher(CentralDogma centralDogma, ScheduledExecutorService watchScheduler,
46                   String projectName, String repositoryName, PathPattern pathPattern,
47                   long timeoutMillis, boolean errorOnEntryNotFound,
48                   @Nullable Function<Object, ? extends T> mapper, Executor mapperExecutor,
49                   long delayOnSuccessMillis, long initialDelayMillis, long maxDelayMillis,
50                   double multiplier, double jitterRate, @Nullable MeterRegistry meterRegistry) {
51          super(watchScheduler, projectName, repositoryName, pathPattern.patternString(), errorOnEntryNotFound,
52                delayOnSuccessMillis, initialDelayMillis, maxDelayMillis, multiplier, jitterRate, meterRegistry);
53          this.centralDogma = centralDogma;
54          this.projectName = projectName;
55          this.repositoryName = repositoryName;
56          this.pathPattern = pathPattern;
57          this.timeoutMillis = timeoutMillis;
58          this.errorOnEntryNotFound = errorOnEntryNotFound;
59          this.mapper = mapper != null ? unsafeCast(mapper) : null;
60          this.mapperExecutor = mapperExecutor;
61      }
62  
63      @Override
64      CompletableFuture<Latest<T>> doWatch(Revision lastKnownRevision) {
65          final CompletableFuture<Revision> future = centralDogma.watchRepository(
66                  projectName, repositoryName, lastKnownRevision,
67                  pathPattern, timeoutMillis, errorOnEntryNotFound);
68          if (mapper == null) {
69              return future.thenApply(revision -> {
70                  if (revision == null) {
71                      return null;
72                  }
73                  //noinspection unchecked
74                  return new Latest<>(revision, (T) revision);
75              });
76          }
77          return future.thenApplyAsync(revision -> {
78              if (revision == null) {
79                  return null;
80              }
81              return new Latest<>(revision, mapper.apply(revision));
82          }, mapperExecutor);
83      }
84  }