cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r951813 - in /cassandra/trunk: src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/io/util/ src/java/org/apache/cassandra/streaming/ test/unit/org/apache/cassandra/db/ test/unit/org/apache/cassandra/io/sstable/
Date Sun, 06 Jun 2010 04:29:54 GMT
Author: jbellis
Date: Sun Jun  6 04:29:54 2010
New Revision: 951813

URL: http://svn.apache.org/viewvc?rev=951813&view=rev
Log:
replace constant-size 2GB mmaped segments and special casing for index entries spanning segment boundaries, with SegmentedFile that computes segments that always contain entire entries (or rows).  patch by Stu Hood and jbellis for CASSANDRA-1117

Added:
    cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java
Modified:
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexSummary.java?rev=951813&r1=951812&r2=951813&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexSummary.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/IndexSummary.java Sun Jun  6 04:29:54 2010
@@ -36,16 +36,12 @@ public class IndexSummary
     public static final int INDEX_INTERVAL = 128;/* Required extension for temporary files created during compactions. */
 
     private ArrayList<KeyPosition> indexPositions;
-    private Map<KeyPosition, SSTable.PositionSize> spannedIndexDataPositions;
-    private Map<Long, KeyPosition> spannedIndexPositions;
     private int keysWritten = 0;
     private long lastIndexPosition;
 
-    public void maybeAddEntry(DecoratedKey decoratedKey, long dataPosition, long rowSize, long indexPosition, long nextIndexPosition)
+    public void maybeAddEntry(DecoratedKey decoratedKey, long indexPosition)
     {
-        boolean spannedIndexEntry = DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap
-                                    && RowIndexedReader.bufferIndex(indexPosition) != RowIndexedReader.bufferIndex(nextIndexPosition);
-        if ((keysWritten++ % INDEX_INTERVAL == 0) || spannedIndexEntry)
+        if (keysWritten++ % INDEX_INTERVAL == 0)
         {
             if (indexPositions == null)
             {
@@ -53,26 +49,10 @@ public class IndexSummary
             }
             KeyPosition info = new KeyPosition(decoratedKey, indexPosition);
             indexPositions.add(info);
-
-            if (spannedIndexEntry)
-            {
-                if (spannedIndexDataPositions == null)
-                {
-                    spannedIndexDataPositions = new HashMap<KeyPosition, SSTable.PositionSize>();
-                    spannedIndexPositions = new HashMap<Long, KeyPosition>();
-                }
-                spannedIndexDataPositions.put(info, new SSTable.PositionSize(dataPosition, rowSize));
-                spannedIndexPositions.put(info.indexPosition, info);
-            }
         }
         lastIndexPosition = indexPosition;
     }
 
-    public Map<KeyPosition, SSTable.PositionSize> getSpannedIndexDataPositions()
-    {
-        return spannedIndexDataPositions;
-    }
-
     public List<KeyPosition> getIndexPositions()
     {
         return indexPositions;
@@ -83,36 +63,11 @@ public class IndexSummary
         indexPositions.trimToSize();
     }
 
-    public SSTable.PositionSize getSpannedDataPosition(KeyPosition sampledPosition)
-    {
-        if (spannedIndexDataPositions == null)
-            return null;
-        return spannedIndexDataPositions.get(sampledPosition);
-    }
-
-    public KeyPosition getSpannedIndexPosition(long nextIndexPosition)
-    {
-        return spannedIndexPositions == null ? null : spannedIndexPositions.get(nextIndexPosition);
-    }
-
-    public SSTable.PositionSize getSpannedDataPosition(long nextIndexPosition)
-    {
-        if (spannedIndexDataPositions == null)
-            return null;
-
-        KeyPosition info = spannedIndexPositions.get(nextIndexPosition);
-        if (info == null)
-            return null;
-
-        return spannedIndexDataPositions.get(info);
-    }
-
     public long getLastIndexPosition()
     {
         return lastIndexPosition;
     }
 
