1   /*
2    * Copyright 2017 LINE Corporation
3    *
4    * LINE Corporation licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package com.linecorp.centraldogma.server;
18  
19  import static com.google.common.base.MoreObjects.firstNonNull;
20  import static com.google.common.base.Preconditions.checkArgument;
21  import static com.linecorp.centraldogma.server.CentralDogmaConfig.convertValue;
22  import static java.util.Objects.requireNonNull;
23  
24  import java.net.InetAddress;
25  import java.net.NetworkInterface;
26  import java.net.SocketException;
27  import java.net.UnknownHostException;
28  import java.util.Enumeration;
29  import java.util.Map;
30  import java.util.Map.Entry;
31  import java.util.concurrent.TimeUnit;
32  
33  import javax.annotation.Nullable;
34  
35  import com.fasterxml.jackson.annotation.JsonCreator;
36  import com.fasterxml.jackson.annotation.JsonProperty;
37  import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
38  import com.google.common.annotations.VisibleForTesting;
39  import com.google.common.base.MoreObjects;
40  import com.google.common.collect.ImmutableMap;
41  
42  import io.netty.util.NetUtil;
43  
44  /**
45   * ZooKeeper-based replication configuration.
46   */
47  public final class ZooKeeperReplicationConfig implements ReplicationConfig {
48  
49      private static final int DEFAULT_TIMEOUT_MILLIS = 10000;
50      private static final int DEFAULT_NUM_WORKERS = 16;
51      private static final int DEFAULT_MAX_LOG_COUNT = 1024;
52      private static final long DEFAULT_MIN_LOG_AGE_MILLIS = TimeUnit.DAYS.toMillis(1);
53      private static final String DEFAULT_SECRET = "ch4n63m3";
54  
55      private final int serverId;
56      private final Map<Integer, ZooKeeperServerConfig> servers;
57      @Nullable
58      private final String secret;
59      private final Map<String, String> additionalProperties;
60      private final int timeoutMillis;
61      private final int numWorkers;
62      private final int maxLogCount;
63      private final long minLogAgeMillis;
64  
65      /**
66       * Creates a new replication configuration.
67       *
68       * @param serverId the ID of this ZooKeeper server in {@code servers}
69       * @param servers the ZooKeeper server addresses, keyed by their ZooKeeper server IDs
70       */
71      public ZooKeeperReplicationConfig(int serverId, Map<Integer, ZooKeeperServerConfig> servers) {
72          this(serverId, servers, null, null, null, null, null, null);
73      }
74  
75      @VisibleForTesting
76      ZooKeeperReplicationConfig(
77              int serverId, Map<Integer, ZooKeeperServerConfig> servers, String secret,
78              Map<String, String> additionalProperties,
79              int timeoutMillis, int numWorkers, int maxLogCount, long minLogAgeMillis) {
80          this(Integer.valueOf(serverId), servers, secret, additionalProperties, Integer.valueOf(timeoutMillis),
81               Integer.valueOf(numWorkers), Integer.valueOf(maxLogCount), Long.valueOf(minLogAgeMillis));
82      }
83  
84      @JsonCreator
85      ZooKeeperReplicationConfig(@JsonProperty("serverId") @Nullable Integer serverId,
86                                 @JsonProperty(value = "servers", required = true)
87                                 @JsonDeserialize(keyAs = Integer.class, contentAs = ZooKeeperServerConfig.class)
88                                 Map<Integer, ZooKeeperServerConfig> servers,
89                                 @JsonProperty("secret") @Nullable String secret,
90                                 @JsonProperty("additionalProperties")
91                                 @JsonDeserialize(keyAs = String.class, contentAs = String.class)
92                                 @Nullable Map<String, String> additionalProperties,
93                                 @JsonProperty("timeoutMillis") @Nullable Integer timeoutMillis,
94                                 @JsonProperty("numWorkers") @Nullable Integer numWorkers,
95                                 @JsonProperty("maxLogCount") @Nullable Integer maxLogCount,
96                                 @JsonProperty("minLogAgeMillis") @Nullable Long minLogAgeMillis) {
97  
98          requireNonNull(servers, "servers");
99          this.serverId = serverId != null ? serverId : findServerId(servers);
100         checkArgument(this.serverId > 0, "serverId: %s (expected: > 0)", serverId);
101         this.secret = secret;
102         checkArgument(!secret().isEmpty(), "secret is empty.");
103 
104         servers.forEach((id, server) -> {
105             checkArgument(id > 0, "'servers' contains non-positive server ID: %s (expected: > 0)", id);
106         });
107         this.servers = ImmutableMap.copyOf(servers);
108 
109         checkArgument(!this.servers.isEmpty(), "servers is empty.");
110         checkArgument(this.servers.containsKey(this.serverId),
111                       "servers must contain the server '%s'.", this.serverId);
112 
113         this.additionalProperties = firstNonNull(additionalProperties, ImmutableMap.of());
114 
115         this.timeoutMillis =
116                 timeoutMillis == null || timeoutMillis <= 0 ? DEFAULT_TIMEOUT_MILLIS : timeoutMillis;
117 
118         this.numWorkers =
119                 numWorkers == null || numWorkers <= 0 ? DEFAULT_NUM_WORKERS : numWorkers;
120 
121         this.maxLogCount =
122                 maxLogCount == null || maxLogCount <= 0 ? DEFAULT_MAX_LOG_COUNT : maxLogCount;
123 
124         this.minLogAgeMillis =
125                 minLogAgeMillis == null || minLogAgeMillis <= 0 ? DEFAULT_MIN_LOG_AGE_MILLIS : minLogAgeMillis;
126     }
127 
128     private static int findServerId(Map<Integer, ZooKeeperServerConfig> servers) {
129         int serverId = -1;
130         try {
131             for (final Enumeration<NetworkInterface> e = NetworkInterface.getNetworkInterfaces();
132                  e.hasMoreElements();) {
133                 serverId = findServerId(servers, serverId, e.nextElement());
134             }
135         } catch (SocketException e) {
136             throw new IllegalStateException("failed to retrieve the network interface list", e);
137         }
138 
139         if (serverId < 0) {
140             throw new IllegalStateException(
141                     "failed to auto-detect server ID because there is no matching IP address.");
142         }
143 
144         return serverId;
145     }
146 
147     private static int findServerId(Map<Integer, ZooKeeperServerConfig> servers, int currentServerId,
148                                     NetworkInterface iface) {
149         for (final Enumeration<InetAddress> ea = iface.getInetAddresses(); ea.hasMoreElements();) {
150             currentServerId = findServerId(servers, currentServerId, ea.nextElement());
151         }
152         return currentServerId;
153     }
154 
155     private static int findServerId(Map<Integer, ZooKeeperServerConfig> servers, int currentServerId,
156                                     InetAddress addr) {
157         final String ip = NetUtil.toAddressString(addr, true);
158         for (Entry<Integer, ZooKeeperServerConfig> entry : servers.entrySet()) {
159             final String zkAddr;
160             try {
161                 zkAddr = NetUtil.toAddressString(InetAddress.getByName(entry.getValue().host()), true);
162             } catch (UnknownHostException uhe) {
163                 throw new IllegalStateException(
164                         "failed to resolve the IP address of the server name: " + entry.getValue().host());
165             }
166 
167             if (zkAddr.equals(ip)) {
168                 final int serverId = entry.getKey().intValue();
169                 if (currentServerId < 0) {
170                     currentServerId = serverId;
171                 } else if (currentServerId != serverId) {
172                     throw new IllegalStateException(
173                             "cannot auto-detect server ID because there are more than one IP address match. " +
174                             "Both server ID " + currentServerId + " and " + serverId +
175                             " have a matching IP address. Consider specifying server ID explicitly.");
176                 }
177             }
178         }
179         return currentServerId;
180     }
181 
182     @Override
183     public ReplicationMethod method() {
184         return ReplicationMethod.ZOOKEEPER;
185     }
186 
187     /**
188      * Returns the ID of this ZooKeeper server in {@link #servers()}.
189      */
190     @JsonProperty
191     public int serverId() {
192         return serverId;
193     }
194 
195     /**
196      * Returns the configuration of this ZooKeeper server in {@link #servers()}.
197      */
198     public ZooKeeperServerConfig serverConfig() {
199         return servers.get(serverId);
200     }
201 
202     /**
203      * Returns the configuration of all ZooKeeper servers, keyed by their server IDs.
204      */
205     @JsonProperty
206     public Map<Integer, ZooKeeperServerConfig> servers() {
207         return servers;
208     }
209 
210     /**
211      * Returns the secret string used for authenticating the ZooKeeper peers.
212      */
213     public String secret() {
214         return firstNonNull(convertValue(secret, "replication.secret"), DEFAULT_SECRET);
215     }
216 
217     /**
218      * Returns the additional ZooKeeper properties.
219      * If unspecified, an empty {@link Map} is returned.
220      */
221     @JsonProperty
222     public Map<String, String> additionalProperties() {
223         return additionalProperties;
224     }
225 
226     /**
227      * Returns the ZooKeeper timeout, in milliseconds.
228      * If unspecified, the default of {@value #DEFAULT_TIMEOUT_MILLIS} is returned.
229      */
230     @JsonProperty
231     public int timeoutMillis() {
232         return timeoutMillis;
233     }
234 
235     /**
236      * Returns the number of worker threads dedicated for replication.
237      * If unspecified, the default of {@value #DEFAULT_NUM_WORKERS} is returned.
238      */
239     @JsonProperty
240     public int numWorkers() {
241         return numWorkers;
242     }
243 
244     /**
245      * Returns the maximum number of log items to keep in ZooKeeper. Note that the log items will still not be
246      * removed if they are younger than {@link #minLogAgeMillis()}.
247      * If unspecified, the default of {@value #DEFAULT_MAX_LOG_COUNT} is returned.
248      */
249     @JsonProperty
250     public int maxLogCount() {
251         return maxLogCount;
252     }
253 
254     /**
255      * Returns the minimum allowed age of log items before they are removed from ZooKeeper.
256      * If unspecified, the default of 1 hour is returned.
257      */
258     @JsonProperty
259     public long minLogAgeMillis() {
260         return minLogAgeMillis;
261     }
262 
263     @Override
264     public int hashCode() {
265         return serverId;
266     }
267 
268     @Override
269     public boolean equals(Object obj) {
270         if (!(obj instanceof ZooKeeperReplicationConfig)) {
271             return false;
272         }
273 
274         if (obj == this) {
275             return true;
276         }
277 
278         final ZooKeeperReplicationConfig that = (ZooKeeperReplicationConfig) obj;
279 
280         return serverId() == that.serverId() &&
281                servers().equals(that.servers()) &&
282                additionalProperties().equals(that.additionalProperties()) &&
283                timeoutMillis() == that.timeoutMillis() &&
284                numWorkers() == that.numWorkers() &&
285                maxLogCount() == that.maxLogCount() &&
286                minLogAgeMillis() == that.minLogAgeMillis();
287     }
288 
289     @Override
290     public String toString() {
291         return MoreObjects.toStringHelper(this)
292                           .add("serverId", serverId())
293                           .add("servers", servers())
294                           .add("additionalProperties", additionalProperties())
295                           .add("timeoutMillis", timeoutMillis())
296                           .add("numWorkers", numWorkers())
297                           .add("maxLogCount", maxLogCount())
298                           .add("minLogAgeMillis", minLogAgeMillis()).toString();
299     }
300 }