cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [1/3] git commit: revert 27ed655fd0552055bd1c26c62c098c16501bc32b
Date Tue, 26 Mar 2013 18:12:25 GMT
Updated Branches:
  refs/heads/cassandra-1.2 e580ef104 -> 01bc564bb
  refs/heads/trunk d394ab14a -> b114ce74c


revert 27ed655fd0552055bd1c26c62c098c16501bc32b


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/01bc564b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/01bc564b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/01bc564b

Branch: refs/heads/cassandra-1.2
Commit: 01bc564bb2e89684b445f217a83c39f32113560a
Parents: e580ef1
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Tue Mar 26 13:11:19 2013 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Tue Mar 26 13:11:19 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    2 -
 conf/cassandra.yaml                                |    6 -
 src/java/org/apache/cassandra/config/Config.java   |    2 -
 .../cassandra/config/DatabaseDescriptor.java       |    5 -
 .../cassandra/db/commitlog/CommitLogReplayer.java  |    2 +-
 .../db/compaction/AbstractCompactionStrategy.java  |    2 +-
 .../cassandra/db/compaction/CompactionManager.java |    2 +-
 .../cassandra/db/compaction/CompactionTask.java    |    5 +-
 .../db/compaction/LeveledCompactionStrategy.java   |    6 +-
 .../apache/cassandra/db/compaction/Scrubber.java   |    4 +-
 .../io/compress/CompressedRandomAccessReader.java  |   10 +-
 .../apache/cassandra/io/sstable/KeyIterator.java   |    2 +-
 .../io/sstable/SSTableBoundedScanner.java          |    4 +-
 .../apache/cassandra/io/sstable/SSTableReader.java |   90 ++-------------
 .../cassandra/io/sstable/SSTableScanner.java       |   10 +-
 .../cassandra/io/util/RandomAccessReader.java      |   54 +++++++--
 .../apache/cassandra/streaming/FileStreamTask.java |    2 +-
 .../compress/CompressedFileStreamTask.java         |    2 +-
 .../org/apache/cassandra/tools/SSTableExport.java  |    4 +-
 src/java/org/apache/cassandra/utils/CLibrary.java  |   21 ----
 .../unit/org/apache/cassandra/db/KeyCacheTest.java |    4 +-
 .../compress/CompressedRandomAccessReaderTest.java |    8 +-
 .../apache/cassandra/io/sstable/SSTableTest.java   |    4 +-
 .../apache/cassandra/io/sstable/SSTableUtils.java  |    4 +-
 .../io/util/BufferedRandomAccessFileTest.java      |    4 +-
 25 files changed, 98 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e0d3869..184e70d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,8 +7,6 @@
  * Improve asynchronous hint delivery (CASSANDRA-5179)
  * Fix Guava dependency version (12.0 -> 13.0.1) for Maven (CASSANDRA-5364)
  * Validate that provided CQL3 collection value are < 64K (CASSANDRA-5355)
- * Change Kernel Page Cache skipping into row preheating (disabled by default)
-   (CASSANDRA-4937)
  * Make upgradeSSTable skip current version sstables by default (CASSANDRA-5366)
  * Optimize min/max timestamp collection (CASSANDRA-5373)
 Merged from 1.1:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 76e5901..2bf23a6 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -684,9 +684,3 @@ internode_compression: all
 # reducing overhead from the TCP protocol itself, at the cost of increasing
 # latency if you block for cross-datacenter responses.
 inter_dc_tcp_nodelay: true
-
-# Enable or disable kernel page cache preheating from contents of the key cache after compaction.
-# When enabled it would preheat only first "page" (4KB) of each row to optimize
-# for sequential access. Note: This could be harmful for fat rows, see CASSANDRA-4937
-# for further details on that topic.
-preheat_kernel_page_cache: false

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 5061042..34b26f5 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -167,8 +167,6 @@ public class Config
 
     public boolean inter_dc_tcp_nodelay = true;
 
