Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3CAEACD11 for ; Fri, 14 Mar 2014 18:17:59 +0000 (UTC) Received: (qmail 47674 invoked by uid 500); 14 Mar 2014 18:17:41 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 47616 invoked by uid 500); 14 Mar 2014 18:17:41 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 47601 invoked by uid 99); 14 Mar 2014 18:17:40 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Mar 2014 18:17:40 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 738348B735A; Fri, 14 Mar 2014 18:17:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org Date: Fri, 14 Mar 2014 18:17:40 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/8] git commit: Fix schema concurrency exceptions patch by Benedict Elliott Smith; reviewed by Carl Yeksigian for CASSANDRA-6841 Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 c3aac4560 -> 22b94537e refs/heads/cassandra-2.1 5ddeba336 -> 9db5807f4 refs/heads/trunk 51a5106c9 -> 90783a112 Fix schema concurrency exceptions patch by Benedict Elliott Smith; reviewed by Carl Yeksigian for CASSANDRA-6841 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bfd03f19 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bfd03f19 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bfd03f19 Branch: refs/heads/cassandra-2.1 Commit: bfd03f193938104cc86d59ccda137eece5cfd176 Parents: c49d336 Author: Jonathan Ellis Authored: Fri Mar 14 13:11:25 2014 -0500 Committer: Jonathan Ellis Committed: Fri Mar 14 13:12:05 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/Schema.java | 8 +- .../db/commitlog/CommitLogAllocator.java | 2 +- .../db/commitlog/CommitLogSegment.java | 15 ++- .../apache/cassandra/utils/ConcurrentBiMap.java | 131 +++++++++++++++++++ 5 files changed, 147 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd03f19/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e208e21..6483012 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.7 + * Fix schema concurrency exceptions (CASSANDRA-6841) * Fix leaking validator FH in StreamWriter (CASSANDRA-6832) * Fix saving triggers to schema (CASSANDRA-6789) * Fix trigger mutations when base mutation list is immutable (CASSANDRA-6790) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd03f19/src/java/org/apache/cassandra/config/Schema.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java index d822704..0907177 100644 --- a/src/java/org/apache/cassandra/config/Schema.java +++ b/src/java/org/apache/cassandra/config/Schema.java @@ -23,7 +23,9 @@ import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.*; -import com.google.common.collect.*; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +34,7 @@ import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.service.MigrationManager; -import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.ConcurrentBiMap; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; import org.cliffc.high_scale_lib.NonBlockingHashMap; @@ -58,7 +60,7 @@ public class Schema private final Map keyspaceInstances = new NonBlockingHashMap(); /* metadata map for faster ColumnFamily lookup */ - private final BiMap, UUID> cfIdMap = HashBiMap.create(); + private final ConcurrentBiMap, UUID> cfIdMap = new ConcurrentBiMap<>(); private volatile UUID version; http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd03f19/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java index 575e3c3..3009a63 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogAllocator.java @@ -304,7 +304,7 @@ public class CommitLogAllocator { CommitLogSegment oldestSegment = activeSegments.peek(); - if (oldestSegment != null) + if (oldestSegment != null && oldestSegment != CommitLog.instance.activeSegment) { for (UUID dirtyCFId : oldestSegment.getDirtyCFIDs()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd03f19/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index 25658ed..5b8bcfa 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -25,10 +25,13 @@ import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; +import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.Checksum; @@ -63,7 +66,7 @@ public class CommitLogSegment static final int ENTRY_OVERHEAD_SIZE = 4 + 8 + 8; // cache which cf is dirty in this segment to avoid having to lookup all ReplayPositions to decide if we can delete this segment - private final HashMap cfLastWrite = new HashMap(); + private final Map cfLastWrite = new HashMap<>(); public final long id; @@ -355,7 +358,7 @@ public class CommitLogSegment * @param cfId the column family ID that is now clean * @param context the optional clean offset */ - public void markClean(UUID cfId, ReplayPosition context) + public synchronized void markClean(UUID cfId, ReplayPosition context) { Integer lastWritten = cfLastWrite.get(cfId); @@ -368,15 +371,15 @@ public class CommitLogSegment /** * @return a collection of dirty CFIDs for this segment file. */ - public Collection getDirtyCFIDs() + public synchronized Collection getDirtyCFIDs() { - return cfLastWrite.keySet(); + return new ArrayList<>(cfLastWrite.keySet()); } /** * @return true if this segment is unused and safe to recycle or delete */ - public boolean isUnused() + public synchronized boolean isUnused() { return cfLastWrite.isEmpty(); } @@ -396,7 +399,7 @@ public class CommitLogSegment public String dirtyString() { StringBuilder sb = new StringBuilder(); - for (UUID cfId : cfLastWrite.keySet()) + for (UUID cfId : getDirtyCFIDs()) { CFMetaData m = Schema.instance.getCFMetaData(cfId); sb.append(m == null ? "" : m.cfName).append(" (").append(cfId).append("), "); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bfd03f19/src/java/org/apache/cassandra/utils/ConcurrentBiMap.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/ConcurrentBiMap.java b/src/java/org/apache/cassandra/utils/ConcurrentBiMap.java new file mode 100644 index 0000000..b4dfa2e --- /dev/null +++ b/src/java/org/apache/cassandra/utils/ConcurrentBiMap.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.utils; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * + * A variant of BiMap that permits concurrent access, and expects uniqueness of values in both domain and range. + * We synchronize on _modifications only_, and use ConcurrentHashMap so that readers can lookup safely. This does mean there + * could be races to lookup the inverse, but we aren't too worried about that. + * + * @param + * @param + */ +public class ConcurrentBiMap implements Map +{ + protected final Map forwardMap; + protected final Map reverseMap; + + public ConcurrentBiMap() + { + this(new ConcurrentHashMap(16, 0.5f, 1), new ConcurrentHashMap(16, 0.5f, 1)); + } + + protected ConcurrentBiMap(Map forwardMap, Map reverseMap) + { + this.forwardMap = forwardMap; + this.reverseMap = reverseMap; + } + + public Map inverse() + { + return Collections.unmodifiableMap(reverseMap); + } + + public void clear() + { + forwardMap.clear(); + reverseMap.clear(); + } + + public boolean containsKey(Object key) + { + return forwardMap.containsKey(key); + } + + public boolean containsValue(Object value) + { + return reverseMap.containsKey(value); + } + + public Set> entrySet() + { + return forwardMap.entrySet(); + } + + public V get(Object key) + { + return forwardMap.get(key); + } + + public boolean isEmpty() + { + return forwardMap.isEmpty(); + } + + public Set keySet() + { + return forwardMap.keySet(); + } + + public synchronized V put(K key, V value) + { + K oldKey = reverseMap.get(value); + if (oldKey != null && !key.equals(oldKey)) + throw new IllegalArgumentException(value + " is already bound in reverseMap to " + oldKey); + V oldVal = forwardMap.put(key, value); + if (oldVal != null && !Objects.equals(reverseMap.remove(oldVal), key)) + throw new IllegalStateException(); // for the prior mapping to be correct, we MUST get back the key from the reverseMap + reverseMap.put(value, key); + return oldVal; + } + + public synchronized void putAll(Map m) + { + for (Entry entry : m.entrySet()) + put(entry.getKey(), entry.getValue()); + } + + public synchronized V remove(Object key) + { + V oldVal = forwardMap.remove(key); + if (oldVal == null) + return null; + Object oldKey = reverseMap.remove(oldVal); + if (oldKey == null || !oldKey.equals(key)) + throw new IllegalStateException(); // for the prior mapping to be correct, we MUST get back the key from the reverseMap + return oldVal; + } + + public int size() + { + return forwardMap.size(); + } + + public Collection values() + { + return reverseMap.keySet(); + } +}