cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r933465 - in /cassandra/branches/cassandra-0.6: ./ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/utils/ test/unit/org/apache/cassandra/io/
Date Tue, 13 Apr 2010 02:25:56 GMT
Author: jbellis
Date: Tue Apr 13 02:25:56 2010
New Revision: 933465

URL: http://svn.apache.org/viewvc?rev=933465&view=rev
Log:
fix index scans that cross the 2GB mmap boundaries for both mmap and standard i/o modes.
patch by jbellis; tested by Todd Burruss and reviewed by gdusbabek for CASSANDRA-866


Added:
    cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableReaderTest.java
Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftValidation.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=933465&r1=933464&r2=933465&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Tue Apr 13 02:25:56 2010
@@ -16,6 +16,8 @@
    (CASSANDRA-969)
  * Retrieve the correct number of undeleted columns, if any, from
    a supercolumn in a row that had been deleted previously (CASSANDRA-920)
+ * fix index scans that cross the 2GB mmap boundaries for both mmap
+   and standard i/o modes (CASSANDRA-866)
 
 
 0.6.0-RC1

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java?rev=933465&r1=933464&r2=933465&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java Tue
Apr 13 02:25:56 2010
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.DecoratedKey;
 
 public class IndexSummary
@@ -36,11 +37,13 @@ public class IndexSummary
     private ArrayList<KeyPosition> indexPositions;
     private Map<KeyPosition, SSTable.PositionSize> spannedIndexDataPositions;
     private Map<Long, KeyPosition> spannedIndexPositions;
-    int keysWritten = 0;
+    private int keysWritten = 0;
+    private long lastIndexPosition;
 
     public void maybeAddEntry(DecoratedKey decoratedKey, long dataPosition, long dataSize,
long indexPosition, long nextIndexPosition)
     {
-        boolean spannedIndexEntry = SSTableReader.bufferIndex(indexPosition) != SSTableReader.bufferIndex(nextIndexPosition);
+        boolean spannedIndexEntry = DatabaseDescriptor.getIndexAccessMode() == DatabaseDescriptor.DiskAccessMode.mmap
+                                    && SSTableReader.bufferIndex(indexPosition) !=
SSTableReader.bufferIndex(nextIndexPosition);
         if (keysWritten++ % INDEX_INTERVAL == 0 || spannedIndexEntry)
         {
             if (indexPositions == null)
@@ -61,6 +64,7 @@ public class IndexSummary
                 spannedIndexPositions.put(info.indexPosition, info);
             }
         }
+        lastIndexPosition = indexPosition;
     }
 
     public List<KeyPosition> getIndexPositions()
@@ -73,14 +77,19 @@ public class IndexSummary
         indexPositions.trimToSize();
     }
 
-    public SSTable.PositionSize getSpannedPosition(KeyPosition sampledPosition)
+    public SSTable.PositionSize getSpannedDataPosition(KeyPosition sampledPosition)
     {
         if (spannedIndexDataPositions == null)
             return null;
         return spannedIndexDataPositions.get(sampledPosition);
     }
 
-    public SSTable.PositionSize getSpannedPosition(long nextIndexPosition)
+    public KeyPosition getSpannedIndexPosition(long nextIndexPosition)
+    {
+        return spannedIndexPositions == null ? null : spannedIndexPositions.get(nextIndexPosition);
+    }
+
+    public SSTable.PositionSize getSpannedDataPosition(long nextIndexPosition)
     {
         if (spannedIndexDataPositions == null)
             return null;
@@ -92,6 +101,12 @@ public class IndexSummary
         return spannedIndexDataPositions.get(info);
     }
 
