Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 00D1418C8F for ; Wed, 15 Jul 2015 08:36:24 +0000 (UTC) Received: (qmail 15737 invoked by uid 500); 15 Jul 2015 08:36:23 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 15701 invoked by uid 500); 15 Jul 2015 08:36:23 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 15680 invoked by uid 99); 15 Jul 2015 08:36:23 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Jul 2015 08:36:23 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 70808E051A; Wed, 15 Jul 2015 08:36:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: slebresne@apache.org To: commits@cassandra.apache.org Date: Wed, 15 Jul 2015 08:36:23 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] cassandra git commit: Fix comparison contract violation in the dynamic snitch sorting Repository: cassandra Updated Branches: refs/heads/trunk 4c12f1961 -> a83407f8c Fix comparison contract violation in the dynamic snitch sorting patch by slebresne; reviewed by benedict for CASSANDRA-9519 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9d44186e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9d44186e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9d44186e Branch: refs/heads/trunk Commit: 9d44186ee3d95ae5e75ce5fa88961dadf5a03016 Parents: 1eccced Author: Sylvain Lebresne Authored: Thu Jul 9 13:28:38 2015 +0200 Committer: Sylvain Lebresne Committed: Wed Jul 15 10:10:45 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../locator/DynamicEndpointSnitch.java | 34 ++++++++-- .../locator/DynamicEndpointSnitchTest.java | 69 +++++++++++++++++++- 3 files changed, 95 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d44186e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5f4fdf2..1e21c8d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.9 + * Complete CASSANDRA-8448 fix (CASSANDRA-9519) * Handle corrupt files on startup (CASSANDRA-9686) * Fix clientutil jar and tests (CASSANDRA-9760) * (cqlsh) Allow the SSL protocol version to be specified through the http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d44186e/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java index aefce0d..6b6286f 100644 --- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java @@ -43,9 +43,9 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa private static final double ALPHA = 0.75; // set to 0.75 to make EDS more biased to towards the newer values private static final int WINDOW_SIZE = 100; - private int UPDATE_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicUpdateInterval(); - private int RESET_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicResetInterval(); - private double BADNESS_THRESHOLD = DatabaseDescriptor.getDynamicBadnessThreshold(); + private final int UPDATE_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicUpdateInterval(); + private final int RESET_INTERVAL_IN_MS = DatabaseDescriptor.getDynamicResetInterval(); + private final double BADNESS_THRESHOLD = DatabaseDescriptor.getDynamicBadnessThreshold(); // the score for a merged set of endpoints must be this much worse than the score for separate endpoints to // warrant not merging two ranges into a single range @@ -155,7 +155,18 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa private void sortByProximityWithScore(final InetAddress address, List addresses) { - super.sortByProximity(address, addresses); + // Scores can change concurrently from a call to this method. But Collections.sort() expects + // its comparator to be "stable", that is 2 endpoint should compare the same way for the duration + // of the sort() call. As we copy the scores map on write, it is thus enough to alias the current + // version of it during this call. + final HashMap scores = this.scores; + Collections.sort(addresses, new Comparator() + { + public int compare(InetAddress a1, InetAddress a2) + { + return compareEndpoints(address, a1, a2, scores); + } + }); } private void sortByProximityWithBadness(final InetAddress address, List addresses) @@ -164,6 +175,8 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa return; subsnitch.sortByProximity(address, addresses); + HashMap scores = this.scores; // Make sure the score don't change in the middle of the loop below + // (which wouldn't really matter here but its cleaner that way). ArrayList subsnitchOrderedScores = new ArrayList<>(addresses.size()); for (InetAddress inet : addresses) { @@ -190,7 +203,8 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa } } - public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) + // Compare endpoints given an immutable snapshot of the scores + private int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2, Map scores) { Double scored1 = scores.get(a1); Double scored2 = scores.get(a2); @@ -215,6 +229,14 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa return 1; } + public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2) + { + // That function is fundamentally unsafe because the scores can change at any time and so the result of that + // method is not stable for identical arguments. This is why we don't rely on super.sortByProximity() in + // sortByProximityWithScore(). + throw new UnsupportedOperationException("You shouldn't wrap the DynamicEndpointSnitch (within itself or otherwise)"); + } + public void receiveTiming(InetAddress host, long latency) // this is cheap { ExponentiallyDecayingSample sample = samples.get(host); @@ -264,7 +286,6 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa scores = newScores; } - private void reset() { for (ExponentiallyDecayingSample sample : samples.values()) @@ -288,6 +309,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa { return BADNESS_THRESHOLD; } + public String getSubsnitchClassName() { return subsnitch.getClass().getName(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d44186e/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java index e23bcfa..3f90532 100644 --- a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java +++ b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java @@ -21,9 +21,9 @@ package org.apache.cassandra.locator; import java.io.IOException; import java.net.InetAddress; -import java.util.Arrays; -import java.util.List; +import java.util.*; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.service.StorageService; import org.junit.Test; @@ -90,4 +90,67 @@ public class DynamicEndpointSnitchTest order = Arrays.asList(host1, host3, host2); assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3))); } -} \ No newline at end of file + + @Test + public void testConcurrency() throws InterruptedException, IOException, ConfigurationException + { + // The goal of this test is to check for CASSANDRA-8448/CASSANDRA-9519 + double badness = DatabaseDescriptor.getDynamicBadnessThreshold(); + DatabaseDescriptor.setDynamicBadnessThreshold(0.0); + + final int ITERATIONS = 10; + + // do this because SS needs to be initialized before DES can work properly. + StorageService.instance.initClient(0); + SimpleSnitch ss = new SimpleSnitch(); + DynamicEndpointSnitch dsnitch = new DynamicEndpointSnitch(ss, String.valueOf(ss.hashCode())); + InetAddress self = FBUtilities.getBroadcastAddress(); + + List hosts = new ArrayList<>(); + // We want a giant list of hosts so that sorting it takes time, making it much more likely to reproduce the + // problem we're looking for. + for (int i = 0; i < 10; i++) + for (int j = 0; j < 256; j++) + for (int k = 0; k < 256; k++) + hosts.add(InetAddress.getByAddress(new byte[]{127, (byte)i, (byte)j, (byte)k})); + + ScoreUpdater updater = new ScoreUpdater(dsnitch, hosts); + updater.start(); + + List result = null; + for (int i = 0; i < ITERATIONS; i++) + result = dsnitch.getSortedListByProximity(self, hosts); + + updater.stopped = true; + updater.join(); + + DatabaseDescriptor.setDynamicBadnessThreshold(badness); + } + + public static class ScoreUpdater extends Thread + { + private static final int SCORE_RANGE = 100; + + public volatile boolean stopped; + + private final DynamicEndpointSnitch dsnitch; + private final List hosts; + private final Random random = new Random(); + + public ScoreUpdater(DynamicEndpointSnitch dsnitch, List hosts) + { + this.dsnitch = dsnitch; + this.hosts = hosts; + } + + public void run() + { + while (!stopped) + { + InetAddress host = hosts.get(random.nextInt(hosts.size())); + int score = random.nextInt(SCORE_RANGE); + dsnitch.receiveTiming(host, score); + } + } + } +}