cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bened...@apache.org
Subject cassandra git commit: Transient Replication support for EACH_QUORUM, and correction of behaviour for LOCAL_QUORUM [Forced Update!]
Date Fri, 30 Nov 2018 17:07:44 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk c683e4ff6 -> c277fc56b (forced update)


Transient Replication support for EACH_QUORUM, and correction of behaviour for LOCAL_QUORUM

patch by Benedict; reviewed by Alex Petrov and Ariel Weisberg for CASSANDRA-14727


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c277fc56
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c277fc56
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c277fc56

Branch: refs/heads/trunk
Commit: c277fc56b586d7c6db1f0d42fd2253f5484ca3d8
Parents: d8164c6
Author: Benedict Elliott Smith <benedict@apple.com>
Authored: Wed Sep 19 12:52:27 2018 +0100
Committer: Benedict Elliott Smith <benedict@apache.org>
Committed: Fri Nov 30 17:06:32 2018 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ConsistencyLevel.java   |  18 ++-
 .../apache/cassandra/locator/ReplicaPlans.java  |  50 +++++---
 .../org/apache/cassandra/locator/Replicas.java  |  24 +++-
 .../locator/ReplicaCollectionTest.java          |  42 +------
 .../cassandra/locator/ReplicaLayoutTest.java    |   2 +-
 .../cassandra/locator/ReplicaPlansTest.java     | 120 +++++++++++++++++++
 .../apache/cassandra/locator/ReplicaUtils.java  |  63 ++++++++++
 .../WriteResponseHandlerTransientTest.java      |  12 +-
 9 files changed, 265 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f145a06..c147eca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Transient Replication: support EACH_QUORUM (CASSANDRA-14727)
  * BufferPool: allocating thread for new chunks should acquire directly (CASSANDRA-14832)
  * Send correct messaging version in internode messaging handshake's third message (CASSANDRA-14896)
  * Make Read and Write Latency columns consistent for proxyhistograms and tablehistograms
(CASSANDRA-11939)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 9e884a7..4973915 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.transport.ProtocolException;
 
+import static org.apache.cassandra.locator.Replicas.addToCountPerDc;
 import static org.apache.cassandra.locator.Replicas.countInOurDc;
 
 public enum ConsistencyLevel
@@ -92,7 +93,12 @@ public enum ConsistencyLevel
              : quorumFor(keyspace);
     }
 
-    public static ObjectIntOpenHashMap<String> eachQuorumFor(Keyspace keyspace)
+    public static int localQuorumForOurDc(Keyspace keyspace)
+    {
+        return localQuorumFor(keyspace, DatabaseDescriptor.getLocalDataCenter());
+    }
+
+    public static ObjectIntOpenHashMap<String> eachQuorumForRead(Keyspace keyspace)
     {
         NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
         ObjectIntOpenHashMap<String> perDc = new ObjectIntOpenHashMap<>(strategy.getDatacenters().size());
@@ -101,6 +107,13 @@ public enum ConsistencyLevel
         return perDc;
     }
 
