cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject cassandra git commit: Only optimize large ranges when figuring out where to stream from
Date Mon, 14 Aug 2017 12:35:18 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 62d39f654 -> ff06424fa


Only optimize large ranges when figuring out where to stream from

Patch by marcuse; reviewed by Ariel Weisberg for CASSANDRA-13664


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ff06424f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ff06424f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ff06424f

Branch: refs/heads/trunk
Commit: ff06424faccc8acedd027c71e955a38fd8ddee6c
Parents: 62d39f6
Author: Marcus Eriksson <marcuse@apache.org>
Authored: Mon Jul 3 15:16:56 2017 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Mon Aug 14 14:32:19 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/dht/RangeFetchMapCalculator.java  |  79 +++++++-
 .../dht/RangeFetchMapCalculatorTest.java        | 186 +++++++++++++------
 3 files changed, 208 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff06424f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a6428d3..a59c00b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Don't optimise trivial ranges in RangeFetchMapCalculator (CASSANDRA-13664)
  * Use an ExecutorService for repair commands instead of new Thread(..).start() (CASSANDRA-13594)
  * Fix race / ref leak in anticompaction (CASSANDRA-13688)
  * Expose tasks queue length via JMX (CASSANDRA-12758)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff06424f/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
index 1186eab..d407212 100644
--- a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
+++ b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
@@ -18,9 +18,17 @@
 
 package org.apache.cassandra.dht;
 
+import java.math.BigInteger;
 import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 
