cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From marc...@apache.org
Subject [1/3] Preemptive open of compaction results
Date Wed, 23 Apr 2014 14:29:29 GMT
Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 b3a225ef1 -> 4e95953f2


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/service/FileCacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/FileCacheService.java b/src/java/org/apache/cassandra/service/FileCacheService.java
index d22763b..9f57995 100644
--- a/src/java/org/apache/cassandra/service/FileCacheService.java
+++ b/src/java/org/apache/cassandra/service/FileCacheService.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.cache.*;
 import org.slf4j.Logger;
@@ -41,36 +42,66 @@ public class FileCacheService
 
     public static FileCacheService instance = new FileCacheService();
 
-    private static final Callable<Queue<RandomAccessReader>> cacheForPathCreator
= new Callable<Queue<RandomAccessReader>>()
+    private static final AtomicLong cacheKeyIdCounter = new AtomicLong();
+    public static final class CacheKey
+    {
+        final long id;
+        public CacheKey()
+        {
+            this.id = cacheKeyIdCounter.incrementAndGet();
+        }
+        public boolean equals(Object that)
+        {
+            return that instanceof CacheKey && ((CacheKey) that).id == this.id;
+        }
+        public int hashCode()
+        {
+            return (int) id;
+        }
+    }
+
+    private static final Callable<CacheBucket> cacheForPathCreator = new Callable<CacheBucket>()
     {
         @Override
-        public Queue<RandomAccessReader> call()
+        public CacheBucket call()
         {
-            return new ConcurrentLinkedQueue<RandomAccessReader>();
+            return new CacheBucket();
         }
     };
 
     private static final AtomicInteger memoryUsage = new AtomicInteger();
 
-    private final Cache<String, Queue<RandomAccessReader>> cache;
+    private final Cache<CacheKey, CacheBucket> cache;
     private final FileCacheMetrics metrics = new FileCacheMetrics();
 
