cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r999742 - in /cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/streaming/ test/unit/org/apache/cassandra/io/sstable/
Date Wed, 22 Sep 2010 03:41:52 GMT
Author: jbellis
Date: Wed Sep 22 03:41:52 2010
New Revision: 999742

URL: http://svn.apache.org/viewvc?rev=999742&view=rev
Log:
split 2ary index build out from bloom/row index build, and move into stream session post-processing.
 bloom/row index construction moved into SSTableWriter.Builder and is now run on CompactionManager
executor
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1415


Modified:
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
    cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed Sep 22 03:41:52
2010
@@ -33,6 +33,7 @@ import javax.management.ObjectName;
 import com.google.common.collect.Iterables;
 import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -205,12 +206,9 @@ public class ColumnFamilyStore implement
             logger.info("Creating index {}.{}", table, indexedCfMetadata.cfName);
             Runnable runnable = new WrappedRunnable()
             {
-                public void runMayThrow() throws IOException, ExecutionException, InterruptedException
+                public void runMayThrow() throws IOException
                 {
-                    logger.debug("Submitting index build to compactionmanager");
-                    ReducingKeyIterator iter = new ReducingKeyIterator(getSSTables());
-                    Future future = CompactionManager.instance.submitIndexBuild(ColumnFamilyStore.this,
FBUtilities.getSingleColumnSet(info.name), iter);
-                    future.get();
+                    buildSecondaryIndexes(getSSTables(), FBUtilities.getSingleColumnSet(info.name));
                     logger.info("Index {} complete", indexedCfMetadata.cfName);
                     SystemTable.setIndexBuilt(table, indexedCfMetadata.cfName);
                 }
@@ -220,6 +218,26 @@ public class ColumnFamilyStore implement
         indexedColumns.put(info.name, indexedCfs);
     }
 
+    public void buildSecondaryIndexes(Collection<SSTableReader> sstables, SortedSet<byte[]>
columns)
+    {
+        logger.debug("Submitting index build to compactionmanager");
+        Future future = CompactionManager.instance.submitIndexBuild(this, columns, new ReducingKeyIterator(sstables));
+        try
+        {
+            future.get();
+            for (byte[] column : columns)
+                getIndexedColumnFamilyStore(column).forceBlockingFlush();
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     // called when dropping or renaming a CF. Performs mbean housekeeping.
     void unregisterMBean()
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed Sep 22 03:41:52
2010
@@ -18,38 +18,38 @@
 
 package org.apache.cassandra.db;
 
-import java.io.IOException;
 import java.io.File;
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
-import javax.management.*;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
 
+import org.apache.commons.collections.PredicateUtils;
+import org.apache.commons.collections.iterators.CollatingIterator;
+import org.apache.commons.collections.iterators.FilterIterator;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.io.*;
+import org.apache.cassandra.io.AbstractCompactedRow;
+import org.apache.cassandra.io.CompactionIterator;
+import org.apache.cassandra.io.ICompactionInfo;
 import org.apache.cassandra.io.sstable.*;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.AntiEntropyService;
 import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.service.AntiEntropyService;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
-import java.net.InetAddress;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.collections.iterators.FilterIterator;
-import org.apache.commons.collections.iterators.CollatingIterator;
-import org.apache.commons.collections.PredicateUtils;
-
 public class CompactionManager implements CompactionManagerMBean
 {
     public static final String MBEAN_OBJECT_NAME = "org.apache.cassandra.db:type=CompactionManager";
@@ -508,6 +508,20 @@ public class CompactionManager implement
         return executor.submit(runnable);
     }
 
+    public Future<SSTableReader> submitSSTableBuild(Descriptor desc)
+    {
+        final SSTableWriter.Builder builder = SSTableWriter.createBuilder(desc);
+        Callable<SSTableReader> callable = new Callable<SSTableReader>()
+        {
+            public SSTableReader call() throws IOException
+            {
+                executor.beginCompaction(builder.cfs, builder);
+                return builder.build();
+            }
+        };
+        return executor.submit(callable);
+    }
+
     private static class AntiCompactionIterator extends CompactionIterator
     {
         private Set<SSTableScanner> scanners;
@@ -550,6 +564,11 @@ public class CompactionManager implement
             }
             return scanners;
         }
+
+        public String getTaskType()
+        {
+            return "Anticompaction";
+        }
     }
 
     public void checkAllColumnFamilies() throws IOException

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Sep 22 03:41:52 2010
@@ -456,7 +456,8 @@ public class Table
                 synchronized (indexLockFor(key.key))
                 {
                     ColumnFamily cf = readCurrentIndexedColumns(key, cfs, columns);
-                    applyIndexUpdates(key.key, memtablesToFlush, cf, cfs, cf.getColumnNames(),
null);
+                    if (cf != null)
+                        applyIndexUpdates(key.key, memtablesToFlush, cf, cfs, cf.getColumnNames(),
null);
                 }
             }
             finally

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/CompactionIterator.java Wed Sep 22 03:41:52
2010
@@ -161,4 +161,8 @@ implements Closeable, ICompactionInfo
         return bytesRead;
     }
 
