From pr-return-1591-archive-asf-public=cust-asf.ponee.io@cassandra.apache.org Sat Nov 10 02:37:21 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id EBAA6180627 for ; Sat, 10 Nov 2018 02:37:20 +0100 (CET) Received: (qmail 11353 invoked by uid 500); 10 Nov 2018 01:37:20 -0000 Mailing-List: contact pr-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: pr@cassandra.apache.org Delivered-To: mailing list pr@cassandra.apache.org Received: (qmail 11340 invoked by uid 99); 10 Nov 2018 01:37:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Nov 2018 01:37:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5769CE1267; Sat, 10 Nov 2018 01:37:19 +0000 (UTC) From: jolynch To: pr@cassandra.apache.org Reply-To: pr@cassandra.apache.org References: In-Reply-To: Subject: [GitHub] cassandra pull request #283: CASSANDRA-14459: DynamicEndpointSnitch should n... Content-Type: text/plain Message-Id: <20181110013719.5769CE1267@git1-us-west.apache.org> Date: Sat, 10 Nov 2018 01:37:19 +0000 (UTC) Github user jolynch commented on a diff in the pull request: https://github.com/apache/cassandra/pull/283#discussion_r232435926 --- Diff: src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java --- @@ -23,128 +23,300 @@ import java.net.UnknownHostException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import com.codahale.metrics.ExponentiallyDecayingReservoir; import javax.management.MBeanServer; import javax.management.ObjectName; -import com.codahale.metrics.Snapshot; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.SettableFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.dynamicsnitch.DynamicEndpointSnitchHistogram; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; +import org.apache.cassandra.net.LatencyMeasurementType; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.PingMessage; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; +import static org.apache.cassandra.net.MessagingService.Verb.PING; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.LARGE_MESSAGE; +import static org.apache.cassandra.net.async.OutboundConnectionIdentifier.ConnectionType.SMALL_MESSAGE; + + /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector + * Note that the subclasses (e.g. {@link DynamicEndpointSnitchHistogram}) are responsible for actually measuring + * latency and providing an ISnitchMeasurement implementation back to this class. */ -public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean +public abstract class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILatencySubscriber, DynamicEndpointSnitchMBean { - private static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); - - private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values - private static final int WINDOW_SIZE = 100; - - private volatile int dynamicUpdateInterval = DatabaseDescriptor.getDynamicUpdateInterval(); - private volatile int dynamicResetInterval = DatabaseDescriptor.getDynamicResetInterval(); - private volatile double dynamicBadnessThreshold = DatabaseDescriptor.getDynamicBadnessThreshold(); + private static final Logger logger = LoggerFactory.getLogger(DynamicEndpointSnitch.class); + + // Latency measurement and ranking. The samples contain latency measurements and the scores are used for ranking + protected boolean registered = false; + protected static final boolean USE_SEVERITY = !Boolean.getBoolean("cassandra.ignore_dynamic_snitch_severity"); + protected volatile Map scores = new HashMap<>(); + protected final Map samples = new ConcurrentHashMap<>(); + + // Latency probe functionality for actively probing endpoints that we haven't measured recently but are ranking + public static final long MAX_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_max_probe_interval_ms", 60 * 10 * 1000L); + public static final long MIN_PROBE_INTERVAL_MS = Long.getLong("cassandra.dynamic_snitch_min_probe_interval_ms", 60 * 1000L) ; + // The probe rate is set later when configuration is read + protected static final RateLimiter probeRateLimiter = RateLimiter.create(1); + protected static final DebuggableScheduledThreadPoolExecutor latencyProbeExecutor = new DebuggableScheduledThreadPoolExecutor(1, "LatencyProbes", Thread.MIN_PRIORITY); + + // User configuration of the snitch tunables + protected volatile int dynamicUpdateInterval = -1; + protected volatile int dynamicLatencyProbeInterval = -1; + protected volatile double dynamicBadnessThreshold = 0; // the score for a merged set of endpoints must be this much worse than the score for separate endpoints to // warrant not merging two ranges into a single range private static final double RANGE_MERGING_PREFERENCE = 1.5; private String mbeanName; - private boolean registered = false; - - private volatile HashMap scores = new HashMap<>(); - private final ConcurrentHashMap samples = new ConcurrentHashMap<>(); + private boolean mbeanRegistered = false; public final IEndpointSnitch subsnitch; - private volatile ScheduledFuture updateSchedular; - private volatile ScheduledFuture resetSchedular; - - private final Runnable update; - private final Runnable reset; + private volatile ScheduledFuture updateScoresScheduler; + private volatile ScheduledFuture updateSamplesScheduler; public DynamicEndpointSnitch(IEndpointSnitch snitch) { this(snitch, null); } - public DynamicEndpointSnitch(IEndpointSnitch snitch, String instance) + protected DynamicEndpointSnitch(IEndpointSnitch snitch, String instance) { mbeanName = "org.apache.cassandra.db:type=DynamicEndpointSnitch"; if (instance != null) mbeanName += ",instance=" + instance; subsnitch = snitch; - update = new Runnable() + + if (DatabaseDescriptor.isDaemonInitialized()) { - public void run() - { - updateScores(); - } - }; - reset = new Runnable() + applyConfigChanges(DatabaseDescriptor.getDynamicUpdateInterval(), + DatabaseDescriptor.getDynamicSampleUpdateInterval(), + DatabaseDescriptor.getDynamicBadnessThreshold()); + open(); + } + } + + /** + * Allows subclasses to inject new ways of measuring latency back to this abstract base class. + */ + protected interface ISnitchMeasurement + { + void sample(long value); + double measure(); + Iterable measurements(); + } + + /** + * Adds some boookeeping that the DES uses over top of the various metrics techniques used by the + * implementations. This is used to allow CASSANDRA-14459 latency probes as well as further experimentation + * on new latency measurement techniques in CASSANDRA-14817 + * + * {@link AnnotatedMeasurement#millisSinceLastRequest} is set to zero through + * {@link DynamicEndpointSnitch#sortedByProximity(InetAddressAndPort, ReplicaCollection)}. It defaults to + * the maximum interval so that we only start probing once it has been ranked at least once + * {@link AnnotatedMeasurement#millisSinceLastMeasure} is set to zero from + * {@link DynamicEndpointSnitch#receiveTiming(InetAddressAndPort, long, LatencyMeasurementType)}. + * + * {@link AnnotatedMeasurement#millisSinceLastMeasure and {@link AnnotatedMeasurement#probeTimerMillis} + * are incremented via {@link DynamicEndpointSnitch#updateSamples()} + */ + protected static class AnnotatedMeasurement + { + // Used to optimally target latency probes only on nodes that are both requested for ranking + // and are not being measured. For example with token aware clients a large portion of the cluster will never + // be ranked at all and therefore we won't probe them. + public AtomicLong millisSinceLastRequest = new AtomicLong(MAX_PROBE_INTERVAL_MS); + public AtomicLong millisSinceLastMeasure = new AtomicLong(0); + public long probeTimerMillis = 0; + public ScheduledFuture probeFuture = null; --- End diff -- Done. --- --------------------------------------------------------------------- To unsubscribe, e-mail: pr-unsubscribe@cassandra.apache.org For additional commands, e-mail: pr-help@cassandra.apache.org