cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r955923 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ src/java/org/apache/c...
Date Fri, 18 Jun 2010 10:29:54 GMT
Author: jbellis
Date: Fri Jun 18 10:29:54 2010
New Revision: 955923

URL: http://svn.apache.org/viewvc?rev=955923&view=rev
Log:
Stream sstables without anticompaction
patch by Stu Hood; reviewed by jbellis for CASSANDRA-579

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/NEWS.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
    cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Jun 18 10:29:54 2010
@@ -29,6 +29,7 @@ dev
    segments that always contain entire entries/rows (CASSANDRA-1117)
  * avoid reading large rows into memory during compaction (CASSANDRA-16)
  * added hadoop OutputFormat (CASSANDRA-1101)
+ * efficient Streaming (no more anticompaction) (CASSANDRA-579)
 
 
 0.6.3

Modified: cassandra/trunk/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/NEWS.txt?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/NEWS.txt (original)
+++ cassandra/trunk/NEWS.txt Fri Jun 18 10:29:54 2010
@@ -15,6 +15,8 @@ Features
       `cassandra.yaml.`
     - row size limit increased from 2GB to 2 billion columns
     - Hadoop OutputFormat support
+    - Streaming data for repair or node movement no longer requires 
+      anticompaction step first
 
 Configuraton
 ------------

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Jun 18 10:29:54 2010
@@ -191,40 +191,21 @@ public class Table 
             }
         }
     }
