cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r925514 - in /cassandra/branches/cassandra-0.6: ./ contrib/ contrib/client_only/ contrib/word_count/ interface/ src/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/io/util/ src/java/org/apa...
Date Sat, 20 Mar 2010 01:14:53 GMT
Author: jbellis
Date: Sat Mar 20 01:14:52 2010
New Revision: 925514

URL: http://svn.apache.org/viewvc?rev=925514&view=rev
Log: (empty)

Added:
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java
Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/contrib/   (props changed)
    cassandra/branches/cassandra-0.6/contrib/client_only/   (props changed)
    cassandra/branches/cassandra-0.6/contrib/word_count/   (props changed)
    cassandra/branches/cassandra-0.6/interface/   (props changed)
    cassandra/branches/cassandra-0.6/src/   (props changed)
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTable.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/FileDataInput.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
    cassandra/branches/cassandra-0.6/test/   (props changed)

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Sat Mar 20 01:14:52 2010
@@ -3,6 +3,7 @@
  * Bootstrapping can skip ranges under the right conditions (CASSANDRA-902)
  * fix merging row versions in range_slice for CL > ONE (CASSANDRA-884)
  * default write ConsistencyLeven chaned from ZERO to ONE
+ * fix for index entries spanning mmap buffer boundaries (CASSANDRA-857)
  * use lexical comparison if time part of TimeUUIDs are the same 
    (CASSANDRA-907)
  * bound read, mutation, and response stages to fix possible OOM

Propchange: cassandra/branches/cassandra-0.6/contrib/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Sat Mar 20 01:14:52 2010
@@ -0,0 +1 @@
+*.iml

Propchange: cassandra/branches/cassandra-0.6/contrib/client_only/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Sat Mar 20 01:14:52 2010
@@ -0,0 +1 @@
+*.iml

Propchange: cassandra/branches/cassandra-0.6/contrib/word_count/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Sat Mar 20 01:14:52 2010
@@ -0,0 +1 @@
+*.iml

Propchange: cassandra/branches/cassandra-0.6/interface/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Sat Mar 20 01:14:52 2010
@@ -1 +1,2 @@
 avro
+*.iml

Propchange: cassandra/branches/cassandra-0.6/src/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Sat Mar 20 01:14:52 2010
@@ -1,2 +1,2 @@
 gen-java
-
+*.iml

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Sat Mar 20 01:14:52 2010
@@ -49,10 +49,7 @@ import org.apache.cassandra.dht.Abstract
 import org.apache.cassandra.dht.Bounds;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.SSTable;
-import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.io.SSTableScanner;
-import org.apache.cassandra.io.SSTableTracker;
+import org.apache.cassandra.io.*;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.SliceRange;
@@ -1128,10 +1125,10 @@ public class ColumnFamilyStore implement
         return Iterables.concat(stores);
     }
 
