cassandra-pr mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jolynch <...@git.apache.org>
Subject [GitHub] cassandra pull request #212: Rework the wait for healthy logic to count down...
Date Wed, 07 Nov 2018 22:07:37 GMT
Github user jolynch commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/212#discussion_r231696207
  
    --- Diff: src/java/org/apache/cassandra/net/StartupClusterConnectivityChecker.java ---
    @@ -48,81 +51,133 @@
     {
         private static final Logger logger = LoggerFactory.getLogger(StartupClusterConnectivityChecker.class);
     
    -    private final int targetPercent;
    +    private final boolean blockForRemoteDcs;
         private final long timeoutNanos;
     
    -    public static StartupClusterConnectivityChecker create(int targetPercent, int timeoutSecs)
    +    public static StartupClusterConnectivityChecker create(long timeoutSecs, boolean
blockForRemoteDcs)
         {
    -        timeoutSecs = Math.max(1, timeoutSecs);
    +        if (timeoutSecs < 0)
    +            logger.warn("skipping block-for-peers due to negative timeout. You may encounter
errors or timeouts on" +
    +                        " the first user query");
             if (timeoutSecs > 100)
                 logger.warn("setting the block-for-peers timeout (in seconds) to {} might
be a bit excessive, but using it nonetheless", timeoutSecs);
             long timeoutNanos = TimeUnit.SECONDS.toNanos(timeoutSecs);
     
    -        return new StartupClusterConnectivityChecker(targetPercent, timeoutNanos);
    +        return new StartupClusterConnectivityChecker(timeoutNanos, blockForRemoteDcs);
         }
     
         @VisibleForTesting
    -    StartupClusterConnectivityChecker(int targetPercent, long timeoutNanos)
    +    StartupClusterConnectivityChecker(long timeoutNanos, boolean blockForRemoteDcs)
         {
    -        this.targetPercent = Math.min(100, Math.max(0, targetPercent));
    +        this.blockForRemoteDcs = blockForRemoteDcs;
             this.timeoutNanos = timeoutNanos;
         }
     
         /**
          * @param peers The currently known peers in the cluster; argument is not modified.
    +     * @param getDatacenterSource A function for mapping peers to their datacenter.
          * @return true if the requested percentage of peers are marked ALIVE in gossip and
have their connections opened;
          * else false.
          */
    -    public boolean execute(Set<InetAddressAndPort> peers)
    +    public boolean execute(Set<InetAddressAndPort> peers, Function<InetAddressAndPort,
String> getDatacenterSource)
         {
    -        if (targetPercent == 0 || peers == null)
    +        if (peers == null || this.timeoutNanos < 0)
                 return true;
     
             // make a copy of the set, to avoid mucking with the input (in case it's a sensitive
collection)
             peers = new HashSet<>(peers);
    -        peers.remove(FBUtilities.getBroadcastAddressAndPort());
    +        InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort();
    +        String localDc = getDatacenterSource.apply(localAddress);
     
    +        peers.remove(localAddress);
             if (peers.isEmpty())
                 return true;
     
    -        logger.info("choosing to block until {}% of the {} known peers are marked alive
and connections are established; max time to wait = {} seconds",
    -                    targetPercent, peers.size(), TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
    +        // make a copy of the datacenter mapping (in case gossip updates happen during
this method or some such)
    +        Map<InetAddressAndPort, String> datacenterMap = peers.stream()
    +                                                             .collect(Collectors.toMap(k
-> k, getDatacenterSource));
    +        Function<InetAddressAndPort, String> getDatacenter = datacenterMap::get;
     
    -        long startNanos = System.nanoTime();
    +        Map<String, Set<InetAddressAndPort>> peersByDc = peers.stream()
    +                                                              .collect(Collectors.groupingBy(getDatacenter,
    +                                                                                    
        Collectors.toSet()));
    +
    +        if (!blockForRemoteDcs)
    +        {
    +            peersByDc.keySet().retainAll(Collections.singleton(localDc));
    +            logger.info("Blocking coordination until only a single peer is DOWN in the
local datacenter, timeout={}s",
    +                        TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
    +        }
    +        else
    +        {
    +            logger.info("Blocking coordination until only a single peer is DOWN in each
datacenter, timeout={}s",
    +                        TimeUnit.NANOSECONDS.toSeconds(timeoutNanos));
    +        }
     
             AckMap acks = new AckMap(3);
    -        int target = (int) ((targetPercent / 100.0) * peers.size());
    -        CountDownLatch latch = new CountDownLatch(target);
    +        Map<String, CountDownLatch> latchMap = new HashMap<>(peersByDc.size());
    +        for (Map.Entry<String, Set<InetAddressAndPort>> entry: peersByDc.entrySet())
    +        {
    +            latchMap.put(entry.getKey(), new CountDownLatch(Math.max(entry.getValue().size()
- 1, 0)));
    +        }
    +
    +        long startNanos = System.nanoTime();
     
             // set up a listener to react to new nodes becoming alive (in gossip), and account
for all the nodes that are already alive
    -        Set<InetAddressAndPort> alivePeers = Sets.newSetFromMap(new ConcurrentHashMap<>());
    -        AliveListener listener = new AliveListener(alivePeers, latch, acks);
    +        Set<InetAddressAndPort> alivePeers = Collections.newSetFromMap(new ConcurrentHashMap<>());
    +        AliveListener listener = new AliveListener(alivePeers, latchMap, acks, getDatacenter);
             Gossiper.instance.register(listener);
     
             // send out a ping message to open up the non-gossip connections
    -        sendPingMessages(peers, latch, acks);
    +        sendPingMessages(peers, latchMap, acks, getDatacenter);
    --- End diff --
    
    Done.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


Mime
View raw message