From notifications-return-2141-archive-asf-public=cust-asf.ponee.io@zookeeper.apache.org Fri Oct 4 10:51:35 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 3766E180651 for ; Fri, 4 Oct 2019 12:51:35 +0200 (CEST) Received: (qmail 26575 invoked by uid 500); 4 Oct 2019 10:51:34 -0000 Mailing-List: contact notifications-help@zookeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@zookeeper.apache.org Delivered-To: mailing list notifications@zookeeper.apache.org Received: (qmail 26566 invoked by uid 99); 4 Oct 2019 10:51:34 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Oct 2019 10:51:34 +0000 From: GitBox To: notifications@zookeeper.apache.org Subject: [GitHub] [zookeeper] symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network Message-ID: <157018629454.26005.11290011882728445564.gitbox@gitbox.apache.org> Date: Fri, 04 Oct 2019 10:51:34 -0000 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit symat commented on a change in pull request #1048: ZOOKEEPER-3188: Improve resilience to network URL: https://github.com/apache/zookeeper/pull/1048#discussion_r331445109 ########## File path: zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/MultipleAddresses.java ########## @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NoRouteToHostException; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class allows to store several quorum and electing addresses. + * + * See ZOOKEEPER-3188 for a discussion of this feature. + */ +public class MultipleAddresses { + private static final int DEFAULT_TIMEOUT = 100; + + private Set addresses; + private int timeout; + + public MultipleAddresses() { + addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); + timeout = DEFAULT_TIMEOUT; + } + + public MultipleAddresses(List addresses) { + this(addresses, DEFAULT_TIMEOUT); + } + + public MultipleAddresses(InetSocketAddress address) { + this(address, DEFAULT_TIMEOUT); + } + + public MultipleAddresses(List addresses, int timeout) { + this.addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); + this.addresses.addAll(addresses); + this.timeout = timeout; + } + + public MultipleAddresses(InetSocketAddress address, int timeout) { + addresses = Collections.newSetFromMap(new ConcurrentHashMap<>()); + addresses.add(address); + this.timeout = timeout; + } + + public int getTimeout() { + return timeout; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + + public boolean isEmpty() { + return addresses.isEmpty(); + } + + /** + * Returns all addresses. + * + * @return set of all InetSocketAddress + */ + public Set getAllAddresses() { + return Collections.unmodifiableSet(addresses); + } + + /** + * Returns wildcard addresses for all ports + * + * @return set of InetSocketAddress with wildcards for all ports + */ + public Set getWildcardAddresses() { + return addresses.stream().map(a -> new InetSocketAddress(a.getPort())).collect(Collectors.toSet()); + } + + /** + * Returns all ports + * + * @return list of all ports + */ + public List getAllPorts() { + return addresses.stream().map(InetSocketAddress::getPort).distinct().collect(Collectors.toList()); + } + + /** + * Returns distinct list of all host strings + * + * @return list of all hosts + */ + public List getAllHostStrings() { + return addresses.stream().map(InetSocketAddress::getHostString).distinct().collect(Collectors.toList()); + } + + public void addAddress(InetSocketAddress address) { + addresses.add(address); + } + + /** + * Returns reachable address. If none is reachable than throws exception. + * + * @return address which is reachable. + * @throws NoRouteToHostException if none address is reachable + */ + public InetSocketAddress getReachableAddress() throws NoRouteToHostException { + AtomicReference address = new AtomicReference<>(null); + getInetSocketAddressStream().forEach(addr -> checkIfAddressIsReachableAndSet(addr, address)); + + if (address.get() != null) { + return address.get(); + } else { + throw new NoRouteToHostException("No valid address among " + addresses); + } + } + + /** + * Returns reachable address or first one, if none is reachable. + * + * @return address which is reachable or fist one. + */ + public InetSocketAddress getReachableOrOne() { + InetSocketAddress address; + try { + address = getReachableAddress(); + } catch (NoRouteToHostException e) { + address = getOne(); + } + return address; + } + + /** + * Performs a DNS lookup for addresses. + * + * If the DNS lookup fails, than address remain unmodified. + */ + public void recreateSocketAddresses() { + Set temp = Collections.newSetFromMap(new ConcurrentHashMap<>()); + temp.addAll(getInetSocketAddressStream().map(this::recreateSocketAddress).collect(Collectors.toSet())); + addresses = temp; + } + + /** + * Returns first address from set. + * + * @return address from a set. + */ + public InetSocketAddress getOne() { + return addresses.iterator().next(); + } + + private void checkIfAddressIsReachableAndSet(InetSocketAddress address, + AtomicReference reachableAddress) { + for (int i = 0; i < 5 && reachableAddress.get() == null; i++) { + try { + if (address.getAddress().isReachable((i + 1) * timeout)) { + reachableAddress.compareAndSet(null, address); + break; + } + Thread.sleep(timeout); + } catch (NullPointerException | IOException | InterruptedException ignored) { + } + } + } + + private InetSocketAddress recreateSocketAddress(InetSocketAddress address) { + try { + return new InetSocketAddress(InetAddress.getByName(address.getHostString()), address.getPort()); + } catch (UnknownHostException e) { + return address; + } + } + + private Stream getInetSocketAddressStream() { + if (addresses.size() > 1) { + return addresses.parallelStream(); Review comment: sure, I will add a comment. Basically the idea is, that using parallelStream we can do hostname resolutions or isReachable checks parallel on all addresses. I think this is a meaningful optimization, otherwise we can loose many seconds e.g. during a leader election when the single working address is in the end of the list. Although I don't think this function is really necessary. I think we can simply use parallelStreams which would work for sets with single element too. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: users@infra.apache.org With regards, Apache Git Services