-    public Iterable<SSTable.KeyPosition> allIndexPositions()
+    public Iterable<IndexSummary.KeyPosition> allIndexPositions()
     {
         Collection<SSTableReader> sstables = getSSTables();
-        Iterable<SSTable.KeyPosition>[] positions = new Iterable[sstables.size()];
+        Iterable<IndexSummary.KeyPosition>[] positions = new Iterable[sstables.size()];
         int i = 0;
         for (SSTableReader sstable: sstables)
         {

Added: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java?rev=925514&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java (added)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/IndexSummary.java Sat
Mar 20 01:14:52 2010
@@ -0,0 +1,101 @@
+package org.apache.cassandra.io;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.db.DecoratedKey;
+
+public class IndexSummary
+{
+    /** Every 128th index entry is loaded into memory so we know where to start looking for
the actual key w/o seeking */
+    public static final int INDEX_INTERVAL = 128;/* Required extension for temporary files
created during compactions. */
+
+    private ArrayList<KeyPosition> indexPositions;
+    private Map<KeyPosition, SSTable.PositionSize> spannedIndexDataPositions;
+    private Map<Long, KeyPosition> spannedIndexPositions;
+    int keysWritten = 0;
+
+    public void maybeAddEntry(DecoratedKey decoratedKey, long dataPosition, long dataSize,
long indexPosition, long nextIndexPosition)
+    {
+        boolean spannedIndexEntry = SSTableReader.bufferIndex(indexPosition) != SSTableReader.bufferIndex(nextIndexPosition);
+        if (keysWritten++ % INDEX_INTERVAL == 0 || spannedIndexEntry)
+        {
+            if (indexPositions == null)
+            {
+                indexPositions  = new ArrayList<KeyPosition>();
+            }
+            KeyPosition info = new KeyPosition(decoratedKey, indexPosition);
+            indexPositions.add(info);
+
+            if (spannedIndexEntry)
+            {
+                if (spannedIndexDataPositions == null)
+                {
+                    spannedIndexDataPositions = new HashMap<KeyPosition, SSTable.PositionSize>();
+                    spannedIndexPositions = new HashMap<Long, KeyPosition>();
+                }
+                spannedIndexDataPositions.put(info, new SSTable.PositionSize(dataPosition,
dataSize));
+                spannedIndexPositions.put(info.indexPosition, info);
+            }
+        }
+    }
+
+    public List<KeyPosition> getIndexPositions()
+    {
+        return indexPositions;
+    }
+
+    public void complete()
+    {
+        indexPositions.trimToSize();
+    }
+
+    public SSTable.PositionSize getSpannedPosition(KeyPosition sampledPosition)
+    {
+        if (spannedIndexDataPositions == null)
+            return null;
+        return spannedIndexDataPositions.get(sampledPosition);
+    }
+
+    public SSTable.PositionSize getSpannedPosition(long nextIndexPosition)
+    {
+        if (spannedIndexDataPositions == null)
+            return null;
+
+        KeyPosition info = spannedIndexPositions.get(nextIndexPosition);
+        if (info == null)
+            return null;
+
+        return spannedIndexDataPositions.get(info);
+    }
+
+    /**
+     * This is a simple container for the index Key and its corresponding position
+     * in the index file. Binary search is performed on a list of these objects
+     * to find where to start looking for the index entry containing the data position
+     * (which will be turned into a PositionSize object)
+     */
+    public static class KeyPosition implements Comparable<KeyPosition>
+    {
+        public final DecoratedKey key;
+        public final long indexPosition;
+
+        public KeyPosition(DecoratedKey key, long indexPosition)
+        {
+            this.key = key;
+            this.indexPosition = indexPosition;
+        }
+
+        public int compareTo(KeyPosition kp)
+        {
+            return key.compareTo(kp.key);
+        }
+
+        public String toString()
+        {
+            return key + ":" + indexPosition;
+        }
+    }
+}
\ No newline at end of file

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTable.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTable.java?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTable.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTable.java Sat Mar
20 01:14:52 2010
@@ -25,7 +25,6 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Arrays;
-import java.util.Map;
 
 import org.apache.log4j.Logger;
 import org.apache.commons.lang.StringUtils;
@@ -56,12 +55,9 @@ public abstract class SSTable
     protected String path;
     protected IPartitioner partitioner;
     protected BloomFilter bf;
-    protected List<KeyPosition> indexPositions;
-    protected Map<KeyPosition, PositionSize> spannedIndexDataPositions; // map of index
position, to data position, for index entries spanning mmap segments
     protected String columnFamilyName;
+    protected IndexSummary indexSummary;
 
-    /* Every 128th index entry is loaded into memory so we know where to start looking for
the actual key w/o seeking */
-    public static final int INDEX_INTERVAL = 128;/* Required extension for temporary files
created during compactions. */
     public static final String TEMPFILE_MARKER = "tmp";
 
     public SSTable(String filename, IPartitioner partitioner)
@@ -173,33 +169,6 @@ public abstract class SSTable
         return sum;
     }
 
-    /**
-     * This is a simple container for the index Key and its corresponding position
-     * in the data file. Binary search is performed on a list of these objects
-     * to lookup keys within the SSTable data file.
-     */
-    public class KeyPosition implements Comparable<KeyPosition>
-    {
-        public final DecoratedKey key;
-        public final long position;
-
-        public KeyPosition(DecoratedKey key, long position)
-        {
-            this.key = key;
-            this.position = position;
-        }
-
-        public int compareTo(KeyPosition kp)
-        {
-            return key.compareTo(kp.key);
-        }
-
-        public String toString()
-        {
-            return key + ":" + position;
-        }
-    }
-
     public long bytesOnDisk()
     {
         long bytes = 0;

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableReader.java Sat
Mar 20 01:14:52 2010
@@ -89,7 +89,7 @@ public class SSTableReader extends SSTab
 
     public static int indexInterval()
     {
-        return INDEX_INTERVAL;
+        return IndexSummary.INDEX_INTERVAL;
     }
 
     public static long getApproximateKeyCount(Iterable<SSTableReader> sstables)
@@ -99,7 +99,7 @@ public class SSTableReader extends SSTab
         for (SSTableReader sstable : sstables)
         {
             int indexKeyCount = sstable.getIndexPositions().size();
-            count = count + (indexKeyCount + 1) * INDEX_INTERVAL;
+            count = count + (indexKeyCount + 1) * IndexSummary.INDEX_INTERVAL;
             if (logger.isDebugEnabled())
                 logger.debug("index size for bloom filter calc for file  : " + sstable.getFilename()
+ "   : " + count);
         }
@@ -136,10 +136,7 @@ public class SSTableReader extends SSTab
 
     private InstrumentedCache<Pair<String, DecoratedKey>, PositionSize> keyCache;
 
-    SSTableReader(String filename,
-                  IPartitioner partitioner,
-                  List<KeyPosition> indexPositions, Map<KeyPosition, PositionSize>
spannedIndexDataPositions,
-                  BloomFilter bloomFilter)
+    SSTableReader(String filename, IPartitioner partitioner, IndexSummary indexSummary, BloomFilter
bloomFilter)
     throws IOException
     {
         super(filename, partitioner);
@@ -179,8 +176,7 @@ public class SSTableReader extends SSTab
             buffers = null;
         }
 
-        this.indexPositions = indexPositions;
-        this.spannedIndexDataPositions = spannedIndexDataPositions;
+        this.indexSummary = indexSummary;
         this.bf = bloomFilter;
     }
 
@@ -217,17 +213,17 @@ public class SSTableReader extends SSTab
 
     private SSTableReader(String filename, IPartitioner partitioner) throws IOException
     {
-        this(filename, partitioner, null, null, null);
+        this(filename, partitioner, null, null);
     }
 
-    public List<KeyPosition> getIndexPositions()
+    public List<IndexSummary.KeyPosition> getIndexPositions()
     {
-        return indexPositions;
+        return indexSummary.getIndexPositions();
     }
 
     public long estimatedKeys()
     {
-        return indexPositions.size() * INDEX_INTERVAL;
+        return indexSummary.getIndexPositions().size() * IndexSummary.INDEX_INTERVAL;
     }
 
     void loadBloomFilter() throws IOException
@@ -245,14 +241,13 @@ public class SSTableReader extends SSTab
 
     void loadIndexFile() throws IOException
     {
-        indexPositions = new ArrayList<KeyPosition>();
         // we read the positions in a BRAF so we don't have to worry about an entry spanning
a mmap boundary.
         // any entries that do, we force into the in-memory sample so key lookup can always
bsearch within
         // a single mmapped segment.
+        indexSummary = new IndexSummary();
         BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(), "r");
         try
         {
-            int i = 0;
             long indexSize = input.length();
             while (true)
             {
@@ -264,27 +259,21 @@ public class SSTableReader extends SSTab
                 DecoratedKey decoratedKey = partitioner.convertFromDiskFormat(input.readUTF());
                 long dataPosition = input.readLong();
                 long nextIndexPosition = input.getFilePointer();
-                boolean spannedEntry = bufferIndex(indexPosition) != bufferIndex(nextIndexPosition);
-                if (i++ % INDEX_INTERVAL == 0 || spannedEntry)
+                // read the next index entry to see how big the row is
+                long nextDataPosition;
+                if (input.isEOF())
+                {
+                    nextDataPosition = length();
+                }
+                else
                 {
-                    KeyPosition info;
-                    info = new KeyPosition(decoratedKey, indexPosition);
-                    indexPositions.add(info);
-
-                    if (spannedEntry)
-                    {
-                        if (spannedIndexDataPositions == null)
-                        {
-                            spannedIndexDataPositions = new HashMap<KeyPosition, PositionSize>();
-                        }
-                        // read the next index entry to see how big the row is corresponding
to the current, mmap-segment-spanning one
-                        input.readUTF();
-                        long nextDataPosition = input.readLong();
-                        input.seek(nextIndexPosition);
-                        spannedIndexDataPositions.put(info, new PositionSize(dataPosition,
nextDataPosition - dataPosition));
-                    }
+                    input.readUTF();
+                    nextDataPosition = input.readLong();
+                    input.seek(nextIndexPosition);
                 }
+                indexSummary.maybeAddEntry(decoratedKey, dataPosition, nextDataPosition -
dataPosition, indexPosition, nextIndexPosition);
             }
+            indexSummary.complete();
         }
         finally
         {
@@ -293,10 +282,10 @@ public class SSTableReader extends SSTab
     }
 
     /** get the position in the index file to start scanning to find the given key (at most
indexInterval keys away) */
-    private KeyPosition getIndexScanPosition(DecoratedKey decoratedKey)
+    private IndexSummary.KeyPosition getIndexScanPosition(DecoratedKey decoratedKey)
     {
-        assert indexPositions != null && indexPositions.size() > 0;
-        int index = Collections.binarySearch(indexPositions, new KeyPosition(decoratedKey,
-1));
+        assert indexSummary.getIndexPositions() != null && indexSummary.getIndexPositions().size()
> 0;
+        int index = Collections.binarySearch(indexSummary.getIndexPositions(), new IndexSummary.KeyPosition(decoratedKey,
-1));
         if (index < 0)
         {
             // binary search gives us the first index _greater_ than the key searched for,
@@ -304,11 +293,11 @@ public class SSTableReader extends SSTab
             int greaterThan = (index + 1) * -1;
             if (greaterThan == 0)
                 return null;
-            return indexPositions.get(greaterThan - 1);
+            return indexSummary.getIndexPositions().get(greaterThan - 1);
         }
         else
         {
-            return indexPositions.get(index);
+            return indexSummary.getIndexPositions().get(index);
         }
     }
 
@@ -333,23 +322,19 @@ public class SSTableReader extends SSTab
         }
 
         // next, see if the sampled index says it's impossible for the key to be present
