cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [1/2] git commit: Use OpOrder to guard sstable references for reads.
Date Wed, 23 Apr 2014 12:20:03 GMT
Repository: cassandra
Updated Branches:
  refs/heads/trunk 902925716 -> c7d604bdf


Use OpOrder to guard sstable references for reads.

Patch by benedict; reviewed by marcuse for CASSANDRA-6919


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/13910dc4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/13910dc4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/13910dc4

Branch: refs/heads/trunk
Commit: 13910dc40077d5d0dadb541c043047f1b7a37be2
Parents: ad57cb0
Author: belliottsmith <github@sub.laerad.com>
Authored: Tue Mar 25 10:09:45 2014 +0000
Committer: Marcus Eriksson <marcuse@apache.org>
Committed: Wed Apr 23 14:16:04 2014 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +-
 .../org/apache/cassandra/config/Schema.java     |  11 ++
 .../cassandra/db/CollationController.java       |   6 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  | 117 ++++++++-----------
 .../cassandra/io/sstable/SSTableReader.java     |  79 +++++++++----
 .../cassandra/streaming/StreamSession.java      |   2 +-
 6 files changed, 119 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/13910dc4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 844df95..07fc3f9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -48,7 +48,8 @@
  * Clean up IndexInfo on keyspace/table drops (CASSANDRA-6924)
  * Only snapshot relative SSTables when sequential repair (CASSANDRA-7024)
  * Require nodetool rebuild_index to specify index names (CASSANDRA-7038)
- * fix cassandra stress errors on reads with native protocol (CASANDRA-7033)
+ * fix cassandra stress errors on reads with native protocol (CASSANDRA-7033)
+ * Use OpOrder to guard sstable references for reads (CASSANDRA-6919)
 Merged from 2.0:
  * Use LOCAL_QUORUM for data reads at LOCAL_SERIAL (CASSANDRA-6939)
  * Log a warning for large batches (CASSANDRA-6487)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/13910dc4/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java
index c606388..b1e0f2f 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -129,6 +129,17 @@ public class Schema
         return keyspaceInstances.get(keyspaceName);
     }
 