+    public static ObjectIntOpenHashMap<String> eachQuorumForWrite(Keyspace keyspace,
Endpoints<?> pendingWithDown)
+    {
+        ObjectIntOpenHashMap<String> perDc = eachQuorumForRead(keyspace);
+        addToCountPerDc(perDc, pendingWithDown, 1);
+        return perDc;
+    }
+
     public int blockFor(Keyspace keyspace)
     {
         switch (this)
@@ -121,7 +134,7 @@ public enum ConsistencyLevel
                 return keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
             case LOCAL_QUORUM:
             case LOCAL_SERIAL:
-                return localQuorumFor(keyspace, DatabaseDescriptor.getLocalDataCenter());
+                return localQuorumForOurDc(keyspace);
             case EACH_QUORUM:
                 if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
                 {
@@ -164,6 +177,7 @@ public enum ConsistencyLevel
 
     /**
      * Determine if this consistency level meets or exceeds the consistency requirements
of the given cl for the given keyspace
+     * WARNING: this is not locality aware; you cannot safely use this with mixed locality
consistency levels (e.g. LOCAL_QUORUM and QUORUM)
      */
     public boolean satisfies(ConsistencyLevel other, Keyspace keyspace)
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/src/java/org/apache/cassandra/locator/ReplicaPlans.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
index 5551e5f..a6fe53f 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.locator;
 
 import com.carrotsearch.hppc.ObjectIntOpenHashMap;
+import com.carrotsearch.hppc.cursors.ObjectIntCursor;
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ArrayListMultimap;
@@ -57,9 +58,13 @@ import java.util.function.Function;
 import java.util.function.Predicate;
 
 import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.filter;
 import static org.apache.cassandra.db.ConsistencyLevel.EACH_QUORUM;
-import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumFor;
+import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumForRead;
+import static org.apache.cassandra.db.ConsistencyLevel.eachQuorumForWrite;
 import static org.apache.cassandra.db.ConsistencyLevel.localQuorumFor;
+import static org.apache.cassandra.db.ConsistencyLevel.localQuorumForOurDc;
+import static org.apache.cassandra.locator.Replicas.addToCountPerDc;
 import static org.apache.cassandra.locator.Replicas.countInOurDc;
 import static org.apache.cassandra.locator.Replicas.countPerDc;
 
@@ -77,7 +82,7 @@ public class ReplicaPlans
             case LOCAL_ONE:
                 return countInOurDc(liveReplicas).hasAtleast(1, 1);
             case LOCAL_QUORUM:
-                return countInOurDc(liveReplicas).hasAtleast(consistencyLevel.blockFor(keyspace),
1);
+                return countInOurDc(liveReplicas).hasAtleast(localQuorumForOurDc(keyspace),
1);
             case EACH_QUORUM:
                 if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
                 {
@@ -143,7 +148,7 @@ public class ReplicaPlans
                     Collection<String> dcs = ((NetworkTopologyStrategy) keyspace.getReplicationStrategy()).getDatacenters();
                     for (ObjectObjectCursor<String, Replicas.ReplicaCount> entry :
countPerDc(dcs, allLive))
                     {
-                        int dcBlockFor = ConsistencyLevel.localQuorumFor(keyspace, entry.key);
+                        int dcBlockFor = localQuorumFor(keyspace, entry.key);
                         Replicas.ReplicaCount dcCount = entry.value;
                         if (!dcCount.hasAtleast(dcBlockFor, 0))
                             throw UnavailableException.create(consistencyLevel, entry.key,
dcBlockFor, dcCount.allReplicas(), 0, dcCount.fullReplicas());
@@ -340,7 +345,7 @@ public class ReplicaPlans
 
     public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel
consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, ReplicaLayout.ForTokenWrite live,
Selector selector) throws UnavailableException
     {
-        EndpointsForToken contacts = selector.select(keyspace, consistencyLevel, liveAndDown,
live);
+        EndpointsForToken contacts = selector.select(keyspace, liveAndDown, live);
         assureSufficientLiveReplicasForWrite(keyspace, consistencyLevel, live.all(), liveAndDown.pending());
         return new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, liveAndDown.pending(),
liveAndDown.all(), live.all(), contacts);
     }
@@ -348,20 +353,20 @@ public class ReplicaPlans
     public interface Selector
     {
         <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
-        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live);
+        E select(Keyspace keyspace, L liveAndDown, L live);
     }
 
     /**
      * Select all nodes, transient or otherwise, as targets for the operation.
      *
-     * This is may no longer be useful until we finish implementing transient replication
support, however
+     * This is may no longer be useful once we finish implementing transient replication
support, however
      * it can be of value to stipulate that a location writes to all nodes without regard
to transient status.
      */
     public static final Selector writeAll = new Selector()
     {
         @Override
         public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
-        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
+        E select(Keyspace keyspace, L liveAndDown, L live)
         {
             return liveAndDown.all();
         }
@@ -380,22 +385,33 @@ public class ReplicaPlans
     {
         @Override
         public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
-        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
+        E select(Keyspace keyspace, L liveAndDown, L live)
         {
             if (!any(liveAndDown.all(), Replica::isTransient))
                 return liveAndDown.all();
 
-            assert consistencyLevel != EACH_QUORUM;
-
             ReplicaCollection.Builder<E> contacts = liveAndDown.all().newBuilder(liveAndDown.all().size());
-            contacts.addAll(liveAndDown.natural().filterLazily(Replica::isFull));
+            contacts.addAll(filter(liveAndDown.natural(), Replica::isFull));
             contacts.addAll(liveAndDown.pending());
 
-            // TODO: this doesn't correctly handle LOCAL_QUORUM (or EACH_QUORUM at all)
-            int liveCount = contacts.count(live.all()::contains);
-            int requiredTransientCount = consistencyLevel.blockForWrite(keyspace, liveAndDown.pending())
- liveCount;
-            if (requiredTransientCount > 0)
-                contacts.addAll(live.natural().filterLazily(Replica::isTransient, requiredTransientCount));
+            /**
+             * Per CASSANDRA-14768, we ensure we write to at least a QUORUM of nodes in every
DC,
+             * regardless of how many responses we need to wait for and our requested consistencyLevel.
+             * This is to minimally surprise users with transient replication; with normal
writes, we
+             * soft-ensure that we reach QUORUM in all DCs we are able to, by writing to
every node;
+             * even if we don't wait for ACK, we have in both cases sent sufficient messages.
+              */
+            ObjectIntOpenHashMap<String> requiredPerDc = eachQuorumForWrite(keyspace,
liveAndDown.pending());
+            addToCountPerDc(requiredPerDc, live.natural().filter(Replica::isFull), -1);
+            addToCountPerDc(requiredPerDc, live.pending(), -1);
+
+            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+            for (Replica replica : filter(live.natural(), Replica::isTransient))
+            {
+                String dc = snitch.getDatacenter(replica);
+                if (requiredPerDc.addTo(dc, -1) >= 0)
+                    contacts.add(replica);
+            }
             return contacts.build();
         }
     };
@@ -453,7 +469,7 @@ public class ReplicaPlans
     private static <E extends Endpoints<E>> E contactForEachQuorumRead(Keyspace
keyspace, E candidates)
     {
         assert keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy;
-        ObjectIntOpenHashMap<String> perDc = eachQuorumFor(keyspace);
+        ObjectIntOpenHashMap<String> perDc = eachQuorumForRead(keyspace);
 
         final IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
         return candidates.filter(replica -> {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/src/java/org/apache/cassandra/locator/Replicas.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Replicas.java b/src/java/org/apache/cassandra/locator/Replicas.java
index 6c80134..9e6048a 100644
--- a/src/java/org/apache/cassandra/locator/Replicas.java
+++ b/src/java/org/apache/cassandra/locator/Replicas.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.function.Predicate;
 
+import com.carrotsearch.hppc.ObjectIntOpenHashMap;
 import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
 import com.google.common.collect.Iterables;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -84,21 +85,38 @@ public class Replicas
         return count;
     }
 
-    public static ObjectObjectOpenHashMap<String, ReplicaCount> countPerDc(Collection<String>
dataCenters, Iterable<Replica> liveReplicas)
+    /**
+     * count the number of full and transient replicas, separately, for each DC
+     */
+    public static ObjectObjectOpenHashMap<String, ReplicaCount> countPerDc(Collection<String>
dataCenters, Iterable<Replica> replicas)
     {
         ObjectObjectOpenHashMap<String, ReplicaCount> perDc = new ObjectObjectOpenHashMap<>(dataCenters.size());
         for (String dc: dataCenters)
             perDc.put(dc, new ReplicaCount());
 
-        for (Replica replica : liveReplicas)
+        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+        for (Replica replica : replicas)
         {
-            String dc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(replica);
+            String dc = snitch.getDatacenter(replica);
             perDc.get(dc).increment(replica);
         }
         return perDc;
     }
 
     /**
+     * increment each of the map's DC entries for each matching replica provided
+     */
+    public static void addToCountPerDc(ObjectIntOpenHashMap<String> perDc, Iterable<Replica>
replicas, int add)
+    {
+        IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+        for (Replica replica : replicas)
+        {
+            String dc = snitch.getDatacenter(replica);
+            perDc.addTo(dc, add);
+        }
+    }
+
+    /**
      * A placeholder for areas of the code that cannot yet handle transient replicas, but
should do so in future
      */
     public static void temporaryAssertFull(Replica replica)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
index ced49e2..e2d4797 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
@@ -46,51 +46,11 @@ import static com.google.common.collect.Iterables.*;
 import static com.google.common.collect.Iterables.filter;
 import static org.apache.cassandra.locator.Replica.fullReplica;
 import static org.apache.cassandra.locator.Replica.transientReplica;
+import static org.apache.cassandra.locator.ReplicaUtils.*;
 
 public class ReplicaCollectionTest
 {
 
-    static final InetAddressAndPort EP1, EP2, EP3, EP4, EP5, BROADCAST_EP, NULL_EP;
-    static final Range<Token> R1, R2, R3, R4, R5, BROADCAST_RANGE, NULL_RANGE;
-    static final List<InetAddressAndPort> ALL_EP;
-    static final List<Range<Token>> ALL_R;
-    static
-    {
-        try
-        {
-            EP1 = InetAddressAndPort.getByName("127.0.0.1");
-            EP2 = InetAddressAndPort.getByName("127.0.0.2");
-            EP3 = InetAddressAndPort.getByName("127.0.0.3");
-            EP4 = InetAddressAndPort.getByName("127.0.0.4");
-            EP5 = InetAddressAndPort.getByName("127.0.0.5");
-            BROADCAST_EP = FBUtilities.getBroadcastAddressAndPort();
-            NULL_EP = InetAddressAndPort.getByName("127.255.255.255");
-            R1 = range(0, 1);
-            R2 = range(1, 2);
-            R3 = range(2, 3);
-            R4 = range(3, 4);
-            R5 = range(4, 0);
-            BROADCAST_RANGE = range(10, 11);
-            NULL_RANGE = range(10000, 10001);
-            ALL_EP = ImmutableList.of(EP1, EP2, EP3, EP4, EP5, BROADCAST_EP);
-            ALL_R = ImmutableList.of(R1, R2, R3, R4, R5, BROADCAST_RANGE);
-        }
-        catch (UnknownHostException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    static Token tk(long t)
-    {
-        return new Murmur3Partitioner.LongToken(t);
-    }
-
-    static Range<Token> range(long left, long right)
-    {
-        return new Range<>(tk(left), tk(right));
-    }
-
     static class TestCase<C extends AbstractReplicaCollection<C>>
     {
         final boolean isBuilder;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java b/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java
index 9f2ac58..b5b60e3 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicaLayoutTest.java
@@ -23,7 +23,7 @@ import org.apache.cassandra.dht.Token;
 import org.junit.Assert;
 import org.junit.Test;
 
-import static org.apache.cassandra.locator.ReplicaCollectionTest.*;
+import static org.apache.cassandra.locator.ReplicaUtils.*;
 
 public class ReplicaLayoutTest
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/test/unit/org/apache/cassandra/locator/ReplicaPlansTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaPlansTest.java b/test/unit/org/apache/cassandra/locator/ReplicaPlansTest.java
new file mode 100644
index 0000000..4d0dd47
--- /dev/null
+++ b/test/unit/org/apache/cassandra/locator/ReplicaPlansTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.ReplicaUtils.*;
+
+public class ReplicaPlansTest
+{
+
+    static
+    {
+        DatabaseDescriptor.daemonInitialization();
+    }
+
+    static class Snitch extends AbstractNetworkTopologySnitch
+    {
+        final Set<InetAddressAndPort> dc1;
+        Snitch(Set<InetAddressAndPort> dc1)
+        {
+            this.dc1 = dc1;
+        }
+        @Override
+        public String getRack(InetAddressAndPort endpoint)
+        {
+            return dc1.contains(endpoint) ? "R1" : "R2";
+        }
+
+        @Override
+        public String getDatacenter(InetAddressAndPort endpoint)
+        {
+            return dc1.contains(endpoint) ? "DC1" : "DC2";
+        }
+    }
+
+    private static Keyspace ks(Set<InetAddressAndPort> dc1, Map<String, String>
replication)
+    {
+        replication = ImmutableMap.<String, String>builder().putAll(replication).put("class",
"NetworkTopologyStrategy").build();
+        Keyspace keyspace = Keyspace.mockKS(KeyspaceMetadata.create("blah", KeyspaceParams.create(false,
replication)));
+        Snitch snitch = new Snitch(dc1);
+        DatabaseDescriptor.setEndpointSnitch(snitch);
+        keyspace.getReplicationStrategy().snitch = snitch;
+        return keyspace;
+    }
+
+    private static Replica full(InetAddressAndPort ep) { return fullReplica(ep, R1); }
+
+
+
+    @Test
+    public void testWriteEachQuorum()
+    {
+        IEndpointSnitch stash = DatabaseDescriptor.getEndpointSnitch();
+        final Token token = tk(1L);
+        try
+        {
+            {
+                // all full natural
+                Keyspace ks = ks(ImmutableSet.of(EP1, EP2, EP3), ImmutableMap.of("DC1", "3",
"DC2", "3"));
+                EndpointsForToken natural = EndpointsForToken.of(token, full(EP1), full(EP2),
full(EP3), full(EP4), full(EP5), full(EP6));
+                EndpointsForToken pending = EndpointsForToken.empty(token);
+                ReplicaPlan.ForTokenWrite plan = ReplicaPlans.forWrite(ks, ConsistencyLevel.EACH_QUORUM,
natural, pending, Predicates.alwaysTrue(), ReplicaPlans.writeNormal);
+                assertEquals(natural, plan.liveAndDown);
+                assertEquals(natural, plan.live);
+                assertEquals(natural, plan.contacts());
+            }
+            {
+                // all natural and up, one transient in each DC
+                Keyspace ks = ks(ImmutableSet.of(EP1, EP2, EP3), ImmutableMap.of("DC1", "3",
"DC2", "3"));
+                EndpointsForToken natural = EndpointsForToken.of(token, full(EP1), full(EP2),
trans(EP3), full(EP4), full(EP5), trans(EP6));
+                EndpointsForToken pending = EndpointsForToken.empty(token);
+                ReplicaPlan.ForTokenWrite plan = ReplicaPlans.forWrite(ks, ConsistencyLevel.EACH_QUORUM,
natural, pending, Predicates.alwaysTrue(), ReplicaPlans.writeNormal);
+                assertEquals(natural, plan.liveAndDown);
+                assertEquals(natural, plan.live);
+                EndpointsForToken expectContacts = EndpointsForToken.of(token, full(EP1),
full(EP2), full(EP4), full(EP5));
+                assertEquals(expectContacts, plan.contacts());
+            }
+        }
+        finally
+        {
+            DatabaseDescriptor.setEndpointSnitch(stash);
+        }
+
+        {
+            // test simple
+
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaUtils.java b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
index c5350dc..72c0a06 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicaUtils.java
@@ -18,11 +18,17 @@
 
 package org.apache.cassandra.locator;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.utils.FBUtilities;
+import org.junit.Assert;
+
+import java.net.UnknownHostException;
+import java.util.List;
 
 import static org.apache.cassandra.locator.Replica.fullReplica;
 import static org.apache.cassandra.locator.Replica.transientReplica;
@@ -51,4 +57,61 @@ public class ReplicaUtils
     {
         return transientReplica(endpoint, new Range<>(token, token));
     }
+
+    static final InetAddressAndPort EP1, EP2, EP3, EP4, EP5, EP6, EP7, EP8, EP9, BROADCAST_EP,
NULL_EP;
+    static final Range<Token> R1, R2, R3, R4, R5, R6, R7, R8, R9, BROADCAST_RANGE,
NULL_RANGE, WRAP_RANGE;
+    static final List<InetAddressAndPort> ALL_EP;
+    static final List<Range<Token>> ALL_R;
+
+    static
+    {
+        try
+        {
+            EP1 = InetAddressAndPort.getByName("127.0.0.1");
+            EP2 = InetAddressAndPort.getByName("127.0.0.2");
+            EP3 = InetAddressAndPort.getByName("127.0.0.3");
+            EP4 = InetAddressAndPort.getByName("127.0.0.4");
+            EP5 = InetAddressAndPort.getByName("127.0.0.5");
+            EP6 = InetAddressAndPort.getByName("127.0.0.6");
+            EP7 = InetAddressAndPort.getByName("127.0.0.7");
+            EP8 = InetAddressAndPort.getByName("127.0.0.8");
+            EP9 = InetAddressAndPort.getByName("127.0.0.9");
+            BROADCAST_EP = FBUtilities.getBroadcastAddressAndPort();
+            NULL_EP = InetAddressAndPort.getByName("127.255.255.255");
+            R1 = range(0, 1);
+            R2 = range(1, 2);
+            R3 = range(2, 3);
+            R4 = range(3, 4);
+            R5 = range(4, 5);
+            R6 = range(5, 6);
+            R7 = range(6, 7);
+            R8 = range(7, 8);
+            R9 = range(8, 9);
+            BROADCAST_RANGE = range(10, 11);
+            NULL_RANGE = range(10000, 10001);
+            WRAP_RANGE = range(100000, 0);
+            ALL_EP = ImmutableList.of(EP1, EP2, EP3, EP4, EP5, BROADCAST_EP);
+            ALL_R = ImmutableList.of(R1, R2, R3, R4, R5, BROADCAST_RANGE, WRAP_RANGE);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    static Token tk(long t)
+    {
+        return new Murmur3Partitioner.LongToken(t);
+    }
+
+    static Range<Token> range(long left, long right)
+    {
+        return new Range<>(tk(left), tk(right));
+    }
+
+    static void assertEquals(AbstractReplicaCollection<?> a, AbstractReplicaCollection<?>
b)
+    {
+        Assert.assertEquals(a.list, b.list);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c277fc56/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
index d31d3f1..15fbd27 100644
--- a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
+++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
@@ -173,9 +173,15 @@ public class WriteResponseHandlerTransientTest
     private static void assertSpeculationReplicas(ReplicaPlan.ForTokenWrite expected, EndpointsForToken
replicas, Predicate<InetAddressAndPort> livePredicate)
     {
         ReplicaPlan.ForTokenWrite actual = getSpeculationContext(replicas, livePredicate);
-        Assert.assertTrue(Iterables.elementsEqual(expected.pending(), actual.pending()));
-        Assert.assertTrue(Iterables.elementsEqual(expected.live(), actual.live()));
-        Assert.assertTrue(Iterables.elementsEqual(expected.contacts(), actual.contacts()));
+        assertEquals(expected.pending(), actual.pending());
+        assertEquals(expected.live(), actual.live());
+        assertEquals(expected.contacts(), actual.contacts());
+    }
+
+    private static void assertEquals(ReplicaCollection<?> a, ReplicaCollection<?>
b)
+    {
+        if (!Iterables.elementsEqual(a, b))
+            Assert.assertTrue(a + " vs " + b, false);
     }
 
     private static Predicate<InetAddressAndPort> dead(InetAddressAndPort... endpoints)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


Mime
View raw message