1   /*
2    * Copyright 2018 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.linecorp.centraldogma.client.armeria;
18  
19  import static com.google.common.base.MoreObjects.toStringHelper;
20  import static java.util.Objects.requireNonNull;
21  
22  import java.util.List;
23  import java.util.concurrent.CompletableFuture;
24  
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  
28  import com.linecorp.armeria.client.Endpoint;
29  import com.linecorp.armeria.client.endpoint.DynamicEndpointGroup;
30  import com.linecorp.armeria.client.endpoint.EndpointGroup;
31  import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy;
32  import com.linecorp.centraldogma.client.CentralDogma;
33  import com.linecorp.centraldogma.client.Watcher;
34  import com.linecorp.centraldogma.common.Query;
35  
36  /**
37   * A {@link DynamicEndpointGroup} implementation that retrieves the {@link Endpoint} list from an entry in
38   * Central Dogma. The entry can be a JSON file or a plain text file.
39   *
40   * <p>For example, the following JSON array will be served as a list of {@link Endpoint}s:
41   * <pre>{@code
42   *  [
43   *      "host1:port1",
44   *      "host2:port2",
45   *      "host3:port3"
46   *  ]
47   * }</pre>
48   *
49   * <p>The JSON array file could be retrieved as an {@link EndpointGroup} using the following code:
50   * <pre>{@code
51   * CentralDogmaEndpointGroup<JsonNode> endpointGroup = CentralDogmaEndpointGroup.of(
52   *      centralDogma, "myProject", "myRepo",
53   *      Query.ofJson("/endpoints.json"),
54   *      EndpointListDecoder.JSON);
55   * endpointGroup.awaitInitialEndpoints();
56   * endpointGroup.endpoints();
57   * }</pre>
58   *
59   * @param <T> the type of the file in Central Dogma
60   */
61  public final class CentralDogmaEndpointGroup<T> extends DynamicEndpointGroup {
62      private static final Logger logger = LoggerFactory.getLogger(CentralDogmaEndpointGroup.class);
63  
64      private final Watcher<T> instanceListWatcher;
65      private final EndpointListDecoder<T> endpointListDecoder;
66  
67      /**
68       * Creates a new {@link CentralDogmaEndpointGroup}.
69       *
70       * @param watcher a {@link Watcher}
71       * @param endpointListDecoder an {@link EndpointListDecoder}
72       */
73      public static <T> CentralDogmaEndpointGroup<T> ofWatcher(Watcher<T> watcher,
74                                                               EndpointListDecoder<T> endpointListDecoder) {
75          return new CentralDogmaEndpointGroup<>(EndpointSelectionStrategy.weightedRoundRobin(),
76                                                 watcher, endpointListDecoder);
77      }
78  
79      /**
80       * Creates a new {@link CentralDogmaEndpointGroup}.
81       *
82       * @param centralDogma a {@link CentralDogma}
83       * @param projectName a Central Dogma project name
84       * @param repositoryName a Central Dogma repository name
85       * @param query a {@link Query} to route file
86       * @param endpointListDecoder an {@link EndpointListDecoder}
87       */
88      public static <T> CentralDogmaEndpointGroup<T> of(CentralDogma centralDogma,
89                                                        String projectName, String repositoryName,
90                                                        Query<T> query,
91                                                        EndpointListDecoder<T> endpointListDecoder) {
92          return ofWatcher(centralDogma.forRepo(projectName, repositoryName)
93                                       .watcher(query)
94                                       .start(),
95                           endpointListDecoder);
96      }
97  
98      /**
99       * Returns a new {@link CentralDogmaEndpointGroupBuilder} with the {@link Watcher}
100      * and {@link EndpointListDecoder}. You can create a {@link Watcher} using {@link CentralDogma}:
101      *
102      * <pre>{@code
103      * CentralDogma centralDogma = ...
104      * Query<T> query = ... // The query to the entry that contains the list of endpoints.
105      * Watcher watcher = centralDogma.fileWatcher(projectName, repositoryName, query);
106      * }</pre>
107      */
108     public static <T> CentralDogmaEndpointGroupBuilder<T> builder(Watcher<T> watcher,
109                                                                   EndpointListDecoder<T> endpointListDecoder) {
110         return new CentralDogmaEndpointGroupBuilder<>(watcher, endpointListDecoder);
111     }
112 
113     CentralDogmaEndpointGroup(EndpointSelectionStrategy strategy,
114                               Watcher<T> instanceListWatcher,
115                               EndpointListDecoder<T> endpointListDecoder) {
116         super(strategy);
117         this.instanceListWatcher = requireNonNull(instanceListWatcher, "instanceListWatcher");
118         this.endpointListDecoder = requireNonNull(endpointListDecoder, "endpointListDecoder");
119         registerWatcher();
120     }
121 
122     private void registerWatcher() {
123         instanceListWatcher.watch((revision, instances) -> {
124             try {
125                 final List<Endpoint> newEndpoints = endpointListDecoder.decode(instances);
126                 if (newEndpoints.isEmpty()) {
127                     logger.info("Not refreshing the endpoint list of {} because it's empty. {}",
128                                 instanceListWatcher, revision);
129                     return;
130                 }
131                 setEndpoints(newEndpoints);
132             } catch (Exception e) {
133                 logger.warn("Failed to re-retrieve the endpoint list from Central Dogma.", e);
134             }
135         });
136         instanceListWatcher.initialValueFuture().exceptionally(e -> {
137             logger.warn("Failed to retrieve the initial instance list from Central Dogma.", e);
138             return null;
139         });
140     }
141 
142     @Override
143     protected void doCloseAsync(CompletableFuture<?> future) {
144         instanceListWatcher.close();
145         future.complete(null);
146     }
147 
148     @Override
149     public String toString() {
150         return toStringHelper(this)
151                 .add("instanceListWatcher", instanceListWatcher)
152                 .add("endpointListDecoder", endpointListDecoder)
153                 .add("endpointGroup", super.toString())
154                 .toString();
155     }
156 }