+    public ColumnFamilyStore getColumnFamilyStoreInstance(UUID cfId)
+    {
+        Pair<String, String> pair = cfIdMap.inverse().get(cfId);
+        if (pair == null)
+            return null;
+        Keyspace instance = getKeyspaceInstance(pair.left);
+        if (instance == null)
+            return null;
+        return instance.getColumnFamilyStore(cfId);
+    }
+
     /**
      * Store given Keyspace instance to the schema
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/13910dc4/src/java/org/apache/cassandra/db/CollationController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java
index 151a7c5..36a9ebf 100644
--- a/src/java/org/apache/cassandra/db/CollationController.java
+++ b/src/java/org/apache/cassandra/db/CollationController.java
@@ -69,7 +69,7 @@ public class CollationController
         final ColumnFamily container = ArrayBackedSortedColumns.factory.create(cfs.metadata,
filter.filter.isReversed());
         List<OnDiskAtomIterator> iterators = new ArrayList<>();
         Tracing.trace("Acquiring sstable references");
-        ColumnFamilyStore.ViewFragment view = cfs.markReferenced(filter.key);
+        ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(filter.key));
 
         try
         {
@@ -159,7 +159,6 @@ public class CollationController
         {
             for (OnDiskAtomIterator iter : iterators)
                 FileUtils.closeQuietly(iter);
-            SSTableReader.releaseReferences(view.sstables);
         }
     }
 
@@ -187,7 +186,7 @@ public class CollationController
     private ColumnFamily collectAllData(boolean copyOnHeap)
     {
         Tracing.trace("Acquiring sstable references");
-        ColumnFamilyStore.ViewFragment view = cfs.markReferenced(filter.key);
+        ColumnFamilyStore.ViewFragment view = cfs.select(cfs.viewFilter(filter.key));
         List<OnDiskAtomIterator> iterators = new ArrayList<>(Iterables.size(view.memtables)
+ view.sstables.size());
         ColumnFamily returnCF = ArrayBackedSortedColumns.factory.create(cfs.metadata, filter.filter.isReversed());
         DeletionInfo returnDeletionInfo = returnCF.deletionInfo();
@@ -311,7 +310,6 @@ public class CollationController
         {
             for (OnDiskAtomIterator iter : iterators)
                 FileUtils.closeQuietly(iter);
-            SSTableReader.releaseReferences(view.sstables);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/13910dc4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 8f96765..ea49250 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -1780,68 +1780,64 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return repairedSSTables;
     }
 
-    private ViewFragment markReferenced(Function<DataTracker.View, List<SSTableReader>>
filter)
+    public ViewFragment selectAndReference(Function<DataTracker.View, List<SSTableReader>>
filter)
     {
-        List<SSTableReader> sstables;
-        DataTracker.View view;
-
         while (true)
         {
-            view = data.getView();
-
-            if (view.intervalTree.isEmpty())
-            {
-                sstables = Collections.emptyList();
-                break;
-            }
-
-            sstables = filter.apply(view);
-            if (SSTableReader.acquireReferences(sstables))
-                break;
-            // retry w/ new view
+            ViewFragment view = select(filter);
+            if (view.sstables.isEmpty() || SSTableReader.acquireReferences(view.sstables))
+                return view;
         }
+    }
 
+    public ViewFragment select(Function<DataTracker.View, List<SSTableReader>>
filter)
+    {
+        DataTracker.View view = data.getView();
+        List<SSTableReader> sstables = view.intervalTree.isEmpty()
+                                       ? Collections.<SSTableReader>emptyList()
+                                       : filter.apply(view);
         return new ViewFragment(sstables, view.getAllMemtables());
     }
 
+
     /**
      * @return a ViewFragment containing the sstables and memtables that may need to be merged
      * for the given @param key, according to the interval tree
      */
-    public ViewFragment markReferenced(final DecoratedKey key)
+    public Function<DataTracker.View, List<SSTableReader>> viewFilter(final DecoratedKey
key)
     {
         assert !key.isMinimum(partitioner);
-        return markReferenced(new Function<DataTracker.View, List<SSTableReader>>()
+        return new Function<DataTracker.View, List<SSTableReader>>()
         {
             public List<SSTableReader> apply(DataTracker.View view)
             {
                 return compactionStrategy.filterSSTablesForReads(view.intervalTree.search(key));
             }
-        });
+        };
     }
 
     /**
      * @return a ViewFragment containing the sstables and memtables that may need to be merged
      * for rows within @param rowBounds, inclusive, according to the interval tree.
      */
-    public ViewFragment markReferenced(final AbstractBounds<RowPosition> rowBounds)
+    public Function<DataTracker.View, List<SSTableReader>> viewFilter(final AbstractBounds<RowPosition>
rowBounds)
     {
-        return markReferenced(new Function<DataTracker.View, List<SSTableReader>>()
+        return new Function<DataTracker.View, List<SSTableReader>>()
         {
             public List<SSTableReader> apply(DataTracker.View view)
             {
                 return compactionStrategy.filterSSTablesForReads(view.sstablesInBounds(rowBounds));
             }
-        });
+        };
     }
 
     /**
      * @return a ViewFragment containing the sstables and memtables that may need to be merged
      * for rows for all of @param rowBoundsCollection, inclusive, according to the interval
tree.
      */
