Return-Path: Delivered-To: apmail-cassandra-commits-archive@www.apache.org Received: (qmail 8210 invoked from network); 13 Apr 2010 02:26:18 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 13 Apr 2010 02:26:18 -0000 Received: (qmail 19121 invoked by uid 500); 13 Apr 2010 02:26:18 -0000 Delivered-To: apmail-cassandra-commits-archive@cassandra.apache.org Received: (qmail 19099 invoked by uid 500); 13 Apr 2010 02:26:18 -0000 Mailing-List: contact commits-help@cassandra.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cassandra.apache.org Delivered-To: mailing list commits@cassandra.apache.org Received: (qmail 19091 invoked by uid 99); 13 Apr 2010 02:26:18 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Apr 2010 02:26:18 +0000 X-ASF-Spam-Status: No, hits=-1214.5 required=10.0 tests=ALL_TRUSTED,AWL X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Apr 2010 02:26:16 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B8B38238899B; Tue, 13 Apr 2010 02:25:56 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r933465 - in /cassandra/branches/cassandra-0.6: ./ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/utils/ test/unit/org/apache/cassandra/io/ Date: Tue, 13 Apr 2010 02:25:56 -0000 To: commits@cassandra.apache.org From: jbellis@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100413022556.B8B38238899B@eris.apache.org> Author: jbellis Date: Tue Apr 13 02:25:56 2010 New Revision: 933465 URL: http://svn.apache.org/viewvc?rev=933465&view=rev Log: fix index scans that cross the 2GB mmap boundaries for both mmap and standard i/o modes. patch by jbellis; tested by Todd Burruss and reviewed by gdusbabek for CASSANDRA-866 Added: cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableReaderTest.java Modified: cassandra/branches/cassandra-0.6/CHANGES.txt cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftValidation.java cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java Modified: cassandra/branches/cassandra-0.6/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=933465&r1=933464&r2=933465&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/CHANGES.txt (original) +++ cassandra/branches/cassandra-0.6/CHANGES.txt Tue Apr 13 02:25:56 2010 @@ -16,6 +16,8 @@ (CASSANDRA-969) * Retrieve the correct number of undeleted columns, if any, from a supercolumn in a row that had been deleted previously (CASSANDRA-920) + * fix index scans that cross the 2GB mmap boundaries for both mmap + and standard i/o modes (CASSANDRA-866) 0.6.0-RC1 Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java?rev=933465&r1=933464&r2=933465&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java Tue Apr 13 02:25:56 2010 @@ -26,6 +26,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.DecoratedKey; public class IndexSummary @@ -36,11 +37,13 @@ public class IndexSummary private ArrayList indexPositions; private Map spannedIndexDataPositions; private Map spannedIndexPositions; - int keysWritten = 0; + private int keysWritten = 0; + private long lastIndexPosition; public void maybeAddEntry(DecoratedKey decoratedKey, long dataPosition, long dataSize, long indexPosition, long nextIndexPosition) { - boolean spannedIndexEntry = SSTableReader.bufferIndex(indexPosition) != SSTableReader.bufferIndex(nextIndexPosition); + boolean spannedIndexEntry = DatabaseDescriptor.getIndexAccessMode() == DatabaseDescriptor.DiskAccessMode.mmap + && SSTableReader.bufferIndex(indexPosition) != SSTableReader.bufferIndex(nextIndexPosition); if (keysWritten++ % INDEX_INTERVAL == 0 || spannedIndexEntry) { if (indexPositions == null) @@ -61,6 +64,7 @@ public class IndexSummary spannedIndexPositions.put(info.indexPosition, info); } } + lastIndexPosition = indexPosition; } public List getIndexPositions() @@ -73,14 +77,19 @@ public class IndexSummary indexPositions.trimToSize(); } - public SSTable.PositionSize getSpannedPosition(KeyPosition sampledPosition) + public SSTable.PositionSize getSpannedDataPosition(KeyPosition sampledPosition) { if (spannedIndexDataPositions == null) return null; return spannedIndexDataPositions.get(sampledPosition); } - public SSTable.PositionSize getSpannedPosition(long nextIndexPosition) + public KeyPosition getSpannedIndexPosition(long nextIndexPosition) + { + return spannedIndexPositions == null ? null : spannedIndexPositions.get(nextIndexPosition); + } + + public SSTable.PositionSize getSpannedDataPosition(long nextIndexPosition) { if (spannedIndexDataPositions == null) return null; @@ -92,6 +101,12 @@ public class IndexSummary return spannedIndexDataPositions.get(info); } + public long getLastIndexPosition() + { + return lastIndexPosition; + } + + /** * This is a simple container for the index Key and its corresponding position * in the index file. Binary search is performed on a list of these objects Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java?rev=933465&r1=933464&r2=933465&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java Tue Apr 13 02:25:56 2010 @@ -32,6 +32,7 @@ import org.apache.commons.lang.StringUti import org.apache.cassandra.cache.InstrumentedCache; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.utils.BloomFilter; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.config.DatabaseDescriptor; @@ -85,7 +86,8 @@ public class SSTableReader extends SSTab }; new Thread(runnable, "SSTABLE-DELETER").start(); }}; - private static final long BUFFER_SIZE = Integer.MAX_VALUE; + // in a perfect world, BUFFER_SIZE would be final, but we need to test with a smaller size to stay sane. + static long BUFFER_SIZE = Integer.MAX_VALUE; public static int indexInterval() { @@ -326,15 +328,9 @@ public class SSTableReader extends SSTab if (sampledPosition == null) return null; - // handle exact sampled index hit - PositionSize info = indexSummary.getSpannedPosition(sampledPosition); - if (info != null) - return info; - // get either a buffered or a mmap'd input for the on-disk index long p = sampledPosition.indexPosition; FileDataInput input; - int bufferIndex = bufferIndex(p); if (indexBuffers == null) { input = new BufferedRandomAccessFile(indexFilename(), "r"); @@ -342,7 +338,7 @@ public class SSTableReader extends SSTab } else { - input = new MappedFileDataInput(indexBuffers[bufferIndex], indexFilename(), BUFFER_SIZE * bufferIndex, (int)(p % BUFFER_SIZE)); + input = indexInputAt(p); } // scan the on-disk index, starting at the nearest sampled position @@ -351,12 +347,33 @@ public class SSTableReader extends SSTab int i = 0; do { + // handle exact sampled index hit + IndexSummary.KeyPosition kp = indexSummary.getSpannedIndexPosition(input.getAbsolutePosition()); + if (kp != null && kp.key.equals(decoratedKey)) + return indexSummary.getSpannedDataPosition(kp); + // if using mmapped i/o, skip to the next mmap buffer if necessary - if (input.isEOF() || indexSummary.getSpannedPosition(input.getAbsolutePosition()) != null) + if (input.isEOF() || kp != null) { - if (indexBuffers == null || ++bufferIndex == indexBuffers.length) + if (indexBuffers == null) // not mmap-ing, just one index input + break; + + FileDataInput oldInput = input; + if (kp == null) + { + input = indexInputAt(input.getAbsolutePosition()); + } + else + { + long nextUnspannedPostion = input.getAbsolutePosition() + + 2 + FBUtilities.encodedUTF8Length(StorageService.getPartitioner().convertToDiskFormat(kp.key)) + + 8; + input = indexInputAt(nextUnspannedPostion); + } + oldInput.close(); + if (input == null) break; - input = new MappedFileDataInput(indexBuffers[bufferIndex], indexFilename(), BUFFER_SIZE * bufferIndex, 0); + continue; } @@ -367,7 +384,7 @@ public class SSTableReader extends SSTab int v = indexDecoratedKey.compareTo(decoratedKey); if (v == 0) { - info = getDataPositionSize(input, dataPosition); + PositionSize info = getDataPositionSize(input, dataPosition); if (keyCache != null && keyCache.getCapacity() > 0) keyCache.put(unifiedKey, info); return info; @@ -378,11 +395,20 @@ public class SSTableReader extends SSTab } finally { - input.close(); + if (input != null) + input.close(); } return null; } + private FileDataInput indexInputAt(long indexPosition) + { + if (indexPosition > indexSummary.getLastIndexPosition()) + return null; + int bufferIndex = bufferIndex(indexPosition); + return new MappedFileDataInput(indexBuffers[bufferIndex], indexFilename(), BUFFER_SIZE * bufferIndex, (int)(indexPosition % BUFFER_SIZE)); + } + private PositionSize getDataPositionSize(FileDataInput input, long dataPosition) throws IOException { // if we've reached the end of the index, then the row size is "the rest of the data file" @@ -392,7 +418,7 @@ public class SSTableReader extends SSTab // otherwise, row size is the start of the next row (in next index entry), minus the start of this one. long nextIndexPosition = input.getAbsolutePosition(); // if next index entry would span mmap boundary, get the next row position from the summary instead - PositionSize nextPositionSize = indexSummary.getSpannedPosition(nextIndexPosition); + PositionSize nextPositionSize = indexSummary.getSpannedDataPosition(nextIndexPosition); if (nextPositionSize != null) return new PositionSize(dataPosition, nextPositionSize.position - dataPosition); Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftValidation.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=933465&r1=933464&r2=933465&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftValidation.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftValidation.java Tue Apr 13 02:25:56 2010 @@ -36,6 +36,7 @@ import org.apache.cassandra.dht.IPartiti import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; public class ThriftValidation { @@ -46,18 +47,7 @@ public class ThriftValidation throw new InvalidRequestException("Key may not be empty"); } // check that writeUTF will be able to handle it -- encoded length must fit in 2 bytes - int strlen = key.length(); - int utflen = 0; - for (int i = 0; i < strlen; i++) - { - int c = key.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) - utflen++; - else if (c > 0x07FF) - utflen += 3; - else - utflen += 2; - } + int utflen = FBUtilities.encodedUTF8Length(key); if (utflen > 65535) throw new InvalidRequestException("Encoded key length of " + utflen + " is longer than maximum of 65535"); } Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=933465&r1=933464&r2=933465&view=diff ============================================================================== --- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java (original) +++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java Tue Apr 13 02:25:56 2010 @@ -463,4 +463,21 @@ public class FBUtilities else return a.equals(b); } + + public static int encodedUTF8Length(String st) + { + int strlen = st.length(); + int utflen = 0; + for (int i = 0; i < strlen; i++) + { + int c = st.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) + utflen++; + else if (c > 0x07FF) + utflen += 3; + else + utflen += 2; + } + return utflen; + } } Added: cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableReaderTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableReaderTest.java?rev=933465&view=auto ============================================================================== --- cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableReaderTest.java (added) +++ cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableReaderTest.java Tue Apr 13 02:25:56 2010 @@ -0,0 +1,58 @@ +package org.apache.cassandra.io; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.QueryPath; +import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.service.StorageService; + + +public class SSTableReaderTest +{ + @Test + public void testSpannedIndexPositions() throws IOException, ExecutionException, InterruptedException + { + SSTableReader.BUFFER_SIZE = 40; + + Table table = Table.open("Keyspace1"); + ColumnFamilyStore store = table.getColumnFamilyStore("Standard1"); + + // insert a bunch of data + CompactionManager.instance.disableAutoCompaction(); + for (int j = 0; j < 100; j += 2) + { + String key = String.valueOf(j); + RowMutation rm = new RowMutation("Keyspace1", key); + rm.add(new QueryPath("Standard1", null, "0".getBytes()), new byte[0], j); + rm.apply(); + } + store.forceBlockingFlush(); + CompactionManager.instance.submitMajor(store).get(); + + // check that all our keys are found correctly + SSTableReader sstable = store.getSSTables().iterator().next(); + for (int j = 0; j < 100; j += 2) + { + String key = String.valueOf(j); + DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); + FileDataInput file = sstable.getFileDataInput(dk, DatabaseDescriptor.getIndexedReadBufferSizeInKB() * 1024); + DecoratedKey keyInDisk = sstable.getPartitioner().convertFromDiskFormat(file.readUTF()); + assert keyInDisk.equals(dk) : String.format("%s != %s in %s", keyInDisk, dk, file.getPath()); + } + + // check no false positives + for (int j = 1; j < 110; j += 2) + { + String key = String.valueOf(j); + DecoratedKey dk = StorageService.getPartitioner().decorateKey(key); + assert sstable.getPosition(dk) == null; + } + } +}