-        KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
+        IndexSummary.KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
         if (sampledPosition == null)
-        {
             return null;
-        }
 
         // handle exact sampled index hit
-        if (spannedIndexDataPositions != null)
-        {
-            PositionSize info = spannedIndexDataPositions.get(sampledPosition);
-            if (info != null)
-                return info;
-        }
+        PositionSize info = indexSummary.getSpannedPosition(sampledPosition);
+        if (info != null)
+            return info;
 
-        // scan the on-disk index, starting at the nearest sampled position
-        long p = sampledPosition.position;
+        // get either a buffered or a mmap'd input for the on-disk index
+        long p = sampledPosition.indexPosition;
         FileDataInput input;
+        int bufferIndex = bufferIndex(p);
         if (indexBuffers == null)
         {
             input = new BufferedRandomAccessFile(indexFilename(), "r");
@@ -357,45 +342,39 @@ public class SSTableReader extends SSTab
         }
         else
         {
-            input = new MappedFileDataInput(indexBuffers[bufferIndex(p)], indexFilename(),
(int)(p % BUFFER_SIZE));
+            input = new MappedFileDataInput(indexBuffers[bufferIndex], indexFilename(), BUFFER_SIZE
* bufferIndex, (int)(p % BUFFER_SIZE));
         }
+
+        // scan the on-disk index, starting at the nearest sampled position
         try
         {
             int i = 0;
             do
             {
-                DecoratedKey indexDecoratedKey;
-                try
+                // if using mmapped i/o, skip to the next mmap buffer if necessary
+                if (input.isEOF() || indexSummary.getSpannedPosition(input.getAbsolutePosition())
!= null)
                 {
-                    indexDecoratedKey = partitioner.convertFromDiskFormat(input.readUTF());
-                }
-                catch (EOFException e)
-                {
-                    return null;
+                    if (indexBuffers == null || ++bufferIndex == indexBuffers.length)
+                        break;
+                    input = new MappedFileDataInput(indexBuffers[bufferIndex], indexFilename(),
BUFFER_SIZE * bufferIndex, 0);
+                    continue;
                 }
-                long position = input.readLong();
+
+                // read key & data position from index entry
+                DecoratedKey indexDecoratedKey = partitioner.convertFromDiskFormat(input.readUTF());
+                long dataPosition = input.readLong();
+
                 int v = indexDecoratedKey.compareTo(decoratedKey);
                 if (v == 0)
                 {
-                    PositionSize info;
-                    if (!input.isEOF())
-                    {
-                        int utflen = input.readUnsignedShort();
-                        if (utflen != input.skipBytes(utflen))
-                            throw new EOFException();
-                        info = new PositionSize(position, input.readLong() - position);
-                    }
-                    else
-                    {
-                        info = new PositionSize(position, length() - position);
-                    }
+                    info = getDataPositionSize(input, dataPosition);
                     if (keyCache != null && keyCache.getCapacity() > 0)
                         keyCache.put(unifiedKey, info);
                     return info;
                 }
                 if (v > 0)
                     return null;
-            } while  (++i < INDEX_INTERVAL);
+            } while  (++i < IndexSummary.INDEX_INTERVAL);
         }
         finally
         {
@@ -404,10 +383,30 @@ public class SSTableReader extends SSTab
         return null;
     }
 