-    public ViewFragment markReferenced(final Collection<AbstractBounds<RowPosition>>
rowBoundsCollection)
+    public Function<DataTracker.View, List<SSTableReader>> viewFilter(final Collection<AbstractBounds<RowPosition>>
rowBoundsCollection)
     {
-        return markReferenced(new Function<DataTracker.View, List<SSTableReader>>()
+        return new Function<DataTracker.View, List<SSTableReader>>()
         {
             public List<SSTableReader> apply(DataTracker.View view)
             {
@@ -1851,17 +1847,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
                 return ImmutableList.copyOf(sstables);
             }
-        });
+        };
     }
 
     public List<String> getSSTablesForKey(String key)
     {
         DecoratedKey dk = partitioner.decorateKey(metadata.getKeyValidator().fromString(key));
-        ViewFragment view = markReferenced(dk);
-        try
+        try (OpOrder.Group op = readOrdering.start())
         {
-            List<String> files = new ArrayList<String>();
-            for (SSTableReader sstr : view.sstables)
+            List<String> files = new ArrayList<>();
+            for (SSTableReader sstr : select(viewFilter(dk)).sstables)
             {
                 // check if the key actually exists in this sstable, without updating cache
and stats
                 if (sstr.getPosition(dk, SSTableReader.Operator.EQ, false) != null)
@@ -1869,10 +1864,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             }
             return files;
         }
-        finally
-        {
-            SSTableReader.releaseReferences(view.sstables);
-        }
     }
 
     public ColumnFamily getTopLevelColumns(QueryFilter filter, int gcBefore)
@@ -1927,51 +1918,41 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         assert !(range.keyRange() instanceof Range) || !((Range)range.keyRange()).isWrapAround()
|| range.keyRange().right.isMinimum(partitioner) : range.keyRange();
 
-        final ViewFragment view = markReferenced(range.keyRange());
+        final ViewFragment view = select(viewFilter(range.keyRange()));
         Tracing.trace("Executing seq scan across {} sstables for {}", view.sstables.size(),
range.keyRange().getString(metadata.getKeyValidator()));
 
-        try
-        {
-            final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables,
view.sstables, range, this, now);
+        final CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(view.memtables,
view.sstables, range, this, now);
 
-            // todo this could be pushed into SSTableScanner
-            return new AbstractScanIterator()
+        // todo this could be pushed into SSTableScanner
+        return new AbstractScanIterator()
+        {
+            protected Row computeNext()
             {
-                protected Row computeNext()
-                {
-                    // pull a row out of the iterator
-                    if (!iterator.hasNext())
-                        return endOfData();
+                // pull a row out of the iterator
+                if (!iterator.hasNext())
+                    return endOfData();
 
-                    Row current = iterator.next();
-                    DecoratedKey key = current.key;
+                Row current = iterator.next();
+                DecoratedKey key = current.key;
 
-                    if (!range.stopKey().isMinimum(partitioner) && range.stopKey().compareTo(key)
< 0)
-                        return endOfData();
+                if (!range.stopKey().isMinimum(partitioner) && range.stopKey().compareTo(key)
< 0)
+                    return endOfData();
 
-                    // skipping outside of assigned range
-                    if (!range.contains(key))
-                        return computeNext();
+                // skipping outside of assigned range
+                if (!range.contains(key))
+                    return computeNext();
 
-                    if (logger.isTraceEnabled())
-                        logger.trace("scanned {}", metadata.getKeyValidator().getString(key.key));
+                if (logger.isTraceEnabled())
+                    logger.trace("scanned {}", metadata.getKeyValidator().getString(key.key));
 
-                    return current;
-                }
+                return current;
+            }
 
-                public void close() throws IOException
-                {
-                    SSTableReader.releaseReferences(view.sstables);
-                    iterator.close();
-                }
-            };
-        }
-        catch (RuntimeException e)
-        {
-            // In case getIterator() throws, otherwise the iteror close method releases the
references.
-            SSTableReader.releaseReferences(view.sstables);
-            throw e;
-        }
+            public void close() throws IOException
+            {
+                iterator.close();
+            }
+        };
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/cassandra/blob/13910dc4/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index e70fd60..8e359bd 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -111,6 +111,7 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FilterFactory;
 import org.apache.cassandra.utils.IFilter;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.OpOrder;
 
 import static org.apache.cassandra.db.Directories.SECONDARY_INDEX_NAME_SEPARATOR;
 