+    public String getTaskType()
+    {
+        return "Compaction";
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/ICompactionInfo.java Wed Sep 22 03:41:52
2010
@@ -5,4 +5,6 @@ public interface ICompactionInfo
     public long getTotalBytes();
 
     public long getBytesRead();
+
+    public String getTaskType();
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java Wed Sep 22 03:41:52
2010
@@ -4,16 +4,16 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOError;
 import java.io.IOException;
+import java.util.Iterator;
 
 import com.google.common.collect.AbstractIterator;
 
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.io.ICompactionInfo;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
-public class KeyIterator extends AbstractIterator<DecoratedKey> implements IKeyIterator
+public class KeyIterator extends AbstractIterator<DecoratedKey> implements Iterator<DecoratedKey>,
Closeable
 {
     private final BufferedRandomAccessFile in;
     private final Descriptor desc;

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java Wed
Sep 22 03:41:52 2010
@@ -66,6 +66,11 @@ public class ReducingKeyIterator impleme
         return m;
     }
 
+    public String getTaskType()
+    {
+        return "Secondary index build";
+    }
+
     public boolean hasNext()
     {
         return iter.hasNext();
@@ -73,7 +78,7 @@ public class ReducingKeyIterator impleme
 
     public DecoratedKey next()
     {
-        return (DecoratedKey) iter.next();
+        return iter.next();
     }
 
     public void remove()

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java Wed Sep 22
03:41:52 2010
@@ -32,6 +32,7 @@ import org.apache.cassandra.config.Datab
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.io.AbstractCompactedRow;
+import org.apache.cassandra.io.ICompactionInfo;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.io.util.SegmentedFile;
 import org.apache.cassandra.service.StorageService;
@@ -211,62 +212,35 @@ public class SSTableWriter extends SSTab
         return dfile.length() / (dataPosition / keys);
     }
 
+    public static Builder createBuilder(Descriptor desc)
+    {
+        if (!desc.isLatestVersion)
+            // TODO: streaming between different versions will fail: need support for
+            // recovering other versions to provide a stable streaming api
+            throw new RuntimeException(String.format("Cannot recover SSTable with version
%s (current version %s).",
+                                                     desc.version, Descriptor.CURRENT_VERSION));
+
+        return new Builder(desc);
+    }
+
     /**
-     * If either of the index or filter files are missing, rebuilds both.
-     * TODO: Builds most of the in-memory state of the sstable, but doesn't actually open
it.
+     * Removes the given SSTable from temporary status and opens it, rebuilding the
+     * bloom filter and row index from the data file.
      */
-    private static void maybeRecover(Descriptor desc) throws IOException
+    public static class Builder implements ICompactionInfo
     {
-        logger.debug("In maybeRecover with Descriptor {}", desc);
-        File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
-        File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
-        if (ifile.exists() && ffile.exists())
-            // nothing to do
-            return;
-
-        ColumnFamilyStore cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
-
-        // remove existing files
-        ifile.delete();
-        ffile.delete();
-
-        // open the data file for input, and an IndexWriter for output
-        BufferedRandomAccessFile dfile = new BufferedRandomAccessFile(desc.filenameFor(SSTable.COMPONENT_DATA),
"r", 8 * 1024 * 1024);
-        IndexWriter iwriter;
-        long estimatedRows;
-        try
-        {            
-            estimatedRows = estimateRows(desc, dfile);            
-            iwriter = new IndexWriter(desc, StorageService.getPartitioner(), estimatedRows);
-        }
-        catch(IOException e)
-        {
-            dfile.close();
-            throw e;
-        }
+        private final Descriptor desc;
+        public final ColumnFamilyStore cfs;
+        private BufferedRandomAccessFile dfile;
 
-        // build the index and filter
-        long rows = 0;
-        try
-        {
-            DecoratedKey key;
-            long dataPosition = 0;
-            while (dataPosition < dfile.length())
-            {
-                key = SSTableReader.decodeKey(StorageService.getPartitioner(), desc, FBUtilities.readShortByteArray(dfile));
-                long dataSize = SSTableReader.readRowSize(dfile, desc);
-                iwriter.afterAppend(key, dataPosition);
-                dataPosition = dfile.getFilePointer() + dataSize;
-                dfile.seek(dataPosition);
-                rows++;
-            }
-        }
-        finally
+        public Builder(Descriptor desc)
         {
+
+            this.desc = desc;
+            cfs = Table.open(desc.ksname).getColumnFamilyStore(desc.cfname);
             try
             {
-                dfile.close();
-                iwriter.close();
+                dfile = new BufferedRandomAccessFile(desc.filenameFor(SSTable.COMPONENT_DATA),
"r", 8 * 1024 * 1024);
             }
             catch (IOException e)
             {
@@ -274,44 +248,80 @@ public class SSTableWriter extends SSTab
             }
         }
 
-        if (!cfs.getIndexedColumns().isEmpty())
+        public SSTableReader build() throws IOException
         {
-            Future future = CompactionManager.instance.submitIndexBuild(cfs, cfs.getIndexedColumns(),
new KeyIterator(desc));
+            File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
+            File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
+            assert !ifile.exists();
+            assert !ffile.exists();
+
+            IndexWriter iwriter;
+            long estimatedRows;
             try
             {
-                future.get();
-                for (byte[] column : cfs.getIndexedColumns())
-                    cfs.getIndexedColumnFamilyStore(column).forceBlockingFlush();
+                estimatedRows = estimateRows(desc, dfile);
+                iwriter = new IndexWriter(desc, StorageService.getPartitioner(), estimatedRows);
+            }
+            catch(IOException e)
+            {
+                dfile.close();
+                throw e;
             }
-            catch (InterruptedException e)
+
+            // build the index and filter
+            long rows = 0;
+            try
             {
-                throw new AssertionError(e);
+                DecoratedKey key;
+                long dataPosition = 0;
+                while (dataPosition < dfile.length())
+                {
+                    key = SSTableReader.decodeKey(StorageService.getPartitioner(), desc,
FBUtilities.readShortByteArray(dfile));
+                    long dataSize = SSTableReader.readRowSize(dfile, desc);
+                    iwriter.afterAppend(key, dataPosition);
+                    dataPosition = dfile.getFilePointer() + dataSize;
+                    dfile.seek(dataPosition);
+                    rows++;
+                }
             }
-            catch (ExecutionException e)
+            finally
             {
-                throw new RuntimeException(e);
+                try
+                {
+                    dfile.close();
+                    iwriter.close();
+                }
+                catch (IOException e)
+                {
+                    throw new IOError(e);
+                }
             }
+
+            logger.debug("estimated row count was %s of real count", ((double)estimatedRows)
/ rows);
+            return SSTableReader.open(rename(desc, SSTable.componentsFor(desc)));
         }
 
-        logger.debug("estimated row count was %s of real count", ((double)estimatedRows)
/ rows);
-    }
+        public long getTotalBytes()
+        {
+            try
+            {
+                return dfile.length();
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+        }
 
-    /**
-     * Removes the given SSTable from temporary status and opens it, rebuilding the non-essential
portions of the
-     * file if necessary.
-     */
-    public static SSTableReader recoverAndOpen(Descriptor desc) throws IOException
-    {
-        if (!desc.isLatestVersion)
-            // TODO: streaming between different versions will fail: need support for
-            // recovering other versions to provide a stable streaming api
-            throw new RuntimeException(String.format("Cannot recover SSTable with version
%s (current version %s).",
-                                                     desc.version, Descriptor.CURRENT_VERSION));
+        public long getBytesRead()
+        {
+            return dfile.getFilePointer();
+        }
 
-        // FIXME: once maybeRecover is recovering BMIs, it should return the recovered
-        // components
-        maybeRecover(desc);
-        return SSTableReader.open(rename(desc, SSTable.componentsFor(desc)));
+        public String getTaskType()
+        {
+            return "SSTable rebuild";
+        }
     }
 
     /**

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java Wed
Sep 22 03:41:52 2010
@@ -100,24 +100,6 @@ public class IncomingStreamReader
             fc.close();
         }
 
-        addSSTable(localFile);
-        session.finished(remoteFile);
-    }
-
-    public static void addSSTable(PendingFile pendingFile)
-    {
-        // file was successfully streamed
-        Descriptor desc = pendingFile.desc;
-        try
-        {
-            SSTableReader sstable = SSTableWriter.recoverAndOpen(pendingFile.desc);
-            Table.open(desc.ksname).getColumnFamilyStore(desc.cfname).addSSTable(sstable);
-            logger.info("Streaming added " + sstable);
-        }
-        catch (IOException e)
-        {
-            logger.error("Failed adding {}", pendingFile, e);
-            throw new RuntimeException("Not able to add streamed file " + pendingFile.getFilename(),
e);
-        }
+        session.finished(remoteFile, localFile);
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamInSession.java Wed Sep 22
03:41:52 2010
@@ -22,7 +22,13 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.CompactionManager;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.net.MessagingService;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 import org.apache.cassandra.utils.Pair;
@@ -41,6 +47,8 @@ public class StreamInSession
     private final Pair<InetAddress, Long> context;
     private final Runnable callback;
     private String table;
+    private final List<Future<SSTableReader>> buildFutures = new ArrayList<Future<SSTableReader>>();
+    private ColumnFamilyStore cfs;
 
     private StreamInSession(Pair<InetAddress, Long> context, Runnable callback)
     {
@@ -84,13 +92,19 @@ public class StreamInSession
             if(logger.isDebugEnabled())
                 logger.debug("Adding file {} to Stream Request queue", file.getFilename());
             this.files.add(file);
+            if (cfs == null)
+                cfs = Table.open(file.desc.ksname).getColumnFamilyStore(file.desc.cfname);
         }
     }
 
-    public void finished(PendingFile remoteFile) throws IOException
+    public void finished(PendingFile remoteFile, PendingFile localFile) throws IOException
     {
         if (logger.isDebugEnabled())
             logger.debug("Finished {}. Sending ack to {}", remoteFile, this);
+
+        Future future = CompactionManager.instance.submitSSTableBuild(localFile.desc);
+        buildFutures.add(future);
+
         files.remove(remoteFile);
         StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_FINISHED);
         // send a StreamStatus message telling the source node it can delete this file
@@ -108,6 +122,31 @@ public class StreamInSession
     {
         if (files.isEmpty())
         {
+            // wait for bloom filters and row indexes to finish building
+            List<SSTableReader> sstables = new ArrayList<SSTableReader>(buildFutures.size());
+            for (Future<SSTableReader> future : buildFutures)
+            {
+                try
+                {
+                    SSTableReader sstable = future.get();
+                    cfs.addSSTable(sstable);
+                    sstables.add(sstable);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+                catch (ExecutionException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            // build secondary indexes
+            if (cfs != null && !cfs.getIndexedColumns().isEmpty())
+                cfs.buildSecondaryIndexes(sstables, cfs.getIndexedColumns());
+
+            // send reply to source that we're done
             StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FINISHED);
             logger.info("Finished streaming session {} from {}", getSessionId(), getHost());
             MessagingService.instance.sendOneWay(reply.createMessage(), getHost());

Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java?rev=999742&r1=999741&r2=999742&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java (original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java Wed Sep
22 03:41:52 2010
@@ -29,21 +29,17 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.cassandra.CleanupHelper;
-import org.apache.cassandra.db.Column;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.Table;
-import org.apache.cassandra.db.TimestampClock;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.IFilter;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.QueryPath;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.IndexClause;
 import org.apache.cassandra.thrift.IndexExpression;
@@ -54,7 +50,7 @@ import org.junit.Test;
 public class SSTableWriterTest extends CleanupHelper {
 
     @Test
-    public void testRecoverAndOpen() throws IOException
+    public void testRecoverAndOpen() throws IOException, ExecutionException, InterruptedException
     {
         RowMutation rm;
 
@@ -80,13 +76,13 @@ public class SSTableWriterTest extends C
         
         SSTableReader orig = SSTableUtils.writeRawSSTable("Keyspace1", "Indexed1", entries);
       
         // whack the index to trigger the recover
-        new File(orig.desc.filenameFor(Component.PRIMARY_INDEX)).delete();
-        new File(orig.desc.filenameFor(Component.FILTER)).delete();
-        
-        SSTableReader sstr = SSTableWriter.recoverAndOpen(orig.desc);
-        
+        FileUtils.deleteWithConfirm(orig.desc.filenameFor(Component.PRIMARY_INDEX));
+        FileUtils.deleteWithConfirm(orig.desc.filenameFor(Component.FILTER));
+
+        SSTableReader sstr = CompactionManager.instance.submitSSTableBuild(orig.desc).get();
         ColumnFamilyStore cfs = Table.open("Keyspace1").getColumnFamilyStore("Indexed1");
         cfs.addSSTable(sstr);
+        cfs.buildSecondaryIndexes(cfs.getSSTables(), cfs.getIndexedColumns());
         
         IndexExpression expr = new IndexExpression("birthdate".getBytes("UTF8"), IndexOperator.EQ,
FBUtilities.toByteArray(1L));
         IndexClause clause = new IndexClause(Arrays.asList(expr), "".getBytes(), 100);
@@ -95,7 +91,7 @@ public class SSTableWriterTest extends C
         Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
         List<Row> rows = cfs.scan(clause, range, filter);
         
-        assertEquals("IndexExpression should return two rows on recoverAndOpen",2, rows.size());
+        assertEquals("IndexExpression should return two rows on recoverAndOpen", 2, rows.size());
         assertTrue("First result should be 'k1'",Arrays.equals("k1".getBytes(), rows.get(0).key.key));
     }
 }



Mime
View raw message