-    public boolean preheat_kernel_page_cache = false;
-
     private static boolean loadYaml = true;
     private static boolean outboundBindAny = false;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 31e0b5a..ead60a0 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1263,9 +1263,4 @@ public class DatabaseDescriptor
     {
         return conf.inter_dc_tcp_nodelay;
     }
-
-    public static boolean shouldPreheatPageCache()
-    {
-        return conf.preheat_kernel_page_cache;
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
index 8dcdaad..2728970 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -121,7 +121,7 @@ public class CommitLogReplayer
         CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
         final long segment = desc.id;
         int version = desc.getMessagingVersion();
-        RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
+        RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()),
true);
         try
         {
             assert reader.length() <= Integer.MAX_VALUE;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 85b09c1..cb15109 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -152,7 +152,7 @@ public abstract class AbstractCompactionStrategy
     {
         ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>();
         for (SSTableReader sstable : sstables)
-            scanners.add(sstable.getScanner(range));
+            scanners.add(sstable.getDirectScanner(range));
         return scanners;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index db75bc1..f94c260 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -571,7 +571,7 @@ public class CompactionManager implements CompactionManagerMBean
             if (compactionFileLocation == null)
                 throw new IOException("disk full");
 
-            SSTableScanner scanner = sstable.getScanner();
+            SSTableScanner scanner = sstable.getDirectScanner();
             long rowsRead = 0;
             List<IColumn> indexedColumnsInRow = null;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 8c2af4d..75ea1cb 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -231,7 +231,10 @@ public class CompactionTask extends AbstractCompactionTask
         cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);
         // TODO: this doesn't belong here, it should be part of the reader to load when the
tracker is wired up
         for (SSTableReader sstable : sstables)
-            sstable.preheat(cachedKeyMap.get(sstable.descriptor));
+        {
+            for (Map.Entry<DecoratedKey, RowIndexEntry> entry : cachedKeyMap.get(sstable.descriptor).entrySet())
+               sstable.cacheKey(entry.getKey(), entry.getValue());
+        }
 
         if (logger.isInfoEnabled())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index d916c48..6a1bf4b 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -179,7 +179,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
implem
             {
                 // L0 makes no guarantees about overlapping-ness.  Just create a direct scanner
for each
                 for (SSTableReader sstable : byLevel.get(level))
-                    scanners.add(sstable.getScanner(range));
+                    scanners.add(sstable.getDirectScanner(range));
             }
             else
             {
@@ -209,7 +209,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
implem
             this.sstables = new ArrayList<SSTableReader>(sstables);
             Collections.sort(this.sstables, SSTable.sstableComparator);
             sstableIterator = this.sstables.iterator();
-            currentScanner = sstableIterator.next().getScanner(range);
+            currentScanner = sstableIterator.next().getDirectScanner(range);
 
             long length = 0;
             for (SSTableReader sstable : sstables)
@@ -234,7 +234,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
implem
                         currentScanner = null;
                         return endOfData();
                     }
-                    currentScanner = sstableIterator.next().getScanner(range);
+                    currentScanner = sstableIterator.next().getDirectScanner(range);
                 }
             }
             catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 30929cc..0601857 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -94,8 +94,8 @@ public class Scrubber implements Closeable
         // we'll also loop through the index at the same time, using the position from the
index to recover if the
         // row header (key or data size) is corrupt. (This means our position in the index
file will be one row
         // "ahead" of the data file.)
-        this.dataFile = sstable.openDataReader();
-        this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)));
+        this.dataFile = sstable.openDataReader(true);
+        this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)),
true);
         this.scrubInfo = new ScrubInfo(dataFile, sstable);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index aa686c2..bbd2466 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -42,7 +42,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
     {
         try
         {
-            return new CompressedRandomAccessReader(path, metadata, owner);
+            return new CompressedRandomAccessReader(path, metadata, false, owner);
         }
         catch (FileNotFoundException e)
         {
@@ -50,11 +50,11 @@ public class CompressedRandomAccessReader extends RandomAccessReader
         }
     }
 
