cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [1/2] git commit: Make OpOrder AutoCloseable
Date Fri, 21 Mar 2014 14:06:31 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 74290df23 -> 0b68b6dd7


Make OpOrder AutoCloseable

Patch by benedict, reviewed by marcuse for CASSANDRA-6901.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/269f8105
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/269f8105
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/269f8105

Branch: refs/heads/trunk
Commit: 269f81052e42d36f9a3bee684464543b7074b6b9
Parents: 53e2212
Author: belliottsmith <github@sub.laerad.com>
Authored: Fri Mar 21 15:04:36 2014 +0100
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Fri Mar 21 15:05:39 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 src/java/org/apache/cassandra/db/Keyspace.java  | 27 ++++------
 .../db/commitlog/CommitLogSegment.java          | 13 ++---
 .../db/compaction/CompactionManager.java        | 54 ++++++++++++++------
 .../db/index/SecondaryIndexManager.java         | 34 ++++++++----
 .../db/index/composites/CompositesIndex.java    |  7 +--
 .../cassandra/db/index/keys/KeysSearcher.java   |  9 +---
 .../cassandra/utils/concurrent/OpOrder.java     | 21 +++-----
 .../cassandra/concurrent/LongOpOrderTest.java   | 24 ++++-----
 9 files changed, 99 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7e2ed4d..2949b6a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,7 @@
  * Proper compare function for CollectionType (CASSANDRA-6783)
  * Update native server to Netty 4 (CASSANDRA-6236)
  * Fix off-by-one error in stress (CASSANDRA-6883)
+ * Make OpOrder AutoCloseable (CASSANDRA-6901)
 Merged from 2.0:
  * Add uuid() function (CASSANDRA-6473)
  * Omit tombstones from schema digests (CASSANDRA-6862)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java
index 436aca0..fabd433 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -19,16 +19,19 @@ package org.apache.cassandra.db;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,6 +40,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.KSMetaData;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
@@ -45,6 +49,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.pager.QueryPagers;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 
 /**
  * It represents a Keyspace.
@@ -338,8 +343,7 @@ public class Keyspace
      */
     public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
     {
-        final OpOrder.Group opGroup = writeOrder.start();
-        try
+        try (OpOrder.Group opGroup = writeOrder.start())
         {
             // write the mutation to the commitlog and memtables
             final ReplayPosition replayPosition;
@@ -370,10 +374,6 @@ public class Keyspace
                 cfs.apply(key, cf, updater, opGroup, replayPosition);
             }
         }
-        finally
-        {
-            opGroup.finishOne();
-        }
     }
 
     public AbstractReplicationStrategy getReplicationStrategy()
@@ -391,8 +391,7 @@ public class Keyspace
         if (logger.isDebugEnabled())
             logger.debug("Indexing row {} ", cfs.metadata.getKeyValidator().getString(key.key));
 
-        final OpOrder.Group opGroup = cfs.keyspace.writeOrder.start();
-        try
+        try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start())
         {
             Set<SecondaryIndex> indexes = cfs.indexManager.getIndexesByNames(idxNames);
 
@@ -409,10 +408,6 @@ public class Keyspace
                 cfs.indexManager.indexRow(key.key, cf2, opGroup);
             }
         }
-        finally
-        {
-            opGroup.finishOne();
-        }
     }
 
     public List<Future<?>> flush()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
index cabd886..9436a5a 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -178,7 +178,7 @@ public class CommitLogSegment
             int position = allocate(size);
             if (position < 0)
             {
-                commandOrder.finishOne();
+                commandOrder.close();
                 return false;
             }
             alloc.buffer = (ByteBuffer) buffer.duplicate().position(position).limit(position
+ size);
@@ -190,7 +190,7 @@ public class CommitLogSegment
         }
         catch (Throwable t)
         {
-            commandOrder.finishOne();
+            commandOrder.close();
             throw t;
         }
     }
@@ -216,8 +216,7 @@ public class CommitLogSegment
         // this actually isn't strictly necessary, as currently all calls to discardUnusedTail
occur within a block
         // already protected by this OpOrdering, but to prevent future potential mistakes,
we duplicate the protection here
         // so that the contract between discardUnusedTail() and sync() is more explicit.