+    private static final class CacheBucket
+    {
+        final ConcurrentLinkedQueue<RandomAccessReader> queue = new ConcurrentLinkedQueue<>();
+        volatile boolean discarded = false;
+    }
+
     protected FileCacheService()
     {
-        RemovalListener<String, Queue<RandomAccessReader>> onRemove = new RemovalListener<String,
Queue<RandomAccessReader>>()
+        RemovalListener<CacheKey, CacheBucket> onRemove = new RemovalListener<CacheKey,
CacheBucket>()
         {
             @Override
-            public void onRemoval(RemovalNotification<String, Queue<RandomAccessReader>>
notification)
+            public void onRemoval(RemovalNotification<CacheKey, CacheBucket> notification)
             {
-                Queue<RandomAccessReader> cachedInstances = notification.getValue();
-                if (cachedInstances == null)
+                CacheBucket bucket = notification.getValue();
+                if (bucket == null)
                     return;
 
-                if (cachedInstances.size() > 0)
-                    logger.debug("Evicting cold readers for {}", cachedInstances.peek().getPath());
-
-                for (RandomAccessReader reader : cachedInstances)
+                // set discarded before deallocating the readers, to ensure we don't leak
any
+                bucket.discarded = true;
+                Queue<RandomAccessReader> q = bucket.queue;
+                boolean first = true;
+                for (RandomAccessReader reader = q.poll() ; reader != null ; reader = q.poll())
                 {
+                    if (logger.isDebugEnabled() && first)
+                    {
+                        logger.debug("Evicting cold readers for {}", reader.getPath());
+                        first = false;
+                    }
                     memoryUsage.addAndGet(-1 * reader.getTotalBufferSize());
                     reader.deallocate();
                 }
@@ -81,15 +112,16 @@ public class FileCacheService
                 .expireAfterAccess(AFTER_ACCESS_EXPIRATION, TimeUnit.MILLISECONDS)
                 .concurrencyLevel(DatabaseDescriptor.getConcurrentReaders())
                 .removalListener(onRemove)
+                .initialCapacity(16 << 10)
                 .build();
     }
 
-    public RandomAccessReader get(String path)
+    public RandomAccessReader get(CacheKey key)
     {
         metrics.requests.mark();
 
-        Queue<RandomAccessReader> instances = getCacheFor(path);
-        RandomAccessReader result = instances.poll();
+        CacheBucket bucket = getCacheFor(key);
+        RandomAccessReader result = bucket.queue.poll();
         if (result != null)
         {
             metrics.hits.mark();
@@ -99,11 +131,11 @@ public class FileCacheService
         return result;
     }
 
-    private Queue<RandomAccessReader> getCacheFor(String path)
+    private CacheBucket getCacheFor(CacheKey key)
     {
         try
         {
-            return cache.get(path, cacheForPathCreator);
+            return cache.get(key, cacheForPathCreator);
         }
         catch (ExecutionException e)
         {
@@ -111,34 +143,46 @@ public class FileCacheService
         }
     }
 
-    public void put(RandomAccessReader instance)
+    public void put(CacheKey cacheKey, RandomAccessReader instance)
     {
         int memoryUsed = memoryUsage.get();
         if (logger.isDebugEnabled())
             logger.debug("Estimated memory usage is {} compared to actual usage {}", memoryUsed,
sizeInBytes());
 
-        if (memoryUsed >= MEMORY_USAGE_THRESHOLD)
+        CacheBucket bucket = cache.getIfPresent(cacheKey);
+        if (memoryUsed >= MEMORY_USAGE_THRESHOLD || bucket == null)
         {
             instance.deallocate();
         }
         else
         {
             memoryUsage.addAndGet(instance.getTotalBufferSize());
-            getCacheFor(instance.getPath()).add(instance);
+            bucket.queue.add(instance);
+            if (bucket.discarded)
+            {
+                RandomAccessReader reader = bucket.queue.poll();
+                if (reader != null)
+                {
+                    memoryUsage.addAndGet(-1 * reader.getTotalBufferSize());
+                    reader.deallocate();
+                }
+            }
         }
     }
 
-    public void invalidate(String path)
+    public void invalidate(CacheKey cacheKey, String path)
     {
-        logger.debug("Invalidating cache for {}", path);
-        cache.invalidate(path);
+        if (logger.isDebugEnabled())
+            logger.debug("Invalidating cache for {}", path);
+        cache.invalidate(cacheKey);
     }
 
+    // TODO: this method is unsafe, as it calls getTotalBufferSize() on items that can have
been discarded
     public long sizeInBytes()
     {
         long n = 0;
-        for (Queue<RandomAccessReader> queue : cache.asMap().values())
-            for (RandomAccessReader reader : queue)
+        for (CacheBucket bucket : cache.asMap().values())
+            for (RandomAccessReader reader : bucket.queue)
                 n += reader.getTotalBufferSize();
         return n;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/streaming/StreamLockfile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamLockfile.java b/src/java/org/apache/cassandra/streaming/StreamLockfile.java
index 0eb01c5..4d20479 100644
--- a/src/java/org/apache/cassandra/streaming/StreamLockfile.java
+++ b/src/java/org/apache/cassandra/streaming/StreamLockfile.java
@@ -21,15 +21,20 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.StandardOpenOption;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
 
 import com.google.common.base.Charsets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.util.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Encapsulates the behavior for 'locking' any streamed sttables to a node.
@@ -69,7 +74,7 @@ public class StreamLockfile
             /* write out the file names *without* the 'tmp-file' flag in the file name.
                this class will not need to clean up tmp files (on restart), CassandraDaemon
does that already,
                just make sure we delete the fully-formed SSTRs. */
-            sstablePaths.add(writer.descriptor.asTemporary(false).baseFilename());
+            sstablePaths.add(writer.descriptor.asType(Descriptor.Type.FINAL).baseFilename());
         }
 
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
index e91f58f..78d4d9e 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneScrubber.java
@@ -117,17 +117,6 @@ public class StandaloneScrubber
                             scrubber.close();
                         }
 
-                        if (manifest != null)
-                        {
-                            if (scrubber.getNewInOrderSSTable() != null)
-                                manifest.add(scrubber.getNewInOrderSSTable());
-
-                            List<SSTableReader> added = scrubber.getNewSSTable() ==
null
-                                ? Collections.<SSTableReader>emptyList()
-                                : Collections.singletonList(scrubber.getNewSSTable());
-                            manifest.replace(Collections.singletonList(sstable), added);
-                        }
-
                         // Remove the sstable (it's been copied by scrub and snapshotted)
                         sstable.markObsolete();
                         sstable.releaseReference();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
index 8b92586..9353ce9 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneSplitter.java
@@ -140,10 +140,6 @@ public class StandaloneSplitter
                 try
                 {
                     new SSTableSplitter(cfs, sstable, options.sizeInMB).split();
-
-                    // Remove the sstable
-                    sstable.markObsolete();
-                    sstable.releaseReference();
                 }
                 catch (Exception e)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
index a00245b..55f206e 100644
--- a/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
+++ b/src/java/org/apache/cassandra/tools/StandaloneUpgrader.java
@@ -97,9 +97,6 @@ public class StandaloneUpgrader
                 {
                     Upgrader upgrader = new Upgrader(cfs, sstable, handler);
                     upgrader.upgrade();
-
-                    sstable.markObsolete();
-                    sstable.releaseReference();
                 }
                 catch (Exception e)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/utils/CLibrary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CLibrary.java b/src/java/org/apache/cassandra/utils/CLibrary.java
index ac9f863..1d3c014 100644
--- a/src/java/org/apache/cassandra/utils/CLibrary.java
+++ b/src/java/org/apache/cassandra/utils/CLibrary.java
@@ -18,6 +18,9 @@
 package org.apache.cassandra.utils;
 
 import java.io.FileDescriptor;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.lang.reflect.Field;
 
 import org.slf4j.Logger;
@@ -139,6 +142,25 @@ public final class CLibrary
         }
     }
 