-    public static CompressedRandomAccessReader open(String dataFilePath, CompressionMetadata
metadata)
+    public static CompressedRandomAccessReader open(String dataFilePath, CompressionMetadata
metadata, boolean skipIOCache)
     {
         try
         {
-            return new CompressedRandomAccessReader(dataFilePath, metadata, null);
+            return new CompressedRandomAccessReader(dataFilePath, metadata, skipIOCache,
null);
         }
         catch (FileNotFoundException e)
         {
@@ -73,9 +73,9 @@ public class CompressedRandomAccessReader extends RandomAccessReader
     // raw checksum bytes
     private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]);
 
-    private CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata,
PoolingSegmentedFile owner) throws FileNotFoundException
+    private CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata,
boolean skipIOCache, PoolingSegmentedFile owner) throws FileNotFoundException
     {
-        super(new File(dataFilePath), metadata.chunkLength(), owner);
+        super(new File(dataFilePath), metadata.chunkLength(), skipIOCache, owner);
         this.metadata = metadata;
         compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
index 9fe1309..e581b22 100644
--- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
@@ -38,7 +38,7 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements
Close
     {
         this.desc = desc;
         File path = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
-        in = RandomAccessReader.open(path);
+        in = RandomAccessReader.open(path, true);
     }
 
     protected DecoratedKey computeNext()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
index d5bec82..a571901 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
@@ -35,9 +35,9 @@ public class SSTableBoundedScanner extends SSTableScanner
     private final Iterator<Pair<Long, Long>> rangeIterator;
     private Pair<Long, Long> currentRange;
 
-    SSTableBoundedScanner(SSTableReader sstable, Iterator<Pair<Long, Long>> rangeIterator)
+    SSTableBoundedScanner(SSTableReader sstable, boolean skipCache, Iterator<Pair<Long,
Long>> rangeIterator)
     {
-        super(sstable);
+        super(sstable, skipCache);
         this.rangeIterator = rangeIterator;
         assert rangeIterator.hasNext(); // use EmptyCompactionScanner otherwise
         currentRange = rangeIterator.next();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index cec2e42..21a8673 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -350,7 +350,7 @@ public class SSTableReader extends SSTable
                                           : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
 
         // we read the positions in a BRAF so we don't have to worry about an entry spanning
a mmap boundary.
-        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
+        RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)),
true);
 
         // try to load summaries from the disk and check if we need
         // to read primary index because we should re-create a BloomFilter or pre-load KeyCache
@@ -696,29 +696,6 @@ public class SSTableReader extends SSTable
         keyCache.put(cacheKey, info);
     }
 
