cassandra-pr mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jolynch <...@git.apache.org>
Subject [GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n...
Date Fri, 30 Nov 2018 19:38:24 GMT
Github user jolynch commented on a diff in the pull request:

    https://github.com/apache/cassandra/pull/283#discussion_r237978698
  
    --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---
    @@ -154,31 +326,203 @@ private void registerMBean()
     
         public void close()
         {
    -        updateSchedular.cancel(false);
    -        resetSchedular.cancel(false);
    +        if (updateScoresScheduler != null)
    +            updateScoresScheduler.cancel(false);
    +        if (updateSamplesScheduler != null)
    +            updateSamplesScheduler.cancel(false);
    +
    +        for (AnnotatedMeasurement measurement : samples.values())
    +        {
    +            if (measurement.probeFuture != null)
    +                measurement.probeFuture.cancel(false);
    +
    +            measurement.millisSinceLastMeasure.set(0);
    +            measurement.millisSinceLastRequest.set(MAX_PROBE_INTERVAL_MS);
    +            measurement.probeTimerMillis = 0;
    +        }
     
             MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
             try
             {
    -            mbs.unregisterMBean(new ObjectName(mbeanName));
    +            if (mbeanRegistered)
    +                mbs.unregisterMBean(new ObjectName(mbeanName));
             }
             catch (Exception e)
             {
                 throw new RuntimeException(e);
             }
         }
     
    +    /**
    +     * Background task running on the samples dictionary. The default implementation
sends latency probes (PING)
    +     * messages to explore nodes that we have not received timings for recently but have
ranked in
    +     * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, ReplicaCollection)}.
    +     */
    +    protected void updateSamples()
    +    {
    +        // Split calculation of probe timers from sending probes for testability
    +        calculateProbes(samples, dynamicLatencyProbeInterval);
    +
    +        if (!StorageService.instance.isGossipActive())
    +            return;
    +
    +        schedulePings(samples);
    +    }
    +
    +    /**
    +     * This method mutates the passed AnnotatedMeasurements to implement capped exponential
backoff per endpoint.
    +     *
    +     * The algorithm is as follows:
    +     * 1. All samples get their millisSinceLastMeasure and millisSinceLastRequest fields
    +     *    incremented by the passed interval
    +     * 2. Any recently requested (ranked) endpoints that have not been measured recently
(e.g. because the snitch
    +     *    has sent them no traffic) get probes with exponential backoff.
    +     *
    +     * The backoff is capped at MAX_PROBE_INTERVAL_MS. Furthermore the probes are stopped
after
    +     * MAX_PROBE_INTERVAL_MS of no ranking requests as well.
    +     *
    +     * At the end of this method, any passed AnnotatedMeasurements that need latency
probes will have non zero
    +     * probeTimerMillis members set.
    +     */
    +    @VisibleForTesting
    +    static void calculateProbes(Map<InetAddressAndPort, AnnotatedMeasurement> samples,
long intervalMillis) {
    +        for (Map.Entry<InetAddressAndPort, AnnotatedMeasurement> entry: samples.entrySet())
    +        {
    +            if (entry.getKey().equals(FBUtilities.getBroadcastAddressAndPort()))
    +                continue;
    +
    +            AnnotatedMeasurement measurement = entry.getValue();
    +            long lastMeasure = measurement.millisSinceLastMeasure.getAndAdd(intervalMillis);
    +            long lastRequest = measurement.millisSinceLastRequest.getAndAdd(intervalMillis);
    +
    +            if (lastMeasure >= MIN_PROBE_INTERVAL_MS && lastRequest < MAX_PROBE_INTERVAL_MS)
    +            {
    +                if (measurement.probeTimerMillis == 0)
    +                {
    +                    measurement.probeTimerMillis = intervalMillis;
    +                }
    +                else if (measurement.probeFuture != null && measurement.probeFuture.isDone())
    +                {
    +                    measurement.probeTimerMillis = Math.min(MAX_PROBE_INTERVAL_MS, measurement.probeTimerMillis
* 2);
    +                }
    +            }
    +            else
    +            {
    +                measurement.probeTimerMillis = 0;
    +            }
    +        }
    +    }
    +
    +    @VisibleForTesting
    +    void schedulePings(Map<InetAddressAndPort, AnnotatedMeasurement> samples)
    +    {
    +        for (Map.Entry<InetAddressAndPort, AnnotatedMeasurement> entry: samples.entrySet())
    +        {
    +            AnnotatedMeasurement measurement = entry.getValue();
    +            long delay = measurement.probeTimerMillis;
    +            long millisSinceLastRequest = measurement.millisSinceLastRequest.get();
    +
    +            if (millisSinceLastRequest > MAX_PROBE_INTERVAL_MS && !Gossiper.instance.isAlive(entry.getKey()))
    +            {
    +                samples.remove(entry.getKey());
    +            }
    +
    +            if (delay > 0 && millisSinceLastRequest < MAX_PROBE_INTERVAL_MS
&&
    --- End diff --
    
    Yea I don't think this check is needed anymore since I separated out the two pieces, removed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org
For additional commands, e-mail: pr-help@cassandra.apache.org


Mime
View raw message