1   /*
2    * Copyright 2021 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.client;
17  
18  import java.util.concurrent.CompletableFuture;
19  import java.util.concurrent.Executor;
20  import java.util.concurrent.ScheduledExecutorService;
21  import java.util.function.Function;
22  
23  import javax.annotation.Nullable;
24  
25  import com.linecorp.centraldogma.common.Entry;
26  import com.linecorp.centraldogma.common.Query;
27  import com.linecorp.centraldogma.common.Revision;
28  
29  final class FileWatcher<T> extends AbstractWatcher<T> {
30  
31      private final CentralDogma centralDogma;
32      private final String projectName;
33      private final String repositoryName;
34      private final Query<T> query;
35      private final long timeoutMillis;
36      private final boolean errorOnEntryNotFound;
37      @Nullable
38      private final Function<Object, ? extends T> mapper;
39      @Nullable
40      private final Executor mapperExecutor;
41  
42      FileWatcher(CentralDogma centralDogma, ScheduledExecutorService watchScheduler,
43                  String projectName, String repositoryName, Query<T> query,
44                  long timeoutMillis, boolean errorOnEntryNotFound,
45                  @Nullable Function<Object, ? extends T> mapper, Executor mapperExecutor,
46                  long delayOnSuccessMillis, long initialDelayMillis, long maxDelayMillis,
47                  double multiplier, double jitterRate) {
48          super(watchScheduler, projectName, repositoryName, query.path(), errorOnEntryNotFound,
49                delayOnSuccessMillis, initialDelayMillis, maxDelayMillis, multiplier, jitterRate);
50          this.centralDogma = centralDogma;
51          this.projectName = projectName;
52          this.repositoryName = repositoryName;
53          this.query = query;
54          this.timeoutMillis = timeoutMillis;
55          this.errorOnEntryNotFound = errorOnEntryNotFound;
56          this.mapper = mapper;
57          this.mapperExecutor = mapperExecutor;
58      }
59  
60      @Override
61      CompletableFuture<Latest<T>> doWatch(Revision lastKnownRevision) {
62          final CompletableFuture<Entry<T>> future = centralDogma.watchFile(
63                  projectName, repositoryName, lastKnownRevision, query, timeoutMillis, errorOnEntryNotFound);
64          if (mapper == null) {
65              return future.thenApply(entry -> {
66                  if (entry == null) {
67                      return null;
68                  }
69                  return new Latest<>(entry.revision(), entry.content());
70              });
71          }
72          return future.thenApplyAsync(entry -> {
73              if (entry == null) {
74                  return null;
75              }
76              return new Latest<>(entry.revision(), mapper.apply(entry.content()));
77          }, mapperExecutor);
78      }
79  }