+    private PositionSize getDataPositionSize(FileDataInput input, long dataPosition) throws
IOException
+    {
+        // if we've reached the end of the index, then the row size is "the rest of the data
file"
+        if (input.isEOF())
+            return new PositionSize(dataPosition, length() - dataPosition);
+
+        // otherwise, row size is the start of the next row (in next index entry), minus
the start of this one.
+        long nextIndexPosition = input.getAbsolutePosition();
+        // if next index entry would span mmap boundary, get the next row position from the
summary instead
+        PositionSize nextPositionSize = indexSummary.getSpannedPosition(nextIndexPosition);
+        if (nextPositionSize != null)
+            return new PositionSize(dataPosition, nextPositionSize.position - dataPosition);
+
+        // read next entry directly
+        int utflen = input.readUnsignedShort();
+        if (utflen != input.skipBytes(utflen))
+            throw new EOFException();
+        return new PositionSize(dataPosition, input.readLong() - dataPosition);
+    }
+
     /** like getPosition, but if key is not found will return the location of the first key
_greater_ than the desired one, or -1 if no such key exists. */
     public long getNearestPosition(DecoratedKey decoratedKey) throws IOException
     {
-        KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
+        IndexSummary.KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
         if (sampledPosition == null)
         {
             return 0;
@@ -415,7 +414,7 @@ public class SSTableReader extends SSTab
 
         // can't use a MappedFileDataInput here, since we might cross a segment boundary
while scanning
         BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(path),
"r");
-        input.seek(sampledPosition.position);
+        input.seek(sampledPosition.indexPosition);
         try
         {
             while (true)
@@ -490,7 +489,7 @@ public class SSTableReader extends SSTab
             file.seek(info.position);
             return file;
         }
-        return new MappedFileDataInput(buffers[bufferIndex(info.position)], path, (int) (info.position
% BUFFER_SIZE));
+        return new MappedFileDataInput(buffers[bufferIndex(info.position)], path, BUFFER_SIZE
* (info.position / BUFFER_SIZE), (int) (info.position % BUFFER_SIZE));
     }
 
     static int bufferIndex(long position)

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java Sat
Mar 20 01:14:52 2010
@@ -35,7 +35,6 @@ import org.apache.cassandra.db.Decorated
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -52,6 +51,7 @@ public class SSTableWriter extends SSTab
     public SSTableWriter(String filename, long keyCount, IPartitioner partitioner) throws
