cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pa...@apache.org
Subject [1/2] cassandra git commit: Retry acquire MV lock on failure instead of throwing WTE on streaming
Date Thu, 15 Dec 2016 19:15:27 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 9fc1ffb63 -> 48abc0369


Retry acquire MV lock on failure instead of throwing WTE on streaming

Patch by Benjamin Roth; Reviewed by Paulo Motta for CASSANDRA-12905


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3faa0d92
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3faa0d92
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3faa0d92

Branch: refs/heads/cassandra-3.0
Commit: 3faa0d925791be085b92949a0a0ec20f7e6ae368
Parents: 9fc1ffb
Author: brstgt <brstgt@googlemail.com>
Authored: Thu Dec 15 12:42:31 2016 -0200
Committer: Paulo Motta <paulo@apache.org>
Committed: Thu Dec 15 16:46:00 2016 -0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Keyspace.java  | 92 +++++++++++---------
 src/java/org/apache/cassandra/db/Mutation.java  | 17 ++--
 .../db/commitlog/CommitLogReplayer.java         | 10 +--
 .../cassandra/service/paxos/PaxosState.java     |  9 +-
 .../cassandra/streaming/StreamReceiveTask.java  |  5 +-
 6 files changed, 63 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e69bf08..63e095d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.11
+ * Improve contention handling on failure to acquire MV lock for streaming and hints (CASSANDRA-12905)
  * Fix DELETE and UPDATE queries with empty IN restrictions (CASSANDRA-12829)
  * Mark MVs as built after successful bootstrap (CASSANDRA-12984)
  * Estimated TS drop-time histogram updated with Cell.NO_DELETION_TIME (CASSANDRA-13040)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index ec5102b..3715995 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -26,6 +26,8 @@ import java.util.concurrent.locks.Lock;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
@@ -50,8 +52,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * It represents a Keyspace.
@@ -62,7 +62,7 @@ public class Keyspace
 
     private static final String TEST_FAIL_WRITES_KS = System.getProperty("cassandra.test.fail_writes_ks",
"");
     private static final boolean TEST_FAIL_WRITES = !TEST_FAIL_WRITES_KS.isEmpty();
-    private static int TEST_FAIL_MV_LOCKS_COUNT = Integer.getInteger(System.getProperty("cassandra.test.fail_mv_locks_count",
"0"), 0);
+    private static int TEST_FAIL_MV_LOCKS_COUNT = Integer.getInteger("cassandra.test.fail_mv_locks_count",
0);
 
     public final KeyspaceMetrics metric;
 
@@ -379,42 +379,40 @@ public class Keyspace
         }
     }
 
-    public CompletableFuture<?> apply(Mutation mutation, boolean writeCommitLog)
-    {
-        return apply(mutation, writeCommitLog, true, false, null);
-    }
-
-    /**
-     * Should be used if caller is blocking and runs in mutation stage.
-     * Otherwise there is a race condition where ALL mutation workers are beeing blocked
ending
-     * in a complete deadlock of the mutation stage. See CASSANDRA-12689.
-     *
-     * @param mutation
-     * @param writeCommitLog
-     * @return
-     */
-    public CompletableFuture<?> applyNotDeferrable(Mutation mutation, boolean writeCommitLog)
+    public CompletableFuture<?> applyFuture(Mutation mutation, boolean writeCommitLog,
boolean updateIndexes)
     {
-        return apply(mutation, writeCommitLog, true, false, false, null);
+        return apply(mutation, writeCommitLog, updateIndexes, true, true, null);
     }
 
-    public CompletableFuture<?> apply(Mutation mutation, boolean writeCommitLog, boolean
updateIndexes)
+    public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
     {
-        return apply(mutation, writeCommitLog, updateIndexes, false, null);
+        apply(mutation, writeCommitLog, updateIndexes, true);
     }
 
-    public CompletableFuture<?> applyFromCommitLog(Mutation mutation)
+    public void apply(final Mutation mutation,
+                      final boolean writeCommitLog)
     {
-        return apply(mutation, false, true, true, null);
+        apply(mutation, writeCommitLog, true, true);
     }
 
