cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [2/8] git commit: Fix schema concurrency exceptions patch by Benedict Elliott Smith; reviewed by Carl Yeksigian for CASSANDRA-6841
Date Fri, 14 Mar 2014 18:17:41 GMT
Fix schema concurrency exceptions
patch by Benedict Elliott Smith; reviewed by Carl Yeksigian for CASSANDRA-6841


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bfd03f19
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bfd03f19
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bfd03f19

Branch: refs/heads/trunk
Commit: bfd03f193938104cc86d59ccda137eece5cfd176
Parents: c49d336
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Fri Mar 14 13:11:25 2014 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Fri Mar 14 13:12:05 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/config/Schema.java     |   8 +-
 .../db/commitlog/CommitLogAllocator.java        |   2 +-
 .../db/commitlog/CommitLogSegment.java          |  15 ++-
 .../apache/cassandra/utils/ConcurrentBiMap.java | 131 +++++++++++++++++++
 5 files changed, 147 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd03f19/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e208e21..6483012 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.7
+ * Fix schema concurrency exceptions (CASSANDRA-6841)
  * Fix leaking validator FH in StreamWriter (CASSANDRA-6832)
  * Fix saving triggers to schema (CASSANDRA-6789)
  * Fix trigger mutations when base mutation list is immutable (CASSANDRA-6790)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd03f19/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index d822704..0907177 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -23,7 +23,9 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.*;
 
-import com.google.common.collect.*;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,7 +34,7 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.ConcurrentBiMap;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -58,7 +60,7 @@ public class Schema
     private final Map<String, Keyspace> keyspaceInstances = new NonBlockingHashMap<String,
Keyspace>();
 
     /* metadata map for faster ColumnFamily lookup */
-    private final BiMap<Pair<String, String>, UUID> cfIdMap = HashBiMap.create();
+    private final ConcurrentBiMap<Pair<String, String>, UUID> cfIdMap = new ConcurrentBiMap<>();
 
     private volatile UUID version;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd03f19/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
