cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject cassandra git commit: RangeStreamer should be smarter when picking endpoints for streaming
Date Fri, 12 May 2017 07:16:10 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk d8871bd5b -> bf911cc6a


RangeStreamer should be smarter when picking endpoints for streaming

Patch by Sankalp Kohli; reviewed by marcuse for CASSANDRA-4650


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bf911cc6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bf911cc6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bf911cc6

Branch: refs/heads/trunk
Commit: bf911cc6a852f9ef068318a3545611d9daa5112c
Parents: d8871bd
Author: Sankalp Kohli <sankalp@apple.com>
Authored: Thu Apr 27 09:26:32 2017 +0200
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Fri May 12 09:15:15 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 build.xml                                       |   1 +
 lib/licenses/psjava-0.1.19.txt                  |  21 +
 lib/psjava-0.1.19.jar                           | Bin 0 -> 466947 bytes
 .../cassandra/dht/RangeFetchMapCalculator.java  | 465 +++++++++++++++++++
 .../org/apache/cassandra/dht/RangeStreamer.java |  61 ++-
 .../dht/RangeFetchMapCalculatorTest.java        | 330 +++++++++++++
 7 files changed, 872 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf911cc6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cae5f62..d06b06d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * RangeStreamer should be smarter when picking endpoints for streaming (CASSANDRA-4650)
  * Avoid rewrapping an exception thrown for cache load functions (CASSANDRA-13367)
  * Log time elapsed for each incremental repair phase (CASSANDRA-13498)
  * Add multiple table operation support to cassandra-stress (CASSANDRA-8780)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf911cc6/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 5b71bd9..feb8273 100644
--- a/build.xml
+++ b/build.xml
@@ -494,6 +494,7 @@
                 artifactId="cassandra-parent"
                 version="${version}"/>
         <dependency groupId="junit" artifactId="junit"/>
+        <dependency groupId="org.psjava" artifactId="psjava" version="0.1.19" />
         <dependency groupId="org.apache.rat" artifactId="apache-rat"/>
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-core"/>
       	<dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf911cc6/lib/licenses/psjava-0.1.19.txt