IOException
     {
         super(filename, partitioner);
+        indexSummary = new IndexSummary();
         dataFile = new BufferedRandomAccessFile(path, "rw", (int)(DatabaseDescriptor.getFlushDataBufferSizeInMB()
* 1024 * 1024));
         indexFile = new BufferedRandomAccessFile(indexFilename(), "rw", (int)(DatabaseDescriptor.getFlushIndexBufferSizeInMB()
* 1024 * 1024));
         bf = BloomFilter.getFilter(keyCount, 15);
@@ -86,25 +86,7 @@ public class SSTableWriter extends SSTab
         if (logger.isTraceEnabled())
             logger.trace("wrote index of " + decoratedKey + " at " + indexPosition);
 
-        boolean spannedIndexEntry = SSTableReader.bufferIndex(indexPosition) != SSTableReader.bufferIndex(indexFile.getFilePointer());
-        if (keysWritten++ % INDEX_INTERVAL == 0 || spannedIndexEntry)
-        {
-            if (indexPositions == null)
-            {
-                indexPositions = new ArrayList<KeyPosition>();
-            }
-            KeyPosition info = new KeyPosition(decoratedKey, indexPosition);
-            indexPositions.add(info);
-
-            if (spannedIndexEntry)
-            {
-                if (spannedIndexDataPositions == null)
-                {
-                    spannedIndexDataPositions = new HashMap<KeyPosition, PositionSize>();
-                }
-                spannedIndexDataPositions.put(info, new PositionSize(dataPosition, dataSize));
-            }
-        }
+        indexSummary.maybeAddEntry(decoratedKey, dataPosition, dataSize, indexPosition, indexFile.getFilePointer());
     }
 
     // TODO make this take a DataOutputStream and wrap the byte[] version to combine them