+    public static void trySkipCache(String path, long offset, long len)
+    {
+        trySkipCache(getfd(path), offset, len);
+    }
+
+    public static void trySkipCache(int fd, long offset, long len)
+    {
+        if (len == 0)
+            trySkipCache(fd, 0, 0);
+
+        while (len > 0)
+        {
+            int sublen = (int) Math.min(Integer.MAX_VALUE, len);
+            trySkipCache(fd, offset, sublen);
+            len -= sublen;
+            offset -= sublen;
+        }
+    }
+
     public static void trySkipCache(int fd, long offset, int len)
     {
         if (fd < 0)
@@ -280,33 +302,30 @@ public final class CLibrary
         return -1;
     }
 
-    /**
-     * Suggest kernel to preheat one page for the given file.
-     *
-     * @param fd The file descriptor of file to preheat.
-     * @param position The offset of the block.
-     *
-     * @return On success, zero is returned. On error, an error number is returned.
-     */
-    public static int preheatPage(int fd, long position)
+    public static int getfd(String path)
     {
+        RandomAccessFile file = null;
         try
         {
-            // 4096 is good for SSD because they operate on "Pages" 4KB in size
-            return posix_fadvise(fd, position, 4096, POSIX_FADV_WILLNEED);
+            file = new RandomAccessFile(path, "r");
+            return getfd(file.getFD());
         }
-        catch (UnsatisfiedLinkError e)
+        catch (Throwable t)
         {
-            // JNA is unavailable just skipping
+            // ignore
+            return -1;
         }
-        catch (RuntimeException e)
+        finally
         {
-            if (!(e instanceof LastErrorException))
-                throw e;
-
-            logger.warn(String.format("posix_fadvise(%d, %d) failed, errno (%d).", fd, position,
errno(e)));
+            try
+            {
+                if (file != null)
+                    file.close();
+            }
+            catch (Throwable t)
+            {
+                // ignore
+            }
         }
-
-        return -1;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
index de8da01..3007292 100644
--- a/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
+++ b/src/java/org/apache/cassandra/utils/obs/OffHeapBitSet.java
@@ -21,7 +21,6 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.cassandra.cache.RefCountedMemory;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.Memory;
 
@@ -42,7 +41,7 @@ public class OffHeapBitSet implements IBitSet
         try
         {
             long byteCount = wordCount * 8L;
-            bytes = RefCountedMemory.allocate(byteCount);
+            bytes = Memory.allocate(byteCount);
         }
         catch (OutOfMemoryError e)
         {
@@ -123,7 +122,7 @@ public class OffHeapBitSet implements IBitSet
     public static OffHeapBitSet deserialize(DataInput in) throws IOException
     {
         long byteCount = in.readInt() * 8L;
-        Memory memory = RefCountedMemory.allocate(byteCount);
+        Memory memory = Memory.allocate(byteCount);
         for (long i = 0; i < byteCount;)
         {
             long v = in.readLong();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
index d68ba10..35c2b5e 100644
--- a/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
+++ b/test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
@@ -99,7 +99,7 @@ public class LongCompactionsTest extends SchemaLoader
 
         long start = System.nanoTime();
         final int gcBefore = (int) (System.currentTimeMillis() / 1000) - Schema.instance.getCFMetaData(KEYSPACE1,
"Standard1").getGcGraceSeconds();
-        new CompactionTask(store, sstables, gcBefore).execute(null);
+        new CompactionTask(store, sstables, gcBefore, false).execute(null);
         System.out.println(String.format("%s: sstables=%d rowsper=%d colsper=%d: %d ms",
                                          this.getClass().getName(),
                                          sstableCount,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index b3f7429..d180b82 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -22,15 +22,26 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.io.util.FileUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.Test;
@@ -41,28 +52,55 @@ import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.IndexType;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
-import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.composites.CellName;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.composites.Composites;
+import org.apache.cassandra.db.filter.ColumnSlice;
+import org.apache.cassandra.db.filter.IDiskAtomFilter;
+import org.apache.cassandra.db.filter.NamesQueryFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.SliceQueryFilter;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.db.marshal.LexicalUUIDType;
 import org.apache.cassandra.db.marshal.LongType;
-import org.apache.cassandra.dht.*;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.dht.ExcludingBounds;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.IncludingExcludingBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableDeletingTask;
+import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.io.sstable.SSTableSimpleWriter;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.thrift.ThriftValidation;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 
-import static org.junit.Assert.*;
-import static org.apache.cassandra.Util.*;
+import static org.apache.cassandra.Util.cellname;
+import static org.apache.cassandra.Util.column;
+import static org.apache.cassandra.Util.dk;
+import static org.apache.cassandra.Util.rp;
 import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class ColumnFamilyStoreTest extends SchemaLoader
@@ -916,8 +954,8 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         for (int version = 1; version <= 2; ++version)
         {
-            Descriptor existing = new Descriptor(cfs.directories.getDirectoryForCompactedSSTables(),
"Keyspace2", "Standard1", version, false);
-            Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), "Keyspace2",
"Standard1", version, false);
+            Descriptor existing = new Descriptor(cfs.directories.getDirectoryForCompactedSSTables(),
"Keyspace2", "Standard1", version, Descriptor.Type.FINAL);
+            Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), "Keyspace2",
"Standard1", version, Descriptor.Type.FINAL);
             for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX,
Component.FILTER, Component.STATS })
                 assertTrue("can not find backedup file:" + desc.filenameFor(c), new File(desc.filenameFor(c)).exists());
         }