-
     /**
      * This is a simple container for the index Key and its corresponding position
      * in the index file. Binary search is performed on a list of these objects

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedReader.java?rev=951813&r1=951812&r2=951813&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedReader.java Sun Jun  6 04:29:54 2010
@@ -21,32 +21,20 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.*;
 import java.util.*;
-import java.lang.ref.ReferenceQueue;
-import java.lang.ref.Reference;
-import java.nio.channels.FileChannel;
-import java.nio.MappedByteBuffer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.cache.InstrumentedCache;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.config.Config;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryFilter;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.io.util.MappedFileDataInput;
-
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
+import org.apache.cassandra.io.util.SegmentedFile;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
@@ -58,17 +46,19 @@ class RowIndexedReader extends SSTableRe
 {
     private static final Logger logger = LoggerFactory.getLogger(RowIndexedReader.class);
 
-    // in a perfect world, BUFFER_SIZE would be final, but we need to test with a smaller size to stay sane.
-    static long BUFFER_SIZE = Integer.MAX_VALUE;
-
-    // jvm can only map up to 2GB at a time, so we split index/data into segments of that size when using mmap i/o
-    private final MappedByteBuffer[] indexBuffers;
-    private final MappedByteBuffer[] buffers;
-
-    private InstrumentedCache<Pair<Descriptor,DecoratedKey>, PositionSize> keyCache;
+    // guesstimated size of INDEX_INTERVAL index entries
+    private static final int INDEX_FILE_BUFFER_BYTES = 16 * IndexSummary.INDEX_INTERVAL;
+  
+    // indexfile and datafile: might be null before a call to load()
+    private SegmentedFile ifile;
+    private SegmentedFile dfile;
+  
+    private InstrumentedCache<Pair<Descriptor,DecoratedKey>, Long> keyCache;
 
     RowIndexedReader(Descriptor desc,
                      IPartitioner partitioner,
+                     SegmentedFile ifile,
+                     SegmentedFile dfile,
                      IndexSummary indexSummary,
                      BloomFilter bloomFilter,
                      long maxDataAge)
@@ -76,68 +66,39 @@ class RowIndexedReader extends SSTableRe
     {
         super(desc, partitioner, maxDataAge);
 
-        if (DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.mmap)
-        {
-            long indexLength = new File(indexFilename()).length();
-            int bufferCount = 1 + (int) (indexLength / BUFFER_SIZE);
-            indexBuffers = new MappedByteBuffer[bufferCount];
-            long remaining = indexLength;
-            for (int i = 0; i < bufferCount; i++)
-            {
-                indexBuffers[i] = mmap(indexFilename(), i * BUFFER_SIZE, (int) Math.min(remaining, BUFFER_SIZE));
-                remaining -= BUFFER_SIZE;
-            }
-        }
-        else
-        {
-            assert DatabaseDescriptor.getIndexAccessMode() == Config.DiskAccessMode.standard;
-            indexBuffers = null;
-        }
-
-        if (DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
-        {
-            int bufferCount = 1 + (int) (new File(getFilename()).length() / BUFFER_SIZE);
-            buffers = new MappedByteBuffer[bufferCount];
-            long remaining = length();
-            for (int i = 0; i < bufferCount; i++)
-            {
-                buffers[i] = mmap(getFilename(), i * BUFFER_SIZE, (int) Math.min(remaining, BUFFER_SIZE));
-                remaining -= BUFFER_SIZE;
-            }
-        }
-        else
-        {
-            assert DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.standard;
-            buffers = null;
-        }
 
+        this.ifile = ifile;
+        this.dfile = dfile;
         this.indexSummary = indexSummary;
         this.bf = bloomFilter;
     }
 
-    RowIndexedReader(Descriptor desc, IPartitioner partitioner) throws IOException
-    {
-        this(desc, partitioner, null, null, System.currentTimeMillis());
-    }
-
-    public static RowIndexedReader open(Descriptor desc, IPartitioner partitioner) throws IOException
+    /** Open a RowIndexedReader which needs its state loaded from disk. */
+    static RowIndexedReader internalOpen(Descriptor desc, IPartitioner partitioner) throws IOException
     {
-        RowIndexedReader sstable = new RowIndexedReader(desc, partitioner);
+        RowIndexedReader sstable = new RowIndexedReader(desc, partitioner, null, null, null, null, System.currentTimeMillis());
 
+        // versions before 'c' encoded keys as utf-16 before hashing to the filter
         if (desc.versionCompareTo("c") < 0)
-        {
-            // versions before 'c' encoded keys as utf-16 before hashing to the filter
-            sstable.loadIndexFile(true);
-        }
+            sstable.load(true);
         else
         {
-            sstable.loadIndexFile(false);
+            sstable.load(false);
             sstable.loadBloomFilter();
         }
 
         return sstable;
     }
 
+    /**
+     * Open a RowIndexedReader which already has its state initialized (by SSTableWriter).
+     */
+    static RowIndexedReader internalOpen(Descriptor desc, IPartitioner partitioner, SegmentedFile ifile, SegmentedFile dfile, IndexSummary isummary, BloomFilter bf, long maxDataAge) throws IOException
+    {
+        assert desc != null && partitioner != null && ifile != null && dfile != null && isummary != null && bf != null;
+        return new RowIndexedReader(desc, partitioner, ifile, dfile, isummary, bf, maxDataAge);
+    }
+
     public long estimatedKeys()
     {
         return indexSummary.getIndexPositions().size() * IndexSummary.INDEX_INTERVAL;
@@ -168,51 +129,36 @@ class RowIndexedReader extends SSTableRe
     }
 
     /**
-     * @param recreatebloom If true, rebuild the bloom filter based on keys from the index.
+     * Loads ifile, dfile and indexSummary, and optionally recreates the bloom filter.
      */
-    void loadIndexFile(boolean recreatebloom) throws IOException
+    private void load(boolean recreatebloom) throws IOException
     {
+        SegmentedFile.Builder ibuilder = SegmentedFile.getBuilder();
+        SegmentedFile.Builder dbuilder = SegmentedFile.getBuilder();
 
         // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
-        // any entries that do, we force into the in-memory sample so key lookup can always bsearch within
-        // a single mmapped segment.
         indexSummary = new IndexSummary();
         BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(), "r");
-        if (recreatebloom)
-        {
-            // estimate key count based on index length
-            bf = BloomFilter.getFilter((int)(input.length() / 32), 15);
-        }
         try
         {
             long indexSize = input.length();
+            if (recreatebloom)
+                // estimate key count based on index length
+                bf = BloomFilter.getFilter((int)(input.length() / 32), 15);
             while (true)
             {
                 long indexPosition = input.getFilePointer();
                 if (indexPosition == indexSize)
-                {
                     break;
-                }
+
                 DecoratedKey decoratedKey = partitioner.convertFromDiskFormat(FBUtilities.readShortByteArray(input));
                 if (recreatebloom)
-                {
                     bf.add(decoratedKey.key);
-                }
                 long dataPosition = input.readLong();
-                long nextIndexPosition = input.getFilePointer();
-                // read the next index entry to see how big the row is
-                long nextDataPosition;
-                if (input.isEOF())
-                {
-                    nextDataPosition = length();
-                }
-                else
-                {
-                    FBUtilities.readShortByteArray(input);
-                    nextDataPosition = input.readLong();
-                    input.seek(nextIndexPosition);
-                }
-                indexSummary.maybeAddEntry(decoratedKey, dataPosition, nextDataPosition - dataPosition, indexPosition, nextIndexPosition);
+
+                indexSummary.maybeAddEntry(decoratedKey, indexPosition);
+                ibuilder.addPotentialBoundary(indexPosition);
+                dbuilder.addPotentialBoundary(dataPosition);
             }
             indexSummary.complete();
         }
@@ -220,6 +166,11 @@ class RowIndexedReader extends SSTableRe
         {
             input.close();
         }
+
+        // finalize the state of the reader
+        indexSummary.complete();
+        ifile = ibuilder.complete(indexFilename());
+        dfile = dbuilder.complete(getFilename());
     }
 
     @Override
@@ -250,19 +201,19 @@ class RowIndexedReader extends SSTableRe
     }
 
     /**
-     * returns the position in the data file to find the given key, or -1 if the key is not present
+     * @return The position in the data file to find the given key, or -1 if the key is not present
      */