-        OpOrder.Group group = appendOrder.start();
-        try
+        try (OpOrder.Group group = appendOrder.start())
         {
             while (true)
             {
@@ -233,10 +232,6 @@ public class CommitLogSegment
                 }
             }
         }
-        finally
-        {
-            group.finishOne();
-        }
     }
 
     /**
@@ -581,7 +576,7 @@ public class CommitLogSegment
         // but must not be called more than once
         void markWritten()
         {
-            appendOp.finishOne();
+            appendOp.close();
         }
 
         void awaitDiskSync()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index e28cfef..20fc747 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -20,19 +20,37 @@ package org.apache.cassandra.db.compaction;
 import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
 import com.google.common.base.Throwables;
-import com.google.common.collect.*;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.RateLimiter;
-
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,20 +60,31 @@ import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.OnDiskAtom;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionInfo.Holder;
 import org.apache.cassandra.db.index.SecondaryIndexBuilder;
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.CompactionMetrics;
 import org.apache.cassandra.repair.Validator;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.*;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.WrappedRunnable;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 
 /**
  * A singleton which manages a private executor of ongoing compactions.
@@ -831,15 +860,10 @@ public class CompactionManager implements CompactionManagerMBean
                 if (indexedColumnsInRow != null && !indexedColumnsInRow.isEmpty())
                 {
                     // acquire memtable lock here because secondary index deletion may cause
a race. See CASSANDRA-3712
-                    final OpOrder.Group opGroup = cfs.keyspace.writeOrder.start();
-                    try
+                    try (OpOrder.Group opGroup = cfs.keyspace.writeOrder.start())
                     {
                         cfs.indexManager.deleteFromIndexes(row.getKey(), indexedColumnsInRow,
opGroup);
                     }
-                    finally
-                    {
-                        opGroup.finishOne();
-                    }
                 }
                 return null;
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index c628a74..a97007e 100644
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@ -18,10 +18,20 @@
 package org.apache.cassandra.db.index;
 
 import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
-
-import org.apache.cassandra.utils.concurrent.OpOrder;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Future;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -29,7 +39,13 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.config.IndexType;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IndexExpression;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.composites.CellName;
 import org.apache.cassandra.db.filter.ExtendedFilter;
@@ -37,6 +53,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.ReducingKeyIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 
 /**
  * Manages all the indexes associated with a given CFS
@@ -646,15 +663,10 @@ public class SecondaryIndexManager
             {
                 if (index instanceof PerColumnSecondaryIndex)
                 {
-                    OpOrder.Group opGroup = baseCfs.keyspace.writeOrder.start();
-                    try
+                    try (OpOrder.Group opGroup = baseCfs.keyspace.writeOrder.start())
                     {
                         ((PerColumnSecondaryIndex) index).delete(key.key, cell, opGroup);
                     }
-                    finally
-                    {
-                        opGroup.finishOne();
-                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
index ec1118b..52b76ea 100644
--- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
+++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java
@@ -132,8 +132,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
     public void delete(IndexedEntry entry)
     {
         // start a mini-transaction for this delete, to ensure safe memtable updates
-        OpOrder.Group opGroup = baseCfs.keyspace.writeOrder.start();
-        try
+        try (OpOrder.Group opGroup = baseCfs.keyspace.writeOrder.start())
         {
             int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
             ColumnFamily cfi = ArrayBackedSortedColumns.factory.create(indexCfs.metadata);
@@ -142,10 +141,6 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn
             if (logger.isDebugEnabled())
                 logger.debug("removed index entry for cleaned-up value {}:{}", entry.indexValue,
cfi);
         }
-        finally
-        {
-            opGroup.finishOne();
-        }
     }
 
     protected AbstractType<?> getExpressionComparator()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
index ce5fe30..af780d3 100644
--- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
+++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java
@@ -187,14 +187,9 @@ public class KeysSearcher extends SecondaryIndexSearcher
                         {
                             // delete the index entry w/ its own timestamp
                             Cell dummyCell = new Cell(primaryColumn, indexKey.key, cell.timestamp());
-                            OpOrder.Group opGroup = baseCfs.keyspace.writeOrder.start();
-                            try
+                            try (OpOrder.Group opGroup = baseCfs.keyspace.writeOrder.start())
                             {
-                                ((PerColumnSecondaryIndex)index).delete(dk.key, dummyCell,
opGroup);
-                            }
-                            finally
-                            {
-                                opGroup.finishOne();
+                                ((PerColumnSecondaryIndex) index).delete(dk.key, dummyCell,
opGroup);
                             }
                             continue;
                         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java b/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
index d42f996..bc43e10 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/OpOrder.java
@@ -64,18 +64,13 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
         public void produce()
         {
-            Group opGroup = order.start();
-            try
+            try (Group opGroup = order.start())
             {
                 SharedState s = state;
                 while (s.barrier != null && !s.barrier.isAfter(opGroup))
                     s = s.getReplacement();
                 s.doProduceWork();
             }
-            finally
-            {
-                opGroup.finishOne();
-            }
         }
     }
  * </pre>
@@ -97,7 +92,7 @@ public class OpOrder
 
     /**
      * Start an operation against this OpOrder.
-     * Once the operation is completed Ordered.finishOne() MUST be called EXACTLY once for
this operation.
+     * Once the operation is completed Ordered.close() MUST be called EXACTLY once for this
operation.
      *
      * @return the Ordered instance that manages this OpOrder
      */