@@ -63,22 +71,54 @@ import org.psjava.ds.math.Function;
 public class RangeFetchMapCalculator
 {
     private static final Logger logger = LoggerFactory.getLogger(RangeFetchMapCalculator.class);
+    private static final long TRIVIAL_RANGE_LIMIT = 1000;
     private final Multimap<Range<Token>, InetAddress> rangesWithSources;
     private final Collection<RangeStreamer.ISourceFilter> sourceFilters;
     private final String keyspace;
     //We need two Vertices to act as source and destination in the algorithm
     private final Vertex sourceVertex = OuterVertex.getSourceVertex();
     private final Vertex destinationVertex = OuterVertex.getDestinationVertex();
+    private final Set<Range<Token>> trivialRanges;
 
-    public RangeFetchMapCalculator(Multimap<Range<Token>, InetAddress> rangesWithSources,
Collection<RangeStreamer.ISourceFilter> sourceFilters, String keyspace)
+    public RangeFetchMapCalculator(Multimap<Range<Token>, InetAddress> rangesWithSources,
+                                   Collection<RangeStreamer.ISourceFilter> sourceFilters,
+                                   String keyspace)
     {
         this.rangesWithSources = rangesWithSources;
         this.sourceFilters = sourceFilters;
         this.keyspace = keyspace;
+        this.trivialRanges = rangesWithSources.keySet()
+                                              .stream()
+                                              .filter(RangeFetchMapCalculator::isTrivial)
+                                              .collect(Collectors.toSet());
+    }
+
+    static boolean isTrivial(Range<Token> range)
+    {
+        IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+        if (partitioner.splitter().isPresent())
+        {
+            BigInteger l = partitioner.splitter().get().valueForToken(range.left);
+            BigInteger r = partitioner.splitter().get().valueForToken(range.right);
+            if (r.compareTo(l) <= 0)
+                return false;
+            if (r.subtract(l).compareTo(BigInteger.valueOf(TRIVIAL_RANGE_LIMIT)) < 0)
+                return true;
+        }
+        return false;
     }
 
     public Multimap<InetAddress, Range<Token>> getRangeFetchMap()
     {
+        Multimap<InetAddress, Range<Token>> fetchMap = HashMultimap.create();
+        fetchMap.putAll(getRangeFetchMapForNonTrivialRanges());
+        fetchMap.putAll(getRangeFetchMapForTrivialRanges(fetchMap));
+        return fetchMap;
+    }
+
+    @VisibleForTesting
+    Multimap<InetAddress, Range<Token>> getRangeFetchMapForNonTrivialRanges()
+    {
         //Get the graph with edges between ranges and their source endpoints
         MutableCapacityGraph<Vertex, Integer> graph = getGraph();
         //Add source and destination vertex and edges
@@ -107,6 +147,37 @@ public class RangeFetchMapCalculator
         return getRangeFetchMapFromGraphResult(graph, result);
     }
 
+    @VisibleForTesting
+    Multimap<InetAddress, Range<Token>> getRangeFetchMapForTrivialRanges(Multimap<InetAddress,
Range<Token>> optimisedMap)
+    {
+        Multimap<InetAddress, Range<Token>> fetchMap = HashMultimap.create();
+        for (Range<Token> trivialRange : trivialRanges)
+        {
+            boolean added = false;
+            boolean localDCCheck = true;
+            while (!added)
+            {
+                List<InetAddress> srcs = new ArrayList<>(rangesWithSources.get(trivialRange));
+                // sort with the endpoint having the least number of streams first:
+                srcs.sort(Comparator.comparingInt(o -> optimisedMap.get(o).size()));
+                for (InetAddress src : srcs)
+                {
+                    if (passFilters(src, localDCCheck))
+                    {
+                        fetchMap.put(src, trivialRange);
+                        added = true;
+                        break;
+                    }
+                }
+                if (!added && !localDCCheck)
+                    throw new IllegalStateException("Unable to find sufficient sources for
streaming range " + trivialRange + " in keyspace " + keyspace);
+                if (!added)
+                    logger.info("Using other DC endpoints for streaming for range: {} and
keyspace {}", trivialRange, keyspace);
+                localDCCheck = false;
+            }
+        }
+        return fetchMap;
+    }
     /*
         Return the total number of range vertices in the graph
      */
@@ -240,6 +311,12 @@ public class RangeFetchMapCalculator
         //Connect all ranges with all source endpoints
         for (Range<Token> range : rangesWithSources.keySet())
         {
+            if (trivialRanges.contains(range))
+            {
+                logger.debug("Not optimising trivial range {} for keyspace {}", range, keyspace);
+                continue;
+            }
+
             final RangeVertex rangeVertex = new RangeVertex(range);
 
             //Try to only add source endpoints from same DC

http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff06424f/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java b/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
index 8d3d74f..1b3fe03 100644
--- a/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
+++ b/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
@@ -23,6 +23,7 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 import com.google.common.collect.HashMultimap;
@@ -34,12 +35,16 @@ import org.junit.Test;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 public class RangeFetchMapCalculatorTest
 {
     @BeforeClass
     public static void setupUpSnitch()
     {
         DatabaseDescriptor.daemonInitialization();
+        DatabaseDescriptor.setPartitionerUnsafe(RandomPartitioner.instance);
         DatabaseDescriptor.setEndpointSnitch(new AbstractNetworkTopologySnitch()
         {
             //Odd IPs are in DC1 and Even are in DC2. Endpoints upto .14 will have unique
racks and
@@ -74,11 +79,11 @@ public class RangeFetchMapCalculatorTest
     public void testWithSingleSource() throws Exception
     {
         Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
-        addRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1");
-        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
-        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
-        addRangeAndSources(rangesWithSources, 31, 40, "127.0.0.4");
-        addRangeAndSources(rangesWithSources, 41, 50, "127.0.0.5");
+        addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1");
+        addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
+        addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
+        addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.4");
+        addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.5");
 
         RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
         Multimap<InetAddress, Range<Token>> map = calculator.getRangeFetchMap();
@@ -91,11 +96,11 @@ public class RangeFetchMapCalculatorTest
     public void testWithNonOverlappingSource() throws Exception
     {
         Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
-        addRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.2");
-        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.4");
-        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.5", "127.0.0.6");
-        addRangeAndSources(rangesWithSources, 31, 40, "127.0.0.7", "127.0.0.8");
-        addRangeAndSources(rangesWithSources, 41, 50, "127.0.0.9", "127.0.0.10");
+        addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.2");
+        addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.4");
+        addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.5", "127.0.0.6");
+        addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.7", "127.0.0.8");
+        addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.9", "127.0.0.10");
 
         RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
         Multimap<InetAddress, Range<Token>> map = calculator.getRangeFetchMap();
@@ -108,9 +113,9 @@ public class RangeFetchMapCalculatorTest
     public void testWithRFThreeReplacement() throws Exception
     {
         Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
-        addRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.2");
-        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2", "127.0.0.3");
-        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3", "127.0.0.4");
+        addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.2");
+        addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2", "127.0.0.3");
+        addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3", "127.0.0.4");
 
         RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
         Multimap<InetAddress, Range<Token>> map = calculator.getRangeFetchMap();
@@ -124,11 +129,11 @@ public class RangeFetchMapCalculatorTest
     public void testForMultipleRoundsComputation() throws Exception
     {
         Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
-        addRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3");
-        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3");
-        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
-        addRangeAndSources(rangesWithSources, 31, 40, "127.0.0.3");
-        addRangeAndSources(rangesWithSources, 41, 50, "127.0.0.3", "127.0.0.2");
+        addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3");
+        addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3");
+        addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
+        addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.3");
+        addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.3", "127.0.0.2");
 
         RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
         Multimap<InetAddress, Range<Token>> map = calculator.getRangeFetchMap();
@@ -137,20 +142,20 @@ public class RangeFetchMapCalculatorTest
         //We should validate that it streamed from 2 unique sources
         Assert.assertEquals(2, map.asMap().keySet().size());
 
-        assertArrays(Arrays.asList(generateRange(1, 10), generateRange(11, 20), generateRange(21,
30), generateRange(31, 40)),
+        assertArrays(Arrays.asList(generateNonTrivialRange(1, 10), generateNonTrivialRange(11,
20), generateNonTrivialRange(21, 30), generateNonTrivialRange(31, 40)),
                 map.asMap().get(InetAddress.getByName("127.0.0.3")));
-        assertArrays(Arrays.asList(generateRange(41, 50)), map.asMap().get(InetAddress.getByName("127.0.0.2")));
+        assertArrays(Arrays.asList(generateNonTrivialRange(41, 50)), map.asMap().get(InetAddress.getByName("127.0.0.2")));
     }
 
     @Test
     public void testForMultipleRoundsComputationWithLocalHost() throws Exception
     {
         Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
-        addRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1");
-        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1");
-        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.1");
-        addRangeAndSources(rangesWithSources, 31, 40, "127.0.0.1");
-        addRangeAndSources(rangesWithSources, 41, 50, "127.0.0.1", "127.0.0.2");
+        addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1");
+        addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1");
+        addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.1");
+        addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.1");
+        addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.1", "127.0.0.2");
 
         RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
         Multimap<InetAddress, Range<Token>> map = calculator.getRangeFetchMap();
@@ -159,32 +164,32 @@ public class RangeFetchMapCalculatorTest
         //We should validate that it streamed from only non local host and only one range
         Assert.assertEquals(1, map.asMap().keySet().size());
 
-        assertArrays(Arrays.asList(generateRange(41, 50)), map.asMap().get(InetAddress.getByName("127.0.0.2")));
+        assertArrays(Arrays.asList(generateNonTrivialRange(41, 50)), map.asMap().get(InetAddress.getByName("127.0.0.2")));
     }
 
     @Test
     public void testForEmptyGraph() throws Exception
     {
         Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
-        addRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1");
-        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1");
-        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.1");
-        addRangeAndSources(rangesWithSources, 31, 40, "127.0.0.1");
-        addRangeAndSources(rangesWithSources, 41, 50, "127.0.0.1");
+        addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1");
+        addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1");
+        addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.1");
+        addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.1");
+        addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.1");
 
         RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
         Multimap<InetAddress, Range<Token>> map = calculator.getRangeFetchMap();
         //All ranges map to local host so we will not stream anything.
-        Assert.assertTrue(map.isEmpty());
+        assertTrue(map.isEmpty());
     }
 
     @Test
     public void testWithNoSourceWithLocal() throws Exception
     {
         Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
-        addRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.5");
-        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
-        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
+        addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.5");
+        addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
+        addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
 
         //Return false for all except 127.0.0.5
         final RangeStreamer.ISourceFilter filter = new RangeStreamer.ISourceFilter()
@@ -213,17 +218,17 @@ public class RangeFetchMapCalculatorTest
         //We should validate that it streamed from only non local host and only one range
         Assert.assertEquals(2, map.asMap().keySet().size());
 
-        assertArrays(Arrays.asList(generateRange(11, 20)), map.asMap().get(InetAddress.getByName("127.0.0.2")));
-        assertArrays(Arrays.asList(generateRange(21, 30)), map.asMap().get(InetAddress.getByName("127.0.0.3")));
+        assertArrays(Arrays.asList(generateNonTrivialRange(11, 20)), map.asMap().get(InetAddress.getByName("127.0.0.2")));
+        assertArrays(Arrays.asList(generateNonTrivialRange(21, 30)), map.asMap().get(InetAddress.getByName("127.0.0.3")));
     }
 
     @Test (expected = IllegalStateException.class)
     public void testWithNoLiveSource() throws Exception
     {
         Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
-        addRangeAndSources(rangesWithSources, 1, 10,  "127.0.0.5");
-        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
-        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
+        addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.5");
+        addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
+        addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
 
         final RangeStreamer.ISourceFilter allDeadFilter = new RangeStreamer.ISourceFilter()
         {
@@ -241,9 +246,9 @@ public class RangeFetchMapCalculatorTest
     public void testForLocalDC() throws Exception
     {
         Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
-        addRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.3", "127.0.0.53");
-        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1", "127.0.0.3", "127.0.0.57");
-        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59", "127.0.0.61");
+        addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.3",
"127.0.0.53");
+        addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1", "127.0.0.3",
"127.0.0.57");
+        addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59",
"127.0.0.61");
 
         RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
new ArrayList<>(), "Test");
         Multimap<InetAddress, Range<Token>> map = calculator.getRangeFetchMap();
@@ -251,17 +256,17 @@ public class RangeFetchMapCalculatorTest
         Assert.assertEquals(2, map.asMap().size());
 
         //Should have streamed from local DC endpoints
-        assertArrays(Arrays.asList(generateRange(21, 30)), map.asMap().get(InetAddress.getByName("127.0.0.2")));
-        assertArrays(Arrays.asList(generateRange(1, 10), generateRange(11, 20)), map.asMap().get(InetAddress.getByName("127.0.0.3")));
+        assertArrays(Arrays.asList(generateNonTrivialRange(21, 30)), map.asMap().get(InetAddress.getByName("127.0.0.2")));
+        assertArrays(Arrays.asList(generateNonTrivialRange(1, 10), generateNonTrivialRange(11,
20)), map.asMap().get(InetAddress.getByName("127.0.0.3")));
     }
 
     @Test
     public void testForRemoteDC() throws Exception
     {
         Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
-        addRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51");
-        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55");
-        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59");
+        addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51");
+        addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55");
+        addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59");
 
         //Reject only 127.0.0.3 and accept everyone else
         final RangeStreamer.ISourceFilter localHostFilter = new RangeStreamer.ISourceFilter()
@@ -288,30 +293,89 @@ public class RangeFetchMapCalculatorTest
         Assert.assertEquals(3, map.asMap().size());
 
         //Should have streamed from remote DC endpoint
-        assertArrays(Arrays.asList(generateRange(1, 10)), map.asMap().get(InetAddress.getByName("127.0.0.51")));
-        assertArrays(Arrays.asList(generateRange(11, 20)), map.asMap().get(InetAddress.getByName("127.0.0.55")));
-        assertArrays(Arrays.asList(generateRange(21, 30)), map.asMap().get(InetAddress.getByName("127.0.0.2")));
+        assertArrays(Arrays.asList(generateNonTrivialRange(1, 10)), map.asMap().get(InetAddress.getByName("127.0.0.51")));
+        assertArrays(Arrays.asList(generateNonTrivialRange(11, 20)), map.asMap().get(InetAddress.getByName("127.0.0.55")));
+        assertArrays(Arrays.asList(generateNonTrivialRange(21, 30)), map.asMap().get(InetAddress.getByName("127.0.0.2")));
+    }
+
+    @Test
+    public void testTrivialRanges() throws UnknownHostException
+    {
+        Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
+        // add non-trivial ranges
+        addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51");
+        addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55");
+        addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59");
+        // and a trivial one:
+        addTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51");
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
Collections.emptyList(), "Test");
+        Multimap<InetAddress, Range<Token>> optMap = calculator.getRangeFetchMapForNonTrivialRanges();
+        Multimap<InetAddress, Range<Token>> trivialMap = calculator.getRangeFetchMapForTrivialRanges(optMap);
+        assertTrue(trivialMap.get(InetAddress.getByName("127.0.0.3")).contains(generateTrivialRange(1,10))
^
+                   trivialMap.get(InetAddress.getByName("127.0.0.51")).contains(generateTrivialRange(1,10)));
+        assertFalse(optMap.containsKey(generateTrivialRange(1, 10)));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testNotEnoughEndpointsForTrivialRange() throws UnknownHostException
+    {
+        Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
+        // add non-trivial ranges
+        addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51");
+        addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55");
+        addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59");
+        // and a trivial one:
+        addTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3");
+
+        RangeStreamer.ISourceFilter filter = new RangeStreamer.ISourceFilter()
+        {
+            public boolean shouldInclude(InetAddress endpoint)
+            {
+                try
+                {
+                    if (endpoint.equals(InetAddress.getByName("127.0.0.3")))
+                        return false;
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                return true;
+            }
+        };
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
Collections.singleton(filter), "Test");
+        Multimap<InetAddress, Range<Token>> optMap = calculator.getRangeFetchMapForNonTrivialRanges();
+        Multimap<InetAddress, Range<Token>> trivialMap = calculator.getRangeFetchMapForTrivialRanges(optMap);
+
     }
 
     private void assertArrays(Collection<Range<Token>> expected, Collection<Range<Token>>
result)
     {
         Assert.assertEquals(expected.size(), result.size());
-        Assert.assertTrue(result.containsAll(expected));
+        assertTrue(result.containsAll(expected));
     }
 
     private void validateRange(Multimap<Range<Token>, InetAddress> rangesWithSources,
Multimap<InetAddress, Range<Token>> result)
     {
         for (Map.Entry<InetAddress, Range<Token>> entry : result.entries())
         {
-            Assert.assertTrue(rangesWithSources.get(entry.getValue()).contains(entry.getKey()));
+            assertTrue(rangesWithSources.get(entry.getValue()).contains(entry.getKey()));
+        }
+    }
+
+    private void addNonTrivialRangeAndSources(Multimap<Range<Token>, InetAddress>
rangesWithSources, int left, int right, String... hosts) throws UnknownHostException
+    {
+        for (InetAddress endpoint : makeAddrs(hosts))
+        {
+            rangesWithSources.put(generateNonTrivialRange(left, right), endpoint);
         }
     }
 
-    private void addRangeAndSources(Multimap<Range<Token>, InetAddress> rangesWithSources,
int left, int right, String... hosts) throws UnknownHostException
+    private void addTrivialRangeAndSources(Multimap<Range<Token>, InetAddress>
rangesWithSources, int left, int right, String... hosts) throws UnknownHostException
     {
         for (InetAddress endpoint : makeAddrs(hosts))
         {
-            rangesWithSources.put(generateRange(left, right), endpoint);
+            rangesWithSources.put(generateTrivialRange(left, right), endpoint);
         }
     }
 
@@ -323,8 +387,16 @@ public class RangeFetchMapCalculatorTest
         return addrs;
     }
 
-    private Range<Token> generateRange(int left, int right)
+    private Range<Token> generateNonTrivialRange(int left, int right)
+    {
+        // * 1000 to make sure we dont filter away any trivial ranges:
+        return new Range<>(new RandomPartitioner.BigIntegerToken(String.valueOf(left
* 10000)), new RandomPartitioner.BigIntegerToken(String.valueOf(right * 10000)));
+    }
+
+    private Range<Token> generateTrivialRange(int left, int right)
     {
-        return new Range<Token>(new RandomPartitioner.BigIntegerToken(String.valueOf(left)),
new RandomPartitioner.BigIntegerToken(String.valueOf(right)));
+        Range<Token> r = new Range<>(new RandomPartitioner.BigIntegerToken(String.valueOf(left)),
new RandomPartitioner.BigIntegerToken(String.valueOf(right)));
+        assertTrue(RangeFetchMapCalculator.isTrivial(r));
+        return r;
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message