-    public void preheat(Map<DecoratedKey, RowIndexEntry> cachedKeys) throws IOException
-    {
-        RandomAccessFile f = new RandomAccessFile(getFilename(), "r");
-
-        try
-        {
-            int fd = CLibrary.getfd(f.getFD());
-
-            for (Map.Entry<DecoratedKey, RowIndexEntry> entry : cachedKeys.entrySet())
-            {
-                cacheKey(entry.getKey(), entry.getValue());
-
-                // add to the cache but don't do actual preheating if we have it disabled
in the config
-                if (DatabaseDescriptor.shouldPreheatPageCache() && fd > 0)
-                    CLibrary.preheatPage(fd, entry.getValue().position);
-            }
-        }
-        finally
-        {
-            FileUtils.closeQuietly(f);
-        }
-    }
-
     public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
     {
         return getCachedPosition(new KeyCacheKey(descriptor, key.key), updateStats);
@@ -918,15 +895,6 @@ public class SSTableReader extends SSTable
     {
         if (references.decrementAndGet() == 0 && isCompacted.get())
         {
-            /**
-             * Make OS a favour and suggest (using fadvice call) that we
-             * don't want to see pages of this SSTable in memory anymore.
-             *
-             * NOTE: We can't use madvice in java because it requires address of
-             * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on
it
-             */
-            dropPageCache();
-
             // Force finalizing mmapping if necessary
             ifile.cleanup();
             dfile.cleanup();
@@ -978,12 +946,12 @@ public class SSTableReader extends SSTable
     }
 
    /**
-    * I/O SSTableScanner
+    * Direct I/O SSTableScanner
     * @return A Scanner for seeking over the rows of the SSTable.
     */
-    public SSTableScanner getScanner()
+    public SSTableScanner getDirectScanner()
     {
-        return new SSTableScanner(this);
+        return new SSTableScanner(this, true);
     }
 
    /**
@@ -992,14 +960,14 @@ public class SSTableReader extends SSTable
     * @param range the range of keys to cover
     * @return A Scanner for seeking over the rows of the SSTable.
     */
-    public ICompactionScanner getScanner(Range<Token> range)
+    public ICompactionScanner getDirectScanner(Range<Token> range)
     {
         if (range == null)
-            return getScanner();
+            return getDirectScanner();
 
         Iterator<Pair<Long, Long>> rangeIterator = getPositionsForRanges(Collections.singletonList(range)).iterator();
         return rangeIterator.hasNext()
-               ? new SSTableBoundedScanner(this, rangeIterator)
+               ? new SSTableBoundedScanner(this, true, rangeIterator)
                : new EmptyCompactionScanner(getFilename());
     }
 
@@ -1152,16 +1120,16 @@ public class SSTableReader extends SSTable
         return sstableMetadata.ancestors;
     }
 
-    public RandomAccessReader openDataReader()
+    public RandomAccessReader openDataReader(boolean skipIOCache)
     {
         return compression
-               ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata())
-               : RandomAccessReader.open(new File(getFilename()));
+               ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata(),
skipIOCache)
+               : RandomAccessReader.open(new File(getFilename()), skipIOCache);
     }
 
-    public RandomAccessReader openIndexReader()
+    public RandomAccessReader openIndexReader(boolean skipIOCache)
     {
-        return RandomAccessReader.open(new File(getIndexFilename()));
+        return RandomAccessReader.open(new File(getIndexFilename()), skipIOCache);
     }
 
     /**
@@ -1252,38 +1220,4 @@ public class SSTableReader extends SSTable
             throw new UnsupportedOperationException();
         }
     }
-
-    private void dropPageCache()
-    {
-        dropPageCache(dfile.path);
-        dropPageCache(ifile.path);
-    }
-
-    private void dropPageCache(String filePath)
-    {
-        RandomAccessFile file = null;
-
-        try
-        {
-            file = new RandomAccessFile(filePath, "r");
-
-            int fd = CLibrary.getfd(file.getFD());
-
-            if (fd > 0)
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug(String.format("Dropping page cache of file %s.", filePath));
-
-                CLibrary.trySkipCache(fd, 0, 0);
-            }
-        }
-        catch (IOException e)
-        {
-            // we don't care if cache cleanup fails
-        }
-        finally
-        {
-            FileUtils.closeQuietly(file);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 949acda..22ac485 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -46,10 +46,10 @@ public class SSTableScanner implements ICompactionScanner
     /**
      * @param sstable SSTable to scan.
      */
-    SSTableScanner(SSTableReader sstable)
+    SSTableScanner(SSTableReader sstable, boolean skipCache)
     {
-        this.dfile = sstable.openDataReader();
-        this.ifile = sstable.openIndexReader();
+        this.dfile = sstable.openDataReader(skipCache);
+        this.ifile = sstable.openIndexReader(skipCache);
         this.sstable = sstable;
         this.filter = null;
     }