-
-    /*
-     * This method is invoked only during a bootstrap process. We basically
-     * do a complete compaction since we can figure out based on the ranges
-     * whether the files need to be split.
-    */
-    public List<SSTableReader> forceAntiCompaction(Collection<Range> ranges, InetAddress target)
-    {
-        List<SSTableReader> allResults = new ArrayList<SSTableReader>();
-        for (ColumnFamilyStore cfStore : columnFamilyStores.values())
-        {
-            try
-            {
-                allResults.addAll(CompactionManager.instance.submitAnticompaction(cfStore, ranges, target).get());
-            }
-            catch (Exception e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-        return allResults;
-    }
     
     /*
      * This method is an ADMIN operation to force compaction
      * of all SSTables on disk. 
-    */
+     */
     public void forceCompaction()
     {
         for (ColumnFamilyStore cfStore : columnFamilyStores.values())
             CompactionManager.instance.submitMajor(cfStore);
     }
 
-    List<SSTableReader> getAllSSTablesOnDisk()
+    /**
+     * @return A list of open SSTableReaders (TODO: ensure that the caller doesn't modify these).
+     */
+    public List<SSTableReader> getAllSSTables()
     {
         List<SSTableReader> list = new ArrayList<SSTableReader>();
         for (ColumnFamilyStore cfStore : columnFamilyStores.values())

Modified: cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/dht/AbstractBounds.java Fri Jun 18 10:29:54 2010
@@ -25,8 +25,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.cassandra.io.ICompactSerializer2;
 
@@ -72,6 +71,27 @@ public abstract class AbstractBounds imp
 
     public abstract List<AbstractBounds> unwrap();
 
+    /**
+     * @return A copy of the given list of non-intersecting bounds with all bounds unwrapped, sorted by bound.left.
+     */
+    public static List<AbstractBounds> normalize(Collection<? extends AbstractBounds> bounds)
+    {
+        // unwrap all
+        List<AbstractBounds> output = new ArrayList<AbstractBounds>();
+        for (AbstractBounds bound : bounds)
+            output.addAll(bound.unwrap());
+
+        // sort by left
+        Collections.sort(output, new Comparator<AbstractBounds>()
+        {
+            public int compare(AbstractBounds b1, AbstractBounds b2)
+            {
+                return b1.left.compareTo(b2.left);
+            }
+        });
+        return output;
+    }
+
     private static class AbstractBoundsSerializer implements ICompactSerializer2<AbstractBounds>
     {
         public void serialize(AbstractBounds range, DataOutput out) throws IOException

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java Fri Jun 18 10:29:54 2010
@@ -42,9 +42,12 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.clock.AbstractReconciler;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.ICompactSerializer2;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.utils.BloomFilter;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * SSTableReaders are open()ed by Table.onStart; after that they are created by SSTableWriter.renameAndOpen.
@@ -344,15 +347,40 @@ public class SSTableReader extends SSTab
     }
 
     /**
-     * Returns the position in the data file to find the given key, or -1 if the
-     * key is not present.
-     * FIXME: should not be public: use Scanner.
+     * Determine the minimal set of sections that can be extracted from this SSTable to cover the given ranges.
+     * @return A sorted list of (offset,end) pairs that cover the given ranges in the datafile for this SSTable.
      */
-    @Deprecated
-    public long getPosition(DecoratedKey decoratedKey)
+    public List<Pair<Long,Long>> getPositionsForRanges(Collection<Range> ranges)
+    {
+        // use the index to determine a minimal section for each range
+        List<Pair<Long,Long>> positions = new ArrayList<Pair<Long,Long>>();
+        for (AbstractBounds range : AbstractBounds.normalize(ranges))
+        {
+            long left = getPosition(new DecoratedKey(range.left, null), Operator.GT);
+            if (left == -1)
+                // left is past the end of the file
+                continue;
+            long right = getPosition(new DecoratedKey(range.right, null), Operator.GT);
+            if (right == -1 || Range.isWrapAround(range.left, range.right))
+                // right is past the end of the file, or it wraps
+                right = length();
+            if (left == right)
+                // empty range
+                continue;
+            positions.add(new Pair(Long.valueOf(left), Long.valueOf(right)));
+        }
+        return positions;
+    }
+
+    /**
+     * @param decoratedKey The key to apply as the rhs to the given Operator.
+     * @param op The Operator defining matching keys: the nearest key to the target matching the operator wins.
+     * @return The position in the data file to find the key, or -1 if the key is not present
+     */
+    public long getPosition(DecoratedKey decoratedKey, Operator op)
     {
         // first, check bloom filter
-        if (!bf.isPresent(partitioner.convertToDiskFormat(decoratedKey)))
+        if (op == Operator.EQ && !bf.isPresent(partitioner.convertToDiskFormat(decoratedKey)))
             return -1;
 
         // next, the key cache
@@ -369,30 +397,32 @@ public class SSTableReader extends SSTab
         // next, see if the sampled index says it's impossible for the key to be present
         IndexSummary.KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
         if (sampledPosition == null)
-            return -1;
+            // we matched the -1th position: if the operator might match forward, return the 0th position
+            return op.apply(1) >= 0 ? 0 : -1;
 
         // scan the on-disk index, starting at the nearest sampled position
-        int i = 0;
         Iterator<FileDataInput> segments = ifile.iterator(sampledPosition.indexPosition, INDEX_FILE_BUFFER_BYTES);
         while (segments.hasNext())
         {
             FileDataInput input = segments.next();
             try
             {
-                while (!input.isEOF() && i++ < IndexSummary.INDEX_INTERVAL)
+                while (!input.isEOF())
                 {
                     // read key & data position from index entry
                     DecoratedKey indexDecoratedKey = partitioner.convertFromDiskFormat(FBUtilities.readShortByteArray(input));
                     long dataPosition = input.readLong();
 
-                    int v = indexDecoratedKey.compareTo(decoratedKey);
+                    int comparison = indexDecoratedKey.compareTo(decoratedKey);
+                    int v = op.apply(comparison);
                     if (v == 0)
                     {
-                        if (keyCache != null && keyCache.getCapacity() > 0)
+                        if (comparison == 0 && keyCache != null && keyCache.getCapacity() > 0)
+                            // store exact match for the key
                             keyCache.put(unifiedKey, Long.valueOf(dataPosition));
                         return dataPosition;
                     }
-                    if (v > 0)
+                    if (v < 0)
                         return -1;
                 }
             }
@@ -416,53 +446,6 @@ public class SSTableReader extends SSTab
     }
 
     /**
-     * Like getPosition, but if key is not found will return the location of the
-     * first key _greater_ than the desired one, or -1 if no such key exists.
-     * FIXME: should not be public: use Scanner.
-     */
-    @Deprecated
-    public long getNearestPosition(DecoratedKey decoratedKey)
-    {
-        IndexSummary.KeyPosition sampledPosition = getIndexScanPosition(decoratedKey);
-        if (sampledPosition == null)
-            return 0;
-
-        // scan the on-disk index, starting at the nearest sampled position
-        Iterator<FileDataInput> segiter = ifile.iterator(sampledPosition.indexPosition, INDEX_FILE_BUFFER_BYTES);
-        while (segiter.hasNext())
-        {
-            FileDataInput input = segiter.next();
-            try
-            {
-                while (!input.isEOF())
-                {
-                    DecoratedKey indexDecoratedKey = partitioner.convertFromDiskFormat(FBUtilities.readShortByteArray(input));
-                    long position = input.readLong();
-                    int v = indexDecoratedKey.compareTo(decoratedKey);
-                    if (v >= 0)
-                        return position;
-                }
-            }
-            catch (IOException e)
-            {
-                throw new IOError(e);
-            }
-            finally
-            {
-                try
-                {
-                    input.close();
-                }
-                catch (IOException e)
-                {
-                    logger.error("error closing file", e);
-                }
-            }
-        }
-        return -1;
-    }
-
-    /**
      * @return The length in bytes of the data file for this SSTable.
      */
     public long length()
@@ -507,7 +490,7 @@ public class SSTableReader extends SSTab
 
     public FileDataInput getFileDataInput(DecoratedKey decoratedKey, int bufferSize)
     {
-        long position = getPosition(decoratedKey);
+        long position = getPosition(decoratedKey, Operator.EQ);
         if (position < 0)
             return null;
 
@@ -557,4 +540,35 @@ public class SSTableReader extends SSTab
             return in.readInt();
         return in.readLong();
     }
+
+    /**
+     * TODO: Move someplace reusable
+     */
+    public abstract static class Operator
+    {
+        public static final Operator EQ = new Equals();
+        public static final Operator GE = new GreaterThanOrEqualTo();
+        public static final Operator GT = new GreaterThan();
+
+        /**
+         * @param comparison The result of a call to compare/compareTo, with the desired field on the rhs.
+         * @return less than 0 if the operator cannot match forward, 0 if it matches, greater than 0 if it might match forward.
+         */
+        public abstract int apply(int comparison);
+
+        final static class Equals extends Operator
+        {
+            public int apply(int comparison) { return -comparison; }
+        }
+
+        final static class GreaterThanOrEqualTo extends Operator
+        {
+            public int apply(int comparison) { return comparison >= 0 ? 0 : -comparison; }
+        }
+
+        final static class GreaterThan extends Operator
+        {
+            public int apply(int comparison) { return comparison > 0 ? 0 : 1; }
+        }
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java Fri Jun 18 10:29:54 2010
@@ -90,7 +90,7 @@ public class SSTableScanner implements I
     {
         try
         {
-            long position = sstable.getNearestPosition(seekKey);
+            long position = sstable.getPosition(seekKey, SSTableReader.Operator.GE);
             if (position < 0)
             {
                 exhausted = true;

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Fri Jun 18 10:29:54 2010
@@ -39,13 +39,13 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.*;
 
-import org.apache.cassandra.io.AbstractCompactedRow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.AbstractCompactedRow;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.SegmentedFile;

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Fri Jun 18 10:29:54 2010
@@ -25,12 +25,13 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
 
-import org.apache.cassandra.streaming.StreamOutManager;
+import org.apache.cassandra.streaming.PendingFile;
 import org.apache.cassandra.utils.FBUtilities;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 public class FileStreamTask extends WrappedRunnable
@@ -41,21 +42,12 @@ public class FileStreamTask extends Wrap
     // around 10 minutes at the default rpctimeout
     public static final int MAX_CONNECT_ATTEMPTS = 8;
 
-    private final String file;
-    private final long startPosition;
-    private final long endPosition;
+    private final PendingFile file;
     private final InetAddress to;
     
-    FileStreamTask(String file, InetAddress to)
-    {
-        this(file, 0, new File(file).length(), to);
-    }
-
-    private FileStreamTask(String file, long startPosition, long endPosition, InetAddress to)
+    FileStreamTask(PendingFile file, InetAddress to)
     {
         this.file = file;
-        this.startPosition = startPosition;
-        this.endPosition = endPosition;
         this.to = to;
     }
     
@@ -87,8 +79,7 @@ public class FileStreamTask extends Wrap
 
     private void stream(SocketChannel channel) throws IOException
     {
-        long start = startPosition;
-        RandomAccessFile raf = new RandomAccessFile(new File(file), "r");
+        RandomAccessFile raf = new RandomAccessFile(new File(file.getFilename()), "r");
         try
         {
             FileChannel fc = raf.getChannel();
@@ -96,14 +87,16 @@ public class FileStreamTask extends Wrap
             ByteBuffer buffer = MessagingService.constructStreamHeader(false);
             channel.write(buffer);
             assert buffer.remaining() == 0;
-
-            while (start < endPosition)
+            
+            // stream sections of the file as returned by PendingFile.currentSection
+            Pair<Long,Long> section;
+            while ((section = file.currentSection()) != null)
             {
-                long bytesTransferred = fc.transferTo(start, CHUNK_SIZE, channel);
+                long length = Math.min(CHUNK_SIZE, section.right - section.left);
+                long bytesTransferred = fc.transferTo(section.left, length, channel);
                 if (logger.isDebugEnabled())
                     logger.debug("Bytes transferred " + bytesTransferred);
-                start += bytesTransferred;
-                StreamOutManager.get(to).update(file, start);
+                file.update(section.left + bytesTransferred);
             }
         }
         finally

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Jun 18 10:29:54 2010
@@ -27,6 +27,7 @@ import org.apache.cassandra.io.util.Data
 import org.apache.cassandra.net.io.SerializerType;
 import org.apache.cassandra.net.sink.SinkManager;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.PendingFile;
 import org.apache.cassandra.utils.ExpiringMap;
 import org.apache.cassandra.utils.GuidGenerator;
 import org.apache.cassandra.utils.SimpleCondition;
@@ -325,15 +326,14 @@ public class MessagingService implements
     /**
      * Stream a file from source to destination. This is highly optimized
      * to not hold any of the contents of the file in memory.
-     * @param file name of file to stream.
+     * @param file file to stream.
      * @param to endpoint to which we need to stream the file.
     */
 
-    public void stream(String file, InetAddress to)
+    public void stream(PendingFile file, InetAddress to)
     {
         /* Streaming asynchronously on streamExector_ threads. */
-        Runnable streamingTask = new FileStreamTask(file, to);
-        streamExecutor_.execute(streamingTask);
+        streamExecutor_.execute(new FileStreamTask(file, to));
     }
     
     /** blocks until the processing pools are empty and done. */

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Fri Jun 18 10:29:54 2010
@@ -660,13 +660,13 @@ public class AntiEntropyService
             ColumnFamilyStore cfstore = Table.open(cf.left).getColumnFamilyStore(cf.right);
             try
             {
-                List<Range> ranges = new ArrayList<Range>(differences);
-                final List<SSTableReader> sstables = CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get();
+                final List<Range> ranges = new ArrayList<Range>(differences);
+                final Collection<SSTableReader> sstables = cfstore.getSSTables();
                 Future f = StageManager.getStage(StageManager.STREAM_STAGE).submit(new WrappedRunnable() 
                 {
                     protected void runMayThrow() throws Exception
                     {
-                        StreamOut.transferSSTables(remote, sstables, cf.left);
+                        StreamOut.transferSSTables(remote, cf.left, sstables, ranges);
                         StreamOutManager.remove(remote);
                     }
                 });

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatus.java Fri Jun 18 10:29:54 2010
@@ -54,16 +54,14 @@ class FileStatus
     }
 
     private final String file_;
-    private final long expectedBytes_;
     private Action action_;
 
     /**
      * Create a FileStatus with the default Action: STREAM.
      */
-    public FileStatus(String file, long expectedBytes)
+    public FileStatus(String file)
     {
         file_ = file;
-        expectedBytes_ = expectedBytes;
         action_ = Action.STREAM;
     }
 
@@ -72,11 +70,6 @@ class FileStatus
         return file_;
     }
 
-    public long getExpectedBytes()
-    {
-        return expectedBytes_;
-    }
-
     public void setAction(Action action)
     {
         action_ = action;
@@ -100,15 +93,13 @@ class FileStatus
         public void serialize(FileStatus streamStatus, DataOutputStream dos) throws IOException
         {
             dos.writeUTF(streamStatus.getFile());
-            dos.writeLong(streamStatus.getExpectedBytes());
             dos.writeInt(streamStatus.getAction().ordinal());
         }
 
         public FileStatus deserialize(DataInputStream dis) throws IOException
         {
             String targetFile = dis.readUTF();
-            long expectedBytes = dis.readLong();
-            FileStatus streamStatus = new FileStatus(targetFile, expectedBytes);
+            FileStatus streamStatus = new FileStatus(targetFile);
 
             int ordinal = dis.readInt();
             if (ordinal == Action.DELETE.ordinal())

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/FileStatusHandler.java Fri Jun 18 10:29:54 2010
@@ -64,6 +64,7 @@ class FileStatusHandler
         }
         catch (IOException e)
         {
+            logger.error("Failed adding " + pendingFile, e);
             throw new RuntimeException("Not able to add streamed file " + pendingFile.getFilename(), e);
         }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Fri Jun 18 10:29:54 2010
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.net.FileStreamTask;
+import org.apache.cassandra.utils.Pair;
 
 public class IncomingStreamReader
 {
@@ -58,39 +59,43 @@ public class IncomingStreamReader
         FileOutputStream fos = new FileOutputStream(pendingFile.getFilename(), true);
         FileChannel fc = fos.getChannel();
 
-        long bytesRead = 0;
+        long offset = 0;
         try
         {
-            while (bytesRead < pendingFile.getExpectedBytes()) {
-                bytesRead += fc.transferFrom(socketChannel, bytesRead, FileStreamTask.CHUNK_SIZE);
-                pendingFile.update(bytesRead);
+            Pair<Long,Long> section;
+            while ((section = pendingFile.currentSection()) != null)
+            {
+                long length = Math.min(FileStreamTask.CHUNK_SIZE, section.right - section.left);
+                long bytesRead = fc.transferFrom(socketChannel, offset, length);
+                // offset in the remote file
+                pendingFile.update(section.left + bytesRead);
+                // offset in the local file
+                offset += bytesRead;
             }
-            logger.debug("Receiving stream: finished reading chunk, awaiting more");
         }
         catch (IOException ex)
         {
+            logger.debug("Receiving stream: recovering from IO error");
             /* Ask the source node to re-stream this file. */
             streamStatus.setAction(FileStatus.Action.STREAM);
             handleFileStatus(remoteAddress.getAddress());
             /* Delete the orphaned file. */
             File file = new File(pendingFile.getFilename());
             file.delete();
-            logger.debug("Receiving stream: recovering from IO error");
+            /* Reset our state. */
+            pendingFile.update(0);
             throw ex;
         }
         finally
         {
+            fc.close();
             StreamInManager.activeStreams.remove(remoteAddress.getAddress(), pendingFile);
         }
 
-        if (bytesRead == pendingFile.getExpectedBytes())
-        {
-            if (logger.isDebugEnabled())
-                logger.debug("Removing stream context " + pendingFile);
-            fc.close();
-            streamStatus.setAction(FileStatus.Action.DELETE);
-            handleFileStatus(remoteAddress.getAddress());
-        }
+        if (logger.isDebugEnabled())
+            logger.debug("Removing stream context " + pendingFile);
+        streamStatus.setAction(FileStatus.Action.DELETE);
+        handleFileStatus(remoteAddress.getAddress());
     }
 
     private void handleFileStatus(InetAddress remoteHost) throws IOException

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/PendingFile.java Fri Jun 18 10:29:54 2010
@@ -24,17 +24,23 @@ package org.apache.cassandra.streaming;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.utils.Pair;
 
-class PendingFile
+/**
+ * Represents portions of a file to be streamed between nodes.
+ */
+public class PendingFile
 {
     private static ICompactSerializer<PendingFile> serializer_;
 
     static
     {
-        serializer_ = new InitiatedFileSerializer();
+        serializer_ = new PendingFileSerializer();
     }
 
     public static ICompactSerializer<PendingFile> serializer()
@@ -42,16 +48,21 @@ class PendingFile
         return serializer_;
     }
 
-    private Descriptor desc;
-    private String component;
-    private long expectedBytes;                     
+    private final Descriptor desc;
+    private final String component;
+    private final List<Pair<Long,Long>> sections;
     private long ptr;
 
-    public PendingFile(Descriptor desc, String component, long expectedBytes)
+    public PendingFile(Descriptor desc, PendingFile pf)
+    {
+        this(desc, pf.component, pf.sections);
+    }
+
+    public PendingFile(Descriptor desc, String component, List<Pair<Long,Long>> sections)
     {
         this.desc = desc;
         this.component = component;
-        this.expectedBytes = expectedBytes;         
+        this.sections = sections;
         ptr = 0;
     }
 
@@ -60,9 +71,16 @@ class PendingFile
         this.ptr = ptr;
     }
 
-    public long getPtr()
-    {
-        return ptr;
+    /**
+     * @return The current section of the file, as an (offset,end) pair, or null if nothing left to stream.
+     */
+    public Pair<Long,Long> currentSection()
+    {
+        // linear search for the first appropriate section
+        for (Pair<Long,Long> section : sections)
+            if (ptr < section.right)
+                return new Pair<Long,Long>(Long.valueOf(Math.max(ptr, section.left)), section.right);
+        return null;
     }
 
     public String getComponent()
@@ -80,11 +98,6 @@ class PendingFile
         return desc.filenameFor(component);
     }
     
-    public long getExpectedBytes()
-    {
-        return expectedBytes;
-    }
-
     public boolean equals(Object o)
     {
         if ( !(o instanceof PendingFile) )
@@ -96,29 +109,36 @@ class PendingFile
 
     public int hashCode()
     {
-        return toString().hashCode();
+        return getFilename().hashCode();
     }
 
     public String toString()
     {
-        return getFilename() + ":" + expectedBytes;
+        return getFilename() + ":" + ptr + "/" + sections;
     }
 
-    private static class InitiatedFileSerializer implements ICompactSerializer<PendingFile>
+    private static class PendingFileSerializer implements ICompactSerializer<PendingFile>
     {
         public void serialize(PendingFile sc, DataOutputStream dos) throws IOException
         {
             dos.writeUTF(sc.desc.filenameFor(sc.component));
             dos.writeUTF(sc.component);
-            dos.writeLong(sc.expectedBytes);            
+            dos.writeInt(sc.sections.size());
+            for (Pair<Long,Long> section : sc.sections)
+            {
+                dos.writeLong(section.left); dos.writeLong(section.right);
+            }
         }
 
         public PendingFile deserialize(DataInputStream dis) throws IOException
         {
             Descriptor desc = Descriptor.fromFilename(dis.readUTF());
             String component = dis.readUTF();
-            long expectedBytes = dis.readLong();           
-            return new PendingFile(desc, component, expectedBytes);
+            int count = dis.readInt();
+            List<Pair<Long,Long>> sections = new ArrayList<Pair<Long,Long>>(count);
+            for (int i = 0; i < count; i++)
+                sections.add(new Pair<Long,Long>(Long.valueOf(dis.readLong()), Long.valueOf(dis.readLong())));
+            return new PendingFile(desc, component, sections);
         }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamFinishedVerbHandler.java Fri Jun 18 10:29:54 2010
@@ -49,7 +49,7 @@ public class StreamFinishedVerbHandler i
             switch (streamStatus.getAction())
             {
                 case DELETE:
-                    StreamOutManager.get(message.getFrom()).finishAndStartNext(streamStatus.getFile());
+                    StreamOutManager.get(message.getFrom()).finishAndStartNext();
                     break;
 
                 case STREAM:

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateMessage.java Fri Jun 18 10:29:54 2010
@@ -77,15 +77,9 @@ class StreamInitiateMessage
         public StreamInitiateMessage deserialize(DataInputStream dis) throws IOException
         {
             int size = dis.readInt();
-            PendingFile[] pendingFiles = new PendingFile[0];
-            if ( size > 0 )
-            {
-                pendingFiles = new PendingFile[size];
-                for ( int i = 0; i < size; ++i )
-                {
-                    pendingFiles[i] = PendingFile.serializer().deserialize(dis);
-                }
-            }
+            PendingFile[] pendingFiles = new PendingFile[size];
+            for (int i = 0; i < size; i++)
+                pendingFiles[i] = PendingFile.serializer().deserialize(dis);
             return new StreamInitiateMessage(pendingFiles);
         }
     }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInitiateVerbHandler.java Fri Jun 18 10:29:54 2010
@@ -76,7 +76,7 @@ public class StreamInitiateVerbHandler i
                 PendingFile remoteFile = pendingFile.getKey();
                 PendingFile localFile = pendingFile.getValue();
 
-                FileStatus streamStatus = new FileStatus(remoteFile.getFilename(), remoteFile.getExpectedBytes());
+                FileStatus streamStatus = new FileStatus(remoteFile.getFilename());
 
                 if (logger.isDebugEnabled())
                   logger.debug("Preparing to receive stream from " + message.getFrom() + ": " + remoteFile + " -> " + localFile);
@@ -114,7 +114,7 @@ public class StreamInitiateVerbHandler i
             Descriptor localdesc = Descriptor.fromFilename(cfStore.getFlushPath());
 
             // add a local file for this component
-            mapping.put(remote, new PendingFile(localdesc, remote.getComponent(), remote.getExpectedBytes()));
+            mapping.put(remote, new PendingFile(localdesc, remote));
         }
 
         return mapping;

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOut.java Fri Jun 18 10:29:54 2010
@@ -38,7 +38,7 @@ import org.apache.cassandra.io.sstable.S
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-
+import org.apache.cassandra.utils.Pair;
 
 /**
  * This class handles streaming data from one node to another.
@@ -73,7 +73,7 @@ public class StreamOut
 
         /*
          * (1) dump all the memtables to disk.
-         * (2) anticompaction -- split out the keys in the range specified
+         * (2) determine the minimal file sections we need to send for the given ranges
          * (3) transfer the data.
         */
         try
@@ -95,9 +95,8 @@ public class StreamOut
                     throw new RuntimeException(e);
                 }
             }
-            logger.info("Performing anticompaction ...");
-            /* Get the list of files that need to be streamed */
-            transferSSTables(target, table.forceAntiCompaction(ranges, target), tableName); // SSTR GC deletes the file when done
+            // send the matching portion of every sstable in the keyspace
+            transferSSTables(target, tableName, table.getAllSSTables(), ranges);
         }
         catch (IOException e)
         {
@@ -112,20 +111,23 @@ public class StreamOut
     }
 
     /**
-     * Transfers a group of sstables from a single table to the target endpoint
-     * and then marks them as ready for local deletion.
+     * Transfers matching portions of a group of sstables from a single table to the target endpoint.
      */
-    public static void transferSSTables(InetAddress target, List<SSTableReader> sstables, String table) throws IOException
+    public static void transferSSTables(InetAddress target, String table, Collection<SSTableReader> sstables, Collection<Range> ranges) throws IOException
     {
-        PendingFile[] pendingFiles = new PendingFile[sstables.size()];
+        List<PendingFile> pending = new ArrayList<PendingFile>();
         int i = 0;
         for (SSTableReader sstable : sstables)
         {
             Descriptor desc = sstable.getDescriptor();
-            long filelen = new File(desc.filenameFor(SSTable.COMPONENT_DATA)).length();
-            pendingFiles[i++] = new PendingFile(desc, SSTable.COMPONENT_DATA, filelen);
+            List<Pair<Long,Long>> sections = sstable.getPositionsForRanges(ranges);
+            if (sections.isEmpty())
+                continue;
+            pending.add(new PendingFile(desc, SSTable.COMPONENT_DATA, sections));
         }
-        logger.info("Stream context metadata " + StringUtils.join(pendingFiles, ", " + " " + sstables.size() + " sstables."));
+        logger.info("Stream context metadata " + pending + " " + sstables.size() + " sstables.");
+
+        PendingFile[] pendingFiles = pending.toArray(new PendingFile[pending.size()]);
         StreamOutManager.get(target).addFilesToStream(pendingFiles);
         StreamInitiateMessage biMessage = new StreamInitiateMessage(pendingFiles);
         Message message = StreamInitiateMessage.makeStreamInitiateMessage(biMessage);

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamOutManager.java Fri Jun 18 10:29:54 2010
@@ -35,6 +35,7 @@ import java.net.InetAddress;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.SimpleCondition;
 
 import org.slf4j.Logger;
@@ -96,7 +97,6 @@ public class StreamOutManager
     private final Map<String, PendingFile> fileMap = new HashMap<String, PendingFile>();
     
     private final InetAddress to;
-    private long totalBytes = 0L;
     private final SimpleCondition condition = new SimpleCondition();
     
     private StreamOutManager(InetAddress to)
@@ -114,41 +114,39 @@ public class StreamOutManager
               logger.debug("Adding file " + pendingFile.getFilename() + " to be streamed.");
             files.add(pendingFile);
             fileMap.put(pendingFile.getFilename(), pendingFile);
-            totalBytes += pendingFile.getExpectedBytes();
         }
     }
 
+    /**
+     * An (offset,end) pair representing the current section of the file to stream.
+     */
+    public Pair<Long,Long> currentSection(String path)
+    {
+        return fileMap.get(path).currentSection();
+    }
+
     public void update(String path, long pos)
     {
-        PendingFile pf = fileMap.get(path);
-        if (pf != null)
-            pf.update(pos);
+        fileMap.get(path).update(pos);
     }
     
     public void startNext()
     {
         if (files.size() > 0)
         {
-            File file = new File(files.get(0).getFilename());
+            PendingFile pf = files.get(0);
             if (logger.isDebugEnabled())
-              logger.debug("Streaming " + file.length() + " length file " + file + " ...");
-            MessagingService.instance.stream(file.getAbsolutePath(), to);
+              logger.debug("Streaming " + pf + " ...");
+            MessagingService.instance.stream(pf, to);
         }
     }
 
-    public void finishAndStartNext(String file) throws IOException
+    public void finishAndStartNext() throws IOException
     {
-        File f = new File(file);
-        if (logger.isDebugEnabled())
-          logger.debug("Deleting file " + file + " after streaming " + f.length() + "/" + totalBytes + " bytes.");
-        FileUtils.delete(file);
         PendingFile pf = files.remove(0);
-        if (pf != null)
-            fileMap.remove(pf.getFilename());
+        fileMap.remove(pf.getFilename());
         if (files.size() > 0)
-        {
             startNext();
-        }
         else
         {
             if (logger.isDebugEnabled())

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingService.java Fri Jun 18 10:29:54 2010
@@ -58,7 +58,7 @@ public class StreamingService implements
             sb.append(String.format(" %s:\n", source.getHostAddress()));
             for (PendingFile pf : StreamInManager.getIncomingFiles(source))
             {
-                sb.append(String.format("  %s %d/%d\n", pf.getFilename(), pf.getPtr(), pf.getExpectedBytes()));
+                sb.append(String.format("  %s\n", pf.toString()));
             }
         }
         sb.append("Sending to:\n");
@@ -67,7 +67,7 @@ public class StreamingService implements
             sb.append(String.format(" %s:\n", dest.getHostAddress()));
             for (PendingFile pf : StreamOutManager.getPendingFiles(dest))
             {
-                sb.append(String.format("  %s %d/%d\n", pf.getFilename(), pf.getPtr(), pf.getExpectedBytes()));
+                sb.append(String.format("  %s\n", pf.toString()));
             }
         }
         return sb.toString();
@@ -92,7 +92,7 @@ public class StreamingService implements
         
         StreamOutManager manager = StreamOutManager.get(dest);
         for (PendingFile f : manager.getFiles())
-            files.add(String.format("%s %d/%d", f.getFilename(), f.getPtr(), f.getExpectedBytes()));
+            files.add(String.format("%s", f.toString()));
         return files;
     }
 
@@ -108,7 +108,7 @@ public class StreamingService implements
         List<String> files = new ArrayList<String>();
         for (PendingFile pf : StreamInManager.getIncomingFiles(InetAddress.getByName(host)))
         {
-            files.add(String.format("%s: %s %d/%d", pf.getDescriptor().ksname, pf.getFilename(), pf.getPtr(), pf.getExpectedBytes()));
+            files.add(String.format("%s: %s", pf.getDescriptor().ksname, pf.toString()));
         }
         return files;
     }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java Fri Jun 18 10:29:54 2010
@@ -64,7 +64,7 @@ public class ColumnFamilyStoreTest exten
         ColumnFamilyStore store = Util.writeColumnFamily(rms);
 
         Table table = Table.open("Keyspace1");
-        List<SSTableReader> ssTables = table.getAllSSTablesOnDisk();
+        List<SSTableReader> ssTables = table.getAllSSTables();
         assertEquals(1, ssTables.size());
         ssTables.get(0).forceFilterFailures();
         ColumnFamily cf = store.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key2"), new QueryPath("Standard1", null, "Column1".getBytes())));

Modified: cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/db/TableTest.java Fri Jun 18 10:29:54 2010
@@ -431,7 +431,7 @@ public class TableTest extends CleanupHe
             CompactionManager.instance.submitMajor(cfStore).get();
         }
         SSTableReader sstable = cfStore.getSSTables().iterator().next();
-        long position = sstable.getPosition(key);
+        long position = sstable.getPosition(key, SSTableReader.Operator.EQ);
         BufferedRandomAccessFile file = new BufferedRandomAccessFile(sstable.getFilename(), "r");
         file.seek(position);
         assert Arrays.equals(FBUtilities.readShortByteArray(file), key.key);

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/StreamingTest.java Fri Jun 18 10:29:54 2010
@@ -26,6 +26,11 @@ import java.util.*;
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableUtils;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.service.StorageService;
@@ -46,17 +51,28 @@ public class StreamingTest extends Clean
         // write a temporary SSTable, but don't register it
         Set<String> content = new HashSet<String>();
         content.add("key");
+        content.add("key2");
+        content.add("key3");
         SSTableReader sstable = SSTableUtils.writeSSTable(content);
         String tablename = sstable.getTableName();
         String cfname = sstable.getColumnFamilyName();
 
-        // transfer
-        StreamOut.transferSSTables(LOCAL, Arrays.asList(sstable), tablename);
+        // transfer the first and last key
+        IPartitioner p = StorageService.getPartitioner();
+        List<Range> ranges = new ArrayList<Range>();
+        ranges.add(new Range(p.getMinimumToken(), p.getToken("key".getBytes())));
+        ranges.add(new Range(p.getToken("key2".getBytes()), p.getMinimumToken()));
+        StreamOut.transferSSTables(LOCAL, tablename, Arrays.asList(sstable), ranges);
 
         // confirm that the SSTable was transferred and registered
         ColumnFamilyStore cfstore = Table.open(tablename).getColumnFamilyStore(cfname);
         List<Row> rows = Util.getRangeSlice(cfstore);
-        assert rows.size() == 1;
+        assertEquals(2, rows.size());
         assert Arrays.equals(rows.get(0).key.key, "key".getBytes());
+        assert Arrays.equals(rows.get(1).key.key, "key3".getBytes());
+
+        // and that the index and filter were properly recovered
+        assert null != cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key"), new QueryPath("Standard1")));
+        assert null != cfstore.getColumnFamily(QueryFilter.getIdentityFilter(Util.dk("key3"), new QueryPath("Standard1")));
     }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java Fri Jun 18 10:29:54 2010
@@ -102,7 +102,7 @@ public class LegacySSTableTest extends C
         for (byte[] key : keys)
         {
             // confirm that the bloom filter does not reject any keys
-            file.seek(reader.getPosition(reader.partitioner.decorateKey(key)));
+            file.seek(reader.getPosition(reader.partitioner.decorateKey(key), SSTableReader.Operator.EQ));
             assert Arrays.equals(key, FBUtilities.readShortByteArray(file));
         }
     }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java Fri Jun 18 10:29:54 2010
@@ -2,6 +2,9 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.List;
 
 import org.junit.Test;
 
@@ -9,9 +12,14 @@ import org.apache.cassandra.CleanupHelpe
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.MmappedSegmentedFile;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 import org.apache.cassandra.Util;
 
@@ -19,6 +27,50 @@ import static org.junit.Assert.assertEqu
 
 public class SSTableReaderTest extends CleanupHelper
 {
+    static Token t(int i)
+    {
+        return StorageService.getPartitioner().getToken(String.valueOf(i).getBytes());
+    }
+
+    @Test
+    public void testGetPositionsForRanges() throws IOException, ExecutionException, InterruptedException
+    {
+        Table table = Table.open("Keyspace1");
+        ColumnFamilyStore store = table.getColumnFamilyStore("Standard2");
+
+        // insert data and compact to a single sstable
+        CompactionManager.instance.disableAutoCompaction();
+        for (int j = 0; j < 10; j++)
+        {
+            byte[] key = String.valueOf(j).getBytes();
+            RowMutation rm = new RowMutation("Keyspace1", key);
+            rm.add(new QueryPath("Standard2", null, "0".getBytes()), new byte[0], new TimestampClock(j));
+            rm.apply();
+        }
+        store.forceBlockingFlush();
+        CompactionManager.instance.submitMajor(store).get();
+
+        List<Range> ranges = new ArrayList<Range>();
+        // 1 key
+        ranges.add(new Range(t(0), t(1)));
+        // 2 keys
+        ranges.add(new Range(t(2), t(4)));
+        // wrapping range from key to end
+        ranges.add(new Range(t(6), StorageService.getPartitioner().getMinimumToken()));
+        // empty range (should be ignored)
+        ranges.add(new Range(t(9), t(91)));
+
+        // confirm that positions increase continuously
+        SSTableReader sstable = store.getSSTables().iterator().next();
+        long previous = -1;
+        for (Pair<Long,Long> section : sstable.getPositionsForRanges(ranges))
+        {
+            assert previous <= section.left : previous + " ! < " + section.left;
+            assert section.left < section.right : section.left + " ! < " + section.right;
+            previous = section.right;
+        }
+    }
+
     @Test
     public void testSpannedIndexPositions() throws IOException, ExecutionException, InterruptedException
     {
@@ -53,7 +105,7 @@ public class SSTableReaderTest extends C
         for (int j = 1; j < 110; j += 2)
         {
             DecoratedKey dk = Util.dk(String.valueOf(j));
-            assert sstable.getPosition(dk) == -1;
+            assert sstable.getPosition(dk, SSTableReader.Operator.EQ) == -1;
         }
     }
 }

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java Fri Jun 18 10:29:54 2010
@@ -50,7 +50,7 @@ public class SSTableTest extends Cleanup
     private void verifySingle(SSTableReader sstable, byte[] bytes, byte[] key) throws IOException
     {
         BufferedRandomAccessFile file = new BufferedRandomAccessFile(sstable.getFilename(), "r");
-        file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key)));
+        file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ));
         assert Arrays.equals(key, FBUtilities.readShortByteArray(file));
         int size = (int)SSTableReader.readRowSize(file, sstable.getDescriptor());
         byte[] bytes2 = new byte[size];
