1   /*
2    * Copyright 2021 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package com.linecorp.centraldogma.client;
17  
18  import java.util.concurrent.CompletableFuture;
19  import java.util.concurrent.Executor;
20  import java.util.concurrent.ScheduledExecutorService;
21  import java.util.function.Function;
22  
23  import javax.annotation.Nullable;
24  
25  import com.linecorp.centraldogma.common.Entry;
26  import com.linecorp.centraldogma.common.Query;
27  import com.linecorp.centraldogma.common.Revision;
28  
29  import io.micrometer.core.instrument.MeterRegistry;
30  
31  final class FileWatcher<T> extends AbstractWatcher<T> {
32  
33      private final CentralDogma centralDogma;
34      private final String projectName;
35      private final String repositoryName;
36      private final Query<T> query;
37      private final long timeoutMillis;
38      private final boolean errorOnEntryNotFound;
39      @Nullable
40      private final Function<Object, ? extends T> mapper;
41      @Nullable
42      private final Executor mapperExecutor;
43  
44      FileWatcher(CentralDogma centralDogma, ScheduledExecutorService watchScheduler, String projectName,
45                  String repositoryName, Query<T> query, long timeoutMillis, boolean errorOnEntryNotFound,
46                  @Nullable Function<Object, ? extends T> mapper, Executor mapperExecutor,
47                  long delayOnSuccessMillis, long initialDelayMillis, long maxDelayMillis, double multiplier,
48                  double jitterRate, @Nullable MeterRegistry meterRegistry) {
49          super(watchScheduler, projectName, repositoryName, query.path(), errorOnEntryNotFound,
50                delayOnSuccessMillis, initialDelayMillis, maxDelayMillis, multiplier, jitterRate, meterRegistry);
51          this.centralDogma = centralDogma;
52          this.projectName = projectName;
53          this.repositoryName = repositoryName;
54          this.query = query;
55          this.timeoutMillis = timeoutMillis;
56          this.errorOnEntryNotFound = errorOnEntryNotFound;
57          this.mapper = mapper;
58          this.mapperExecutor = mapperExecutor;
59      }
60  
61      @Override
62      CompletableFuture<Latest<T>> doWatch(Revision lastKnownRevision) {
63          final CompletableFuture<Entry<T>> future = centralDogma.watchFile(projectName, repositoryName,
64                                                                            lastKnownRevision, query,
65                                                                            timeoutMillis, errorOnEntryNotFound);
66          if (mapper == null) {
67              return future.thenApply(entry -> {
68                  if (entry == null) {
69                      return null;
70                  }
71                  return new Latest<>(entry.revision(), entry.content());
72              });
73          }
74          return future.thenApplyAsync(entry -> {
75              if (entry == null) {
76                  return null;
77              }
78              return new Latest<>(entry.revision(), mapper.apply(entry.content()));
79          }, mapperExecutor);
80      }
81  }