@@ -1697,7 +1735,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
                 MetadataCollector collector = new MetadataCollector(cfmeta.comparator);
                 for (int ancestor : ancestors)
                     collector.addAncestor(ancestor);
-                String file = new Descriptor(directory, ks, cf, 3, true).filenameFor(Component.DATA);
+                String file = new Descriptor(directory, ks, cf, 3, Descriptor.Type.TEMP).filenameFor(Component.DATA);
                 return new SSTableWriter(file,
                                          0,
                                          ActiveRepairService.UNREPAIRED_SSTABLE,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 97cd21c..05e0beb 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -19,20 +19,25 @@ package org.apache.cassandra.db;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Config.DiskFailurePolicy;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Directories.DataDirectory;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
@@ -99,7 +104,7 @@ public class DirectoriesTest
 
     private static void createFakeSSTable(File dir, String cf, int gen, boolean temp, List<File>
addTo) throws IOException
     {
-        Descriptor desc = new Descriptor(dir, KS, cf, gen, temp);
+        Descriptor desc = new Descriptor(dir, KS, cf, gen, temp ? Descriptor.Type.TEMP :
Descriptor.Type.FINAL);
         for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER
})
         {
             File f = new File(desc.filenameFor(c));
@@ -122,7 +127,7 @@ public class DirectoriesTest
             Directories directories = new Directories(cfm);
             assertEquals(cfDir(cfm), directories.getDirectoryForCompactedSSTables());
 
-            Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, false);
+            Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, Descriptor.Type.FINAL);
             File snapshotDir = new File(cfDir(cfm),  File.separator + Directories.SNAPSHOT_SUBDIR
+ File.separator + "42");
             assertEquals(snapshotDir, Directories.getSnapshotDirectory(desc, "42"));
 
@@ -224,7 +229,7 @@ public class DirectoriesTest
             final String n = Long.toString(System.nanoTime());
             Callable<File> directoryGetter = new Callable<File>() {
                 public File call() throws Exception {
-                    Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, false);
+                    Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, Descriptor.Type.FINAL);
                     return Directories.getSnapshotDirectory(desc, n);
                 }
             };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index 6ca5487..c48a728 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -17,8 +17,10 @@
  */
 package org.apache.cassandra.db;
 
+import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 import org.junit.AfterClass;
@@ -30,7 +32,9 @@ import org.apache.cassandra.cache.KeyCacheKey;
 import org.apache.cassandra.db.composites.*;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.CacheService;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.junit.Assert.assertEquals;