+    public long getLastIndexPosition()
+    {
+        return lastIndexPosition;
+    }
+
+
     /**
      * This is a simple container for the index Key and its corresponding position
      * in the index file. Binary search is performed on a list of these objects

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java?rev=933465&r1=933464&r2=933465&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java Tue
Apr 13 02:25:56 2010
@@ -32,6 +32,7 @@ import org.apache.commons.lang.StringUti
 import org.apache.cassandra.cache.InstrumentedCache;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -85,7 +86,8 @@ public class SSTableReader extends SSTab
         };
         new Thread(runnable, "SSTABLE-DELETER").start();
     }};
-    private static final long BUFFER_SIZE = Integer.MAX_VALUE;
+    // in a perfect world, BUFFER_SIZE would be final, but we need to test with a smaller
size to stay sane.
+    static long BUFFER_SIZE = Integer.MAX_VALUE;
 
     public static int indexInterval()
     {
@@ -326,15 +328,9 @@ public class SSTableReader extends SSTab
         if (sampledPosition == null)
             return null;
 
-        // handle exact sampled index hit
-        PositionSize info = indexSummary.getSpannedPosition(sampledPosition);
-        if (info != null)
-            return info;
-
         // get either a buffered or a mmap'd input for the on-disk index
         long p = sampledPosition.indexPosition;
         FileDataInput input;
-        int bufferIndex = bufferIndex(p);
         if (indexBuffers == null)
         {
             input = new BufferedRandomAccessFile(indexFilename(), "r");
@@ -342,7 +338,7 @@ public class SSTableReader extends SSTab
         }
         else
         {
-            input = new MappedFileDataInput(indexBuffers[bufferIndex], indexFilename(), BUFFER_SIZE
* bufferIndex, (int)(p % BUFFER_SIZE));
+            input = indexInputAt(p);
         }
 
         // scan the on-disk index, starting at the nearest sampled position
@@ -351,12 +347,33 @@ public class SSTableReader extends SSTab
             int i = 0;
             do
             {
+                // handle exact sampled index hit
+                IndexSummary.KeyPosition kp = indexSummary.getSpannedIndexPosition(input.getAbsolutePosition());
+                if (kp != null && kp.key.equals(decoratedKey))
+                    return indexSummary.getSpannedDataPosition(kp);
+
                 // if using mmapped i/o, skip to the next mmap buffer if necessary
-                if (input.isEOF() || indexSummary.getSpannedPosition(input.getAbsolutePosition())
!= null)
+                if (input.isEOF() || kp != null)
                 {
-                    if (indexBuffers == null || ++bufferIndex == indexBuffers.length)
+                    if (indexBuffers == null) // not mmap-ing, just one index input
+                        break;
+
+                    FileDataInput oldInput = input;
+                    if (kp == null)
+                    {
+                        input = indexInputAt(input.getAbsolutePosition());
+                    }
+                    else
+                    {
+                        long nextUnspannedPostion = input.getAbsolutePosition()
+                                                    + 2 + FBUtilities.encodedUTF8Length(StorageService.getPartitioner().convertToDiskFormat(kp.key))
+                                                    + 8;
+                        input = indexInputAt(nextUnspannedPostion);
+                    }
+                    oldInput.close();
+                    if (input == null)
                         break;
-                    input = new MappedFileDataInput(indexBuffers[bufferIndex], indexFilename(),
BUFFER_SIZE * bufferIndex, 0);
+
                     continue;
                 }
 
@@ -367,7 +384,7 @@ public class SSTableReader extends SSTab
                 int v = indexDecoratedKey.compareTo(decoratedKey);
                 if (v == 0)
                 {
-                    info = getDataPositionSize(input, dataPosition);
+                    PositionSize info = getDataPositionSize(input, dataPosition);
                     if (keyCache != null && keyCache.getCapacity() > 0)
                         keyCache.put(unifiedKey, info);
                     return info;
@@ -378,11 +395,20 @@ public class SSTableReader extends SSTab
         }
         finally
         {
-            input.close();
+            if (input != null)
+                input.close();
         }
         return null;
     }
 
+    private FileDataInput indexInputAt(long indexPosition)
+    {
+        if (indexPosition > indexSummary.getLastIndexPosition())
+            return null;
+        int bufferIndex = bufferIndex(indexPosition);
+        return new MappedFileDataInput(indexBuffers[bufferIndex], indexFilename(), BUFFER_SIZE
* bufferIndex, (int)(indexPosition % BUFFER_SIZE));
+    }
+
     private PositionSize getDataPositionSize(FileDataInput input, long dataPosition) throws
IOException
     {
         // if we've reached the end of the index, then the row size is "the rest of the data
file"
@@ -392,7 +418,7 @@ public class SSTableReader extends SSTab
         // otherwise, row size is the start of the next row (in next index entry), minus
the start of this one.
         long nextIndexPosition = input.getAbsolutePosition();
         // if next index entry would span mmap boundary, get the next row position from the
summary instead
-        PositionSize nextPositionSize = indexSummary.getSpannedPosition(nextIndexPosition);
+        PositionSize nextPositionSize = indexSummary.getSpannedDataPosition(nextIndexPosition);
         if (nextPositionSize != null)
             return new PositionSize(dataPosition, nextPositionSize.position - dataPosition);
 

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftValidation.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftValidation.java?rev=933465&r1=933464&r2=933465&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftValidation.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/thrift/ThriftValidation.java
Tue Apr 13 02:25:56 2010
@@ -36,6 +36,7 @@ import org.apache.cassandra.dht.IPartiti
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
 
 public class ThriftValidation
 {
@@ -46,18 +47,7 @@ public class ThriftValidation
             throw new InvalidRequestException("Key may not be empty");
         }
         // check that writeUTF will be able to handle it -- encoded length must fit in 2
bytes
-        int strlen = key.length();
-        int utflen = 0;
-        for (int i = 0; i < strlen; i++)
-        {
-            int c = key.charAt(i);
-            if ((c >= 0x0001) && (c <= 0x007F))
-                utflen++;
-            else if (c > 0x07FF)
-                utflen += 3;
-            else
-                utflen += 2;
-        }
+        int utflen = FBUtilities.encodedUTF8Length(key);
         if (utflen > 65535)
             throw new InvalidRequestException("Encoded key length of " + utflen + " is longer
than maximum of 65535");
     }

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=933465&r1=933464&r2=933465&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/utils/FBUtilities.java
Tue Apr 13 02:25:56 2010
@@ -463,4 +463,21 @@ public class FBUtilities
         else
             return a.equals(b);
 }
+
+    public static int encodedUTF8Length(String st)
+    {
+        int strlen = st.length();
+        int utflen = 0;
+        for (int i = 0; i < strlen; i++)
+        {
+            int c = st.charAt(i);
+            if ((c >= 0x0001) && (c <= 0x007F))
+                utflen++;
+            else if (c > 0x07FF)
+                utflen += 3;
+            else
+                utflen += 2;
+        }
+        return utflen;
+    }
 }

Added: cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableReaderTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableReaderTest.java?rev=933465&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableReaderTest.java
(added)
+++ cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/SSTableReaderTest.java
Tue Apr 13 02:25:56 2010
@@ -0,0 +1,58 @@
+package org.apache.cassandra.io;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.io.util.FileDataInput;
+import org.apache.cassandra.service.StorageService;
+
+
+public class SSTableReaderTest
+{
+    @Test
+    public void testSpannedIndexPositions() throws IOException, ExecutionException, InterruptedException
+    {
+        SSTableReader.BUFFER_SIZE = 40;
+
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+
+        // insert a bunch of data
+        CompactionManager.instance.disableAutoCompaction();
+        for (int j = 0; j < 100; j += 2)
+        {
+            String key = String.valueOf(j);
+            RowMutation rm = new RowMutation("Keyspace1", key);
+            rm.add(new QueryPath("Standard1", null, "0".getBytes()), new byte[0], j);
+            rm.apply();
+        }
+        store.forceBlockingFlush();
+        CompactionManager.instance.submitMajor(store).get();
+
+        // check that all our keys are found correctly
+        SSTableReader sstable = store.getSSTables().iterator().next();
+        for (int j = 0; j < 100; j += 2)
+        {
+            String key = String.valueOf(j);
+            DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
+            FileDataInput file = sstable.getFileDataInput(dk, DatabaseDescriptor.getIndexedReadBufferSizeInKB()
* 1024);
+            DecoratedKey keyInDisk = sstable.getPartitioner().convertFromDiskFormat(file.readUTF());
+            assert keyInDisk.equals(dk) : String.format("%s != %s in %s", keyInDisk, dk,
file.getPath());
+        }
+
+        // check no false positives
+        for (int j = 1; j < 110; j += 2)
+        {
+            String key = String.valueOf(j);
+            DecoratedKey dk = StorageService.getPartitioner().decorateKey(key);
+            assert sstable.getPosition(dk) == null;
+        }
+    }
+}



Mime
View raw message