@@ -554,13 +555,15 @@ public class SSTableReader extends SSTable
 
         synchronized (replaceLock)
         {
-            boolean closeBf = true, closeSummary = true, closeFiles = true;
+            boolean closeBf = true, closeSummary = true, closeFiles = true, deleteFile =
false;
 
             if (replacedBy != null)
             {
                 closeBf = replacedBy.bf != bf;
                 closeSummary = replacedBy.indexSummary != indexSummary;
                 closeFiles = replacedBy.dfile != dfile;
+                // if the replacement sstablereader uses a different path, clean up our paths
+                deleteFile = !dfile.path.equals(replacedBy.dfile.path);
             }
 
             if (replaces != null)
@@ -568,6 +571,7 @@ public class SSTableReader extends SSTable
                 closeBf &= replaces.bf != bf;
                 closeSummary &= replaces.indexSummary != indexSummary;
                 closeFiles &= replaces.dfile != dfile;
+                deleteFile &= !dfile.path.equals(replaces.dfile.path);
             }
 
             boolean deleteAll = false;
@@ -593,32 +597,57 @@ public class SSTableReader extends SSTable
                     replacedBy.replaces = replaces;
             }
 
-            if (references.get() != 0)
-            {
-                throw new IllegalStateException("SSTable is not fully released (" + references.get()
+ " references)");
-            }
-            if (closeBf)
-                bf.close();
-            if (closeSummary)
-                indexSummary.close();
-            if (closeFiles)
-            {
-                ifile.cleanup();
-                dfile.cleanup();
-            }
-            if (deleteAll)
+            scheduleTidy(closeBf, closeSummary, closeFiles, deleteFile, deleteAll);
+        }
+    }
+
+    private void scheduleTidy(final boolean closeBf, final boolean closeSummary, final boolean
closeFiles, final boolean deleteFiles, final boolean deleteAll)
+    {
+        final ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.cfId);
+        final OpOrder.Barrier barrier;
+        if (cfs != null)
+        {
+            barrier = cfs.readOrdering.newBarrier();
+            barrier.issue();
+        }
+        else
+            barrier = null;
+
+        StorageService.tasks.execute(new Runnable()
+        {
+            public void run()
             {
-                /**
-                 * Do the OS a favour and suggest (using fadvice call) that we
-                 * don't want to see pages of this SSTable in memory anymore.
-                 *
-                 * NOTE: We can't use madvice in java because it requires the address of
-                 * the mapping, so instead we always open a file and run fadvice(fd, 0, 0)
on it
-                 */
-                dropPageCache();
-                deletingTask.schedule();
+                if (barrier != null)
+                    barrier.await();
+                assert references.get() == 0;
+                if (closeBf)
+                    bf.close();
+                if (closeSummary)
+                    indexSummary.close();
+                if (closeFiles)
+                {
+                    ifile.cleanup();
+                    dfile.cleanup();
+                }
+                if (deleteAll)
+                {
+                    /**
+                     * Do the OS a favour and suggest (using fadvice call) that we
+                     * don't want to see pages of this SSTable in memory anymore.
+                     *
+                     * NOTE: We can't use madvice in java because it requires the address
of
+                     * the mapping, so instead we always open a file and run fadvice(fd,
0, 0) on it
+                     */
+                    dropPageCache();
+                    deletingTask.run();
+                }
+                else if (deleteFiles)
+                {
+                    FileUtils.deleteWithConfirm(new File(dfile.path));
+                    FileUtils.deleteWithConfirm(new File(ifile.path));
+                }
             }
-        }
+        });
     }
 
     public String getFilename()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/13910dc4/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index e8879f8..1ef24e3 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -269,7 +269,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber,
IFailureDe
             List<AbstractBounds<RowPosition>> rowBoundsList = Lists.newLinkedList();
             for (Range<Token> range : normalizedRanges)
                 rowBoundsList.add(range.toRowBounds());
-            ColumnFamilyStore.ViewFragment view = cfStore.markReferenced(rowBoundsList);
+            ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList));
             sstables.addAll(view.sstables);
         }
         return sstables;


Mime
View raw message