cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From c...@apache.org
Subject [3/6] cassandra git commit: Mutations do not block for completion under view lock contention
Date Mon, 01 Feb 2016 22:07:20 GMT
Mutations do not block for completion under view lock contention

Patch by Carl Yeksigian; reviewed by Tyler Hobbs for CASSANDRA-10779


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/839a5bab
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/839a5bab
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/839a5bab

Branch: refs/heads/trunk
Commit: 839a5bab2a7f5385a878e5dc5f8b01bda28fa777
Parents: b554cb3
Author: Carl Yeksigian <carl@apache.org>
Authored: Mon Feb 1 16:51:15 2016 -0500
Committer: Carl Yeksigian <carl@apache.org>
Committed: Mon Feb 1 16:59:57 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Keyspace.java  | 32 +++++++++++++-------
 src/java/org/apache/cassandra/db/Mutation.java  | 19 ++++++++++--
 .../cassandra/db/MutationVerbHandler.java       | 25 ++++++++++++---
 .../db/commitlog/CommitLogReplayer.java         |  7 +++--
 .../cassandra/service/paxos/PaxosState.java     | 12 ++++++--
 6 files changed, 73 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/839a5bab/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7a42916..bed8703 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.3
+ * Mutations do not block for completion under view lock contention (CASSANDRA-10779)
  * Invalidate legacy schema tables when unloading them (CASSANDRA-11071)
  * (cqlsh) handle INSERT and UPDATE statements with LWT conditions correctly
    (CASSANDRA-11003)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/839a5bab/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 7b4f79b..2b62f0e 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -379,19 +379,19 @@ public class Keyspace
         }
     }
 
-    public void apply(Mutation mutation, boolean writeCommitLog)
+    public CompletableFuture<?> apply(Mutation mutation, boolean writeCommitLog)
     {
-        apply(mutation, writeCommitLog, true, false);
+        return apply(mutation, writeCommitLog, true, false, null);
     }
 
-    public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
+    public CompletableFuture<?> apply(Mutation mutation, boolean writeCommitLog, boolean
updateIndexes)
     {
-        apply(mutation, writeCommitLog, updateIndexes, false);
+        return apply(mutation, writeCommitLog, updateIndexes, false, null);
     }
 
-    public void applyFromCommitLog(Mutation mutation)
+    public CompletableFuture<?> applyFromCommitLog(Mutation mutation)
     {
-        apply(mutation, false, true, true);
+        return apply(mutation, false, true, true, null);
     }
 
     /**
@@ -403,13 +403,18 @@ public class Keyspace
      * @param updateIndexes  false to disable index updates (used by CollationController
"defragmenting")
      * @param isClReplay     true if caller is the commitlog replayer
      */
-    public void apply(final Mutation mutation, final boolean writeCommitLog, boolean updateIndexes,
boolean isClReplay)
+    public CompletableFuture<?> apply(final Mutation mutation,
+                                      final boolean writeCommitLog,
+                                      boolean updateIndexes,
+                                      boolean isClReplay,
+                                      CompletableFuture<?> future)
     {
         if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS))
             throw new RuntimeException("Testing write failures");
 
         Lock lock = null;
         boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation),
false);
+        final CompletableFuture<?> mark = future == null ? new CompletableFuture<>()
: future;
 
         if (requiresViewUpdate)
         {
@@ -422,7 +427,10 @@ public class Keyspace
                 {
                     logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()));
                     Tracing.trace("Could not acquire MV lock");
-                    throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE,
0, 1);
+                    if (future != null)
+                        future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW,
ConsistencyLevel.LOCAL_ONE, 0, 1));
+                    else
+                        throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE,
0, 1);
                 }
                 else
                 {
@@ -430,12 +438,12 @@ public class Keyspace
                     // we will re-apply ourself to the queue and try again later
                     StageManager.getStage(Stage.MUTATION).execute(() -> {
                         if (writeCommitLog)
-                            mutation.apply();
+                            apply(mutation, true, true, isClReplay, mark);
                         else
-                            mutation.applyUnsafe();
+                            apply(mutation, false, true, isClReplay, mark);
                     });
 
-                    return;
+                    return mark;
                 }
             }
             else
@@ -495,6 +503,8 @@ public class Keyspace
                 if (requiresViewUpdate)
                     baseComplete.set(System.currentTimeMillis());
             }