----------------------------------------------------------------------
diff --git a/lib/licenses/psjava-0.1.19.txt b/lib/licenses/psjava-0.1.19.txt
new file mode 100644
index 0000000..5acf4c1
--- /dev/null
+++ b/lib/licenses/psjava-0.1.19.txt
@@ -0,0 +1,21 @@
+The MIT License (MIT)
+
+Copyright (c) 2013 psjava authors
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf911cc6/lib/psjava-0.1.19.jar
----------------------------------------------------------------------
diff --git a/lib/psjava-0.1.19.jar b/lib/psjava-0.1.19.jar
new file mode 100644
index 0000000..6652b95
Binary files /dev/null and b/lib/psjava-0.1.19.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf911cc6/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
new file mode 100644
index 0000000..355d7a9
--- /dev/null
+++ b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.net.InetAddress;
+import java.util.Collection;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+import org.apache.cassandra.utils.FBUtilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.psjava.algo.graph.flownetwork.FordFulkersonAlgorithm;
+import org.psjava.algo.graph.flownetwork.MaximumFlowAlgorithm;
+import org.psjava.algo.graph.flownetwork.MaximumFlowAlgorithmResult;
+import org.psjava.algo.graph.pathfinder.DFSPathFinder;
+import org.psjava.ds.graph.CapacityEdge;
+import org.psjava.ds.graph.MutableCapacityGraph;
+import org.psjava.ds.numbersystrem.IntegerNumberSystem;
+import org.psjava.ds.math.Function;
+
+/**
+ * We model the graph like this:
+ * * Each range we are about to stream is a vertex in the graph
+ * * Each node that can provide a range is a vertex in the graph
+ * * We add an edge from each range to the node that can provide the range
+ * * Then, to be able to solve the maximum flow problem using Ford-Fulkerson we add a super
source with edges to all range vertices
+ *   and a super sink with incoming edges from all the node vertices.
+ * * The capacity on the edges between the super source and the range-vertices is 1
+ * * The capacity on the edges between the range-vertices and the node vertices is infinite
+ * * The capacity on the edges between the nodes-vertices and the super sink is ceil(#range-vertices/#node-vertices)
+ *   - if we have more machines than ranges to stream the capacity will be 1 (each machine
will stream at most 1 range)
+ * * Since the sum of the capacity on the edges from the super source to the range-vertices
is less or equal to the sum
+ *   of the capacities between the node-vertices and super sink we know that to get maximum
flow we will use all the
+ *   range-vertices. (Say we have x ranges, y machines to provide them, total supersource
-> range-vertice capacity will be x,
+ *   total node-vertice -> supersink capacity will be (y * ceil(x / y)) which worst case
is x if x==y). The capacity between
+ *   the range-vertices and node-vertices is infinite.
+ * * Then we try to solve the max-flow problem using psjava
+ * * If we can't find a solution where the total flow is = number of range-vertices, we bump
the capacity between the node-vertices
+ *   and the super source and try again.
+ *
+ *
+ */
+public class RangeFetchMapCalculator
+{
+    private static final Logger logger = LoggerFactory.getLogger(RangeFetchMapCalculator.class);
+    private final Multimap<Range<Token>, InetAddress> rangesWithSources;
+    private final Collection<RangeStreamer.ISourceFilter> sourceFilters;
+    private final String keyspace;
+    //We need two Vertices to act as source and destination in the algorithm
+    private final Vertex sourceVertex = OuterVertex.getSourceVertex();
+    private final Vertex destinationVertex = OuterVertex.getDestinationVertex();
+
+    public RangeFetchMapCalculator(Multimap<Range<Token>, InetAddress> rangesWithSources,
Collection<RangeStreamer.ISourceFilter> sourceFilters, String keyspace)
+    {
+        this.rangesWithSources = rangesWithSources;
+        this.sourceFilters = sourceFilters;
+        this.keyspace = keyspace;
+    }
+
+    public Multimap<InetAddress, Range<Token>> getRangeFetchMap()
+    {
+        //Get the graph with edges between ranges and their source endpoints
+        MutableCapacityGraph<Vertex, Integer> graph = getGraph();
+        //Add source and destination vertex and edges
+        addSourceAndDestination(graph, getDestinationLinkCapacity(graph));
+
+        int flow = 0;
+        MaximumFlowAlgorithmResult<Integer, CapacityEdge<Vertex, Integer>> result
= null;
+
+        //We might not be working on all ranges
+        while (flow < getTotalRangeVertices(graph))
+        {
+            if (flow > 0)
+            {
+                //We could not find a path with previous graph. Bump the capacity b/w endpoint
vertices and destination by 1
+                incrementCapacity(graph, 1);
+            }
+
+            MaximumFlowAlgorithm fordFulkerson = FordFulkersonAlgorithm.getInstance(DFSPathFinder.getInstance());
+            result = fordFulkerson.calc(graph, sourceVertex, destinationVertex, IntegerNumberSystem.getInstance());
+
+            int newFlow = result.calcTotalFlow();
+            assert newFlow > flow;   //We are not making progress which should not happen
+            flow = newFlow;
+        }
+
+        return getRangeFetchMapFromGraphResult(graph, result);
+    }
+
+    /*
+        Return the total number of range vertices in the graph
+     */
+    private int getTotalRangeVertices(MutableCapacityGraph<Vertex, Integer> graph)
+    {
+        int count = 0;
+        for (Vertex vertex : graph.getVertices())
+        {
+            if (vertex.isRangeVertex())
+            {
+                count++;
+            }
+        }
+
+        return count;
+    }
+
+    /**
+     *  Convert the max flow graph to Multimap<InetAddress, Range<Token>>
+     *      We iterate over all range vertices and find an edge with flow of more than zero
connecting to endpoint vertex.
+     * @param graph  The graph to convert
+     * @param result Flow algorithm result
+     * @return  Multi Map of Machine to Ranges
+     */
+    private Multimap<InetAddress, Range<Token>> getRangeFetchMapFromGraphResult(MutableCapacityGraph<Vertex,
Integer> graph, MaximumFlowAlgorithmResult<Integer, CapacityEdge<Vertex, Integer>>
result)
+    {
+        final Multimap<InetAddress, Range<Token>> rangeFetchMapMap = HashMultimap.create();
+        if(result == null)
+            return rangeFetchMapMap;
+        final Function<CapacityEdge<Vertex, Integer>, Integer> flowFunction =
result.calcFlowFunction();
+
+        for (Vertex vertex : graph.getVertices())
+        {
+            if (vertex.isRangeVertex())
+            {
+                boolean sourceFound = false;
+                for (CapacityEdge<Vertex, Integer> e : graph.getEdges(vertex))
+                {
+                    if(flowFunction.get(e) > 0)
+                    {
+                        assert !sourceFound;
+                        sourceFound = true;
+                        if(e.to().isEndpointVertex())
+                            rangeFetchMapMap.put(((EndpointVertex)e.to()).getEndpoint(),
((RangeVertex)vertex).getRange());
+                        else if(e.from().isEndpointVertex())
+                            rangeFetchMapMap.put(((EndpointVertex)e.from()).getEndpoint(),
((RangeVertex)vertex).getRange());
+                    }
+                }
+
+                assert sourceFound;
+
+            }
+        }
+
+        return rangeFetchMapMap;
+    }
+
+    /**
+     * This will increase the capacity from endpoint vertices to destination by incrementalCapacity
+     * @param graph The graph to work on
+     * @param incrementalCapacity Amount by which to increment capacity
+     */
+    private void incrementCapacity(MutableCapacityGraph<Vertex, Integer> graph, int
incrementalCapacity)
+    {
+        for (Vertex vertex : graph.getVertices())
+        {
+            if (vertex.isEndpointVertex())
+            {
+                graph.addEdge(vertex, destinationVertex, incrementalCapacity);
+            }
+        }
+    }
+
+    /**
+     * Add source and destination vertices. Add edges of capacity 1 b/w source and range
vertices.
+     * Also add edges b/w endpoint vertices and destination vertex with capacity of 'destinationCapacity'
+     * @param graph Graph to work on
+     * @param destinationCapacity The capacity for edges b/w endpoint vertices and destination
+     */
+    private void addSourceAndDestination(MutableCapacityGraph<Vertex, Integer> graph,
int destinationCapacity)
+    {
+        graph.insertVertex(sourceVertex);
+        graph.insertVertex(destinationVertex);
+        for (Vertex vertex : graph.getVertices())
+        {
+            if (vertex.isRangeVertex())
+            {
+                graph.addEdge(sourceVertex, vertex, 1);
+            }
+            else if (vertex.isEndpointVertex())
+            {
+                graph.addEdge(vertex, destinationVertex, destinationCapacity);
+            }
+        }
+    }
+
+    /**
+     * Find the initial capacity which we want to use b/w machine vertices and destination
to keep things optimal
+     * @param graph Graph to work on
+     * @return  The initial capacity
+     */
+    private int getDestinationLinkCapacity(MutableCapacityGraph<Vertex, Integer> graph)
+    {
+        //Find total nodes which are endpoints and ranges
+        double endpointVertices = 0;
+        double rangeVertices = 0;
+        for (Vertex vertex : graph.getVertices())
+        {
+            if (vertex.isEndpointVertex())
+            {
+                endpointVertices++;
+            }
+            else if (vertex.isRangeVertex())
+            {
+                rangeVertices++;
+            }
+        }
+
+        return (int) Math.ceil(rangeVertices / endpointVertices);
+    }
+
+    /**
+     *  Generate a graph with all ranges and endpoints as vertices. It will create edges
b/w a range and its filtered source endpoints
+     *  It will try to use sources from local DC if possible
+     * @return  The generated graph
+     */
+    private MutableCapacityGraph<Vertex, Integer> getGraph()
+    {
+        MutableCapacityGraph<Vertex, Integer> capacityGraph = MutableCapacityGraph.create();
+
+        //Connect all ranges with all source endpoints
+        for (Range<Token> range : rangesWithSources.keySet())
+        {
+            final RangeVertex rangeVertex = new RangeVertex(range);
+
+            //Try to only add source endpoints from same DC
+            boolean sourceFound = addEndpoints(capacityGraph, rangeVertex, true);
+
+            if (!sourceFound)
+            {
+                logger.info("Using other DC endpoints for streaming for range: {} and keyspace
{}", range, keyspace);
+                sourceFound = addEndpoints(capacityGraph, rangeVertex, false);
+            }
+
+            //We could not find any source for this range which passed the filters. Ignore
if localhost is part of the endpoints for this range
+            if (!sourceFound && !rangesWithSources.get(range).contains(FBUtilities.getBroadcastAddress()))
+                throw new IllegalStateException("Unable to find sufficient sources for streaming
range " + range + " in keyspace " + keyspace);
+
+        }
+
+        return capacityGraph;
+    }
+
+    /**
+     * Create edges with infinite capacity b/w range vertex and all its source endpoints
which clear the filters
+     * @param capacityGraph The Capacity graph on which changes are made
+     * @param rangeVertex The range for which we need to add all its source endpoints
+     * @param localDCCheck Should add source endpoints from local DC only
+     * @return If we were able to add atleast one source for this range after applying filters
to endpoints
+     */
+    private boolean addEndpoints(MutableCapacityGraph<Vertex, Integer> capacityGraph,
RangeVertex rangeVertex, boolean localDCCheck)
+    {
+        boolean sourceFound = false;
+        for (InetAddress endpoint : rangesWithSources.get(rangeVertex.getRange()))
+        {
+            if (passFilters(endpoint, localDCCheck))
+            {
+                sourceFound = true;
+                final Vertex endpointVertex = new EndpointVertex(endpoint);
+                capacityGraph.insertVertex(rangeVertex);
+                capacityGraph.insertVertex(endpointVertex);
+                capacityGraph.addEdge(rangeVertex, endpointVertex, Integer.MAX_VALUE);
+            }
+        }
+        return sourceFound;
+    }
+
+    private boolean isInLocalDC(InetAddress endpoint)
+    {
+        return DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(endpoint));
+    }
+
+    /**
+     *
+     * @param endpoint   Endpoint to check
+     * @param localDCCheck Allow endpoints with local DC
+     * @return   True if filters pass this endpoint
+     */
+    private boolean passFilters(final InetAddress endpoint, boolean localDCCheck)
+    {
+        for (RangeStreamer.ISourceFilter filter : sourceFilters)
+        {
+            if (!filter.shouldInclude(endpoint))
+                return false;
+        }
+
+        if(endpoint.equals(FBUtilities.getBroadcastAddress()))
+            return false;
+
+        return !localDCCheck || isInLocalDC(endpoint);
+    }
+
+    private static abstract class Vertex
+    {
+        public enum VERTEX_TYPE
+        {
+            ENDPOINT, RANGE, SOURCE, DESTINATION
+        }
+
+        public abstract VERTEX_TYPE getVertexType();
+
+        public boolean isEndpointVertex()
+        {
+            return getVertexType() == VERTEX_TYPE.ENDPOINT;
+        }
+
+        public boolean isRangeVertex()
+        {
+            return getVertexType() == VERTEX_TYPE.RANGE;
+        }
+    }
+
+    /*
+       This Vertex will contain the endpoints.
+     */
+    private static class EndpointVertex extends Vertex
+    {
+        private final InetAddress endpoint;
+
+        public EndpointVertex(InetAddress endpoint)
+        {
+            assert endpoint != null;
+            this.endpoint = endpoint;
+        }
+
+        public InetAddress getEndpoint()
+        {
+            return endpoint;
+        }
+
+
+        @Override
+        public VERTEX_TYPE getVertexType()
+        {
+            return VERTEX_TYPE.ENDPOINT;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            EndpointVertex that = (EndpointVertex) o;
+
+            return endpoint.equals(that.endpoint);
+
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return endpoint.hashCode();
+        }
+    }
+
+    /*
+       This Vertex will contain the Range
+     */
+    private static class RangeVertex extends Vertex
+    {
+        private final Range<Token> range;
+
+        public RangeVertex(Range<Token> range)
+        {
+            assert range != null;
+            this.range = range;
+        }
+
+        public Range<Token> getRange()
+        {
+            return range;
+        }
+
+        @Override
+        public VERTEX_TYPE getVertexType()
+        {
+            return VERTEX_TYPE.RANGE;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            RangeVertex that = (RangeVertex) o;
+
+            return range.equals(that.range);
+
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return range.hashCode();
+        }
+    }
+
+    /*
+       This denotes the source and destination Vertex we need for the flow graph
+     */
+    private static class OuterVertex extends Vertex
+    {
+        private final boolean source;
+
+        private OuterVertex(boolean source)
+        {
+            this.source = source;
+        }
+
+        public static Vertex getSourceVertex()
+        {
+            return new OuterVertex(true);
+        }
+
+        public static Vertex getDestinationVertex()
+        {
+            return new OuterVertex(false);
+        }
+
+        @Override
+        public VERTEX_TYPE getVertexType()
+        {
+            return source? VERTEX_TYPE.SOURCE : VERTEX_TYPE.DESTINATION;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            OuterVertex that = (OuterVertex) o;
+
+            return source == that.source;
+
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return (source ? 1 : 0);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf911cc6/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 85c2307..447446b 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -25,6 +25,8 @@ import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+
+import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -177,16 +179,24 @@ public class RangeStreamer
      */
     public void addRanges(String keyspaceName, Collection<Range<Token>> ranges)
     {
-        Multimap<Range<Token>, InetAddress> rangesForKeyspace = useStrictSourcesForRanges(keyspaceName)
-                ? getAllRangesWithStrictSourcesFor(keyspaceName, ranges) : getAllRangesWithSourcesFor(keyspaceName,
ranges);
-
-        if (logger.isTraceEnabled())
+        if(Keyspace.open(keyspaceName).getReplicationStrategy() instanceof LocalStrategy)
         {
-            for (Map.Entry<Range<Token>, InetAddress> entry : rangesForKeyspace.entries())
-                logger.trace("{}: range {} exists on {}", description, entry.getKey(), entry.getValue());
+            logger.info("Not adding ranges for Local Strategy keyspace=" + keyspaceName);
+            return;
         }
 
-        for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : getRangeFetchMap(rangesForKeyspace,
sourceFilters, keyspaceName, useStrictConsistency).asMap().entrySet())
+        boolean useStrictSource = useStrictSourcesForRanges(keyspaceName);
+        Multimap<Range<Token>, InetAddress> rangesForKeyspace = useStrictSource
+                ? getAllRangesWithStrictSourcesFor(keyspaceName, ranges) : getAllRangesWithSourcesFor(keyspaceName,
ranges);
+
+        for (Map.Entry<Range<Token>, InetAddress> entry : rangesForKeyspace.entries())
+            logger.info(String.format("%s: range %s exists on %s for keyspace %s", description,
entry.getKey(), entry.getValue(), keyspaceName));
+
+
+        Multimap<InetAddress, Range<Token>> rangeFetchMap = useStrictSource ?
getRangeFetchMap(rangesForKeyspace, sourceFilters, keyspaceName, useStrictConsistency) :
+                getOptimizedRangeFetchMap(rangesForKeyspace, sourceFilters, keyspaceName);
+
+        for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : rangeFetchMap.asMap().entrySet())
         {
             if (logger.isTraceEnabled())
             {
@@ -359,6 +369,43 @@ public class RangeStreamer
         return rangeFetchMapMap;
     }
 
+
+    private static Multimap<InetAddress, Range<Token>> getOptimizedRangeFetchMap(Multimap<Range<Token>,
InetAddress> rangesWithSources,
+                                                                        Collection<ISourceFilter>
sourceFilters, String keyspace)
+    {
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
sourceFilters, keyspace);
+        Multimap<InetAddress, Range<Token>> rangeFetchMapMap = calculator.getRangeFetchMap();
+        logger.info("Output from RangeFetchMapCalculator for keyspace " + keyspace);
+        validateRangeFetchMap(rangesWithSources, rangeFetchMapMap, keyspace);
+        return rangeFetchMapMap;
+    }
+
+    /**
+     * Verify that source returned for each range is correct
+     * @param rangesWithSources
+     * @param rangeFetchMapMap
+     * @param keyspace
+     */
+    private static void validateRangeFetchMap(Multimap<Range<Token>, InetAddress>
rangesWithSources, Multimap<InetAddress, Range<Token>> rangeFetchMapMap, String
keyspace)
+    {
+        for (Map.Entry<InetAddress, Range<Token>> entry : rangeFetchMapMap.entries())
+        {
+            if(entry.getKey().equals(FBUtilities.getBroadcastAddress()))
+            {
+                throw new IllegalStateException("Trying to stream locally. Range: " + entry.getValue()
+                                        + " in keyspace " + keyspace);
+            }
+
+            if (!rangesWithSources.get(entry.getValue()).contains(entry.getKey()))
+            {
+                throw new IllegalStateException("Trying to stream from wrong endpoint. Range:
" + entry.getValue()
+                                                + " in keyspace " + keyspace + " from endpoint:
" + entry.getKey());
+            }
+
+            logger.info("Streaming range {} from endpoint {} for keyspace {}", entry.getValue(),
entry.getKey(), keyspace);
+        }
+    }
+
     public static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>,
InetAddress> rangesWithSourceTarget, String keyspace,
                                                                  IFailureDetector fd, boolean
useStrictConsistency)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf911cc6/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java b/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
new file mode 100644
index 0000000..8d3d74f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.dht;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
+
+public class RangeFetchMapCalculatorTest
+{
+    @BeforeClass
+    public static void setupUpSnitch()
+    {
+        DatabaseDescriptor.daemonInitialization();
+        DatabaseDescriptor.setEndpointSnitch(new AbstractNetworkTopologySnitch()
+        {
+            //Odd IPs are in DC1 and Even are in DC2. Endpoints upto .14 will have unique
racks and
+            // then will be same for a set of three.
+            @Override
+            public String getRack(InetAddress endpoint)
+            {
+                return "RAC1";
+            }
+
+            @Override
+            public String getDatacenter(InetAddress endpoint)
+            {
+                if (getIPLastPart(endpoint) <= 50)
+                    return DatabaseDescriptor.getLocalDataCenter();
+                else if (getIPLastPart(endpoint) % 2 == 0)
+                    return DatabaseDescriptor.getLocalDataCenter();
+                else
+                    return DatabaseDescriptor.getLocalDataCenter() + "Remote";
+            }
+
+            private int getIPLastPart(InetAddress endpoint)
+            {
+                String str = endpoint.toString();
+                int index = str.lastIndexOf(".");
+                return Integer.parseInt(str.substring(index + 1).trim());
+            }
+        });
+    }
+
+    @Test
+    public void testWithSingleSource() throws Exception
+    {
+        Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
+        addRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1");
+        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
+        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
+        addRangeAndSources(rangesWithSources, 31, 40, "127.0.0.4");
+        addRangeAndSources(rangesWithSources, 41, 50, "127.0.0.5");
+
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
+        Multimap<InetAddress, Range<Token>> map = calculator.getRangeFetchMap();
+        validateRange(rangesWithSources, map);
+
+        Assert.assertEquals(4, map.asMap().keySet().size());
+    }
+
+    @Test
+    public void testWithNonOverlappingSource() throws Exception
+    {
+        Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
+        addRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.2");
+        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.4");
+        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.5", "127.0.0.6");
+        addRangeAndSources(rangesWithSources, 31, 40, "127.0.0.7", "127.0.0.8");
+        addRangeAndSources(rangesWithSources, 41, 50, "127.0.0.9", "127.0.0.10");
+
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
+        Multimap<InetAddress, Range<Token>> map = calculator.getRangeFetchMap();
+        validateRange(rangesWithSources, map);
+
+        Assert.assertEquals(5, map.asMap().keySet().size());
+    }
+
+    @Test
+    public void testWithRFThreeReplacement() throws Exception
+    {
+        Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
+        addRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.2");
+        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2", "127.0.0.3");
+        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3", "127.0.0.4");
+
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
+        Multimap<InetAddress, Range<Token>> map = calculator.getRangeFetchMap();
+        validateRange(rangesWithSources, map);
+
+        //We should validate that it streamed from 3 unique sources
+        Assert.assertEquals(3, map.asMap().keySet().size());
+    }
+
+    @Test
+    public void testForMultipleRoundsComputation() throws Exception
+    {
+        Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
+        addRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3");
+        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3");
+        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
+        addRangeAndSources(rangesWithSources, 31, 40, "127.0.0.3");
+        addRangeAndSources(rangesWithSources, 41, 50, "127.0.0.3", "127.0.0.2");
+
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
+        Multimap<InetAddress, Range<Token>> map = calculator.getRangeFetchMap();
+        validateRange(rangesWithSources, map);
+
+        //We should validate that it streamed from 2 unique sources
+        Assert.assertEquals(2, map.asMap().keySet().size());
+
+        assertArrays(Arrays.asList(generateRange(1, 10), generateRange(11, 20), generateRange(21,
30), generateRange(31, 40)),
+                map.asMap().get(InetAddress.getByName("127.0.0.3")));
+        assertArrays(Arrays.asList(generateRange(41, 50)), map.asMap().get(InetAddress.getByName("127.0.0.2")));
+    }
+
+    @Test
+    public void testForMultipleRoundsComputationWithLocalHost() throws Exception
+    {
+        Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
+        addRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1");
+        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1");
+        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.1");
+        addRangeAndSources(rangesWithSources, 31, 40, "127.0.0.1");
+        addRangeAndSources(rangesWithSources, 41, 50, "127.0.0.1", "127.0.0.2");
+
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
+        Multimap<InetAddress, Range<Token>> map = calculator.getRangeFetchMap();
+        validateRange(rangesWithSources, map);
+
+        //We should validate that it streamed from only non local host and only one range
+        Assert.assertEquals(1, map.asMap().keySet().size());
+
+        assertArrays(Arrays.asList(generateRange(41, 50)), map.asMap().get(InetAddress.getByName("127.0.0.2")));
+    }
+
+    @Test
+    public void testForEmptyGraph() throws Exception
+    {
+        Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
+        addRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1");
+        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1");
+        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.1");
+        addRangeAndSources(rangesWithSources, 31, 40, "127.0.0.1");
+        addRangeAndSources(rangesWithSources, 41, 50, "127.0.0.1");
+
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
new ArrayList<RangeStreamer.ISourceFilter>(), "Test");
+        Multimap<InetAddress, Range<Token>> map = calculator.getRangeFetchMap();
+        //All ranges map to local host so we will not stream anything.
+        Assert.assertTrue(map.isEmpty());
+    }
+
+    @Test
+    public void testWithNoSourceWithLocal() throws Exception
+    {
+        Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
+        addRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.5");
+        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
+        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
+
+        //Return false for all except 127.0.0.5
+        final RangeStreamer.ISourceFilter filter = new RangeStreamer.ISourceFilter()
+        {
+            public boolean shouldInclude(InetAddress endpoint)
+            {
+                try
+                {
+                    if (endpoint.equals(InetAddress.getByName("127.0.0.5")))
+                        return false;
+                    else
+                        return true;
+                }
+                catch (UnknownHostException e)
+                {
+                    return true;
+                }
+            }
+        };
+
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
Arrays.asList(filter), "Test");
+        Multimap<InetAddress, Range<Token>> map = calculator.getRangeFetchMap();
+
+        validateRange(rangesWithSources, map);
+
+        //We should validate that it streamed from only non local host and only one range
+        Assert.assertEquals(2, map.asMap().keySet().size());
+
+        assertArrays(Arrays.asList(generateRange(11, 20)), map.asMap().get(InetAddress.getByName("127.0.0.2")));
+        assertArrays(Arrays.asList(generateRange(21, 30)), map.asMap().get(InetAddress.getByName("127.0.0.3")));
+    }
+
+    @Test (expected = IllegalStateException.class)
+    public void testWithNoLiveSource() throws Exception
+    {
+        Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
+        addRangeAndSources(rangesWithSources, 1, 10,  "127.0.0.5");
+        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2");
+        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3");
+
+        final RangeStreamer.ISourceFilter allDeadFilter = new RangeStreamer.ISourceFilter()
+        {
+            public boolean shouldInclude(InetAddress endpoint)
+            {
+                return false;
+            }
+        };
+
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
Arrays.asList(allDeadFilter), "Test");
+        calculator.getRangeFetchMap();
+    }
+
+    @Test
+    public void testForLocalDC() throws Exception
+    {
+        Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
+        addRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.3", "127.0.0.53");
+        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1", "127.0.0.3", "127.0.0.57");
+        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59", "127.0.0.61");
+
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
new ArrayList<>(), "Test");
+        Multimap<InetAddress, Range<Token>> map = calculator.getRangeFetchMap();
+        validateRange(rangesWithSources, map);
+        Assert.assertEquals(2, map.asMap().size());
+
+        //Should have streamed from local DC endpoints
+        assertArrays(Arrays.asList(generateRange(21, 30)), map.asMap().get(InetAddress.getByName("127.0.0.2")));
+        assertArrays(Arrays.asList(generateRange(1, 10), generateRange(11, 20)), map.asMap().get(InetAddress.getByName("127.0.0.3")));
+    }
+
+    @Test
+    public void testForRemoteDC() throws Exception
+    {
+        Multimap<Range<Token>, InetAddress> rangesWithSources = HashMultimap.create();
+        addRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51");
+        addRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55");
+        addRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59");
+
+        //Reject only 127.0.0.3 and accept everyone else
+        final RangeStreamer.ISourceFilter localHostFilter = new RangeStreamer.ISourceFilter()
+        {
+            public boolean shouldInclude(InetAddress endpoint)
+            {
+                try
+                {
+                    if (endpoint.equals(InetAddress.getByName("127.0.0.3")))
+                        return false;
+                    else
+                        return true;
+                }
+                catch (UnknownHostException e)
+                {
+                    return true;
+                }
+            }
+        };
+
+        RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources,
Arrays.asList(localHostFilter), "Test");
+        Multimap<InetAddress, Range<Token>> map = calculator.getRangeFetchMap();
+        validateRange(rangesWithSources, map);
+        Assert.assertEquals(3, map.asMap().size());
+
+        //Should have streamed from remote DC endpoint
+        assertArrays(Arrays.asList(generateRange(1, 10)), map.asMap().get(InetAddress.getByName("127.0.0.51")));
+        assertArrays(Arrays.asList(generateRange(11, 20)), map.asMap().get(InetAddress.getByName("127.0.0.55")));
+        assertArrays(Arrays.asList(generateRange(21, 30)), map.asMap().get(InetAddress.getByName("127.0.0.2")));
+    }
+
+    private void assertArrays(Collection<Range<Token>> expected, Collection<Range<Token>>
result)
+    {
+        Assert.assertEquals(expected.size(), result.size());
+        Assert.assertTrue(result.containsAll(expected));
+    }
+
+    private void validateRange(Multimap<Range<Token>, InetAddress> rangesWithSources,
Multimap<InetAddress, Range<Token>> result)
+    {
+        for (Map.Entry<InetAddress, Range<Token>> entry : result.entries())
+        {
+            Assert.assertTrue(rangesWithSources.get(entry.getValue()).contains(entry.getKey()));
+        }
+    }
+
+    private void addRangeAndSources(Multimap<Range<Token>, InetAddress> rangesWithSources,
int left, int right, String... hosts) throws UnknownHostException
+    {
+        for (InetAddress endpoint : makeAddrs(hosts))
+        {
+            rangesWithSources.put(generateRange(left, right), endpoint);
+        }
+    }
+
+    private Collection<InetAddress> makeAddrs(String... hosts) throws UnknownHostException
+    {
+        ArrayList<InetAddress> addrs = new ArrayList<InetAddress>(hosts.length);
+        for (String host : hosts)
+            addrs.add(InetAddress.getByName(host));
+        return addrs;
+    }
+
+    private Range<Token> generateRange(int left, int right)
+    {
+        return new Range<Token>(new RandomPartitioner.BigIntegerToken(String.valueOf(left)),
new RandomPartitioner.BigIntegerToken(String.valueOf(right)));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message