@@ -60,8 +60,8 @@ public class SSTableScanner implements ICompactionScanner
      */
     SSTableScanner(SSTableReader sstable, QueryFilter filter)
     {
-        this.dfile = sstable.openDataReader();
-        this.ifile = sstable.openIndexReader();
+        this.dfile = sstable.openDataReader(false);
+        this.ifile = sstable.openIndexReader(false);
         this.sstable = sstable;
         this.filter = filter;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
index 4d7bfbb..3210372 100644
--- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java
@@ -50,11 +50,19 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
     // channel liked with the file, used to retrieve data and force updates.
     protected final FileChannel channel;
 
+    private final boolean skipIOCache;
+
+    // file descriptor
+    private final int fd;
+
+    // used if skip I/O cache was enabled
+    private long bytesSinceCacheFlush = 0;
+
     private final long fileLength;
 
     protected final PoolingSegmentedFile owner;
 
-    protected RandomAccessReader(File file, int bufferSize, PoolingSegmentedFile owner) throws
FileNotFoundException
+    protected RandomAccessReader(File file, int bufferSize, boolean skipIOCache, PoolingSegmentedFile
owner) throws FileNotFoundException
     {
         super(file, "r");
 
@@ -66,9 +74,19 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
         // allocating required size of the buffer
         if (bufferSize <= 0)
             throw new IllegalArgumentException("bufferSize must be positive");
-
         buffer = new byte[bufferSize];
 
+        this.skipIOCache = skipIOCache;
+        try
+        {
+            fd = CLibrary.getfd(getFD());
+        }
+        catch (IOException e)
+        {
+            // fd == null, Not Supposed To Happen
+            throw new RuntimeException(e);
+        }
+
         // we can cache file length in read-only mode
         try
         {
@@ -81,22 +99,27 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
         validBufferBytes = -1; // that will trigger reBuffer() on demand by read/seek operations
     }
 
+    public static RandomAccessReader open(File file)
+    {
+        return open(file, false);
+    }
+
     public static RandomAccessReader open(File file, PoolingSegmentedFile owner)
     {
-        return open(file, DEFAULT_BUFFER_SIZE, owner);
+        return open(file, DEFAULT_BUFFER_SIZE, false, owner);
     }
 
-    public static RandomAccessReader open(File file)
+    public static RandomAccessReader open(File file, boolean skipIOCache)
     {
-        return open(file, DEFAULT_BUFFER_SIZE, null);
+        return open(file, DEFAULT_BUFFER_SIZE, skipIOCache, null);
     }
 
     @VisibleForTesting
-    static RandomAccessReader open(File file, int bufferSize, PoolingSegmentedFile owner)
+    static RandomAccessReader open(File file, int bufferSize, boolean skipIOCache, PoolingSegmentedFile
owner)
     {
         try
         {
-            return new RandomAccessReader(file, bufferSize, owner);
+            return new RandomAccessReader(file, bufferSize, skipIOCache, owner);
         }
         catch (FileNotFoundException e)
         {
@@ -107,7 +130,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
     @VisibleForTesting
     static RandomAccessReader open(SequentialWriter writer)
     {
-        return open(new File(writer.getPath()), DEFAULT_BUFFER_SIZE, null);
+        return open(new File(writer.getPath()), DEFAULT_BUFFER_SIZE, false, null);
     }
 
     /**
@@ -135,11 +158,21 @@ public class RandomAccessReader extends RandomAccessFile implements
FileDataInpu
             }
 
             validBufferBytes = read;
+            bytesSinceCacheFlush += read;
         }
         catch (IOException e)
         {
             throw new FSReadError(e, filePath);
         }
+
+        if (skipIOCache && bytesSinceCacheFlush >= CACHE_FLUSH_INTERVAL_IN_BYTES)
+        {
+            // with random I/O we can't control what we are skipping so
+            // it will be more appropriate to just skip a whole file after
+            // we reach threshold
+            CLibrary.trySkipCache(this.fd, 0, 0);
+            bytesSinceCacheFlush = 0;
+        }
     }
 
     @Override
@@ -231,6 +264,9 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
     {
         buffer = null; // makes sure we don't use this after it's ostensibly closed
 
+        if (skipIOCache && bytesSinceCacheFlush > 0)
+            CLibrary.trySkipCache(fd, 0, 0);
+
         try
         {
             super.close();
@@ -244,7 +280,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu
     @Override
     public String toString()
     {
-        return getClass().getSimpleName() + "(" + "filePath='" + filePath + "')";
+        return getClass().getSimpleName() + "(" + "filePath='" + filePath + "'" + ", skipIOCache="
+ skipIOCache + ")";
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
index 979b2e1..67d5c35 100644
--- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java
@@ -139,7 +139,7 @@ public class FileStreamTask extends WrappedRunnable
             return;
 
         // try to skip kernel page cache if possible
-        RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename()));
+        RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename()),
true);
 
         // setting up data compression stream
         compressedoutput = new LZFOutputStream(output);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
b/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
index c1818ed..dda9d7d 100644
--- a/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
+++ b/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
@@ -65,7 +65,7 @@ public class CompressedFileStreamTask extends FileStreamTask
         ByteBuffer headerBuffer = MessagingService.instance().constructStreamHeader(header,
false, MessagingService.instance().getVersion(to));
         socket.getOutputStream().write(ByteBufferUtil.getArray(headerBuffer));
 
-        RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename()));
+        RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename()),
true);
         FileChannel fc = file.getChannel();
 
         StreamingMetrics.activeStreamsOutbound.inc();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java
index 2e117ef..51cdc72 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@ -349,7 +349,7 @@ public class SSTableExport
     public static void export(Descriptor desc, PrintStream outs, Collection<String>
toExport, String[] excludes) throws IOException
     {
         SSTableReader reader = SSTableReader.open(desc);
-        SSTableScanner scanner = reader.getScanner();
+        SSTableScanner scanner = reader.getDirectScanner();
 
         IPartitioner<?> partitioner = reader.partitioner;
 
@@ -406,7 +406,7 @@ public class SSTableExport
 
 
         SSTableIdentityIterator row;
-        SSTableScanner scanner = reader.getScanner();
+        SSTableScanner scanner = reader.getDirectScanner();
 
         outs.println("[");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/utils/CLibrary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/CLibrary.java b/src/java/org/apache/cassandra/utils/CLibrary.java
index 2f6e088..f8ad9d1 100644
--- a/src/java/org/apache/cassandra/utils/CLibrary.java
+++ b/src/java/org/apache/cassandra/utils/CLibrary.java
@@ -326,25 +326,4 @@ public final class CLibrary
 
         return -1;
     }
-
-    /**
-     * Suggest kernel to preheat one page for the given file.
-     *
-     * @param fd The file descriptor of file to preheat.
-     * @param position The offset of the block.
-     *
-     * @return On success, zero is returned. On error, an error number is returned.
-     */
-    public static int preheatPage(int fd, long position)
-    {
-        try
-        {
-            // 4096 is good for SSD because they operate on "Pages" 4KB in size
-            return posix_fadvise(fd, position, 4096, POSIX_FADV_WILLNEED);
-        }
-        catch (UnsatisfiedLinkError e)
-        {
-            return -1;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
index b05a607..93f1fea 100644
--- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java
+++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java
@@ -130,13 +130,13 @@ public class KeyCacheTest extends SchemaLoader
                                                        false,
                                                        10));
 
-        assertEquals(2, CacheService.instance.keyCache.size());
+        assert CacheService.instance.keyCache.size() == 2;
 
         Util.compactAll(cfs).get();
         keyCacheSize = CacheService.instance.keyCache.size();
         // after compaction cache should have entries for
         // new SSTables, if we had 2 keys in cache previously it should become 4
-        assertEquals(4, keyCacheSize);
+        assert keyCacheSize == 4 : keyCacheSize;
 
         // re-read same keys to verify that key cache didn't grow further
         cfs.getColumnFamily(QueryFilter.getSliceFilter(key1,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index 437b778..830c3e1 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -75,7 +75,7 @@ public class CompressedRandomAccessReaderTest
 
             assert f.exists();
             RandomAccessReader reader = compressed
-                                      ? CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename
+ ".metadata", f.length()))
+                                      ? CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename
+ ".metadata", f.length()), false)
                                       : RandomAccessReader.open(f);
             String expected = "The quick brown fox jumps over the lazy dog";
             assertEquals(expected.length(), reader.length());
@@ -115,7 +115,7 @@ public class CompressedRandomAccessReaderTest
         CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length());
         CompressionMetadata.Chunk chunk = meta.chunkFor(0);
 
-        RandomAccessReader reader = CompressedRandomAccessReader.open(file.getPath(), meta);
+        RandomAccessReader reader = CompressedRandomAccessReader.open(file.getPath(), meta,
false);
         // read and verify compressed data
         assertEquals(CONTENT, reader.readLine());
         // close reader
@@ -142,7 +142,7 @@ public class CompressedRandomAccessReaderTest
                 checksumModifier.write(random.nextInt());
                 checksumModifier.getFD().sync(); // making sure that change was synced with
disk
 
-                final RandomAccessReader r = CompressedRandomAccessReader.open(file.getPath(),
meta);
+                final RandomAccessReader r = CompressedRandomAccessReader.open(file.getPath(),
meta, false);
 
                 Throwable exception = null;
                 try
@@ -163,7 +163,7 @@ public class CompressedRandomAccessReaderTest
             // lets write original checksum and check if we can read data
             updateChecksum(checksumModifier, chunk.length, checksum);
 
-            reader = CompressedRandomAccessReader.open(file.getPath(), meta);
+            reader = CompressedRandomAccessReader.open(file.getPath(), meta, false);
             // read and verify compressed data
             assertEquals(CONTENT, reader.readLine());
             // close reader

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
index e944ed2..12f2747 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
@@ -56,7 +56,7 @@ public class SSTableTest extends SchemaLoader
 
     private void verifySingle(SSTableReader sstable, ByteBuffer bytes, ByteBuffer key) throws
IOException
     {
-        RandomAccessReader file = sstable.openDataReader();
+        RandomAccessReader file = sstable.openDataReader(false);
         file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ).position);
         assert key.equals(ByteBufferUtil.readWithShortLength(file));
         int size = (int)SSTableReader.readRowSize(file, sstable.descriptor);