@@ -145,11 +149,22 @@ public class KeyCacheTest extends SchemaLoader
 
         assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
 
+        Set<SSTableReader> readers = cfs.getDataTracker().getSSTables();
+        for (SSTableReader reader : readers)
+            reader.acquireReference();
+
         Util.compactAll(cfs, Integer.MAX_VALUE).get();
-        // after compaction cache should have entries for
-        // new SSTables, if we had 2 keys in cache previously it should become 4
+        // after compaction cache should have entries for new SSTables,
+        // but since we have kept a reference to the old sstables,
+        // if we had 2 keys in cache previously it should become 4
         assertKeyCacheSize(4, KEYSPACE1, COLUMN_FAMILY1);
 
+        for (SSTableReader reader : readers)
+            reader.releaseReference();
+
+        // after releasing the reference this should drop to 2
+        assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
+
         // re-read same keys to verify that key cache didn't grow further
         cfs.getColumnFamily(QueryFilter.getSliceFilter(key1,
                                                        COLUMN_FAMILY1,
@@ -167,7 +182,7 @@ public class KeyCacheTest extends SchemaLoader
                                                        10,
                                                        System.currentTimeMillis()));
 
-        assertKeyCacheSize(4, KEYSPACE1, COLUMN_FAMILY1);
+        assertKeyCacheSize(2, KEYSPACE1, COLUMN_FAMILY1);
     }
 
     private void assertKeyCacheSize(int expected, String keyspace, String columnFamily)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index b8c7980..e820fc2 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -20,8 +20,10 @@ package org.apache.cassandra.db;
  *
  */
 
-import java.io.*;
-import java.util.Collections;
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -29,7 +31,6 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.cql3.QueryProcessor;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.utils.UUIDGen;
 import org.apache.commons.lang3.StringUtils;
@@ -37,17 +38,18 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
-import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.compaction.Scrubber;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.db.compaction.Scrubber;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.Util.cellname;
@@ -112,7 +114,7 @@ public class ScrubTest extends SchemaLoader
         file.close();
 
         // with skipCorrupted == false, the scrub is expected to fail
-        Scrubber scrubber = new Scrubber(cfs, sstable, false);
+        Scrubber scrubber = new Scrubber(cfs, sstable, false, false);
         try
         {
             scrubber.scrub();
@@ -121,10 +123,9 @@ public class ScrubTest extends SchemaLoader
         catch (IOError err) {}
 
         // with skipCorrupted == true, the corrupt row will be skipped
-        scrubber = new Scrubber(cfs, sstable, true);
+        scrubber = new Scrubber(cfs, sstable, true, false);
         scrubber.scrub();
         scrubber.close();
-        cfs.replaceCompactedSSTables(Collections.singletonList(sstable), Collections.singletonList(scrubber.getNewSSTable()),
OperationType.SCRUB);
         assertEquals(1, cfs.getSSTables().size());
 
         // verify that we can read all of the rows, and there is now one less row
@@ -206,7 +207,7 @@ public class ScrubTest extends SchemaLoader
         assert root != null;
         File rootDir = new File(root);
         assert rootDir.isDirectory();
-        Descriptor desc = new Descriptor(new Descriptor.Version("jb"), rootDir, KEYSPACE,
columnFamily, 1, false);
+        Descriptor desc = new Descriptor(new Descriptor.Version("jb"), rootDir, KEYSPACE,
columnFamily, 1, Descriptor.Type.FINAL);
         CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, desc.cfname);
 
         try
@@ -227,7 +228,7 @@ public class ScrubTest extends SchemaLoader
         components.add(Component.TOC);
         SSTableReader sstable = SSTableReader.openNoValidation(desc, components, metadata);
 
-        Scrubber scrubber = new Scrubber(cfs, sstable, false);
+        Scrubber scrubber = new Scrubber(cfs, sstable, false, true);
         scrubber.scrub();
 
         cfs.loadNewSSTables();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index f8fcf76..900abd8 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -18,19 +18,22 @@
  */
 package org.apache.cassandra.io.compress;
 
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.util.Collections;
 import java.util.Random;
 
 import org.junit.Test;
 
-import org.apache.cassandra.db.composites.*;
+import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
-import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
-import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.io.util.FileMark;
+import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SequentialWriter;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -61,7 +64,7 @@ public class CompressedRandomAccessReaderTest
         {
 
             MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
-            CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename
+ ".metadata", false, new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String,
String>emptyMap()), sstableMetadataCollector);
+            CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename
+ ".metadata", new CompressionParameters(SnappyCompressor.instance, 32, Collections.<String,
String>emptyMap()), sstableMetadataCollector);
 
             for (int i = 0; i < 20; i++)
                 writer.write("x".getBytes());