@@ -153,7 +135,8 @@ public class SSTableWriter extends SSTab
         rename(filterFilename());
         path = rename(path); // important to do this last since index & filter file names
are derived from it
 
-        return new SSTableReader(path, partitioner, indexPositions, spannedIndexDataPositions,
bf);
+        indexSummary.complete();
+        return new SSTableReader(path, partitioner, indexSummary, bf);
     }
 
     static String rename(String tmpFilename)

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
Sat Mar 20 01:14:52 2010
@@ -252,7 +252,12 @@ public class BufferedRandomAccessFile ex
         }
         this.curr_ = pos;
     }
-    
+
+    public long getAbsolutePosition()
+    {
+        return getFilePointer();
+    }
+
     public long getFilePointer()
     {
         return this.curr_;

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/FileDataInput.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/FileDataInput.java?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/FileDataInput.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/FileDataInput.java
Sat Mar 20 01:14:52 2010
@@ -36,4 +36,6 @@ public interface FileDataInput extends D
     public void reset() throws IOException;
 
     public int bytesPastMark();
+
+    long getAbsolutePosition();
 }

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
Sat Mar 20 01:14:52 2010
@@ -30,21 +30,28 @@ public class MappedFileDataInput extends
     private final String filename;
     private int position;
     private int markedPosition;
+    private final long absoluteStartPosition;
 
-    public MappedFileDataInput(MappedByteBuffer buffer, String filename)
+    public MappedFileDataInput(MappedByteBuffer buffer, String filename, long absoluteStartPosition)
     {
-        this(buffer, filename, 0);
+        this(buffer, filename, absoluteStartPosition, 0);
     }
 
-    public MappedFileDataInput(MappedByteBuffer buffer, String filename, int position)
+    public MappedFileDataInput(MappedByteBuffer buffer, String filename, long absoluteStartPosition,
int position)
     {
         assert buffer != null;
+        this.absoluteStartPosition = absoluteStartPosition;
         this.buffer = buffer;
         this.filename = filename;
         this.position = position;
     }
 
-    // don't make this public, this is only for seeking WITHIN the current mapped segment
+    public long getAbsolutePosition()
+    {
+        return absoluteStartPosition + position;
+    }
+
+// don't make this public, this is only for seeking WITHIN the current mapped segment
     private void seekInternal(int pos) throws IOException
     {
         position = pos;

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
Sat Mar 20 01:14:52 2010
@@ -33,7 +33,7 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.CompactionIterator.CompactedRow;
 import org.apache.cassandra.io.ICompactSerializer;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.IndexSummary;
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.streaming.StreamOut;
 import org.apache.cassandra.net.IVerbHandler;
@@ -44,7 +44,6 @@ import org.apache.cassandra.utils.*;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Collections2;
-import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 
 /**
@@ -367,7 +366,7 @@ public class AntiEntropyService
             }
             if (cfs != null) // TODO test w/ valid CF definitions, this if{} shouldn't be
necessary
             {
-                for (SSTable.KeyPosition info: cfs.allIndexPositions())
+                for (IndexSummary.KeyPosition info: cfs.allIndexPositions())
                     keys.add(info.key);
             }
 

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java?rev=925514&r1=925513&r2=925514&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageService.java
Sat Mar 20 01:14:52 2010
@@ -27,7 +27,6 @@ import java.util.concurrent.CountDownLat
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.net.InetAddress;
@@ -41,7 +40,7 @@ import org.apache.cassandra.db.commitlog
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.DeletionService;
-import org.apache.cassandra.io.SSTable;
+import org.apache.cassandra.io.IndexSummary;
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.locator.*;
 import org.apache.cassandra.net.*;
@@ -1233,7 +1232,7 @@ public class StorageService implements I
         List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
-            for (SSTable.KeyPosition info: cfs.allIndexPositions())
+            for (IndexSummary.KeyPosition info: cfs.allIndexPositions())
             {
                 if (range.contains(info.key.token))
                     keys.add(info.key);
@@ -1262,7 +1261,7 @@ public class StorageService implements I
         List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
-            for (SSTable.KeyPosition info: cfs.allIndexPositions())
+            for (IndexSummary.KeyPosition info: cfs.allIndexPositions())
             {
                 if (range.contains(info.key.token))
                     keys.add(info.key);

Propchange: cassandra/branches/cassandra-0.6/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Sat Mar 20 01:14:52 2010
@@ -0,0 +1 @@
+*.iml



Mime
View raw message