-    public PositionSize getPosition(DecoratedKey decoratedKey)
+    public long getPosition(DecoratedKey decoratedKey)
     {
         // first, check bloom filter
         if (!bf.isPresent(partitioner.convertToDiskFormat(decoratedKey)))
-            return null;
+            return -1;
 
         // next, the key cache
         Pair<Descriptor, DecoratedKey> unifiedKey = new Pair<Descriptor, DecoratedKey>(desc, decoratedKey);
         if (keyCache != null && keyCache.getCapacity() > 0)
         {
-            PositionSize cachedPosition = keyCache.get(unifiedKey);
+            Long cachedPosition = keyCache.get(unifiedKey);
             if (cachedPosition != null)
             {
                 return cachedPosition;
@@ -272,168 +223,99 @@ class RowIndexedReader extends SSTableRe
         // next, see if the sampled index says it's impossible for the key to be present
         IndexSummary.KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
         if (sampledPosition == null)
-            return null;
-
-        // get either a buffered or a mmap'd input for the on-disk index
-        long p = sampledPosition.indexPosition;
-        FileDataInput input;
-        try
-        {
-            if (indexBuffers == null)
-            {
-                input = new BufferedRandomAccessFile(indexFilename(), "r");
-                ((BufferedRandomAccessFile)input).seek(p);
-            }
-            else
-            {
-                input = indexInputAt(p);
-            }
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
+            return -1;
 
         // scan the on-disk index, starting at the nearest sampled position
-        try
+        int i = 0;
+        Iterator<FileDataInput> segments = ifile.iterator(sampledPosition.indexPosition, INDEX_FILE_BUFFER_BYTES);
+        while (segments.hasNext())
         {
-            int i = 0;
-            do
+            FileDataInput input = segments.next();
+            try
             {
-                // handle exact sampled index hit
-                IndexSummary.KeyPosition kp = indexSummary.getSpannedIndexPosition(input.getAbsolutePosition());
-                if (kp != null && kp.key.equals(decoratedKey))
-                    return indexSummary.getSpannedDataPosition(kp);
-
-                // if using mmapped i/o, skip to the next mmap buffer if necessary
-                if (input.isEOF() || kp != null)
+                while (!input.isEOF() && i++ < IndexSummary.INDEX_INTERVAL)
                 {
-                    if (indexBuffers == null) // not mmap-ing, just one index input
-                        break;
+                    // read key & data position from index entry
+                    DecoratedKey indexDecoratedKey = partitioner.convertFromDiskFormat(FBUtilities.readShortByteArray(input));
+                    long dataPosition = input.readLong();
 
-                    FileDataInput oldInput = input;
-                    if (kp == null)
-                    {
-                        input = indexInputAt(input.getAbsolutePosition());
-                    }
-                    else
+                    int v = indexDecoratedKey.compareTo(decoratedKey);
+                    if (v == 0)
                     {
-                        int keylength = StorageService.getPartitioner().convertToDiskFormat(kp.key).length;
-                        long nextUnspannedPostion = input.getAbsolutePosition()
-                                                    + DBConstants.shortSize_ + keylength
-                                                    + DBConstants.longSize_;
-                        input = indexInputAt(nextUnspannedPostion);
+                        if (keyCache != null && keyCache.getCapacity() > 0)
+                            keyCache.put(unifiedKey, Long.valueOf(dataPosition));
+                        return dataPosition;
                     }
-                    oldInput.close();
-                    if (input == null)
-                        break;
-
-                    continue;
-                }
-
-                // read key & data position from index entry
-                DecoratedKey indexDecoratedKey = partitioner.convertFromDiskFormat(FBUtilities.readShortByteArray(input));
-                long dataPosition = input.readLong();
-
-                int v = indexDecoratedKey.compareTo(decoratedKey);
-                if (v == 0)
-                {
-                    PositionSize info = getDataPositionSize(input, dataPosition);
-                    if (keyCache != null && keyCache.getCapacity() > 0)
-                        keyCache.put(unifiedKey, info);
-                    return info;
+                    if (v > 0)
+                        return -1;
                 }
-                if (v > 0)
-                    return null;
-            } while  (++i < IndexSummary.INDEX_INTERVAL);
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
-        finally
-        {
-            try
-            {
-                if (input != null)
-                    input.close();
             }
             catch (IOException e)
             {
-                logger.error("error closing file", e);
+                throw new IOError(e);
+            }
+            finally
+            {
+                try
+                {
+                    input.close();
+                }
+                catch (IOException e)
+                {
+                    logger.error("error closing file", e);
+                }
             }
         }
-        return null;
-    }
-
-    private FileDataInput indexInputAt(long indexPosition)
-    {
-        if (indexPosition > indexSummary.getLastIndexPosition())
-            return null;
-        int bufferIndex = bufferIndex(indexPosition);
-        return new MappedFileDataInput(indexBuffers[bufferIndex], indexFilename(), BUFFER_SIZE * bufferIndex, (int)(indexPosition % BUFFER_SIZE));
+        return -1;
     }
 
-    private PositionSize getDataPositionSize(FileDataInput input, long dataPosition) throws IOException
-    {
-        // if we've reached the end of the index, then the row size is "the rest of the data file"
-        if (input.isEOF())
-            return new PositionSize(dataPosition, length() - dataPosition);
-
-        // otherwise, row size is the start of the next row (in next index entry), minus the start of this one.
-        long nextIndexPosition = input.getAbsolutePosition();
-        // if next index entry would span mmap boundary, get the next row position from the summary instead
-        PositionSize nextPositionSize = indexSummary.getSpannedDataPosition(nextIndexPosition);
-        if (nextPositionSize != null)
-            return new PositionSize(dataPosition, nextPositionSize.position - dataPosition);
-
-        // read next entry directly
-        int utflen = input.readUnsignedShort();
-        if (utflen != input.skipBytes(utflen))
-            throw new EOFException();
-        return new PositionSize(dataPosition, input.readLong() - dataPosition);
-    }
-
-    /** like getPosition, but if key is not found will return the location of the first key _greater_ than the desired one, or -1 if no such key exists. */
-    public long getNearestPosition(DecoratedKey decoratedKey) throws IOException
+    /**
+     * @return The location of the first key _greater_ than the desired one, or -1 if no such key exists.
+     */
+    public long getNearestPosition(DecoratedKey decoratedKey)
     {
         IndexSummary.KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
         if (sampledPosition == null)
-        {
             return 0;
-        }
 
-        // can't use a MappedFileDataInput here, since we might cross a segment boundary while scanning
-        BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(), "r");
-        input.seek(sampledPosition.indexPosition);
-        try
+        // scan the on-disk index, starting at the nearest sampled position
+        Iterator<FileDataInput> segiter = ifile.iterator(sampledPosition.indexPosition, INDEX_FILE_BUFFER_BYTES);
+        while (segiter.hasNext())
         {
-            while (true)
+            FileDataInput input = segiter.next();
+            try
+            {
+                while (!input.isEOF())
+                {
+                    DecoratedKey indexDecoratedKey = partitioner.convertFromDiskFormat(FBUtilities.readShortByteArray(input));
+                    long position = input.readLong();
+                    int v = indexDecoratedKey.compareTo(decoratedKey);
+                    if (v >= 0)
+                        return position;
+                }
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+            finally
             {
-                DecoratedKey indexDecoratedKey;
                 try
                 {
-                    indexDecoratedKey = partitioner.convertFromDiskFormat(FBUtilities.readShortByteArray(input));
+                    input.close();
                 }
-                catch (EOFException e)
+                catch (IOException e)
                 {
-                    return -1;
+                    logger.error("error closing file", e);
                 }
-                long position = input.readLong();
-                int v = indexDecoratedKey.compareTo(decoratedKey);
-                if (v >= 0)
-                    return position;
             }
         }
-        finally
-        {
-            input.close();
-        }
+        return -1;
     }
 
     public long length()
     {
-        return new File(getFilename()).length();
+        return dfile.length;
     }
 
     public int compareTo(SSTableReader o)
@@ -458,29 +340,11 @@ class RowIndexedReader extends SSTableRe
     
     public FileDataInput getFileDataInput(DecoratedKey decoratedKey, int bufferSize)
     {
-        PositionSize info = getPosition(decoratedKey);
-        if (info == null)
+        long position = getPosition(decoratedKey);
+        if (position < 0)
             return null;
 
-        if (buffers == null || (bufferIndex(info.position) != bufferIndex(info.position + info.size)))
-        {
-            try
-            {
-                BufferedRandomAccessFile file = new BufferedRandomAccessFile(getFilename(), "r", bufferSize);
-                file.seek(info.position);
-                return file;
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-        }
-        return new MappedFileDataInput(buffers[bufferIndex(info.position)], getFilename(), BUFFER_SIZE * (info.position / BUFFER_SIZE), (int) (info.position % BUFFER_SIZE));
-    }
-
-    static int bufferIndex(long position)
-    {
-        return (int) (position / BUFFER_SIZE);
+        return dfile.getSegment(position, bufferSize);
     }
 
     public InstrumentedCache getKeyCache()

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=951813&r1=951812&r2=951813&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Sun Jun  6 04:29:54 2010
@@ -22,18 +22,17 @@ package org.apache.cassandra.io.sstable;
 import java.io.File;
 import java.io.IOError;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Arrays;
 import java.util.StringTokenizer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.StringUtils;
 
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.db.DecoratedKey;
 
 import com.google.common.base.Objects;
 
@@ -67,6 +66,9 @@ public abstract class SSTable
 
     public static final String TEMPFILE_MARKER = "tmp";
 
+    // TODO streaming relies on the -Data (getFilename) file to be last, this is clunky
+    public static List<String> components = Collections.unmodifiableList(Arrays.asList(COMPONENT_FILTER, COMPONENT_INDEX, COMPONENT_DATA));
+
     protected SSTable(String filename, IPartitioner partitioner)
     {
         assert filename.endsWith("-" + COMPONENT_DATA);
@@ -90,11 +92,6 @@ public abstract class SSTable
         return desc;
     }
 
-    protected static String parseColumnFamilyName(String filename)
-    {
-        return new File(filename).getName().split("-")[0];
-    }
-
     public static String indexFilename(String dataFile)
     {
         return Descriptor.fromFilename(dataFile).filenameFor(COMPONENT_INDEX);
@@ -160,13 +157,6 @@ public abstract class SSTable
         return desc.filenameFor(COMPONENT_DATA);
     }
 
-    /** @return component names for files associated w/ this SSTable */
-    public List<String> getAllComponents()
-    {
-        // TODO streaming relies on the -Data (getFilename) file to be last, this is clunky
-        return Arrays.asList(COMPONENT_FILTER, COMPONENT_INDEX, COMPONENT_DATA);
-    }
-
     public String getColumnFamilyName()
     {
         return desc.cfname;
@@ -195,7 +185,7 @@ public abstract class SSTable
     public long bytesOnDisk()
     {
         long bytes = 0;
-        for (String cname : getAllComponents())
+        for (String cname : components)
         {
             bytes += new File(desc.filenameFor(cname)).length();
         }
@@ -210,18 +200,6 @@ public abstract class SSTable
                ')';
     }
 
-    public static class PositionSize
-    {
-        public final long position;
-        public final long size;
-
-        public PositionSize(long position, long size)
-        {
-            this.position = position;
-            this.size = size;
-        }
-    }
-
     /**
      * A SSTable is described by the keyspace and column family it contains data
      * for, a generation (where higher generations contain more recent data) and

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=951813&r1=951812&r2=951813&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Sun Jun  6 04:29:54 2010
@@ -144,7 +144,7 @@ public abstract class SSTableReader exte
         // FIXME: version conditional readers here
         if (true)
         {
-            sstable = RowIndexedReader.open(descriptor, partitioner);
+            sstable = RowIndexedReader.internalOpen(descriptor, partitioner);
         }
 
         if (logger.isDebugEnabled())
@@ -159,28 +159,6 @@ public abstract class SSTableReader exte
         finalizers.add(phantomReference);
     }
 
-    protected static MappedByteBuffer mmap(String filename, long start, int size) throws IOException
-    {
-        RandomAccessFile raf;
-        try
-        {
-            raf = new RandomAccessFile(filename, "r");
-        }
-        catch (FileNotFoundException e)
-        {
-            throw new IOError(e);
-        }
-
-        try
-        {
-            return raf.getChannel().map(FileChannel.MapMode.READ_ONLY, start, size);
-        }
-        finally
-        {
-            raf.close();
-        }
-    }
-
     protected SSTableReader(Descriptor desc, IPartitioner partitioner, long maxDataAge)
     {
         super(desc, partitioner);
@@ -215,7 +193,7 @@ public abstract class SSTableReader exte
      * FIXME: should not be public: use Scanner.
      */
     @Deprecated
-    public abstract PositionSize getPosition(DecoratedKey decoratedKey) throws IOException;
+    public abstract long getPosition(DecoratedKey decoratedKey) throws IOException;
 
     /**
      * Like getPosition, but if key is not found will return the location of the
@@ -259,10 +237,6 @@ public abstract class SSTableReader exte
      */
     public abstract SSTableScanner getScanner(int bufferSize, QueryFilter filter);
     
-    /**
-     * FIXME: should not be public: use Scanner.
-     */
-    @Deprecated
     public abstract FileDataInput getFileDataInput(DecoratedKey decoratedKey, int bufferSize);
 
     public AbstractType getColumnComparator()

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java?rev=951813&r1=951812&r2=951813&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java Sun Jun  6 04:29:54 2010
@@ -42,7 +42,7 @@ public class SSTableTracker implements I
     private final String ksname;
     private final String cfname;
 
-    private final JMXInstrumentedCache<Pair<SSTable.Descriptor,DecoratedKey>,SSTable.PositionSize> keyCache;
+    private final JMXInstrumentedCache<Pair<SSTable.Descriptor,DecoratedKey>,Long> keyCache;
     private final JMXInstrumentedCache<DecoratedKey, ColumnFamily> rowCache;
 
     public SSTableTracker(String ksname, String cfname)
@@ -50,7 +50,7 @@ public class SSTableTracker implements I
         this.ksname = ksname;
         this.cfname = cfname;
         sstables = Collections.emptySet();
-        keyCache = new JMXInstrumentedCache<Pair<SSTable.Descriptor,DecoratedKey>,SSTable.PositionSize>(ksname, cfname + "KeyCache", 0);
+        keyCache = new JMXInstrumentedCache<Pair<SSTable.Descriptor,DecoratedKey>,Long>(ksname, cfname + "KeyCache", 0);
         rowCache = new JMXInstrumentedCache<DecoratedKey, ColumnFamily>(ksname, cfname + "RowCache", 0);
     }
 
@@ -176,7 +176,7 @@ public class SSTableTracker implements I
         totalSize.addAndGet(-size);
     }
 
-    public JMXInstrumentedCache<Pair<SSTable.Descriptor, DecoratedKey>, SSTable.PositionSize> getKeyCache()
+    public JMXInstrumentedCache<Pair<SSTable.Descriptor, DecoratedKey>, Long> getKeyCache()
     {
         return keyCache;
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=951813&r1=951812&r2=951813&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Sun Jun  6 04:29:54 2010
@@ -41,8 +41,6 @@ import java.io.DataOutputStream;
 import java.io.FileOutputStream;
 import java.io.IOError;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,6 +50,7 @@ import org.apache.cassandra.db.Decorated
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.SegmentedFile;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -59,15 +58,19 @@ public class SSTableWriter extends SSTab
 {
     private static Logger logger = LoggerFactory.getLogger(SSTableWriter.class);
 
-    private BufferedRandomAccessFile dataFile;
-    private BufferedRandomAccessFile indexFile;
+    private SegmentedFile.Builder ibuilder;
+    private SegmentedFile.Builder dbuilder;
+    private final BufferedRandomAccessFile dataFile;
+    private final BufferedRandomAccessFile indexFile;
+    private final BloomFilter bf;
     private DecoratedKey lastWrittenKey;
-    private BloomFilter bf;
 
     public SSTableWriter(String filename, long keyCount, IPartitioner partitioner) throws IOException
     {
         super(filename, partitioner);
         indexSummary = new IndexSummary();
+        ibuilder = SegmentedFile.getBuilder();
+        dbuilder = SegmentedFile.getBuilder();
         dataFile = new BufferedRandomAccessFile(getFilename(), "rw", (int)(DatabaseDescriptor.getFlushDataBufferSizeInMB() * 1024 * 1024));
         indexFile = new BufferedRandomAccessFile(indexFilename(), "rw", (int)(DatabaseDescriptor.getFlushIndexBufferSizeInMB() * 1024 * 1024));
         bf = BloomFilter.getFilter(keyCount, 15);
@@ -102,8 +105,9 @@ public class SSTableWriter extends SSTab
         if (logger.isTraceEnabled())
             logger.trace("wrote index of " + decoratedKey + " at " + indexPosition);
 
-        int rowSize = (int)(dataFile.getFilePointer() - dataPosition);
-        indexSummary.maybeAddEntry(decoratedKey, dataPosition, rowSize, indexPosition, indexFile.getFilePointer());
+        indexSummary.maybeAddEntry(decoratedKey, indexPosition);
+        ibuilder.addPotentialBoundary(indexPosition);
+        dbuilder.addPotentialBoundary(dataPosition);
     }
 
     // TODO make this take a DataOutputStream and wrap the byte[] version to combine them
@@ -150,27 +154,31 @@ public class SSTableWriter extends SSTab
         // main data
         dataFile.close(); // calls force
 
-        Descriptor newdesc = desc.asTemporary(false);
-        rename(indexFilename());
-        rename(filterFilename());
-        rename(getFilename());
+        // remove the 'tmp' marker from all components
+        Descriptor newdesc = rename(desc);
 
+        // finalize in-memory state for the reader
         indexSummary.complete();
-        return new RowIndexedReader(newdesc, partitioner, indexSummary, bf, maxDataAge);
+        SegmentedFile ifile = ibuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_INDEX));
+        SegmentedFile dfile = dbuilder.complete(newdesc.filenameFor(SSTable.COMPONENT_DATA));
+        ibuilder = null;
+        dbuilder = null;
+        return RowIndexedReader.internalOpen(newdesc, partitioner, ifile, dfile, indexSummary, bf, maxDataAge);
     }
 
-    static String rename(String tmpFilename)
+    static Descriptor rename(Descriptor tmpdesc)
     {
-        String filename = tmpFilename.replace("-" + SSTable.TEMPFILE_MARKER, "");
+        Descriptor newdesc = tmpdesc.asTemporary(false);
         try
         {
-            FBUtilities.renameWithConfirm(tmpFilename, filename);
+            for (String component : components)
+                FBUtilities.renameWithConfirm(tmpdesc.filenameFor(component), newdesc.filenameFor(component));
         }
         catch (IOException e)
         {
             throw new IOError(e);
         }
-        return filename;
+        return newdesc;
     }
 
     public long getFilePointer()
@@ -178,11 +186,8 @@ public class SSTableWriter extends SSTab
         return dataFile.getFilePointer();
     }
     
-    public static SSTableReader renameAndOpen(String dataFileName) throws IOException
+    public static SSTableReader renameAndOpen(Descriptor tmpdesc) throws IOException
     {
-        SSTableWriter.rename(indexFilename(dataFileName));
-        SSTableWriter.rename(filterFilename(dataFileName));
-        dataFileName = SSTableWriter.rename(dataFileName);
-        return SSTableReader.open(dataFileName);
+        return SSTableReader.open(rename(tmpdesc));
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=951813&r1=951812&r2=951813&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java Sun Jun  6 04:29:54 2010
@@ -260,11 +260,6 @@ public class BufferedRandomAccessFile ex
         this.curr_ = pos;
     }
 
-    public long getAbsolutePosition()
-    {
-        return getFilePointer();
-    }
-
     public long getFilePointer()
     {
         return this.curr_;
@@ -408,6 +403,11 @@ public class BufferedRandomAccessFile ex
         return getFilePointer() == length();
     }
 
+    public long bytesRemaining() throws IOException
+    {
+        return length() - getFilePointer();
+    }
+
     public void mark()
     {
         markedPointer = getFilePointer();

Added: cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java?rev=951813&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java Sun Jun  6 04:29:54 2010
@@ -0,0 +1,50 @@
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+
+public class BufferedSegmentedFile extends SegmentedFile
+{
+    public BufferedSegmentedFile(String path, long length)
+    {
+        super(path, length);
+    }
+
+    public static class Builder extends SegmentedFile.Builder
+    {
+        /**
+         * Adds a position that would be a safe place for a segment boundary in the file. For a block/row based file
+         * format, safe boundaries are block/row edges.
+         * @param boundary The absolute position of the potential boundary in the file.
+         */
+        public void addPotentialBoundary(long boundary)
+        {
+            // only one segment in a standard-io file
+        }
+
+        /**
+         * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
+         * @param path The file on disk.
+         */
+        public SegmentedFile complete(String path)
+        {
+            long length = new File(path).length();
+            return new BufferedSegmentedFile(path, length);
+        }
+    }
+
+    public FileDataInput getSegment(long position, int bufferSize)
+    {
+        try
+        {
+            BufferedRandomAccessFile file = new BufferedRandomAccessFile(path, "r", bufferSize);
+            file.seek(position);
+            return file;
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java?rev=951813&r1=951812&r2=951813&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/FileDataInput.java Sun Jun  6 04:29:54 2010
@@ -31,11 +31,11 @@ public interface FileDataInput extends D
 
     public boolean isEOF() throws IOException;
 
+    public long bytesRemaining() throws IOException;
+
     public void mark();
 
     public void reset() throws IOException;
 
     public int bytesPastMark();
-
-    long getAbsolutePosition();
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java?rev=951813&r1=951812&r2=951813&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/MappedFileDataInput.java Sun Jun  6 04:29:54 2010
@@ -30,28 +30,16 @@ public class MappedFileDataInput extends
     private final String filename;
     private int position;
     private int markedPosition;
-    private final long absoluteStartPosition;
 
-    public MappedFileDataInput(MappedByteBuffer buffer, String filename, long absoluteStartPosition)
-    {
-        this(buffer, filename, absoluteStartPosition, 0);
-    }
-
-    public MappedFileDataInput(MappedByteBuffer buffer, String filename, long absoluteStartPosition, int position)
+    public MappedFileDataInput(MappedByteBuffer buffer, String filename, int position)
     {
         assert buffer != null;
-        this.absoluteStartPosition = absoluteStartPosition;
         this.buffer = buffer;
         this.filename = filename;
         this.position = position;
     }
 
-    public long getAbsolutePosition()
-    {
-        return absoluteStartPosition + position;
-    }
-
-// don't make this public, this is only for seeking WITHIN the current mapped segment
+    // don't make this public, this is only for seeking WITHIN the current mapped segment
     private void seekInternal(int pos) throws IOException
     {
         position = pos;
@@ -91,6 +79,11 @@ public class MappedFileDataInput extends
         return position == buffer.capacity();
     }
 
+    public long bytesRemaining() throws IOException
+    {
+        return buffer.capacity() - position;
+    }
+
     public String getPath()
     {
         return filename;

Added: cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java?rev=951813&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java Sun Jun  6 04:29:54 2010
@@ -0,0 +1,167 @@
+package org.apache.cassandra.io.util;
+
+import java.io.File;
+import java.io.IOError;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class MmappedSegmentedFile extends SegmentedFile
+{
+    // in a perfect world, MAX_SEGMENT_SIZE would be final, but we need to test with a smaller size to stay sane.
+    public static long MAX_SEGMENT_SIZE = Integer.MAX_VALUE;
+
+    /**
+     * Sorted array of segment offsets and MappedByteBuffers for segments. If mmap is completely disabled, or if the
+     * segment would be too long to mmap, the value for an offset will be null, indicating that we need to fall back
+     * to a RandomAccessFile.
+     */
+    private final Segment[] segments;
+
+    public MmappedSegmentedFile(String path, long length, Segment[] segments)
+    {
+        super(path, length);
+        this.segments = segments;
+    }
+
+    /**
+     * @return The segment entry for the given position.
+     */
+    private Segment floor(long position)
+    {
+        assert 0 <= position && position < length: position + " vs " + length;
+        Segment seg = new Segment(position, null);
+        int idx = Arrays.binarySearch(segments, seg);
+        assert idx != -1 : "Bad position " + position + " in segments " + Arrays.toString(segments);
+        if (idx < 0)
+            // round down to entry at insertion point
+            idx = -(idx + 2);
+        return segments[idx];
+    }
+
+    /**
+     * @return The segment containing the given position: must be closed after use.
+     */
+    public FileDataInput getSegment(long position, int bufferSize)
+    {
+        Segment segment = floor(position);
+        if (segment.right != null)
+        {
+            // segment is mmap'd
+            return new MappedFileDataInput(segment.right, path, (int) (position - segment.left));
+        }
+
+        // not mmap'd: open a braf covering the segment
+        try
+        {
+            // FIXME: brafs are unbounded, so this segment will cover the rest of the file, rather than just the row
+            BufferedRandomAccessFile file = new BufferedRandomAccessFile(path, "r", bufferSize);
+            file.seek(position);
+            return file;
+        }
+        catch (IOException e)
+        {
+            throw new IOError(e);
+        }
+    }
+
+    /**
+     * Overrides the default behaviour to create segments of a maximum size.
+     */
+    static class Builder extends SegmentedFile.Builder
+    {
+        // planned segment boundaries
+        private final List<Long> boundaries;
+
+        // offset of the open segment (first segment begins at 0).
+        private long currentStart = 0;
+
+        // current length of the open segment.
+        // used to allow merging multiple too-large-to-mmap segments, into a single buffered segment.
+        private long currentSize = 0;
+
+        public Builder()
+        {
+            super();
+            boundaries = new ArrayList<Long>();
+            boundaries.add(0L);
+        }
+
+        @Override
+        public void addPotentialBoundary(long boundary)
+        {
+            if (boundary - currentStart <= MAX_SEGMENT_SIZE)
+            {
+                // boundary fits into current segment: expand it
+                currentSize = boundary - currentStart;
+                return;
+            }
+
+            // close the current segment to try and make room for the boundary
+            if (currentSize > 0)
+            {
+                currentStart += currentSize;
+                boundaries.add(currentStart);
+            }
+            currentSize = boundary - currentStart;
+
+            // if we couldn't make room, the boundary needs its own segment
+            if (currentSize > MAX_SEGMENT_SIZE)
+            {
+                currentStart = boundary;
+                boundaries.add(currentStart);
+                currentSize = 0;
+            }
+        }
+
+        @Override
+        public SegmentedFile complete(String path)
+        {
+            long length = new File(path).length();
+            // add a sentinel value == length
+            boundaries.add(Long.valueOf(length));
+            // create the segments
+            return new MmappedSegmentedFile(path, length, createSegments(path));
+        }
+
+        private Segment[] createSegments(String path)
+        {
+            int segcount = boundaries.size() - 1;
+            Segment[] segments = new Segment[segcount];
+            RandomAccessFile raf = null;
+            try
+            {
+                raf = new RandomAccessFile(path, "r");
+                for (int i = 0; i < segcount; i++)
+                {
+                    long start = boundaries.get(i);
+                    long size = boundaries.get(i + 1) - start;
+                    MappedByteBuffer segment = size <= MAX_SEGMENT_SIZE
+                                               ? raf.getChannel().map(FileChannel.MapMode.READ_ONLY, start, size)
+                                               : null;
+                    segments[i] = new Segment(start, segment);
+                }
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+            finally
+            {
+                try
+                {
+                    if (raf != null) raf.close();
+                }
+                catch (IOException e)
+                {
+                    throw new IOError(e);
+                }
+            }
+            return segments;
+        }
+    }
+}

Added: cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java?rev=951813&view=auto
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java (added)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/SegmentedFile.java Sun Jun  6 04:29:54 2010
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.io.util;
+
+import java.io.*;
+import java.util.*;
+import java.nio.MappedByteBuffer;
+
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Config;
+
+/**
+ * Abstracts a read-only file that has been split into segments, each of which can be represented by an independent
+ * FileDataInput. Allows for iteration over the FileDataInputs, or random access to the FileDataInput for a given
+ * position.
+ *
+ * The JVM can only map up to 2GB at a time, so each segment is at most that size when using mmap i/o. If a segment
+ * would need to be longer than 2GB, that segment will not be mmap'd, and a new RandomAccessFile will be created for
+ * each access to that segment.
+ */
+public abstract class SegmentedFile
+{
+    public final String path;
+    public final long length;
+
+    /**
+     * Use getBuilder to get a Builder to construct a SegmentedFile.
+     */
+    SegmentedFile(String path, long length)
+    {
+        this.path = path;
+        this.length = length;
+    }
+
+    /**
+     * @return A SegmentedFile.Builder.
+     */
+    public static Builder getBuilder()
+    {
+        if (DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.mmap)
+            return new MmappedSegmentedFile.Builder();
+        assert DatabaseDescriptor.getDiskAccessMode() == Config.DiskAccessMode.standard;
+        return new BufferedSegmentedFile.Builder();
+    }
+
+    public abstract FileDataInput getSegment(long position, int bufferSize);
+
+    /**
+     * @return An Iterator over segments, beginning with the segment containing the given position: each segment must be closed after use.
+     */
+    public Iterator<FileDataInput> iterator(long position, int bufferSize)
+    {
+        return new SegmentIterator(position, bufferSize);
+    }
+
+    /**
+     * Collects potential segmentation points in an underlying file, and builds a SegmentedFile to represent it.
+     */
+    public static abstract class Builder
+    {
+        /**
+         * Adds a position that would be a safe place for a segment boundary in the file. For a block/row based file
+         * format, safe boundaries are block/row edges.
+         * @param boundary The absolute position of the potential boundary in the file.
+         */
+        public abstract void addPotentialBoundary(long boundary);
+
+        /**
+         * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
+         * @param path The file on disk.
+         */
+        public abstract SegmentedFile complete(String path);
+    }
+
+    static final class Segment extends Pair<Long, MappedByteBuffer> implements Comparable<Segment>
+    {
+        public Segment(long offset, MappedByteBuffer segment)
+        {
+            super(offset, segment);
+        }
+
+        public final int compareTo(Segment that)
+        {
+            return (int)Math.signum(this.left - that.left);
+        }
+    }
+
+    /**
+     * A lazy Iterator over segments in forward order from the given position.
+     */
+    final class SegmentIterator implements Iterator<FileDataInput>
+    {
+        private long nextpos;
+        private final int bufferSize;
+        public SegmentIterator(long position, int bufferSize)
+        {
+            this.nextpos = position;
+            this.bufferSize = bufferSize;
+        }
+
+        public boolean hasNext()
+        {
+            return nextpos < length;
+        }
+
+        public FileDataInput next()
+        {
+            long position = nextpos;
+            if (position >= length)
+                throw new NoSuchElementException();
+
+            FileDataInput segment = getSegment(nextpos, bufferSize);
+            try
+            {
+                nextpos = nextpos + segment.bytesRemaining();
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+            return segment;
+        }
+
+        public void remove() { throw new UnsupportedOperationException(); }
+    }
+}

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java?rev=951813&r1=951812&r2=951813&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java Sun Jun  6 04:29:54 2010
@@ -64,7 +64,7 @@ class FileStatusHandler
 
             try
             {
-                SSTableReader sstable = SSTableWriter.renameAndOpen(pendingFile.getFilename());
+                SSTableReader sstable = SSTableWriter.renameAndOpen(pendingFile.getDescriptor());
                 Table.open(tableName).getColumnFamilyStore(temp[0]).addSSTable(sstable);
                 logger.info("Streaming added " + sstable.getFilename());
             }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=951813&r1=951812&r2=951813&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Sun Jun  6 04:29:54 2010
@@ -120,7 +120,7 @@ public class StreamOut
         int i = 0;
         for (SSTableReader sstable : sstables)
         {
-            for (String component : sstable.getAllComponents())
+            for (String component : SSTable.components)
             {
                 SSTable.Descriptor desc = sstable.getDescriptor();
                 long filelen = new File(desc.filenameFor(component)).length();

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=951813&r1=951812&r2=951813&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Sun Jun  6 04:29:54 2010
@@ -431,9 +431,9 @@ public class TableTest extends CleanupHe
             CompactionManager.instance.submitMajor(cfStore).get();
         }
         SSTableReader sstable = cfStore.getSSTables().iterator().next();
-        SSTable.PositionSize info = sstable.getPosition(key);
+        long position = sstable.getPosition(key);
         BufferedRandomAccessFile file = new BufferedRandomAccessFile(sstable.getFilename(), "r");
-        file.seek(info.position);
+        file.seek(position);
         assert Arrays.equals(FBUtilities.readShortByteArray(file), key.key);
         file.readInt();
         IndexHelper.skipBloomFilter(file);

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java?rev=951813&r1=951812&r2=951813&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java Sun Jun  6 04:29:54 2010
@@ -107,7 +107,7 @@ public class LegacySSTableTest extends C
         for (byte[] key : keys)
         {
             // confirm that the bloom filter does not reject any keys
-            file.seek(reader.getPosition(reader.partitioner.decorateKey(key)).position);
+            file.seek(reader.getPosition(reader.partitioner.decorateKey(key)));
             assert Arrays.equals(key, FBUtilities.readShortByteArray(file));
         }
     }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java?rev=951813&r1=951812&r2=951813&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java Sun Jun  6 04:29:54 2010
@@ -2,7 +2,6 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
-import java.util.Map;
 
 import org.junit.Test;
 
@@ -10,9 +9,8 @@ import org.apache.cassandra.CleanupHelpe
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryPath;
-import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileDataInput;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.io.util.MmappedSegmentedFile;
 import org.apache.cassandra.utils.FBUtilities;
 
 import org.apache.cassandra.Util;
@@ -24,7 +22,7 @@ public class SSTableReaderTest extends C
     @Test
     public void testSpannedIndexPositions() throws IOException, ExecutionException, InterruptedException
     {
-        RowIndexedReader.BUFFER_SIZE = 40; // each index entry is ~11 bytes, so this will generate lots of spanned entries
+        MmappedSegmentedFile.MAX_SEGMENT_SIZE = 40; // each index entry is ~11 bytes, so this will generate lots of segments
 
         Table table = Table.open("Keyspace1");
         ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
@@ -55,24 +53,7 @@ public class SSTableReaderTest extends C
         for (int j = 1; j < 110; j += 2)
         {
             DecoratedKey dk = Util.dk(String.valueOf(j));
-            assert sstable.getPosition(dk) == null;
-        }
-
-        // check positionsize information
-        assert sstable.indexSummary.getSpannedIndexDataPositions().entrySet().size() > 0;
-        for (Map.Entry<IndexSummary.KeyPosition, SSTable.PositionSize> entry : sstable.indexSummary.getSpannedIndexDataPositions().entrySet())
-        {
-            IndexSummary.KeyPosition kp = entry.getKey();
-            SSTable.PositionSize info = entry.getValue();
-
-            long nextIndexPosition = kp.indexPosition + 2 + StorageService.getPartitioner().convertToDiskFormat(kp.key).length + 8;
-            BufferedRandomAccessFile indexFile = new BufferedRandomAccessFile(sstable.indexFilename(), "r");
-            indexFile.seek(nextIndexPosition);
-            String nextKey = indexFile.readUTF();
-
-            BufferedRandomAccessFile file = new BufferedRandomAccessFile(sstable.getFilename(), "r");
-            file.seek(info.position + info.size);
-            assertEquals(nextKey, file.readUTF());
+            assert sstable.getPosition(dk) == -1;
         }
     }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java?rev=951813&r1=951812&r2=951813&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java Sun Jun  6 04:29:54 2010
@@ -50,7 +50,7 @@ public class SSTableTest extends Cleanup
     private void verifySingle(SSTableReader sstable, byte[] bytes, byte[] key) throws IOException
     {
         BufferedRandomAccessFile file = new BufferedRandomAccessFile(sstable.getFilename(), "r");
-        file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key)).position);
+        file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key)));
         assert Arrays.equals(key, FBUtilities.readShortByteArray(file));
         int size = file.readInt();
         byte[] bytes2 = new byte[size];
@@ -82,7 +82,7 @@ public class SSTableTest extends Cleanup
         BufferedRandomAccessFile file = new BufferedRandomAccessFile(sstable.getFilename(), "r");
         for (byte[] key : keys)
         {
-            file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key)).position);
+            file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key)));
             assert Arrays.equals(key, FBUtilities.readShortByteArray(file));
             int size = file.readInt();
             byte[] bytes2 = new byte[size];



Mime
View raw message