+            mark.complete(null);
+            return mark;
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/839a5bab/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index cbc7e17..6b4c8e9 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -20,8 +20,11 @@ package org.apache.cassandra.db;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -195,14 +198,26 @@ public class Mutation implements IMutation
         return new Mutation(ks, key, modifications);
     }
 
+    public CompletableFuture<?> applyFuture()
+    {
+        Keyspace ks = Keyspace.open(keyspaceName);
+        return ks.apply(this, ks.getMetadata().params.durableWrites);
+    }
+
     /*
      * This is equivalent to calling commit. Applies the changes to
      * to the keyspace that is obtained by calling Keyspace.open().
      */
     public void apply()
     {
-        Keyspace ks = Keyspace.open(keyspaceName);
-        ks.apply(this, ks.getMetadata().params.durableWrites);
+        try
+        {
+            Uninterruptibles.getUninterruptibly(applyFuture());
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e.getCause());
+        }
     }
 
     public void apply(boolean durableWrites)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/839a5bab/src/java/org/apache/cassandra/db/MutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index d4670a2..74dd625 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.util.function.Consumer;
 
 import org.apache.cassandra.batchlog.LegacyBatchlogMigrator;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
@@ -29,6 +30,17 @@ import org.apache.cassandra.tracing.Tracing;
 
 public class MutationVerbHandler implements IVerbHandler<Mutation>
 {
+    private void reply(int id, InetAddress replyTo)
+    {
+        Tracing.trace("Enqueuing response to {}", replyTo);
+        MessagingService.instance().sendReply(WriteResponse.createMessage(), id, replyTo);
+    }
+
+    private void failed()
+    {
+        Tracing.trace("Payload application resulted in WriteTimeout, not replying");
+    }
+
     public void doVerb(MessageIn<Mutation> message, int id)  throws IOException
     {
         // Check if there were any forwarding headers in this message
@@ -49,16 +61,19 @@ public class MutationVerbHandler implements IVerbHandler<Mutation>
         try
         {
             if (message.version < MessagingService.VERSION_30 && LegacyBatchlogMigrator.isLegacyBatchlogMutation(message.payload))
+            {
                 LegacyBatchlogMigrator.handleLegacyMutation(message.payload);
+                reply(id, replyTo);
+            }
             else
-                message.payload.apply();
-
-            Tracing.trace("Enqueuing response to {}", replyTo);
-            MessagingService.instance().sendReply(WriteResponse.createMessage(), id, replyTo);
+                message.payload.applyFuture().thenAccept(o -> reply(id, replyTo)).exceptionally(wto
-> {
+                    failed();
+                    return null;
+                });
         }
         catch (WriteTimeoutException wto)
         {
-            Tracing.trace("Payload application resulted in WriteTimeout, not replying");
+            failed();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/839a5bab/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 2668bba..b4472ed 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -25,6 +25,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.CRC32;
@@ -34,6 +35,7 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Ordering;
+import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -54,7 +56,6 @@ import org.apache.cassandra.io.compress.ICompressor;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -567,7 +568,7 @@ public class CommitLogReplayer
 
         Runnable runnable = new WrappedRunnable()
         {
-            public void runMayThrow() throws IOException
+            public void runMayThrow() throws ExecutionException
             {
                 if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
                     return;
@@ -602,7 +603,7 @@ public class CommitLogReplayer
                 if (newMutation != null)
                 {
                     assert !newMutation.isEmpty();
-                    Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation);
+                    Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
                     keyspacesRecovered.add(keyspace);
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/839a5bab/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index 20ccb90..3ecac99 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -20,10 +20,11 @@
  */
 package org.apache.cassandra.service.paxos;
 
-import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.locks.Lock;
 
 import com.google.common.util.concurrent.Striped;
+import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -138,7 +139,14 @@ public class PaxosState
             {
                 Tracing.trace("Committing proposal {}", proposal);
                 Mutation mutation = proposal.makeMutation();
-                Keyspace.open(mutation.getKeyspaceName()).apply(mutation, true);
+                try
+                {
+                    Uninterruptibles.getUninterruptibly(Keyspace.open(mutation.getKeyspaceName()).apply(mutation,
true));
+                }
+                catch (ExecutionException e)
+                {
+                    throw new RuntimeException(e.getCause());
+                }
             }
             else
             {


Mime
View raw message