cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] cassandra git commit: Avoid MV race during node decommission
Date Fri, 04 Dec 2015 20:53:57 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.1 2b8cdc234 -> ee46dfbf9


Avoid MV race during node decommission

Patch by Paulo Motta; reviewed by Joel Knighton for CASSANDRA-10674


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c184e8c1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c184e8c1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c184e8c1

Branch: refs/heads/cassandra-3.1
Commit: c184e8c14b28eddc20cbdd098f5e47d1ed832898
Parents: a4da379
Author: Paulo Motta <pauloricardomg@gmail.com>
Authored: Wed Nov 25 14:50:31 2015 -0800
Committer: T Jake Luciani <jake@apache.org>
Committed: Fri Dec 4 15:50:48 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/view/ViewUtils.java | 25 ++------
 .../apache/cassandra/service/StorageProxy.java  | 67 ++++++++++++--------
 .../apache/cassandra/db/view/ViewUtilsTest.java | 42 ++++++++++--
 4 files changed, 88 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c184e8c1/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8f3f182..b95aa76 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.1
+ * Avoid MV race during node decommission (CASSANDRA-10674)
  * Disable reloading of GossipingPropertyFileSnitch (CASSANDRA-9474)
  * Handle single-column deletions correction in materialized views
    when the column is part of the view primary key (CASSANDRA-10796)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c184e8c1/src/java/org/apache/cassandra/db/view/ViewUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewUtils.java b/src/java/org/apache/cassandra/db/view/ViewUtils.java
index 089a3b7..4d9517f 100644
--- a/src/java/org/apache/cassandra/db/view/ViewUtils.java
+++ b/src/java/org/apache/cassandra/db/view/ViewUtils.java
@@ -21,13 +21,13 @@ package org.apache.cassandra.db.view;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
 public final class ViewUtils
@@ -56,9 +56,9 @@ public final class ViewUtils
      *  B writes to A (B's cardinality is 2 for T1, and A's cardinality is 2 for T3)
      *  C writes to B (C's cardinality is 3 for T1, and B's cardinality is 3 for T3)
      *
-     * @throws RuntimeException if this method is called using a base token which does not
belong to this replica
+     * @return Optional.empty() if this method is called using a base token which does not
belong to this replica
      */
-    public static InetAddress getViewNaturalEndpoint(String keyspaceName, Token baseToken,
Token viewToken)
+    public static Optional<InetAddress> getViewNaturalEndpoint(String keyspaceName,
Token baseToken, Token viewToken)
     {
         AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy();
 
@@ -77,7 +77,7 @@ public final class ViewUtils
         {
             // If we are a base endpoint which is also a view replica, we use ourselves as
our view replica
             if (viewEndpoint.equals(FBUtilities.getBroadcastAddress()))
-                return viewEndpoint;
+                return Optional.of(viewEndpoint);
 
             // We have to remove any endpoint which is shared between the base and the view,
as it will select itself
             // and throw off the counts otherwise.
@@ -95,20 +95,9 @@ public final class ViewUtils
         int baseIdx = baseEndpoints.indexOf(FBUtilities.getBroadcastAddress());
 
         if (baseIdx < 0)
-        {
-
-            if (StorageService.instance.getTokenMetadata().pendingEndpointsFor(viewToken,
keyspaceName).size() > 0)
-            {
-                //Since there are pending endpoints we are going to write to the batchlog
regardless.
-                //So we can pretend we are the views endpoint.
-
-                return FBUtilities.getBroadcastAddress();
-            }
-
-            throw new RuntimeException("Trying to get the view natural endpoint on a non-data
replica");
-        }
-
+            //This node is not a base replica of this key, so we return empty
+            return Optional.empty();
 
-        return viewEndpoints.get(baseIdx);
+        return Optional.of(viewEndpoints.get(baseIdx));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c184e8c1/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 15be7c6..397b8b9 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -670,12 +670,12 @@ public class StorageProxy implements StorageProxyMBean
             if (StorageService.instance.isStarting() || StorageService.instance.isJoining()
|| StorageService.instance.isMoving())
             {
                 BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(),
-                                                        mutations),
-                                      writeCommitLog);
+                                                        mutations), writeCommitLog);
             }
             else
             {
                 List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size());
+                List<Mutation> nonPairedMutations = new LinkedList<>();
                 Token baseToken = StorageService.instance.getTokenMetadata().partitioner.getToken(dataKey);
 
                 ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
@@ -684,40 +684,51 @@ public class StorageProxy implements StorageProxyMBean
                 final Collection<InetAddress> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddress());
                 BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
                                                                                         
                     () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
-
                 // add a handler for each mutation - includes checking availability, but