index 575e3c3..3009a63 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java
@@ -304,7 +304,7 @@ public class CommitLogAllocator
     {
         CommitLogSegment oldestSegment = activeSegments.peek();
 
-        if (oldestSegment != null)
+        if (oldestSegment != null && oldestSegment != CommitLog.instance.activeSegment)
         {
             for (UUID dirtyCFId : oldestSegment.getDirtyCFIDs())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd03f19/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index 25658ed..5b8bcfa 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -25,10 +25,13 @@ import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.zip.Checksum;
 
@@ -63,7 +66,7 @@ public class CommitLogSegment
     static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8;
 
     // cache which cf is dirty in this segment to avoid having to lookup all ReplayPositions
to decide if we can delete this segment
-    private final HashMap<UUID, Integer> cfLastWrite = new HashMap<UUID, Integer>();
+    private final Map<UUID, Integer> cfLastWrite = new HashMap<>();
 
     public final long id;
 
@@ -355,7 +358,7 @@ public class CommitLogSegment
      * @param cfId    the column family ID that is now clean
      * @param context the optional clean offset
      */
-    public void markClean(UUID cfId, ReplayPosition context)
+    public synchronized void markClean(UUID cfId, ReplayPosition context)
     {
         Integer lastWritten = cfLastWrite.get(cfId);
 
@@ -368,15 +371,15 @@ public class CommitLogSegment
     /**
      * @return a collection of dirty CFIDs for this segment file.
      */
-    public Collection<UUID> getDirtyCFIDs()
+    public synchronized Collection<UUID> getDirtyCFIDs()
     {
-        return cfLastWrite.keySet();
+        return new ArrayList<>(cfLastWrite.keySet());
     }
 
     /**
      * @return true if this segment is unused and safe to recycle or delete
      */
-    public boolean isUnused()
+    public synchronized boolean isUnused()
     {
         return cfLastWrite.isEmpty();
     }
@@ -396,7 +399,7 @@ public class CommitLogSegment
     public String dirtyString()
     {
         StringBuilder sb = new StringBuilder();
-        for (UUID cfId : cfLastWrite.keySet())
+        for (UUID cfId : getDirtyCFIDs())
         {
             CFMetaData m = Schema.instance.getCFMetaData(cfId);
             sb.append(m == null ? "<deleted>" : m.cfName).append(" (").append(cfId).append("),
");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd03f19/src/java/org/apache/cassandra/utils/ConcurrentBiMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ConcurrentBiMap.java b/src/java/org/apache/cassandra/utils/ConcurrentBiMap.java
new file mode 100644
index 0000000..b4dfa2e
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/ConcurrentBiMap.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *
+ * A variant of BiMap that permits concurrent access, and expects uniqueness of values in
both domain and range.
+ * We synchronize on _modifications only_, and use ConcurrentHashMap so that readers can
lookup safely. This does mean there
+ * could be races to lookup the inverse, but we aren't too worried about that.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class ConcurrentBiMap<K, V> implements Map<K, V>
+{
+    protected final Map<K, V> forwardMap;
+    protected final Map<V, K> reverseMap;
+
+    public ConcurrentBiMap()
+    {
+        this(new ConcurrentHashMap<K, V>(16, 0.5f, 1), new ConcurrentHashMap<V,
K>(16, 0.5f, 1));
+    }
+
+    protected ConcurrentBiMap(Map<K, V> forwardMap, Map<V, K> reverseMap)
+    {
+        this.forwardMap = forwardMap;
+        this.reverseMap = reverseMap;
+    }
+
+    public Map<V, K> inverse()
+    {
+        return Collections.unmodifiableMap(reverseMap);
+    }
+
+    public void clear()
+    {
+        forwardMap.clear();
+        reverseMap.clear();
+    }
+
+    public boolean containsKey(Object key)
+    {
+        return forwardMap.containsKey(key);
+    }
+
+    public boolean containsValue(Object value)
+    {
+        return reverseMap.containsKey(value);
+    }
+
+    public Set<Entry<K, V>> entrySet()
+    {
+        return forwardMap.entrySet();
+    }
+
+    public V get(Object key)
+    {
+        return forwardMap.get(key);
+    }
+
+    public boolean isEmpty()
+    {
+        return forwardMap.isEmpty();
+    }
+
+    public Set<K> keySet()
+    {
+        return forwardMap.keySet();
+    }
+
+    public synchronized V put(K key, V value)
+    {
+        K oldKey = reverseMap.get(value);
+        if (oldKey != null && !key.equals(oldKey))
+            throw new IllegalArgumentException(value + " is already bound in reverseMap to
" + oldKey);
+        V oldVal = forwardMap.put(key, value);
+        if (oldVal != null && !Objects.equals(reverseMap.remove(oldVal), key))
+            throw new IllegalStateException(); // for the prior mapping to be correct, we
MUST get back the key from the reverseMap
+        reverseMap.put(value, key);
+        return oldVal;
+    }
+
+    public synchronized void putAll(Map<? extends K, ? extends V> m)
+    {
+        for (Entry<? extends K, ? extends V> entry : m.entrySet())
+            put(entry.getKey(), entry.getValue());
+    }
+
+    public synchronized V remove(Object key)
+    {
+        V oldVal = forwardMap.remove(key);
+        if (oldVal == null)
+            return null;
+        Object oldKey = reverseMap.remove(oldVal);
+        if (oldKey == null || !oldKey.equals(key))
+            throw new IllegalStateException(); // for the prior mapping to be correct, we
MUST get back the key from the reverseMap
+        return oldVal;
+    }
+
+    public int size()
+    {
+        return forwardMap.size();
+    }
+
+    public Collection<V> values()
+    {
+        return reverseMap.keySet();
+    }
+}


Mime
View raw message