cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r919171 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/service/ test/unit/org/apache/cassandra/db/
Date Thu, 04 Mar 2010 20:49:46 GMT
Author: jbellis
Date: Thu Mar  4 20:49:45 2010
New Revision: 919171

URL: http://svn.apache.org/viewvc?rev=919171&view=rev
Log:
extract SSTableReader as superclasses; subclass is RowIndexedReader
patch by Stu Hood; reviewed by jbellis for CASSANDRA-777

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedReader.java
      - copied, changed from r919170, incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedScanner.java
      - copied, changed from r919170, incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java.orig
      - copied, changed from r919170, incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Mar  4 20:49:45 2010
@@ -1084,16 +1084,16 @@
         return Iterables.concat(stores);
     }
 
-    public Iterable<SSTable.KeyPosition> allIndexPositions()
+    public Iterable<DecoratedKey> allKeySamples()
     {
         Collection<SSTableReader> sstables = getSSTables();
-        Iterable<SSTable.KeyPosition>[] positions = new Iterable[sstables.size()];
+        Iterable<DecoratedKey>[] samples = new Iterable[sstables.size()];
         int i = 0;
         for (SSTableReader sstable: sstables)
         {
-            positions[i++] = sstable.getIndexPositions();
+            samples[i++] = sstable.getKeySamples();
         }
-        return Iterables.concat(positions);
+        return Iterables.concat(samples);
     }
 
     /**

Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedReader.java (from r919170, incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedReader.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedReader.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java&r1=919170&r2=919171&rev=919171&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedReader.java Thu Mar  4 20:49:45 2010
@@ -28,10 +28,9 @@
 
 import org.apache.log4j.Logger;
 
-import org.apache.commons.lang.StringUtils;
-
 import org.apache.cassandra.cache.InstrumentedCache;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.utils.BloomFilter;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.service.StorageService;
@@ -45,110 +44,32 @@
 
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+
 /**
- * SSTableReaders are open()ed by Table.onStart; after that they are created by SSTableWriter.renameAndOpen.
- * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
+ * Pre 0.7 SSTable implementation, using per row indexes.
  */