@@ -82,7 +82,7 @@ public class SSTableTest extends Cleanup
         BufferedRandomAccessFile file = new BufferedRandomAccessFile(sstable.getFilename(), "r");
         for (byte[] key : keys)
         {
-            file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key)));
+            file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ));
             assert Arrays.equals(key, FBUtilities.readShortByteArray(file));
             int size = (int)SSTableReader.readRowSize(file, sstable.getDescriptor());
             byte[] bytes2 = new byte[size];

Modified: cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java?rev=955923&r1=955922&r2=955923&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/streaming/BootstrapTest.java Fri Jun 18 10:29:54 2010
@@ -26,6 +26,9 @@ import java.util.Map;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.utils.Pair;
+
+import java.util.Arrays;
 
 import org.junit.Test;
 
@@ -35,7 +38,7 @@ public class BootstrapTest extends Schem
     public void testGetNewNames() throws IOException
     {
         Descriptor desc = Descriptor.fromFilename(new File("Keyspace1", "Standard1-500-Data.db").toString());
-        PendingFile[] pendingFiles = new PendingFile[]{ new PendingFile(desc, "Data.db", 100) };
+        PendingFile[] pendingFiles = new PendingFile[]{ new PendingFile(desc, "Data.db", Arrays.asList(new Pair<Long,Long>(0L, 1L))) };
         StreamInitiateVerbHandler bivh = new StreamInitiateVerbHandler();
 
         // map the input (remote) contexts to output (local) contexts



Mime
View raw message