doesn't initiate any writes, yet
                 for (Mutation mutation : mutations)
                 {
                     String keyspaceName = mutation.getKeyspaceName();
                     Token tk = mutation.key().getToken();
-                    InetAddress pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName,
baseToken, tk);
-                    List<InetAddress> naturalEndpoints = Lists.newArrayList(pairedEndpoint);
-
-                    WriteResponseHandlerWrapper wrapper = wrapViewBatchResponseHandler(mutation,
-                                                                                       consistencyLevel,
-                                                                                       consistencyLevel,
-                                                                                       naturalEndpoints,
-                                                                                       baseComplete,
-                                                                                       WriteType.BATCH,
-                                                                                       cleanup);
-
-                    // When local node is the endpoint and there are no pending nodes we
can
-                    // Just apply the mutation locally.
-                    if (pairedEndpoint.equals(FBUtilities.getBroadcastAddress()) &&
wrapper.handler.pendingEndpoints.isEmpty() && StorageService.instance.isJoined())
+                    Optional<InetAddress> pairedEndpoint = ViewUtils.getViewNaturalEndpoint(keyspaceName,
baseToken, tk);
+                    Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk,
keyspaceName);
+
+                    if (pairedEndpoint.isPresent())
                     {
-                        try
-                        {
-                            mutation.apply(writeCommitLog);
-                        }
-                        catch (Exception exc)
+                        // When local node is the endpoint and there are no pending nodes
we can
+                        // Just apply the mutation locally.
+                        if (pairedEndpoint.get().equals(FBUtilities.getBroadcastAddress())
+                            && pendingEndpoints.isEmpty() && StorageService.instance.isJoined())
+                            try
+                            {
+                                mutation.apply(writeCommitLog);
+                            }
+                            catch (Exception exc)
+                            {
+                                logger.error("Error applying local view update to keyspace
{}: {}", mutation.getKeyspaceName(), mutation);
+                                throw exc;
+                            }
+                        else
                         {
-                            logger.error("Error applying local view update to keyspace {}:
{}", mutation.getKeyspaceName(), mutation);
-                            throw exc;
+                            wrappers.add(wrapViewBatchResponseHandler(mutation,
+                                                                      consistencyLevel,
+                                                                      consistencyLevel,
+                                                                      Collections.singletonList(pairedEndpoint.get()),
+                                                                      baseComplete,
+                                                                      WriteType.BATCH,
+                                                                      cleanup));
                         }
                     }
                     else
                     {
-                        wrappers.add(wrapper);
+                        //if there are no paired endpoints there are probably range movements
going on,
+                        //so we write to the local batchlog to replay later
+                        if (pendingEndpoints.isEmpty())
+                            logger.warn("Received base materialized view mutation for key
%s that does not belong " +
+                                        "to this node. There is probably a range movement
happening (move or decommission)," +
+                                        "but this node hasn't updated its ring metadata yet.
Adding mutation to " +
+                                        "local batchlog to be replayed later.",
+                                        mutation.key());
+                        nonPairedMutations.add(mutation);
                     }
                 }
 
@@ -730,6 +741,12 @@ public class StorageProxy implements StorageProxyMBean
                     // now actually perform the writes and wait for them to complete
                     asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.VIEW_MUTATION);
                 }
+
+                if (!nonPairedMutations.isEmpty())
+                {
+                    BatchlogManager.store(Batch.createLocal(batchUUID, FBUtilities.timestampMicros(),
nonPairedMutations),
+                                          writeCommitLog);
+                }
             }
         }
         finally

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c184e8c1/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
index 8fd0cfb..c238f36 100644
--- a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
+++ b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.db.view;
 import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -74,11 +75,12 @@ public class ViewUtilsTest
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false,
replicationMap));
         Schema.instance.setKeyspaceMetadata(meta);
 
-        InetAddress naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+        Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
                                                                        new StringToken("CA"),
                                                                        new StringToken("BB"));
 
-        Assert.assertEquals(InetAddress.getByName("127.0.0.2"), naturalEndpoint);
+        Assert.assertTrue(naturalEndpoint.isPresent());
+        Assert.assertEquals(InetAddress.getByName("127.0.0.2"), naturalEndpoint.get());
     }
 
 
@@ -106,10 +108,42 @@ public class ViewUtilsTest
         KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false,
replicationMap));
         Schema.instance.setKeyspaceMetadata(meta);
 
-        InetAddress naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+        Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
                                                                        new StringToken("CA"),
                                                                        new StringToken("BB"));
 
-        Assert.assertEquals(InetAddress.getByName("127.0.0.1"), naturalEndpoint);
+        Assert.assertTrue(naturalEndpoint.isPresent());
+        Assert.assertEquals(InetAddress.getByName("127.0.0.1"), naturalEndpoint.get());
+    }
+
+    @Test
+    public void testBaseTokenDoesNotBelongToLocalReplicaShouldReturnEmpty() throws Exception
+    {
+        TokenMetadata metadata = StorageService.instance.getTokenMetadata();
+        metadata.clearUnsafe();
+
+        // DC1
+        metadata.updateNormalToken(new StringToken("A"), InetAddress.getByName("127.0.0.1"));
+        metadata.updateNormalToken(new StringToken("C"), InetAddress.getByName("127.0.0.2"));
+
+        // DC2
+        metadata.updateNormalToken(new StringToken("B"), InetAddress.getByName("127.0.0.4"));
+        metadata.updateNormalToken(new StringToken("D"), InetAddress.getByName("127.0.0.5"));
+
+        Map<String, String> replicationMap = new HashMap<>();
+        replicationMap.put(ReplicationParams.CLASS, NetworkTopologyStrategy.class.getName());
+
+        replicationMap.put("DC1", "1");
+        replicationMap.put("DC2", "1");
+
+        Keyspace.clear("Keyspace1");
+        KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false,
replicationMap));
+        Schema.instance.setKeyspaceMetadata(meta);
+
+        Optional<InetAddress> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1",
+                                                                       new StringToken("AB"),
+                                                                       new StringToken("BB"));
+
+        Assert.assertFalse(naturalEndpoint.isPresent());
     }
 }


Mime
View raw message