1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.linecorp.centraldogma.client.armeria;
18
19 import static com.google.common.base.MoreObjects.toStringHelper;
20 import static java.util.Objects.requireNonNull;
21
22 import java.util.List;
23 import java.util.concurrent.CompletableFuture;
24
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import com.linecorp.armeria.client.Endpoint;
29 import com.linecorp.armeria.client.endpoint.DynamicEndpointGroup;
30 import com.linecorp.armeria.client.endpoint.EndpointGroup;
31 import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy;
32 import com.linecorp.centraldogma.client.CentralDogma;
33 import com.linecorp.centraldogma.client.Watcher;
34 import com.linecorp.centraldogma.common.Query;
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61 public final class CentralDogmaEndpointGroup<T> extends DynamicEndpointGroup {
62 private static final Logger logger = LoggerFactory.getLogger(CentralDogmaEndpointGroup.class);
63
64 private final Watcher<T> instanceListWatcher;
65 private final EndpointListDecoder<T> endpointListDecoder;
66
67
68
69
70
71
72
73 public static <T> CentralDogmaEndpointGroup<T> ofWatcher(Watcher<T> watcher,
74 EndpointListDecoder<T> endpointListDecoder) {
75 return new CentralDogmaEndpointGroup<>(EndpointSelectionStrategy.weightedRoundRobin(),
76 watcher, endpointListDecoder);
77 }
78
79
80
81
82
83
84
85
86
87
88 public static <T> CentralDogmaEndpointGroup<T> of(CentralDogma centralDogma,
89 String projectName, String repositoryName,
90 Query<T> query,
91 EndpointListDecoder<T> endpointListDecoder) {
92 return ofWatcher(centralDogma.forRepo(projectName, repositoryName)
93 .watcher(query)
94 .start(),
95 endpointListDecoder);
96 }
97
98
99
100
101
102
103
104
105
106
107
108 public static <T> CentralDogmaEndpointGroupBuilder<T> builder(Watcher<T> watcher,
109 EndpointListDecoder<T> endpointListDecoder) {
110 return new CentralDogmaEndpointGroupBuilder<>(watcher, endpointListDecoder);
111 }
112
113 CentralDogmaEndpointGroup(EndpointSelectionStrategy strategy,
114 Watcher<T> instanceListWatcher,
115 EndpointListDecoder<T> endpointListDecoder) {
116 super(strategy);
117 this.instanceListWatcher = requireNonNull(instanceListWatcher, "instanceListWatcher");
118 this.endpointListDecoder = requireNonNull(endpointListDecoder, "endpointListDecoder");
119 registerWatcher();
120 }
121
122 private void registerWatcher() {
123 instanceListWatcher.watch((revision, instances) -> {
124 try {
125 final List<Endpoint> newEndpoints = endpointListDecoder.decode(instances);
126 if (newEndpoints.isEmpty()) {
127 logger.info("Not refreshing the endpoint list of {} because it's empty. {}",
128 instanceListWatcher, revision);
129 return;
130 }
131 setEndpoints(newEndpoints);
132 } catch (Exception e) {
133 logger.warn("Failed to re-retrieve the endpoint list from Central Dogma.", e);
134 }
135 });
136 instanceListWatcher.initialValueFuture().exceptionally(e -> {
137 logger.warn("Failed to retrieve the initial instance list from Central Dogma.", e);
138 return null;
139 });
140 }
141
142 @Override
143 protected void doCloseAsync(CompletableFuture<?> future) {
144 instanceListWatcher.close();
145 future.complete(null);
146 }
147
148 @Override
149 public String toString() {
150 return toStringHelper(this)
151 .add("instanceListWatcher", instanceListWatcher)
152 .add("endpointListDecoder", endpointListDecoder)
153 .add("endpointGroup", super.toString())
154 .toString();
155 }
156 }