@@ -98,7 +98,7 @@ public class SSTableTest extends SchemaLoader
     {
         List<ByteBuffer> keys = new ArrayList<ByteBuffer>(map.keySet());
         //Collections.shuffle(keys);
-        RandomAccessReader file = sstable.openDataReader();
+        RandomAccessReader file = sstable.openDataReader(false);
         for (ByteBuffer key : keys)
         {
             file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ).position);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
index fcfaeec..2b0a13a 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
@@ -72,8 +72,8 @@ public class SSTableUtils
 
     public static void assertContentEquals(SSTableReader lhs, SSTableReader rhs) throws IOException
     {
-        SSTableScanner slhs = lhs.getScanner();
-        SSTableScanner srhs = rhs.getScanner();
+        SSTableScanner slhs = lhs.getDirectScanner();
+        SSTableScanner srhs = rhs.getDirectScanner();
         while (slhs.hasNext())
         {
             OnDiskAtomIterator ilhs = slhs.next();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
index 90c27e3..8059bbd 100644
--- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
+++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
@@ -338,7 +338,7 @@ public class BufferedRandomAccessFileTest
             for (final int offset : Arrays.asList(0, 8))
             {
                 File file1 = writeTemporaryFile(new byte[16]);
-                final RandomAccessReader file = RandomAccessReader.open(file1, bufferSize,
null);
+                final RandomAccessReader file = RandomAccessReader.open(file1, bufferSize,
false, null);
                 expectEOF(new Callable<Object>()
                 {
                     public Object call() throws IOException
@@ -353,7 +353,7 @@ public class BufferedRandomAccessFileTest
             for (final int n : Arrays.asList(1, 2, 4, 8))
             {
                 File file1 = writeTemporaryFile(new byte[16]);
-                final RandomAccessReader file = RandomAccessReader.open(file1, bufferSize,
null);
+                final RandomAccessReader file = RandomAccessReader.open(file1, bufferSize,
false, null);
                 expectEOF(new Callable<Object>()
                 {
                     public Object call() throws IOException


Mime
View raw message