-public class SSTableReader extends SSTable implements Comparable<SSTableReader>
+class RowIndexedReader extends SSTableReader
 {
-    private static final Logger logger = Logger.getLogger(SSTableReader.class);
+    private static final Logger logger = Logger.getLogger(RowIndexedReader.class);
 
-    // `finalizers` is required to keep the PhantomReferences alive after the enclosing SSTR is itself
-    // unreferenced.  otherwise they will never get enqueued.
-    private static final Set<Reference<SSTableReader>> finalizers = new HashSet<Reference<SSTableReader>>();
-    private static final ReferenceQueue<SSTableReader> finalizerQueue = new ReferenceQueue<SSTableReader>()
-    {{
-        Runnable runnable = new Runnable()
-        {
-            public void run()
-            {
-                while (true)
-                {
-                    SSTableDeletingReference r = null;
-                    try
-                    {
-                        r = (SSTableDeletingReference) finalizerQueue.remove();
-                        finalizers.remove(r);
-                    }
-                    catch (InterruptedException e)
-                    {
-                        throw new RuntimeException(e);
-                    }
-                    try
-                    {
-                        r.cleanup();
-                    }
-                    catch (IOException e)
-                    {
-                        logger.error("Error deleting " + r.path, e);
-                    }
-                }
-            }
-        };
-        new Thread(runnable, "SSTABLE-DELETER").start();
-    }};
     private static final long BUFFER_SIZE = Integer.MAX_VALUE;
 
-    public static int indexInterval()
-    {
-        return INDEX_INTERVAL;
-    }
-
-    public static long getApproximateKeyCount(Iterable<SSTableReader> sstables)
-    {
-        long count = 0;
-
-        for (SSTableReader sstable : sstables)
-        {
-            int indexKeyCount = sstable.getIndexPositions().size();
-            count = count + (indexKeyCount + 1) * INDEX_INTERVAL;
-            if (logger.isDebugEnabled())
-                logger.debug("index size for bloom filter calc for file  : " + sstable.getFilename() + "   : " + count);
-        }
-
-        return count;
-    }
-
-    public static SSTableReader open(Descriptor desc) throws IOException
-    {
-        return open(desc.filenameFor(COMPONENT_DATA));
-    }
-
-    public static SSTableReader open(String dataFileName) throws IOException
-    {
-        return open(dataFileName, StorageService.getPartitioner());
-    }
-
-    public static SSTableReader open(String dataFileName, IPartitioner partitioner) throws IOException
-    {
-        assert partitioner != null;
-
-        long start = System.currentTimeMillis();
-        SSTableReader sstable = new SSTableReader(dataFileName, partitioner);
-        logger.info("Sampling index for " + dataFileName);
-        sstable.loadIndexFile();
-        sstable.loadBloomFilter();
-
-        if (logger.isDebugEnabled())
-            logger.debug("INDEX LOAD TIME for "  + dataFileName + ": " + (System.currentTimeMillis() - start) + " ms.");
-
-        return sstable;
-    }
-
-    private volatile SSTableDeletingReference phantomReference;
     // jvm can only map up to 2GB at a time, so we split index/data into segments of that size when using mmap i/o
     private final MappedByteBuffer[] indexBuffers;
     private final MappedByteBuffer[] buffers;
 
-    private InstrumentedCache<Pair<Descriptor,DecoratedKey>,PositionSize> keyCache;
+    private InstrumentedCache<Pair<Descriptor,DecoratedKey>, PositionSize> keyCache;
 
-    SSTableReader(String filename,
-                  IPartitioner partitioner,
-                  List<KeyPosition> indexPositions, Map<KeyPosition, PositionSize> spannedIndexDataPositions,
-                  BloomFilter bloomFilter)
-    throws IOException
+    RowIndexedReader(Descriptor desc,
+                     IPartitioner partitioner,
+                     List<KeyPosition> indexPositions,
+                     Map<KeyPosition, PositionSize> spannedIndexDataPositions,
+                     BloomFilter bloomFilter)
+            throws IOException
     {
-        super(filename, partitioner);
+        super(desc, partitioner);
 
         if (DatabaseDescriptor.getIndexAccessMode() == DatabaseDescriptor.DiskAccessMode.mmap)
         {
@@ -190,48 +111,34 @@
         this.bf = bloomFilter;
     }
 
-    public void setTrackedBy(SSTableTracker tracker)
+    RowIndexedReader(Descriptor desc, IPartitioner partitioner) throws IOException
     {
-        phantomReference = new SSTableDeletingReference(tracker, this, finalizerQueue);
-        finalizers.add(phantomReference);
-        keyCache = tracker.getKeyCache();
+        this(desc, partitioner, null, null, null);
     }
 
-    private static MappedByteBuffer mmap(String filename, long start, int size) throws IOException
+    public static RowIndexedReader open(Descriptor desc, IPartitioner partitioner) throws IOException
     {
-        RandomAccessFile raf;
-        try
-        {
-            raf = new RandomAccessFile(filename, "r");
-        }
-        catch (FileNotFoundException e)
-        {
-            throw new IOError(e);
-        }
-
-        try
-        {
-            return raf.getChannel().map(FileChannel.MapMode.READ_ONLY, start, size);
-        }
-        finally
-        {
-            raf.close();
-        }
-    }
+        RowIndexedReader sstable = new RowIndexedReader(desc, partitioner);
+        sstable.loadIndexFile();
+        sstable.loadBloomFilter();
 
-    private SSTableReader(String filename, IPartitioner partitioner) throws IOException
-    {
-        this(filename, partitioner, null, null, null);
+        return sstable;
     }
 
-    public List<KeyPosition> getIndexPositions()
+    public long estimatedKeys()
     {
-        return indexPositions;
+        return (indexPositions.size() + 1) * INDEX_INTERVAL;
     }
 
-    public long estimatedKeys()
+    public Collection<DecoratedKey> getKeySamples()
     {
-        return indexPositions.size() * INDEX_INTERVAL;
+        return Collections2.transform(indexPositions,
+                                      new Function<KeyPosition,DecoratedKey>(){
+                                          public DecoratedKey apply(KeyPosition kp)
+                                          {
+                                              return kp.key;
+                                          }
+                                      });
     }
 
     void loadBloomFilter() throws IOException
@@ -296,6 +203,13 @@
         }
     }
 
+    @Override
+    public void setTrackedBy(SSTableTracker tracker)
+    {
+        super.setTrackedBy(tracker);
+        keyCache = tracker.getKeyCache();
+    }
+
     /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */
     private KeyPosition getIndexScanPosition(DecoratedKey decoratedKey)
     {
@@ -455,31 +369,14 @@
         return desc.generation - o.desc.generation;
     }
 
-    public void markCompacted() throws IOException
-    {
-        if (logger.isDebugEnabled())
-            logger.debug("Marking " + getFilename() + " compacted");
-        if (!new File(compactedFilename()).createNewFile())
-        {
-            throw new IOException("Unable to create compaction marker");
-        }
-        phantomReference.deleteOnCleanup();
-    }
-
-    /** obviously only for testing */
-    public void forceBloomFilterFailures()
+    public void forceFilterFailures()
     {
         bf = BloomFilter.alwaysMatchingBloomFilter();
     }
 
-    public IPartitioner getPartitioner()
-    {
-        return partitioner;
-    }
-
     public SSTableScanner getScanner(int bufferSize) throws IOException
     {
-        return new SSTableScanner(this, bufferSize);
+        return new RowIndexedScanner(this, bufferSize);
     }
 
     public FileDataInput getFileDataInput(DecoratedKey decoratedKey, int bufferSize) throws IOException