@@ -131,17 +126,17 @@ public class OpOrder
 
     /**
      * Represents a group of identically ordered operations, i.e. all operations started
in the interval between
-     * two barrier issuances. For each register() call this is returned, finishOne() must
be called exactly once.
+     * two barrier issuances. For each register() call this is returned, close() must be
called exactly once.
      * It should be treated like taking a lock().
      */
-    public static final class Group implements Comparable<Group>
+    public static final class Group implements Comparable<Group>, AutoCloseable
     {
         /**
          * In general this class goes through the following stages:
-         * 1) LIVE:      many calls to register() and finishOne()
+         * 1) LIVE:      many calls to register() and close()
          * 2) FINISHING: a call to expire() (after a barrier issue), means calls to register()
will now fail,
          *               and we are now 'in the past' (new operations will be started against
a new Ordered)
-         * 3) FINISHED:  once the last finishOne() is called, this Ordered is done. We call
unlink().
+         * 3) FINISHED:  once the last close() is called, this Ordered is done. We call unlink().
          * 4) ZOMBIE:    all our operations are finished, but some operations against an
earlier Ordered are still
          *               running, or tidying up, so unlink() fails to remove us
          * 5) COMPLETE:  all operations started on or before us are FINISHED (and COMPLETE),
so we are unlinked
@@ -176,7 +171,7 @@ public class OpOrder
         }
 
         // prevents any further operations starting against this Ordered instance
-        // if there are no running operations, calls unlink; otherwise, we let the last op
to finishOne call it.
+        // if there are no running operations, calls unlink; otherwise, we let the last op
to close call it.
         // this means issue() won't have to block for ops to finish.
         private void expire()
         {
@@ -212,7 +207,7 @@ public class OpOrder
          * To be called exactly once for each register() call this object is returned for,
indicating the operation
          * is complete
          */
-        public void finishOne()
+        public void close()
         {
             while (true)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/269f8105/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java b/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
index ec00d48..d7105df 100644
--- a/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
+++ b/test/long/org/apache/cassandra/concurrent/LongOpOrderTest.java
@@ -21,14 +21,6 @@ package org.apache.cassandra.concurrent;
  */
 
 
-import org.apache.cassandra.utils.concurrent.OpOrder;
-
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.junit.*;
-import org.slf4j.*;
-
-import static org.junit.Assert.*;
-
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -37,6 +29,15 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+import static org.junit.Assert.assertTrue;
+
 // TODO: we don't currently test SAFE functionality at all!
 // TODO: should also test markBlocking and SyncOrdered
 public class LongOpOrderTest
@@ -202,8 +203,7 @@ public class LongOpOrderTest
                 while (true)
                 {
                     AtomicInteger c;
-                    OpOrder.Group opGroup = order.start();
-                    try
+                    try (OpOrder.Group opGroup = order.start())
                     {
                         if (null == (c = count.get(opGroup)))
                         {
@@ -215,10 +215,6 @@ public class LongOpOrderTest
                         while (!s.accept(opGroup))
                             s = s.replacement;
                     }
-                    finally
-                    {
-                        opGroup.finishOne();
-                    }
                 }
             }
         }


Mime
View raw message