cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From c...@apache.org
Subject [6/6] cassandra git commit: Merge branch 'cassandra-3.3' into trunk
Date Mon, 01 Feb 2016 22:07:23 GMT
Merge branch 'cassandra-3.3' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6e7d739d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6e7d739d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6e7d739d

Branch: refs/heads/trunk
Commit: 6e7d739d12f6e7cab5fc9d33a28b40ad150c84e6
Parents: 4a241f6 2e965f0
Author: Carl Yeksigian <carl@apache.org>
Authored: Mon Feb 1 17:04:15 2016 -0500
Committer: Carl Yeksigian <carl@apache.org>
Committed: Mon Feb 1 17:04:15 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Keyspace.java  | 32 +++++++++++++-------
 src/java/org/apache/cassandra/db/Mutation.java  | 19 ++++++++++--
 .../cassandra/db/MutationVerbHandler.java       | 25 ++++++++++++---
 .../db/commitlog/CommitLogReplayer.java         |  4 ++-
 .../cassandra/service/paxos/PaxosState.java     | 12 ++++++--
 6 files changed, 72 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e7d739d/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e7d739d/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Keyspace.java
index 6122479,2b62f0e..72bcc82
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@@ -423,48 -412,25 +427,52 @@@ public class Keyspac
          if (TEST_FAIL_WRITES && metadata.name.equals(TEST_FAIL_WRITES_KS))
              throw new RuntimeException("Testing write failures");
  
 -        Lock lock = null;
 +        Lock[] locks = null;
          boolean requiresViewUpdate = updateIndexes && viewManager.updatesAffectView(Collections.singleton(mutation),
false);
+         final CompletableFuture<?> mark = future == null ? new CompletableFuture<>()
: future;
  
          if (requiresViewUpdate)
          {
              mutation.viewLockAcquireStart.compareAndSet(0L, System.currentTimeMillis());
 -            lock = ViewManager.acquireLockFor(mutation.key().getKey());
  
 -            if (lock == null)
 +            // the order of lock acquisition doesn't matter (from a deadlock perspective)
because we only use tryLock()
 +            Collection<UUID> columnFamilyIds = mutation.getColumnFamilyIds();
 +            Iterator<UUID> idIterator = columnFamilyIds.iterator();
 +            locks = new Lock[columnFamilyIds.size()];
 +
 +            for (int i = 0; i < columnFamilyIds.size(); i++)
              {
 -                if ((System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
 +                UUID cfid = idIterator.next();
 +                int lockKey = Objects.hash(mutation.key().getKey(), cfid);
 +                Lock lock = ViewManager.acquireLockFor(lockKey);
 +                if (lock == null)
                  {
 -                    logger.trace("Could not acquire lock for {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()));
 -                    Tracing.trace("Could not acquire MV lock");
 -                    if (future != null)
 -                        future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW,
ConsistencyLevel.LOCAL_ONE, 0, 1));
 +                    // we will either time out or retry, so release all acquired locks
 +                    for (int j = 0; j < i; j++)
 +                        locks[j].unlock();
 +
 +                    if ((System.currentTimeMillis() - mutation.createdAt) > DatabaseDescriptor.getWriteRpcTimeout())
 +                    {
 +                        logger.trace("Could not acquire lock for {} and table {}", ByteBufferUtil.bytesToHex(mutation.key().getKey()),
columnFamilyStores.get(cfid).name);
 +                        Tracing.trace("Could not acquire MV lock");
-                         throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE,
0, 1);
++                        if (future != null)
++                            future.completeExceptionally(new WriteTimeoutException(WriteType.VIEW,
ConsistencyLevel.LOCAL_ONE, 0, 1));
++                        else
++                            throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE,
0, 1);
 +                    }
                      else
 -                        throw new WriteTimeoutException(WriteType.VIEW, ConsistencyLevel.LOCAL_ONE,
0, 1);
 +                    {
 +                        // This view update can't happen right now. so rather than keep
this thread busy
 +                        // we will re-apply ourself to the queue and try again later
 +                        StageManager.getStage(Stage.MUTATION).execute(() -> {
 +                            if (writeCommitLog)
-                                 mutation.apply();
++                                apply(mutation, true, true, isClReplay, mark);
 +                            else
-                                 mutation.applyUnsafe();
++                                apply(mutation, false, true, isClReplay, mark);
 +                        });
 +
-                         return;
++                        return mark;
 +                    }
                  }
                  else
                  {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6e7d739d/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 971133f,3a9f5e6..3ee70cb
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@@ -23,17 -23,9 +23,18 @@@ import java.io.EOFException
  import java.io.File;
  import java.io.FileOutputStream;
  import java.io.IOException;
 -import java.nio.ByteBuffer;
 -import java.util.*;
 +import java.util.ArrayDeque;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Queue;
 +import java.util.Set;
 +import java.util.UUID;
+ import java.util.concurrent.ExecutionException;
  import java.util.concurrent.Future;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.zip.CRC32;
@@@ -44,11 -36,11 +45,12 @@@ import com.google.common.collect.HashMu
  import com.google.common.collect.Iterables;
  import com.google.common.collect.Multimap;
  import com.google.common.collect.Ordering;
+ import com.google.common.util.concurrent.Uninterruptibles;
 -
  import org.apache.commons.lang3.StringUtils;
 +import org.cliffc.high_scale_lib.NonBlockingHashSet;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
 +
  import org.apache.cassandra.concurrent.Stage;
  import org.apache.cassandra.concurrent.StageManager;
  import org.apache.cassandra.config.CFMetaData;


Mime
View raw message