Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 2AD0E10A79 for ; Fri, 21 Mar 2014 14:06:34 +0000 (UTC) Received: (qmail 43264 invoked by uid 500); 21 Mar 2014 14:06:33 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 43053 invoked by uid 500); 21 Mar 2014 14:06:32 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 42991 invoked by uid 99); 21 Mar 2014 14:06:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Mar 2014 14:06:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 3904F987EDE; Fri, 21 Mar 2014 14:06:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: marcuse@apache.org To: commits@cassandra.apache.org Date: Fri, 21 Mar 2014 14:06:31 -0000 Message-Id: <12b85efc15e44ac1ad43978410e85e68@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] git commit: Make OpOrder AutoCloseable Repository: cassandra Updated Branches: refs/heads/trunk 74290df23 -> 0b68b6dd7 Make OpOrder AutoCloseable Patch by benedict, reviewed by marcuse for CASSANDRA-6901. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/269f8105 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/269f8105 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/269f8105 Branch: refs/heads/trunk Commit: 269f81052e42d36f9a3bee684464543b7074b6b9 Parents: 53e2212 Author: belliottsmith Authored: Fri Mar 21 15:04:36 2014 +0100 Committer: Marcus Eriksson Committed: Fri Mar 21 15:05:39 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/Keyspace.java | 27 ++++------ .../db/commitlog/CommitLogSegment.java | 13 ++--- .../db/compaction/CompactionManager.java | 54 ++++++++++++++------ .../db/index/SecondaryIndexManager.java | 34 ++++++++---- .../db/index/composites/CompositesIndex.java | 7 +-- .../cassandra/db/index/keys/KeysSearcher.java | 9 +--- .../cassandra/utils/concurrent/OpOrder.java | 21 +++----- .../cassandra/concurrent/LongOpOrderTest.java | 24 ++++----- 9 files changed, 99 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7e2ed4d..2949b6a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -27,6 +27,7 @@ * Proper compare function for CollectionType (CASSANDRA-6783) * Update native server to Netty 4 (CASSANDRA-6236) * Fix off-by-one error in stress (CASSANDRA-6883) + * Make OpOrder AutoCloseable (CASSANDRA-6901) Merged from 2.0: * Add uuid() function (CASSANDRA-6473) * Omit tombstones from schema digests (CASSANDRA-6862) http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 436aca0..fabd433 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -19,16 +19,19 @@ package org.apache.cassandra.db; import java.io.File; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import com.google.common.base.Function; import com.google.common.collect.Iterables; - -import org.apache.cassandra.utils.concurrent.OpOrder; -import org.apache.cassandra.db.commitlog.ReplayPosition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +40,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexManager; @@ -45,6 +49,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.pager.QueryPagers; import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.concurrent.OpOrder; /** * It represents a Keyspace. @@ -338,8 +343,7 @@ public class Keyspace */ public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes) { - final OpOrder.Group opGroup = writeOrder.start(); - try + try (OpOrder.Group opGroup = writeOrder.start()) { // write the mutation to the commitlog and memtables final ReplayPosition replayPosition; @@ -370,10 +374,6 @@ public class Keyspace cfs.apply(key, cf, updater, opGroup, replayPosition); } } - finally - { - opGroup.finishOne(); - } } public AbstractReplicationStrategy getReplicationStrategy() @@ -391,8 +391,7 @@ public class Keyspace if (logger.isDebugEnabled()) logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.key)); - final OpOrder.Group opGroup = cfs.keyspace.writeOrder.start(); - try + try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start()) { Set indexes = cfs.indexManager.getIndexesByNames(idxNames); @@ -409,10 +408,6 @@ public class Keyspace cfs.indexManager.indexRow(key.key, cf2, opGroup); } } - finally - { - opGroup.finishOne(); - } } public List> flush() http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index cabd886..9436a5a 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -178,7 +178,7 @@ public class CommitLogSegment int position = allocate(size); if (position < 0) { - commandOrder.finishOne(); + commandOrder.close(); return false; } alloc.buffer = (ByteBuffer) buffer.duplicate().position(position).limit(position + size); @@ -190,7 +190,7 @@ public class CommitLogSegment } catch (Throwable t) { - commandOrder.finishOne(); + commandOrder.close(); throw t; } } @@ -216,8 +216,7 @@ public class CommitLogSegment // this actually isn't strictly necessary, as currently all calls to discardUnusedTail occur within a block // already protected by this OpOrdering, but to prevent future potential mistakes, we duplicate the protection here // so that the contract between discardUnusedTail() and sync() is more explicit. - OpOrder.Group group = appendOrder.start(); - try + try (OpOrder.Group group = appendOrder.start()) { while (true) { @@ -233,10 +232,6 @@ public class CommitLogSegment } } } - finally - { - group.finishOne(); - } } /** @@ -581,7 +576,7 @@ public class CommitLogSegment // but must not be called more than once void markWritten() { - appendOp.finishOne(); + appendOp.close(); } void awaitDiskSync() http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index e28cfef..20fc747 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -20,19 +20,37 @@ package org.apache.cassandra.db.compaction; import java.io.File; import java.io.IOException; import java.lang.management.ManagementFactory; -import java.util.*; -import java.util.concurrent.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import javax.management.ObjectName; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import com.google.common.base.Throwables; -import com.google.common.collect.*; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ConcurrentHashMultiset; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.RateLimiter; - -import org.apache.cassandra.utils.concurrent.OpOrder; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,20 +60,31 @@ import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.Cell; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.OnDiskAtom; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.compaction.CompactionInfo.Holder; import org.apache.cassandra.db.index.SecondaryIndexBuilder; import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableIdentityIterator; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.CompactionMetrics; import org.apache.cassandra.repair.Validator; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.CloseableIterator; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.WrappedRunnable; +import org.apache.cassandra.utils.concurrent.OpOrder; /** * A singleton which manages a private executor of ongoing compactions. @@ -831,15 +860,10 @@ public class CompactionManager implements CompactionManagerMBean if (indexedColumnsInRow != null && !indexedColumnsInRow.isEmpty()) { // acquire memtable lock here because secondary index deletion may cause a race. See CASSANDRA-3712 - final OpOrder.Group opGroup = cfs.keyspace.writeOrder.start(); - try + try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start()) { cfs.indexManager.deleteFromIndexes(row.getKey(), indexedColumnsInRow, opGroup); } - finally - { - opGroup.finishOne(); - } } return null; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java index c628a74..a97007e 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java @@ -18,10 +18,20 @@ package org.apache.cassandra.db.index; import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.*; - -import org.apache.cassandra.utils.concurrent.OpOrder; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Future; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -29,7 +39,13 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.IndexType; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.Cell; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.IndexExpression; +import org.apache.cassandra.db.Row; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.filter.ExtendedFilter; @@ -37,6 +53,7 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.ReducingKeyIterator; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.concurrent.OpOrder; /** * Manages all the indexes associated with a given CFS @@ -646,15 +663,10 @@ public class SecondaryIndexManager { if (index instanceof PerColumnSecondaryIndex) { - OpOrder.Group opGroup = baseCfs.keyspace.writeOrder.start(); - try + try (OpOrder.Group opGroup = baseCfs.keyspace.writeOrder.start()) { ((PerColumnSecondaryIndex) index).delete(key.key, cell, opGroup); } - finally - { - opGroup.finishOne(); - } } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java index ec1118b..52b76ea 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java @@ -132,8 +132,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn public void delete(IndexedEntry entry) { // start a mini-transaction for this delete, to ensure safe memtable updates - OpOrder.Group opGroup = baseCfs.keyspace.writeOrder.start(); - try + try (OpOrder.Group opGroup = baseCfs.keyspace.writeOrder.start()) { int localDeletionTime = (int) (System.currentTimeMillis() / 1000); ColumnFamily cfi = ArrayBackedSortedColumns.factory.create(indexCfs.metadata); @@ -142,10 +141,6 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn if (logger.isDebugEnabled()) logger.debug("removed index entry for cleaned-up value {}:{}", entry.indexValue, cfi); } - finally - { - opGroup.finishOne(); - } } protected AbstractType getExpressionComparator() http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java index ce5fe30..af780d3 100644 --- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java +++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java @@ -187,14 +187,9 @@ public class KeysSearcher extends SecondaryIndexSearcher { // delete the index entry w/ its own timestamp Cell dummyCell = new Cell(primaryColumn, indexKey.key, cell.timestamp()); - OpOrder.Group opGroup = baseCfs.keyspace.writeOrder.start(); - try + try (OpOrder.Group opGroup = baseCfs.keyspace.writeOrder.start()) { - ((PerColumnSecondaryIndex)index).delete(dk.key, dummyCell, opGroup); - } - finally - { - opGroup.finishOne(); + ((PerColumnSecondaryIndex) index).delete(dk.key, dummyCell, opGroup); } continue; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java b/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java index d42f996..bc43e10 100644 --- a/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java +++ b/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java @@ -64,18 +64,13 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; public void produce() { - Group opGroup = order.start(); - try + try (Group opGroup = order.start()) { SharedState s = state; while (s.barrier != null && !s.barrier.isAfter(opGroup)) s = s.getReplacement(); s.doProduceWork(); } - finally - { - opGroup.finishOne(); - } } } * @@ -97,7 +92,7 @@ public class OpOrder /** * Start an operation against this OpOrder. - * Once the operation is completed Ordered.finishOne() MUST be called EXACTLY once for this operation. + * Once the operation is completed Ordered.close() MUST be called EXACTLY once for this operation. * * @return the Ordered instance that manages this OpOrder */ @@ -131,17 +126,17 @@ public class OpOrder /** * Represents a group of identically ordered operations, i.e. all operations started in the interval between - * two barrier issuances. For each register() call this is returned, finishOne() must be called exactly once. + * two barrier issuances. For each register() call this is returned, close() must be called exactly once. * It should be treated like taking a lock(). */ - public static final class Group implements Comparable + public static final class Group implements Comparable, AutoCloseable { /** * In general this class goes through the following stages: - * 1) LIVE: many calls to register() and finishOne() + * 1) LIVE: many calls to register() and close() * 2) FINISHING: a call to expire() (after a barrier issue), means calls to register() will now fail, * and we are now 'in the past' (new operations will be started against a new Ordered) - * 3) FINISHED: once the last finishOne() is called, this Ordered is done. We call unlink(). + * 3) FINISHED: once the last close() is called, this Ordered is done. We call unlink(). * 4) ZOMBIE: all our operations are finished, but some operations against an earlier Ordered are still * running, or tidying up, so unlink() fails to remove us * 5) COMPLETE: all operations started on or before us are FINISHED (and COMPLETE), so we are unlinked @@ -176,7 +171,7 @@ public class OpOrder } // prevents any further operations starting against this Ordered instance - // if there are no running operations, calls unlink; otherwise, we let the last op to finishOne call it. + // if there are no running operations, calls unlink; otherwise, we let the last op to close call it. // this means issue() won't have to block for ops to finish. private void expire() { @@ -212,7 +207,7 @@ public class OpOrder * To be called exactly once for each register() call this object is returned for, indicating the operation * is complete */ - public void finishOne() + public void close() { while (true) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java ---------------------------------------------------------------------- diff --git a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java b/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java index ec00d48..d7105df 100644 --- a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java +++ b/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java @@ -21,14 +21,6 @@ package org.apache.cassandra.concurrent; */ -import org.apache.cassandra.utils.concurrent.OpOrder; - -import org.cliffc.high_scale_lib.NonBlockingHashMap; -import org.junit.*; -import org.slf4j.*; - -import static org.junit.Assert.*; - import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -37,6 +29,15 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.cliffc.high_scale_lib.NonBlockingHashMap; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.utils.concurrent.OpOrder; + +import static org.junit.Assert.assertTrue; + // TODO: we don't currently test SAFE functionality at all! // TODO: should also test markBlocking and SyncOrdered public class LongOpOrderTest @@ -202,8 +203,7 @@ public class LongOpOrderTest while (true) { AtomicInteger c; - OpOrder.Group opGroup = order.start(); - try + try (OpOrder.Group opGroup = order.start()) { if (null == (c = count.get(opGroup))) { @@ -215,10 +215,6 @@ public class LongOpOrderTest while (!s.accept(opGroup)) s = s.replacement; } - finally - { - opGroup.finishOne(); - } } } }