-    public CompletableFuture<?> apply(final Mutation mutation,
-                                      final boolean writeCommitLog,
-                                      boolean updateIndexes,
-                                      boolean isClReplay,
-                                      CompletableFuture<?> future)
+    /**
+     * If apply is blocking, apply must not be deferred
+     * Otherwise there is a race condition where ALL mutation workers are beeing blocked
ending
+     * in a complete deadlock of the mutation stage. See CASSANDRA-12689.
+     *
+     * @param mutation       the row to write.  Must not be modified after calling apply,
since commitlog append
+     *                       may happen concurrently, depending on the CL Executor type.
+     * @param writeCommitLog false to disable commitlog append entirely
+     * @param updateIndexes  false to disable index updates (used by CollationController
"defragmenting")
+     * @param isDroppable    true if this should throw WriteTimeoutException if it does not
acquire lock within write_request_timeout_in_ms
+     * @throws ExecutionException
+     */
+    public void apply(final Mutation mutation,
+                      final boolean writeCommitLog,
+                      boolean updateIndexes,
+                      boolean isDroppable)
     {
-        return apply(mutation, writeCommitLog, updateIndexes, isClReplay, true, future);
+        apply(mutation, writeCommitLog, updateIndexes, isDroppable, false, null);
     }
 
     /**
@@ -424,13 +422,13 @@ public class Keyspace
      *                       may happen concurrently, depending on the CL Executor type.
      * @param writeCommitLog false to disable commitlog append entirely
      * @param updateIndexes  false to disable index updates (used by CollationController
"defragmenting")
-     * @param isClReplay     true if caller is the commitlog replayer
+     * @param isDroppable    true if this should throw WriteTimeoutException if it does not
acquire lock within write_request_timeout_in_ms
      * @param isDeferrable   true if caller is not waiting for future to complete, so that
future may be deferred
      */
-    public CompletableFuture<?> apply(final Mutation mutation,
+    private CompletableFuture<?> apply(final Mutation mutation,
                                       final boolean writeCommitLog,
                                       boolean updateIndexes,
-                                      boolean isClReplay,
+                                      boolean isDroppable,
                                       boolean isDeferrable,
                                       CompletableFuture<?> future)
     {
@@ -438,7 +436,11 @@ public class Keyspace
             throw new RuntimeException("Testing write failures");
 
         boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation),
false);
-        final CompletableFuture<?> mark = future == null ? new CompletableFuture<>()
: future;
+
+        // If apply is not deferrable, no future is required, returns always null
+        if (isDeferrable && future == null) {
+            future = new CompletableFuture<>();
+        }
 
         Lock lock = null;
         if (requiresViewUpdate)
