Return-Path: X-Original-To: apmail-cassandra-commits-archive@www.apache.org Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 49427F352 for ; Tue, 26 Mar 2013 18:12:26 +0000 (UTC) Received: (qmail 98857 invoked by uid 500); 26 Mar 2013 18:12:26 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 98828 invoked by uid 500); 26 Mar 2013 18:12:26 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 98803 invoked by uid 99); 26 Mar 2013 18:12:25 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Mar 2013 18:12:25 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A95B0821014; Tue, 26 Mar 2013 18:12:25 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jbellis@apache.org To: commits@cassandra.apache.org Date: Tue, 26 Mar 2013 18:12:25 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] git commit: revert 27ed655fd0552055bd1c26c62c098c16501bc32b Updated Branches: refs/heads/cassandra-1.2 e580ef104 -> 01bc564bb refs/heads/trunk d394ab14a -> b114ce74c revert 27ed655fd0552055bd1c26c62c098c16501bc32b Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/01bc564b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/01bc564b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/01bc564b Branch: refs/heads/cassandra-1.2 Commit: 01bc564bb2e89684b445f217a83c39f32113560a Parents: e580ef1 Author: Jonathan Ellis Authored: Tue Mar 26 13:11:19 2013 -0500 Committer: Jonathan Ellis Committed: Tue Mar 26 13:11:19 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 - conf/cassandra.yaml | 6 - src/java/org/apache/cassandra/config/Config.java | 2 - .../cassandra/config/DatabaseDescriptor.java | 5 - .../cassandra/db/commitlog/CommitLogReplayer.java | 2 +- .../db/compaction/AbstractCompactionStrategy.java | 2 +- .../cassandra/db/compaction/CompactionManager.java | 2 +- .../cassandra/db/compaction/CompactionTask.java | 5 +- .../db/compaction/LeveledCompactionStrategy.java | 6 +- .../apache/cassandra/db/compaction/Scrubber.java | 4 +- .../io/compress/CompressedRandomAccessReader.java | 10 +- .../apache/cassandra/io/sstable/KeyIterator.java | 2 +- .../io/sstable/SSTableBoundedScanner.java | 4 +- .../apache/cassandra/io/sstable/SSTableReader.java | 90 ++------------- .../cassandra/io/sstable/SSTableScanner.java | 10 +- .../cassandra/io/util/RandomAccessReader.java | 54 +++++++-- .../apache/cassandra/streaming/FileStreamTask.java | 2 +- .../compress/CompressedFileStreamTask.java | 2 +- .../org/apache/cassandra/tools/SSTableExport.java | 4 +- src/java/org/apache/cassandra/utils/CLibrary.java | 21 ---- .../unit/org/apache/cassandra/db/KeyCacheTest.java | 4 +- .../compress/CompressedRandomAccessReaderTest.java | 8 +- .../apache/cassandra/io/sstable/SSTableTest.java | 4 +- .../apache/cassandra/io/sstable/SSTableUtils.java | 4 +- .../io/util/BufferedRandomAccessFileTest.java | 4 +- 25 files changed, 98 insertions(+), 161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e0d3869..184e70d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,8 +7,6 @@ * Improve asynchronous hint delivery (CASSANDRA-5179) * Fix Guava dependency version (12.0 -> 13.0.1) for Maven (CASSANDRA-5364) * Validate that provided CQL3 collection value are < 64K (CASSANDRA-5355) - * Change Kernel Page Cache skipping into row preheating (disabled by default) - (CASSANDRA-4937) * Make upgradeSSTable skip current version sstables by default (CASSANDRA-5366) * Optimize min/max timestamp collection (CASSANDRA-5373) Merged from 1.1: http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 76e5901..2bf23a6 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -684,9 +684,3 @@ internode_compression: all # reducing overhead from the TCP protocol itself, at the cost of increasing # latency if you block for cross-datacenter responses. inter_dc_tcp_nodelay: true - -# Enable or disable kernel page cache preheating from contents of the key cache after compaction. -# When enabled it would preheat only first "page" (4KB) of each row to optimize -# for sequential access. Note: This could be harmful for fat rows, see CASSANDRA-4937 -# for further details on that topic. -preheat_kernel_page_cache: false http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 5061042..34b26f5 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -167,8 +167,6 @@ public class Config public boolean inter_dc_tcp_nodelay = true; - public boolean preheat_kernel_page_cache = false; - private static boolean loadYaml = true; private static boolean outboundBindAny = false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 31e0b5a..ead60a0 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1263,9 +1263,4 @@ public class DatabaseDescriptor { return conf.inter_dc_tcp_nodelay; } - - public static boolean shouldPreheatPageCache() - { - return conf.preheat_kernel_page_cache; - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 8dcdaad..2728970 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -121,7 +121,7 @@ public class CommitLogReplayer CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); final long segment = desc.id; int version = desc.getMessagingVersion(); - RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath())); + RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()), true); try { assert reader.length() <= Integer.MAX_VALUE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java index 85b09c1..cb15109 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java @@ -152,7 +152,7 @@ public abstract class AbstractCompactionStrategy { ArrayList scanners = new ArrayList(); for (SSTableReader sstable : sstables) - scanners.add(sstable.getScanner(range)); + scanners.add(sstable.getDirectScanner(range)); return scanners; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index db75bc1..f94c260 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -571,7 +571,7 @@ public class CompactionManager implements CompactionManagerMBean if (compactionFileLocation == null) throw new IOException("disk full"); - SSTableScanner scanner = sstable.getScanner(); + SSTableScanner scanner = sstable.getDirectScanner(); long rowsRead = 0; List indexedColumnsInRow = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 8c2af4d..75ea1cb 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -231,7 +231,10 @@ public class CompactionTask extends AbstractCompactionTask cfs.replaceCompactedSSTables(toCompact, sstables, compactionType); // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up for (SSTableReader sstable : sstables) - sstable.preheat(cachedKeyMap.get(sstable.descriptor)); + { + for (Map.Entry entry : cachedKeyMap.get(sstable.descriptor).entrySet()) + sstable.cacheKey(entry.getKey(), entry.getValue()); + } if (logger.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java index d916c48..6a1bf4b 100644 --- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java @@ -179,7 +179,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem { // L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each for (SSTableReader sstable : byLevel.get(level)) - scanners.add(sstable.getScanner(range)); + scanners.add(sstable.getDirectScanner(range)); } else { @@ -209,7 +209,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem this.sstables = new ArrayList(sstables); Collections.sort(this.sstables, SSTable.sstableComparator); sstableIterator = this.sstables.iterator(); - currentScanner = sstableIterator.next().getScanner(range); + currentScanner = sstableIterator.next().getDirectScanner(range); long length = 0; for (SSTableReader sstable : sstables) @@ -234,7 +234,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy implem currentScanner = null; return endOfData(); } - currentScanner = sstableIterator.next().getScanner(range); + currentScanner = sstableIterator.next().getDirectScanner(range); } } catch (IOException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index 30929cc..0601857 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -94,8 +94,8 @@ public class Scrubber implements Closeable // we'll also loop through the index at the same time, using the position from the index to recover if the // row header (key or data size) is corrupt. (This means our position in the index file will be one row // "ahead" of the data file.) - this.dataFile = sstable.openDataReader(); - this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX))); + this.dataFile = sstable.openDataReader(true); + this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true); this.scrubInfo = new ScrubInfo(dataFile, sstable); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java index aa686c2..bbd2466 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java @@ -42,7 +42,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader { try { - return new CompressedRandomAccessReader(path, metadata, owner); + return new CompressedRandomAccessReader(path, metadata, false, owner); } catch (FileNotFoundException e) { @@ -50,11 +50,11 @@ public class CompressedRandomAccessReader extends RandomAccessReader } } - public static CompressedRandomAccessReader open(String dataFilePath, CompressionMetadata metadata) + public static CompressedRandomAccessReader open(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache) { try { - return new CompressedRandomAccessReader(dataFilePath, metadata, null); + return new CompressedRandomAccessReader(dataFilePath, metadata, skipIOCache, null); } catch (FileNotFoundException e) { @@ -73,9 +73,9 @@ public class CompressedRandomAccessReader extends RandomAccessReader // raw checksum bytes private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]); - private CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, PoolingSegmentedFile owner) throws FileNotFoundException + private CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache, PoolingSegmentedFile owner) throws FileNotFoundException { - super(new File(dataFilePath), metadata.chunkLength(), owner); + super(new File(dataFilePath), metadata.chunkLength(), skipIOCache, owner); this.metadata = metadata; compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java index 9fe1309..e581b22 100644 --- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java @@ -38,7 +38,7 @@ public class KeyIterator extends AbstractIterator implements Close { this.desc = desc; File path = new File(desc.filenameFor(SSTable.COMPONENT_INDEX)); - in = RandomAccessReader.open(path); + in = RandomAccessReader.open(path, true); } protected DecoratedKey computeNext() http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java index d5bec82..a571901 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java @@ -35,9 +35,9 @@ public class SSTableBoundedScanner extends SSTableScanner private final Iterator> rangeIterator; private Pair currentRange; - SSTableBoundedScanner(SSTableReader sstable, Iterator> rangeIterator) + SSTableBoundedScanner(SSTableReader sstable, boolean skipCache, Iterator> rangeIterator) { - super(sstable); + super(sstable, skipCache); this.rangeIterator = rangeIterator; assert rangeIterator.hasNext(); // use EmptyCompactionScanner otherwise currentRange = rangeIterator.next(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index cec2e42..21a8673 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -350,7 +350,7 @@ public class SSTableReader extends SSTable : SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary. - RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); + RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true); // try to load summaries from the disk and check if we need // to read primary index because we should re-create a BloomFilter or pre-load KeyCache @@ -696,29 +696,6 @@ public class SSTableReader extends SSTable keyCache.put(cacheKey, info); } - public void preheat(Map cachedKeys) throws IOException - { - RandomAccessFile f = new RandomAccessFile(getFilename(), "r"); - - try - { - int fd = CLibrary.getfd(f.getFD()); - - for (Map.Entry entry : cachedKeys.entrySet()) - { - cacheKey(entry.getKey(), entry.getValue()); - - // add to the cache but don't do actual preheating if we have it disabled in the config - if (DatabaseDescriptor.shouldPreheatPageCache() && fd > 0) - CLibrary.preheatPage(fd, entry.getValue().position); - } - } - finally - { - FileUtils.closeQuietly(f); - } - } - public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats) { return getCachedPosition(new KeyCacheKey(descriptor, key.key), updateStats); @@ -918,15 +895,6 @@ public class SSTableReader extends SSTable { if (references.decrementAndGet() == 0 && isCompacted.get()) { - /** - * Make OS a favour and suggest (using fadvice call) that we - * don't want to see pages of this SSTable in memory anymore. - * - * NOTE: We can't use madvice in java because it requires address of - * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it - */ - dropPageCache(); - // Force finalizing mmapping if necessary ifile.cleanup(); dfile.cleanup(); @@ -978,12 +946,12 @@ public class SSTableReader extends SSTable } /** - * I/O SSTableScanner + * Direct I/O SSTableScanner * @return A Scanner for seeking over the rows of the SSTable. */ - public SSTableScanner getScanner() + public SSTableScanner getDirectScanner() { - return new SSTableScanner(this); + return new SSTableScanner(this, true); } /** @@ -992,14 +960,14 @@ public class SSTableReader extends SSTable * @param range the range of keys to cover * @return A Scanner for seeking over the rows of the SSTable. */ - public ICompactionScanner getScanner(Range range) + public ICompactionScanner getDirectScanner(Range range) { if (range == null) - return getScanner(); + return getDirectScanner(); Iterator> rangeIterator = getPositionsForRanges(Collections.singletonList(range)).iterator(); return rangeIterator.hasNext() - ? new SSTableBoundedScanner(this, rangeIterator) + ? new SSTableBoundedScanner(this, true, rangeIterator) : new EmptyCompactionScanner(getFilename()); } @@ -1152,16 +1120,16 @@ public class SSTableReader extends SSTable return sstableMetadata.ancestors; } - public RandomAccessReader openDataReader() + public RandomAccessReader openDataReader(boolean skipIOCache) { return compression - ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata()) - : RandomAccessReader.open(new File(getFilename())); + ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata(), skipIOCache) + : RandomAccessReader.open(new File(getFilename()), skipIOCache); } - public RandomAccessReader openIndexReader() + public RandomAccessReader openIndexReader(boolean skipIOCache) { - return RandomAccessReader.open(new File(getIndexFilename())); + return RandomAccessReader.open(new File(getIndexFilename()), skipIOCache); } /** @@ -1252,38 +1220,4 @@ public class SSTableReader extends SSTable throw new UnsupportedOperationException(); } } - - private void dropPageCache() - { - dropPageCache(dfile.path); - dropPageCache(ifile.path); - } - - private void dropPageCache(String filePath) - { - RandomAccessFile file = null; - - try - { - file = new RandomAccessFile(filePath, "r"); - - int fd = CLibrary.getfd(file.getFD()); - - if (fd > 0) - { - if (logger.isDebugEnabled()) - logger.debug(String.format("Dropping page cache of file %s.", filePath)); - - CLibrary.trySkipCache(fd, 0, 0); - } - } - catch (IOException e) - { - // we don't care if cache cleanup fails - } - finally - { - FileUtils.closeQuietly(file); - } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java index 949acda..22ac485 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java @@ -46,10 +46,10 @@ public class SSTableScanner implements ICompactionScanner /** * @param sstable SSTable to scan. */ - SSTableScanner(SSTableReader sstable) + SSTableScanner(SSTableReader sstable, boolean skipCache) { - this.dfile = sstable.openDataReader(); - this.ifile = sstable.openIndexReader(); + this.dfile = sstable.openDataReader(skipCache); + this.ifile = sstable.openIndexReader(skipCache); this.sstable = sstable; this.filter = null; } @@ -60,8 +60,8 @@ public class SSTableScanner implements ICompactionScanner */ SSTableScanner(SSTableReader sstable, QueryFilter filter) { - this.dfile = sstable.openDataReader(); - this.ifile = sstable.openIndexReader(); + this.dfile = sstable.openDataReader(false); + this.ifile = sstable.openIndexReader(false); this.sstable = sstable; this.filter = filter; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java index 4d7bfbb..3210372 100644 --- a/src/java/org/apache/cassandra/io/util/RandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/util/RandomAccessReader.java @@ -50,11 +50,19 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu // channel liked with the file, used to retrieve data and force updates. protected final FileChannel channel; + private final boolean skipIOCache; + + // file descriptor + private final int fd; + + // used if skip I/O cache was enabled + private long bytesSinceCacheFlush = 0; + private final long fileLength; protected final PoolingSegmentedFile owner; - protected RandomAccessReader(File file, int bufferSize, PoolingSegmentedFile owner) throws FileNotFoundException + protected RandomAccessReader(File file, int bufferSize, boolean skipIOCache, PoolingSegmentedFile owner) throws FileNotFoundException { super(file, "r"); @@ -66,9 +74,19 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu // allocating required size of the buffer if (bufferSize <= 0) throw new IllegalArgumentException("bufferSize must be positive"); - buffer = new byte[bufferSize]; + this.skipIOCache = skipIOCache; + try + { + fd = CLibrary.getfd(getFD()); + } + catch (IOException e) + { + // fd == null, Not Supposed To Happen + throw new RuntimeException(e); + } + // we can cache file length in read-only mode try { @@ -81,22 +99,27 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu validBufferBytes = -1; // that will trigger reBuffer() on demand by read/seek operations } + public static RandomAccessReader open(File file) + { + return open(file, false); + } + public static RandomAccessReader open(File file, PoolingSegmentedFile owner) { - return open(file, DEFAULT_BUFFER_SIZE, owner); + return open(file, DEFAULT_BUFFER_SIZE, false, owner); } - public static RandomAccessReader open(File file) + public static RandomAccessReader open(File file, boolean skipIOCache) { - return open(file, DEFAULT_BUFFER_SIZE, null); + return open(file, DEFAULT_BUFFER_SIZE, skipIOCache, null); } @VisibleForTesting - static RandomAccessReader open(File file, int bufferSize, PoolingSegmentedFile owner) + static RandomAccessReader open(File file, int bufferSize, boolean skipIOCache, PoolingSegmentedFile owner) { try { - return new RandomAccessReader(file, bufferSize, owner); + return new RandomAccessReader(file, bufferSize, skipIOCache, owner); } catch (FileNotFoundException e) { @@ -107,7 +130,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu @VisibleForTesting static RandomAccessReader open(SequentialWriter writer) { - return open(new File(writer.getPath()), DEFAULT_BUFFER_SIZE, null); + return open(new File(writer.getPath()), DEFAULT_BUFFER_SIZE, false, null); } /** @@ -135,11 +158,21 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu } validBufferBytes = read; + bytesSinceCacheFlush += read; } catch (IOException e) { throw new FSReadError(e, filePath); } + + if (skipIOCache && bytesSinceCacheFlush >= CACHE_FLUSH_INTERVAL_IN_BYTES) + { + // with random I/O we can't control what we are skipping so + // it will be more appropriate to just skip a whole file after + // we reach threshold + CLibrary.trySkipCache(this.fd, 0, 0); + bytesSinceCacheFlush = 0; + } } @Override @@ -231,6 +264,9 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu { buffer = null; // makes sure we don't use this after it's ostensibly closed + if (skipIOCache && bytesSinceCacheFlush > 0) + CLibrary.trySkipCache(fd, 0, 0); + try { super.close(); @@ -244,7 +280,7 @@ public class RandomAccessReader extends RandomAccessFile implements FileDataInpu @Override public String toString() { - return getClass().getSimpleName() + "(" + "filePath='" + filePath + "')"; + return getClass().getSimpleName() + "(" + "filePath='" + filePath + "'" + ", skipIOCache=" + skipIOCache + ")"; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/streaming/FileStreamTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java index 979b2e1..67d5c35 100644 --- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java +++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java @@ -139,7 +139,7 @@ public class FileStreamTask extends WrappedRunnable return; // try to skip kernel page cache if possible - RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename())); + RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename()), true); // setting up data compression stream compressedoutput = new LZFOutputStream(output); http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java b/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java index c1818ed..dda9d7d 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java @@ -65,7 +65,7 @@ public class CompressedFileStreamTask extends FileStreamTask ByteBuffer headerBuffer = MessagingService.instance().constructStreamHeader(header, false, MessagingService.instance().getVersion(to)); socket.getOutputStream().write(ByteBufferUtil.getArray(headerBuffer)); - RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename())); + RandomAccessReader file = RandomAccessReader.open(new File(header.file.getFilename()), true); FileChannel fc = file.getChannel(); StreamingMetrics.activeStreamsOutbound.inc(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/tools/SSTableExport.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java index 2e117ef..51cdc72 100644 --- a/src/java/org/apache/cassandra/tools/SSTableExport.java +++ b/src/java/org/apache/cassandra/tools/SSTableExport.java @@ -349,7 +349,7 @@ public class SSTableExport public static void export(Descriptor desc, PrintStream outs, Collection toExport, String[] excludes) throws IOException { SSTableReader reader = SSTableReader.open(desc); - SSTableScanner scanner = reader.getScanner(); + SSTableScanner scanner = reader.getDirectScanner(); IPartitioner partitioner = reader.partitioner; @@ -406,7 +406,7 @@ public class SSTableExport SSTableIdentityIterator row; - SSTableScanner scanner = reader.getScanner(); + SSTableScanner scanner = reader.getDirectScanner(); outs.println("["); http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/src/java/org/apache/cassandra/utils/CLibrary.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/CLibrary.java b/src/java/org/apache/cassandra/utils/CLibrary.java index 2f6e088..f8ad9d1 100644 --- a/src/java/org/apache/cassandra/utils/CLibrary.java +++ b/src/java/org/apache/cassandra/utils/CLibrary.java @@ -326,25 +326,4 @@ public final class CLibrary return -1; } - - /** - * Suggest kernel to preheat one page for the given file. - * - * @param fd The file descriptor of file to preheat. - * @param position The offset of the block. - * - * @return On success, zero is returned. On error, an error number is returned. - */ - public static int preheatPage(int fd, long position) - { - try - { - // 4096 is good for SSD because they operate on "Pages" 4KB in size - return posix_fadvise(fd, position, 4096, POSIX_FADV_WILLNEED); - } - catch (UnsatisfiedLinkError e) - { - return -1; - } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/test/unit/org/apache/cassandra/db/KeyCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java index b05a607..93f1fea 100644 --- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java +++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java @@ -130,13 +130,13 @@ public class KeyCacheTest extends SchemaLoader false, 10)); - assertEquals(2, CacheService.instance.keyCache.size()); + assert CacheService.instance.keyCache.size() == 2; Util.compactAll(cfs).get(); keyCacheSize = CacheService.instance.keyCache.size(); // after compaction cache should have entries for // new SSTables, if we had 2 keys in cache previously it should become 4 - assertEquals(4, keyCacheSize); + assert keyCacheSize == 4 : keyCacheSize; // re-read same keys to verify that key cache didn't grow further cfs.getColumnFamily(QueryFilter.getSliceFilter(key1, http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java index 437b778..830c3e1 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java @@ -75,7 +75,7 @@ public class CompressedRandomAccessReaderTest assert f.exists(); RandomAccessReader reader = compressed - ? CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length())) + ? CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length()), false) : RandomAccessReader.open(f); String expected = "The quick brown fox jumps over the lazy dog"; assertEquals(expected.length(), reader.length()); @@ -115,7 +115,7 @@ public class CompressedRandomAccessReaderTest CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length()); CompressionMetadata.Chunk chunk = meta.chunkFor(0); - RandomAccessReader reader = CompressedRandomAccessReader.open(file.getPath(), meta); + RandomAccessReader reader = CompressedRandomAccessReader.open(file.getPath(), meta, false); // read and verify compressed data assertEquals(CONTENT, reader.readLine()); // close reader @@ -142,7 +142,7 @@ public class CompressedRandomAccessReaderTest checksumModifier.write(random.nextInt()); checksumModifier.getFD().sync(); // making sure that change was synced with disk - final RandomAccessReader r = CompressedRandomAccessReader.open(file.getPath(), meta); + final RandomAccessReader r = CompressedRandomAccessReader.open(file.getPath(), meta, false); Throwable exception = null; try @@ -163,7 +163,7 @@ public class CompressedRandomAccessReaderTest // lets write original checksum and check if we can read data updateChecksum(checksumModifier, chunk.length, checksum); - reader = CompressedRandomAccessReader.open(file.getPath(), meta); + reader = CompressedRandomAccessReader.open(file.getPath(), meta, false); // read and verify compressed data assertEquals(CONTENT, reader.readLine()); // close reader http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java index e944ed2..12f2747 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java @@ -56,7 +56,7 @@ public class SSTableTest extends SchemaLoader private void verifySingle(SSTableReader sstable, ByteBuffer bytes, ByteBuffer key) throws IOException { - RandomAccessReader file = sstable.openDataReader(); + RandomAccessReader file = sstable.openDataReader(false); file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ).position); assert key.equals(ByteBufferUtil.readWithShortLength(file)); int size = (int)SSTableReader.readRowSize(file, sstable.descriptor); @@ -98,7 +98,7 @@ public class SSTableTest extends SchemaLoader { List keys = new ArrayList(map.keySet()); //Collections.shuffle(keys); - RandomAccessReader file = sstable.openDataReader(); + RandomAccessReader file = sstable.openDataReader(false); for (ByteBuffer key : keys) { file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ).position); http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java index fcfaeec..2b0a13a 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java @@ -72,8 +72,8 @@ public class SSTableUtils public static void assertContentEquals(SSTableReader lhs, SSTableReader rhs) throws IOException { - SSTableScanner slhs = lhs.getScanner(); - SSTableScanner srhs = rhs.getScanner(); + SSTableScanner slhs = lhs.getDirectScanner(); + SSTableScanner srhs = rhs.getDirectScanner(); while (slhs.hasNext()) { OnDiskAtomIterator ilhs = slhs.next(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/01bc564b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java index 90c27e3..8059bbd 100644 --- a/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java +++ b/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java @@ -338,7 +338,7 @@ public class BufferedRandomAccessFileTest for (final int offset : Arrays.asList(0, 8)) { File file1 = writeTemporaryFile(new byte[16]); - final RandomAccessReader file = RandomAccessReader.open(file1, bufferSize, null); + final RandomAccessReader file = RandomAccessReader.open(file1, bufferSize, false, null); expectEOF(new Callable() { public Object call() throws IOException @@ -353,7 +353,7 @@ public class BufferedRandomAccessFileTest for (final int n : Arrays.asList(1, 2, 4, 8)) { File file1 = writeTemporaryFile(new byte[16]); - final RandomAccessReader file = RandomAccessReader.open(file1, bufferSize, null); + final RandomAccessReader file = RandomAccessReader.open(file1, bufferSize, false, null); expectEOF(new Callable() { public Object call() throws IOException