@@ -101,8 +104,8 @@ public class CompressedRandomAccessReaderTest
         {
             MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
             SequentialWriter writer = compressed
-                ? new CompressedSequentialWriter(f, filename + ".metadata", false, new CompressionParameters(SnappyCompressor.instance),
sstableMetadataCollector)
-                : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
+                ? new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(SnappyCompressor.instance),
sstableMetadataCollector)
+                : new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH);
 
             writer.write("The quick ".getBytes());
             FileMark mark = writer.mark();
@@ -151,7 +154,7 @@ public class CompressedRandomAccessReaderTest
         metadata.deleteOnExit();
 
         MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
-        SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(),
false, new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector);
+        SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(),
new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector);
 
         writer.write(CONTENT.getBytes());
         writer.close();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index e26d0f5..2dc07ec 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -79,7 +79,7 @@ public class LegacySSTableTest extends SchemaLoader
     protected Descriptor getDescriptor(String ver)
     {
         File directory = new File(LEGACY_SSTABLE_ROOT + File.separator + ver + File.separator
+ KSNAME);
-        return new Descriptor(ver, directory, KSNAME, CFNAME, 0, false);
+        return new Descriptor(ver, directory, KSNAME, CFNAME, 0, Descriptor.Type.FINAL);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index 292a51e..19a0b13 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -60,7 +60,7 @@ public class SSTableUtils
         File keyspaceDir = new File(tempdir, keyspaceName);
         keyspaceDir.mkdir();
         keyspaceDir.deleteOnExit();
-        File datafile = new File(new Descriptor(keyspaceDir, keyspaceName, cfname, generation,
false).filenameFor("Data.db"));
+        File datafile = new File(new Descriptor(keyspaceDir, keyspaceName, cfname, generation,
Descriptor.Type.FINAL).filenameFor("Data.db"));
         if (!datafile.createNewFile())
             throw new IOException("unable to create file " + datafile);
         datafile.deleteOnExit();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index da0e31a..7751a51 100644
--- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.io.sstable.metadata;
 
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -28,8 +27,8 @@ import java.util.Set;
 import com.google.common.collect.Sets;
 import org.junit.Test;
 
-import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.io.sstable.Component;
@@ -76,7 +75,7 @@ public class MetadataSerializerTest
             serializer.serialize(originalMetadata, out);
         }
 
-        Descriptor desc = new Descriptor(Descriptor.Version.CURRENT, statsFile.getParentFile(),
"", "", 0, false);
+        Descriptor desc = new Descriptor(Descriptor.Version.CURRENT, statsFile.getParentFile(),
"", "", 0, Descriptor.Type.FINAL);
         try (RandomAccessReader in = RandomAccessReader.open(statsFile))
         {
             Map<MetadataType, MetadataComponent> deserialized = serializer.deserialize(desc,
in, EnumSet.allOf(MetadataType.class));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
index 2a8c7a9..fb45dd3 100644
--- a/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
+++ b/test/unit/org/apache/cassandra/io/util/DataOutputTest.java
@@ -132,7 +132,7 @@ public class DataOutputTest
     public void testSequentialWriter() throws IOException
     {
         File file = FileUtils.createTempFile("dataoutput", "test");
-        final SequentialWriter writer = new SequentialWriter(file, 32, true);
+        final SequentialWriter writer = new SequentialWriter(file, 32);
         DataOutputStreamAndChannel write = new DataOutputStreamAndChannel(writer, writer);
         DataInput canon = testWrite(write);
         write.flush();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4e95953f/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
index 8d9480b..dbc1ec2 100644
--- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
+++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java
@@ -63,7 +63,7 @@ public class CompressedInputStreamTest
         Descriptor desc = Descriptor.fromFilename(tmp.getAbsolutePath());
         MetadataCollector collector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance));
         CompressionParameters param = new CompressionParameters(SnappyCompressor.instance,
32, Collections.EMPTY_MAP);
-        CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO),
false, param, collector);
+        CompressedSequentialWriter writer = new CompressedSequentialWriter(tmp, desc.filenameFor(Component.COMPRESSION_INFO),
param, collector);
         Map<Long, Long> index = new HashMap<Long, Long>();
         for (long l = 0L; l < 1000; l++)
         {


Mime
View raw message