@@ -453,15 +455,15 @@ public class Keyspace
 
                 if (lock == null)
                 {
-                    // avoid throwing a WTE during commitlog replay
-                    if (!isClReplay && (System.currentTimeMillis() - mutation.createdAt)
> DatabaseDescriptor.getWriteRpcTimeout())
+                    //throw WTE only if request is droppable
+                    if (isDroppable && (System.currentTimeMillis() - mutation.createdAt)
> DatabaseDescriptor.getWriteRpcTimeout())
                     {
                         logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()));
                         Tracing.trace("Could not acquire MV lock");
                         if (future != null)
                         {
                             future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW,
ConsistencyLevel.LOCAL_ONE, 0, 1));
-                            return mark;
+                            return future;
                         }
                         else
                         {
@@ -472,11 +474,12 @@ public class Keyspace
                     {
                         //This view update can't happen right now. so rather than keep this
thread busy
                         // we will re-apply ourself to the queue and try again later
+                        final CompletableFuture<?> mark = future;
                         StageManager.getStage(Stage.MUTATION).execute(() ->
-                                apply(mutation, writeCommitLog, true, isClReplay, mark)
+                                apply(mutation, writeCommitLog, true, isDroppable, true,
mark)
                         );
 
-                        return mark;
+                        return future;
                     }
                     else
                     {
@@ -499,7 +502,9 @@ public class Keyspace
                 else
                 {
                     long acquireTime = System.currentTimeMillis() - mutation.viewLockAcquireStart.get();
-                    if (!isClReplay)
+                    // Metrics are only collected for droppable write operations
+                    // Bulk non-droppable operations (e.g. commitlog replay, hint delivery)
are not measured
+                    if (isDroppable)
                     {
                         for (UUID cfid : mutation.getColumnFamilyIds())
                             columnFamilyStores.get(cfid).metric.viewLockAcquireTime.update(acquireTime,
TimeUnit.MILLISECONDS);
@@ -534,7 +539,7 @@ public class Keyspace
                     try
                     {
                         Tracing.trace("Creating materialized view mutations from base table
replica");
-                        viewManager.forTable(upd.metadata()).pushViewReplicaUpdates(upd,
writeCommitLog && !isClReplay, baseComplete);
+                        viewManager.forTable(upd.metadata()).pushViewReplicaUpdates(upd,
writeCommitLog, baseComplete);
                     }
                     catch (Throwable t)
                     {
@@ -553,8 +558,11 @@ public class Keyspace
                 if (requiresViewUpdate)
                     baseComplete.set(System.currentTimeMillis());
             }
-            mark.complete(null);
-            return mark;
+
+            if (future != null) {
+                future.complete(null);
+            }
+            return future;
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/Mutation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java
index 2955677..7ed69c0 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -202,20 +202,17 @@ public class Mutation implements IMutation
     public CompletableFuture<?> applyFuture()
     {
         Keyspace ks = Keyspace.open(keyspaceName);
-        return ks.apply(this, Keyspace.open(keyspaceName).getMetadata().params.durableWrites);
+        return ks.applyFuture(this, Keyspace.open(keyspaceName).getMetadata().params.durableWrites,
true);
+    }
+
+    public void apply(boolean durableWrites, boolean isDroppable)
+    {
+        Keyspace.open(keyspaceName).apply(this, durableWrites, true, isDroppable);
     }
 
     public void apply(boolean durableWrites)
     {
-        try
-        {
-            Keyspace ks = Keyspace.open(keyspaceName);
-            Uninterruptibles.getUninterruptibly(ks.applyNotDeferrable(this, durableWrites));
-        }
-        catch (ExecutionException e)
-        {
-            throw Throwables.propagate(e.getCause());
-        }
+        apply(durableWrites, true);
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index af8efb4..d53f0f8 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -642,15 +642,7 @@ public class CommitLogReplayer
                 {
                     assert !newMutation.isEmpty();
 
-                    try
-                    {
-                        Uninterruptibles.getUninterruptibly(Keyspace.open(newMutation.getKeyspaceName()).applyFromCommitLog(newMutation));
-                    }
-                    catch (ExecutionException e)
-                    {
-                        throw Throwables.propagate(e.getCause());
-                    }
-
+                    Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false,
true, false);
                     keyspacesRecovered.add(keyspace);
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index 0940950..ee1ba6a 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -147,14 +147,7 @@ public class PaxosState
             {
                 Tracing.trace("Committing proposal {}", proposal);
                 Mutation mutation = proposal.makeMutation();
-                try
-                {
-                    Uninterruptibles.getUninterruptibly(Keyspace.open(mutation.getKeyspaceName()).applyNotDeferrable(mutation,
true));
-                }
-                catch (ExecutionException e)
-                {
-                    throw Throwables.propagate(e.getCause());
-                }
+                Keyspace.open(mutation.getKeyspaceName()).apply(mutation, true);
             }
             else
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3faa0d92/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 040906b..b6b8387 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -178,14 +178,15 @@ public class StreamReceiveTask extends StreamTask
                     {
                         for (SSTableReader reader : readers)
                         {
+                            Keyspace ks = Keyspace.open(reader.getKeyspaceName());
                             try (ISSTableScanner scanner = reader.getScanner())
                             {
                                 while (scanner.hasNext())
                                 {
                                     try (UnfilteredRowIterator rowIterator = scanner.next())
                                     {
-                                        //Apply unsafe (we will flush below before transaction
is done)
-                                        new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
+                                        // MV *can* be applied unsafe as we flush below before
transaction is done.
+                                        ks.apply(new Mutation(PartitionUpdate.fromIterator(rowIterator)),
false, true, false);
                                     }
                                 }
                             }


Mime
View raw message