@@ -502,70 +399,8 @@
         return (int) (position / BUFFER_SIZE);
     }
 
-    public AbstractType getColumnComparator()
-    {
-        return DatabaseDescriptor.getComparator(getTableName(), getColumnFamilyName());
-    }
-
-    public ColumnFamily makeColumnFamily()
-    {
-        return ColumnFamily.create(getTableName(), getColumnFamilyName());
-    }
-
-    public ICompactSerializer2<IColumn> getColumnSerializer()
-    {
-        return DatabaseDescriptor.getColumnFamilyType(getTableName(), getColumnFamilyName()).equals("Standard")
-               ? Column.serializer()
-               : SuperColumn.serializer(getColumnComparator());
-    }
-}
-
-class FileSSTableMap
-{
-    private final Map<String, SSTableReader> map = new NonBlockingHashMap<String, SSTableReader>();
-
-    public SSTableReader get(String filename)
-    {
-        try
-        {
-            return map.get(new File(filename).getCanonicalPath());
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public SSTableReader put(String filename, SSTableReader value)
-    {
-        try
-        {
-            return map.put(new File(filename).getCanonicalPath(), value);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public Collection<SSTableReader> values()
-    {
-        return map.values();
-    }
-
-    public void clear()
-    {
-        map.clear();
-    }
-
-    public void remove(String filename) throws IOException
-    {
-        map.remove(new File(filename).getCanonicalPath());
-    }
-
-    @Override
-    public String toString()
+    public InstrumentedCache getKeyCache()
     {
-        return "FileSSTableMap {" + StringUtils.join(map.keySet(), ", ") + "}";
+        return keyCache;
     }
 }

Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedScanner.java (from r919170, incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedScanner.java?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedScanner.java&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java&r1=919170&r2=919171&rev=919171&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/RowIndexedScanner.java Thu Mar  4 20:49:45 2010
@@ -33,9 +33,9 @@
 import org.apache.log4j.Logger;
 
 
-public class SSTableScanner implements Iterator<IteratingRow>, Closeable
+public class RowIndexedScanner extends SSTableScanner
 {
-    private static Logger logger = Logger.getLogger(SSTableScanner.class);
+    private static Logger logger = Logger.getLogger(RowIndexedScanner.class);
 
     private final BufferedRandomAccessFile file;
     private final SSTableReader sstable;
@@ -46,7 +46,7 @@
     /**
      * @param sstable SSTable to scan.
      */
-    SSTableScanner(SSTableReader sstable, int bufferSize) throws IOException
+    RowIndexedScanner(SSTableReader sstable, int bufferSize) throws IOException
     {
         this.file = new BufferedRandomAccessFile(sstable.getFilename(), "r", bufferSize);
         this.sstable = sstable;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Thu Mar  4 20:49:45 2010
@@ -76,6 +76,17 @@
         this.partitioner = partitioner;
     }
 
+    protected SSTable(Descriptor desc, IPartitioner partitioner)
+    {
+        this.desc = desc;
+        this.partitioner = partitioner;
+    }
+
+    public IPartitioner getPartitioner()
+    {
+        return partitioner;
+    }
+
     public Descriptor getDescriptor()
     {
         return desc;
@@ -335,6 +346,14 @@
         }
         
         /**
+         * @return A clone of this descriptor with the given 'temporary' status.
+         */
+        public Descriptor asTemporary(boolean temporary)
+        {
+            return new Descriptor(version, directory, ksname, cfname, generation, temporary);
+        }
+
+        /**
          * @return True if the given version string is not empty, and
          * contains all lowercase letters, as defined by java.lang.Character.
          */

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Thu Mar  4 20:49:45 2010
@@ -28,8 +28,6 @@
 
 import org.apache.log4j.Logger;
 
-import org.apache.commons.lang.StringUtils;
-
 import org.apache.cassandra.cache.InstrumentedCache;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.utils.BloomFilter;
@@ -49,7 +47,7 @@
  * SSTableReaders are open()ed by Table.onStart; after that they are created by SSTableWriter.renameAndOpen.
  * Do not re-call open() on existing SSTable files; use the references kept by ColumnFamilyStore post-start instead.
  */
-public class SSTableReader extends SSTable implements Comparable<SSTableReader>
+public abstract class SSTableReader extends SSTable implements Comparable<SSTableReader>
 {
     private static final Logger logger = Logger.getLogger(SSTableReader.class);
 
@@ -87,7 +85,6 @@
         };
         new Thread(runnable, "SSTABLE-DELETER").start();
     }};
-    private static final long BUFFER_SIZE = Integer.MAX_VALUE;
 
     public static int indexInterval()
     {
@@ -100,7 +97,7 @@
 
         for (SSTableReader sstable : sstables)
         {
-            int indexKeyCount = sstable.getIndexPositions().size();
+            int indexKeyCount = sstable.getKeySamples().size();
             count = count + (indexKeyCount + 1) * INDEX_INTERVAL;
             if (logger.isDebugEnabled())
                 logger.debug("index size for bloom filter calc for file  : " + sstable.getFilename() + "   : " + count);
@@ -109,95 +106,48 @@
         return count;
     }
 
-    public static SSTableReader open(Descriptor desc) throws IOException
+    public static SSTableReader open(String dataFileName) throws IOException
     {
-        return open(desc.filenameFor(COMPONENT_DATA));
+        return open(Descriptor.fromFilename(dataFileName));
     }
 
-    public static SSTableReader open(String dataFileName) throws IOException
+    public static SSTableReader open(Descriptor desc) throws IOException
     {
-        return open(dataFileName, StorageService.getPartitioner());
+        return open(desc, StorageService.getPartitioner());
     }
 
     public static SSTableReader open(String dataFileName, IPartitioner partitioner) throws IOException
     {
-        assert partitioner != null;
-
-        long start = System.currentTimeMillis();
-        SSTableReader sstable = new SSTableReader(dataFileName, partitioner);
-        logger.info("Sampling index for " + dataFileName);
-        sstable.loadIndexFile();
-        sstable.loadBloomFilter();
-
-        if (logger.isDebugEnabled())
-            logger.debug("INDEX LOAD TIME for "  + dataFileName + ": " + (System.currentTimeMillis() - start) + " ms.");
-
-        return sstable;
+        return open(Descriptor.fromFilename(dataFileName), partitioner);
     }
 
-    private volatile SSTableDeletingReference phantomReference;
-    // jvm can only map up to 2GB at a time, so we split index/data into segments of that size when using mmap i/o
-    private final MappedByteBuffer[] indexBuffers;
-    private final MappedByteBuffer[] buffers;
-
-    private InstrumentedCache<Pair<Descriptor,DecoratedKey>,PositionSize> keyCache;
-
-    SSTableReader(String filename,
-                  IPartitioner partitioner,
-                  List<KeyPosition> indexPositions, Map<KeyPosition, PositionSize> spannedIndexDataPositions,
-                  BloomFilter bloomFilter)
-    throws IOException
+    public static SSTableReader open(Descriptor descriptor, IPartitioner partitioner) throws IOException
     {
-        super(filename, partitioner);
+        assert partitioner != null;
 
-        if (DatabaseDescriptor.getIndexAccessMode() == DatabaseDescriptor.DiskAccessMode.mmap)
-        {
-            long indexLength = new File(indexFilename()).length();
-            int bufferCount = 1 + (int) (indexLength / BUFFER_SIZE);
-            indexBuffers = new MappedByteBuffer[bufferCount];
-            long remaining = indexLength;
-            for (int i = 0; i < bufferCount; i++)
-            {
-                indexBuffers[i] = mmap(indexFilename(), i * BUFFER_SIZE, (int) Math.min(remaining, BUFFER_SIZE));
-                remaining -= BUFFER_SIZE;
-            }
-        }
-        else
-        {
-            assert DatabaseDescriptor.getIndexAccessMode() == DatabaseDescriptor.DiskAccessMode.standard;
-            indexBuffers = null;
-        }
+        long start = System.currentTimeMillis();
+        logger.info("Sampling index for " + descriptor);
 
-        if (DatabaseDescriptor.getDiskAccessMode() == DatabaseDescriptor.DiskAccessMode.mmap)
+        SSTableReader sstable;
+        // FIXME: version conditional readers here
+        if (true)
         {
-            int bufferCount = 1 + (int) (new File(getFilename()).length() / BUFFER_SIZE);
-            buffers = new MappedByteBuffer[bufferCount];
-            long remaining = length();
-            for (int i = 0; i < bufferCount; i++)
-            {
-                buffers[i] = mmap(getFilename(), i * BUFFER_SIZE, (int) Math.min(remaining, BUFFER_SIZE));
-                remaining -= BUFFER_SIZE;
-            }
-        }
-        else
-        {
-            assert DatabaseDescriptor.getDiskAccessMode() == DatabaseDescriptor.DiskAccessMode.standard;
-            buffers = null;
+            sstable = RowIndexedReader.open(descriptor, partitioner);
         }
 
-        this.indexPositions = indexPositions;
-        this.spannedIndexDataPositions = spannedIndexDataPositions;
-        this.bf = bloomFilter;
+        if (logger.isDebugEnabled())
+            logger.debug("INDEX LOAD TIME for " + descriptor + ": " + (System.currentTimeMillis() - start) + " ms.");
+
+        return sstable;
     }
 
     public void setTrackedBy(SSTableTracker tracker)
     {
         phantomReference = new SSTableDeletingReference(tracker, this, finalizerQueue);
         finalizers.add(phantomReference);
-        keyCache = tracker.getKeyCache();
     }
 
-    private static MappedByteBuffer mmap(String filename, long start, int size) throws IOException
+    protected static MappedByteBuffer mmap(String filename, long start, int size) throws IOException
     {
         RandomAccessFile raf;
         try
@@ -219,241 +169,61 @@
         }
     }
 
-    private SSTableReader(String filename, IPartitioner partitioner) throws IOException
-    {
-        this(filename, partitioner, null, null, null);
-    }
-
-    public List<KeyPosition> getIndexPositions()
-    {
-        return indexPositions;
-    }
+    /*************************************************************************/
 
-    public long estimatedKeys()
+    protected SSTableReader(Descriptor desc, IPartitioner partitioner)
     {
-        return indexPositions.size() * INDEX_INTERVAL;
+        super(desc, partitioner);
     }
 
-    void loadBloomFilter() throws IOException
-    {
-        DataInputStream stream = new DataInputStream(new FileInputStream(filterFilename()));
-        try
-        {
-            bf = BloomFilter.serializer().deserialize(stream);
-        }
-        finally
-        {
-            stream.close();
-        }
-    }
-
-    void loadIndexFile() throws IOException
-    {
-        indexPositions = new ArrayList<KeyPosition>();
-        // we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
-        // any entries that do, we force into the in-memory sample so key lookup can always bsearch within
-        // a single mmapped segment.
-        BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(), "r");
-        try
-        {
-            int i = 0;
-            long indexSize = input.length();
-            while (true)
-            {
-                long indexPosition = input.getFilePointer();
-                if (indexPosition == indexSize)
-                {
-                    break;
-                }
-                DecoratedKey decoratedKey = partitioner.convertFromDiskFormat(input.readUTF());
-                long dataPosition = input.readLong();
-                long nextIndexPosition = input.getFilePointer();
-                boolean spannedEntry = bufferIndex(indexPosition) != bufferIndex(nextIndexPosition);
-                if (i++ % INDEX_INTERVAL == 0 || spannedEntry)
-                {
-                    KeyPosition info;
-                    info = new KeyPosition(decoratedKey, indexPosition);
-                    indexPositions.add(info);
-
-                    if (spannedEntry)
-                    {
-                        if (spannedIndexDataPositions == null)
-                        {
-                            spannedIndexDataPositions = new HashMap<KeyPosition, PositionSize>();
-                        }
-                        // read the next index entry to see how big the row is corresponding to the current, mmap-segment-spanning one
-                        input.readUTF();
-                        long nextDataPosition = input.readLong();
-                        input.seek(nextIndexPosition);
-                        spannedIndexDataPositions.put(info, new PositionSize(dataPosition, nextDataPosition - dataPosition));
-                    }
-                }
-            }
-        }
-        finally
-        {
-            input.close();
-        }
-    }
+    private volatile SSTableDeletingReference phantomReference;
 
-    /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */
-    private KeyPosition getIndexScanPosition(DecoratedKey decoratedKey)
+    void addFinalizingReference(SSTableTracker tracker)
     {
-        assert indexPositions != null && indexPositions.size() > 0;
-        int index = Collections.binarySearch(indexPositions, new KeyPosition(decoratedKey, -1));
-        if (index < 0)
-        {
-            // binary search gives us the first index _greater_ than the key searched for,
-            // i.e., its insertion position
-            int greaterThan = (index + 1) * -1;
-            if (greaterThan == 0)
-                return null;
-            return indexPositions.get(greaterThan - 1);
-        }
-        else
-        {
-            return indexPositions.get(index);
-        }
+        phantomReference = new SSTableDeletingReference(tracker, this, finalizerQueue);
+        finalizers.add(phantomReference);
     }
 
     /**
-     * returns the position in the data file to find the given key, or -1 if the key is not present
+     * For testing purposes only.
      */
-    public PositionSize getPosition(DecoratedKey decoratedKey) throws IOException
-    {
-        // first, check bloom filter
-        if (!bf.isPresent(partitioner.convertToDiskFormat(decoratedKey)))
-            return null;
-
-        // next, the key cache
-        Pair<Descriptor, DecoratedKey> unifiedKey = new Pair<Descriptor, DecoratedKey>(desc, decoratedKey);
-        if (keyCache != null && keyCache.getCapacity() > 0)
-        {
-            PositionSize cachedPosition = keyCache.get(unifiedKey);
-            if (cachedPosition != null)
-            {
-                return cachedPosition;
-            }
-        }
-
-        // next, see if the sampled index says it's impossible for the key to be present
-        KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
-        if (sampledPosition == null)
-        {
-            return null;
-        }
+    public abstract void forceFilterFailures();
 
-        // handle exact sampled index hit
-        if (spannedIndexDataPositions != null)
-        {
-            PositionSize info = spannedIndexDataPositions.get(sampledPosition);
-            if (info != null)
-                return info;
-        }
+    /**
+     * @return The key cache: for monitoring purposes.
+     */
+    public abstract InstrumentedCache getKeyCache();
 
-        // scan the on-disk index, starting at the nearest sampled position
-        long p = sampledPosition.position;
-        FileDataInput input;
-        if (indexBuffers == null)
-        {
-            input = new BufferedRandomAccessFile(indexFilename(), "r");
-            ((BufferedRandomAccessFile)input).seek(p);
-        }
-        else
-        {
-            input = new MappedFileDataInput(indexBuffers[bufferIndex(p)], indexFilename(), (int)(p % BUFFER_SIZE));
-        }
-        try
-        {
-            int i = 0;
-            do
-            {
-                DecoratedKey indexDecoratedKey;
-                try
-                {
-                    indexDecoratedKey = partitioner.convertFromDiskFormat(input.readUTF());
-                }
-                catch (EOFException e)
-                {
-                    return null;
-                }
-                long position = input.readLong();
-                int v = indexDecoratedKey.compareTo(decoratedKey);
-                if (v == 0)
-                {
-                    PositionSize info;
-                    if (!input.isEOF())
-                    {
-                        int utflen = input.readUnsignedShort();
-                        if (utflen != input.skipBytes(utflen))
-                            throw new EOFException();
-                        info = new PositionSize(position, input.readLong() - position);
-                    }
-                    else
-                    {
-                        info = new PositionSize(position, length() - position);
-                    }
-                    if (keyCache != null && keyCache.getCapacity() > 0)
-                        keyCache.put(unifiedKey, info);
-                    return info;
-                }
-                if (v > 0)
-                    return null;
-            } while  (++i < INDEX_INTERVAL);
-        }
-        finally
-        {
-            input.close();
-        }
-        return null;
-    }
+    /**
+     * @return An estimate of the number of keys in this SSTable.
+     */
+    public abstract long estimatedKeys();
 
-    /** like getPosition, but if key is not found will return the location of the first key _greater_ than the desired one, or -1 if no such key exists. */
-    public long getNearestPosition(DecoratedKey decoratedKey) throws IOException
-    {
-        KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
-        if (sampledPosition == null)
-        {
-            return 0;
-        }
+    /**
+     * @return Approximately 1/INDEX_INTERVALth of the keys in this SSTable.
+     */
+    public abstract Collection<DecoratedKey> getKeySamples();
 
-        // can't use a MappedFileDataInput here, since we might cross a segment boundary while scanning
-        BufferedRandomAccessFile input = new BufferedRandomAccessFile(indexFilename(), "r");
-        input.seek(sampledPosition.position);
-        try
-        {
-            while (true)
-            {
-                DecoratedKey indexDecoratedKey;
-                try
-                {
-                    indexDecoratedKey = partitioner.convertFromDiskFormat(input.readUTF());
-                }
-                catch (EOFException e)
-                {
-                    return -1;
-                }
-                long position = input.readLong();
-                int v = indexDecoratedKey.compareTo(decoratedKey);
-                if (v >= 0)
-                    return position;
-            }
-        }
-        finally
-        {
-            input.close();
-        }
-    }
+    /**
+     * Returns the position in the data file to find the given key, or -1 if the
+     * key is not present.
+     * FIXME: should not be public: use Scanner.
+     */
+    @Deprecated
+    public abstract PositionSize getPosition(DecoratedKey decoratedKey) throws IOException;
 
-    public long length()
-    {
-        return new File(getFilename()).length();
-    }
+    /**
+     * Like getPosition, but if key is not found will return the location of the
+     * first key _greater_ than the desired one, or -1 if no such key exists.
+     * FIXME: should not be public: use Scanner.
+     */
+    @Deprecated
+    public abstract long getNearestPosition(DecoratedKey decoratedKey) throws IOException;
 
-    public int compareTo(SSTableReader o)
-    {
-        return desc.generation - o.desc.generation;
-    }
+    /**
+     * @return The length in bytes of the data file for this SSTable.
+     */
+    public abstract long length();
 
     public void markCompacted() throws IOException
     {
@@ -466,41 +236,17 @@
         phantomReference.deleteOnCleanup();
     }
 
-    /** obviously only for testing */
-    public void forceBloomFilterFailures()
-    {
-        bf = BloomFilter.alwaysMatchingBloomFilter();
-    }
-
-    public IPartitioner getPartitioner()
-    {
-        return partitioner;
-    }
-
-    public SSTableScanner getScanner(int bufferSize) throws IOException
-    {
-        return new SSTableScanner(this, bufferSize);
-    }
-
-    public FileDataInput getFileDataInput(DecoratedKey decoratedKey, int bufferSize) throws IOException
-    {
-        PositionSize info = getPosition(decoratedKey);
-        if (info == null)
-            return null;
-
-        if (buffers == null || (bufferIndex(info.position) != bufferIndex(info.position + info.size)))
-        {
-            BufferedRandomAccessFile file = new BufferedRandomAccessFile(getFilename(), "r", bufferSize);
-            file.seek(info.position);
-            return file;
-        }
-        return new MappedFileDataInput(buffers[bufferIndex(info.position)], getFilename(), (int) (info.position % BUFFER_SIZE));
-    }
+    /**
+     * @param bufferSize Buffer size in bytes for this Scanner.
+     * @return A Scanner for seeking over the rows of the SSTable.
+     */
+    public abstract SSTableScanner getScanner(int bufferSize) throws IOException;
 
-    static int bufferIndex(long position)
-    {
-        return (int) (position / BUFFER_SIZE);
-    }
+    /**
+     * FIXME: should not be public: use Scanner.
+     */
+    @Deprecated
+    public abstract FileDataInput getFileDataInput(DecoratedKey decoratedKey, int bufferSize) throws IOException;
 
     public AbstractType getColumnComparator()
     {
@@ -519,53 +265,3 @@
                : SuperColumn.serializer(getColumnComparator());
     }
 }
-
-class FileSSTableMap
-{
-    private final Map<String, SSTableReader> map = new NonBlockingHashMap<String, SSTableReader>();
-
-    public SSTableReader get(String filename)
-    {
-        try
-        {
-            return map.get(new File(filename).getCanonicalPath());
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public SSTableReader put(String filename, SSTableReader value)
-    {
-        try
-        {
-            return map.put(new File(filename).getCanonicalPath(), value);
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public Collection<SSTableReader> values()
-    {
-        return map.values();
-    }
-
-    public void clear()
-    {
-        map.clear();
-    }
-
-    public void remove(String filename) throws IOException
-    {
-        map.remove(new File(filename).getCanonicalPath());
-    }
-
-    @Override
-    public String toString()
-    {
-        return "FileSSTableMap {" + StringUtils.join(map.keySet(), ", ") + "}";
-    }
-}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java Thu Mar  4 20:49:45 2010
@@ -33,119 +33,11 @@
 import org.apache.log4j.Logger;
 
 
-public class SSTableScanner implements Iterator<IteratingRow>, Closeable
+public abstract class SSTableScanner implements Iterator<IteratingRow>, Closeable
 {
-    private static Logger logger = Logger.getLogger(SSTableScanner.class);
+    public abstract void seekTo(DecoratedKey seekKey);
 
-    private final BufferedRandomAccessFile file;
-    private final SSTableReader sstable;
-    private IteratingRow row;
-    private boolean exhausted = false;
-    private Iterator<IteratingRow> iterator;
+    public abstract long getFileLength();
 
-    /**
-     * @param sstable SSTable to scan.
-     */
-    SSTableScanner(SSTableReader sstable, int bufferSize) throws IOException
-    {
-        this.file = new BufferedRandomAccessFile(sstable.getFilename(), "r", bufferSize);
-        this.sstable = sstable;
-    }
-
-    public void close() throws IOException
-    {
-        file.close();
-    }
-
-    public void seekTo(DecoratedKey seekKey)
-    {
-        try
-        {
-            long position = sstable.getNearestPosition(seekKey);
-            if (position < 0)
-            {
-                exhausted = true;
-                return;
-            }
-            file.seek(position);
-            row = null;
-        }
-        catch (IOException e)
-        {
-            throw new RuntimeException("corrupt sstable", e);
-        }
-    }
-
-    public long getFileLength()
-    {
-        try
-        {
-            return file.length();
-        }
-        catch (IOException e)
-        {
-            throw new IOError(e);
-        }
-    }
-
-    public long getFilePointer()
-    {
-        return file.getFilePointer();
-    }
-
-    public boolean hasNext()
-    {
-        if (iterator == null)
-            iterator = exhausted ? Arrays.asList(new IteratingRow[0]).iterator() : new KeyScanningIterator();
-        return iterator.hasNext();
-    }
-
-    public IteratingRow next()
-    {
-        if (iterator == null)
-            iterator = exhausted ? Arrays.asList(new IteratingRow[0]).iterator() : new KeyScanningIterator();
-        return iterator.next();
-    }
-
-    public void remove()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    private class KeyScanningIterator implements Iterator<IteratingRow>
-    {
-        public boolean hasNext()
-        {
-            try
-            {
-                if (row == null)
-                    return !file.isEOF();
-                return row.getEndPosition() < file.length();
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-
-        public IteratingRow next()
-        {
-            try
-            {
-                if (row != null)
-                    row.skipRemaining();
-                assert !file.isEOF();
-                return row = new IteratingRow(file, sstable);
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-
-        public void remove()
-        {
-            throw new UnsupportedOperationException();
-        }
-    }
+    public abstract long getFilePointer();
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableTracker.java Thu Mar  4 20:49:45 2010
@@ -60,7 +60,7 @@
 
         for (SSTableReader sstable : replacements)
         {
-            assert sstable.getIndexPositions() != null;
+            assert sstable.getKeySamples() != null;
             sstablesNew.add(sstable);
             long size = sstable.bytesOnDisk();
             liveSize.addAndGet(size);

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Thu Mar  4 20:49:45 2010
@@ -84,7 +84,7 @@
         if (logger.isTraceEnabled())
             logger.trace("wrote index of " + decoratedKey + " at " + indexPosition);
 
-        boolean spannedEntry = SSTableReader.bufferIndex(indexPosition) != SSTableReader.bufferIndex(indexFile.getFilePointer());
+        boolean spannedEntry = RowIndexedReader.bufferIndex(indexPosition) != RowIndexedReader.bufferIndex(indexFile.getFilePointer());
         if (keysWritten++ % INDEX_INTERVAL == 0 || spannedEntry)
         {
             if (indexPositions == null)
@@ -144,12 +144,12 @@
         // main data
         dataFile.close(); // calls force
 
-        String newpath = getFilename();
+        Descriptor newdesc = desc.asTemporary(false);
         rename(indexFilename());
         rename(filterFilename());
-        newpath = rename(newpath); // important to do this last since index & filter file names are derived from it
+        rename(getFilename());
 
-        return new SSTableReader(newpath, partitioner, indexPositions, spannedIndexDataPositions, bf);
+        return new RowIndexedReader(newdesc, partitioner, indexPositions, spannedIndexDataPositions, bf);
     }
 
     static String rename(String tmpFilename)
@@ -176,8 +176,7 @@
         SSTableWriter.rename(indexFilename(dataFileName));
         SSTableWriter.rename(filterFilename(dataFileName));
         dataFileName = SSTableWriter.rename(dataFileName);
-        return SSTableReader.open(dataFileName,
-                                  StorageService.getPartitioner());
+        return SSTableReader.open(dataFileName);
     }
 
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Thu Mar  4 20:49:45 2010
@@ -367,8 +367,8 @@
             }
             if (cfs != null) // TODO test w/ valid CF definitions, this if{} shouldn't be necessary
             {
-                for (SSTable.KeyPosition info: cfs.allIndexPositions())
-                    keys.add(info.key);
+                for (DecoratedKey sample : cfs.allKeySamples())
+                    keys.add(sample);
             }
 
             if (keys.isEmpty())

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Thu Mar  4 20:49:45 2010
@@ -1221,10 +1221,10 @@
         List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
-            for (SSTable.KeyPosition info: cfs.allIndexPositions())
+            for (DecoratedKey sample : cfs.allKeySamples())
             {
-                if (range.contains(info.key.token))
-                    keys.add(info.key);
+                if (range.contains(sample.token))
+                    keys.add(sample);
             }
         }
         FBUtilities.sortSampledKeys(keys, range);
@@ -1250,10 +1250,10 @@
         List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
         {
-            for (SSTable.KeyPosition info: cfs.allIndexPositions())
+            for (DecoratedKey key : cfs.allKeySamples())
             {
-                if (range.contains(info.key.token))
-                    keys.add(info.key);
+                if (range.contains(key.token))
+                    keys.add(key);
             }
         }
         FBUtilities.sortSampledKeys(keys, range);

Copied: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java.orig (from r919170, incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java)
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java.orig?p2=incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java.orig&p1=incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java&r1=919170&r2=919171&rev=919171&view=diff
==============================================================================
    (empty)

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Thu Mar  4 20:49:45 2010
@@ -69,7 +69,7 @@
         Table table = Table.open("Keyspace1");
         List<SSTableReader> ssTables = table.getAllSSTablesOnDisk();
         assertEquals(1, ssTables.size());
-        ssTables.get(0).forceBloomFilterFailures();
+        ssTables.get(0).forceFilterFailures();
         ColumnFamily cf = store.getColumnFamily(new IdentityQueryFilter("key2", new QueryPath("Standard1", null, "Column1".getBytes())));
         assertNull(cf);
     }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=919171&r1=919170&r2=919171&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Thu Mar  4 20:49:45 2010
@@ -163,7 +163,7 @@
 
         Collection<SSTableReader> ssTables = table.getColumnFamilyStore("Standard2").getSSTables();
         assertEquals(1, ssTables.size());
-        ssTables.iterator().next().forceBloomFilterFailures();
+        ssTables.iterator().next().forceFilterFailures();
         validateGetSliceNoMatch(table);
     }
 



Mime
View raw message