cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r814756 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/dht/ test/unit/org/apache/cassandra/db/
Date Mon, 14 Sep 2009 18:04:47 GMT
Author: jbellis
Date: Mon Sep 14 18:04:46 2009
New Revision: 814756

URL: http://svn.apache.org/viewvc?rev=814756&view=rev
Log:
make anticompaction return a list of the split-out sstables, instead of a mostly-useless boolean
patch by jbellis; reviewed by goffinet for CASSANDRA-431

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
    incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=814756&r1=814755&r2=814756&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon
Sep 14 18:04:46 2009
@@ -245,38 +245,22 @@
      * This method forces a compaction of the SSTables on disk. We wait
      * for the process to complete by waiting on a future pointer.
     */
-    boolean forceCompaction(List<Range> ranges, EndPoint target, long skip, List<String>
fileList)
+    List<SSTableReader> forceAntiCompaction(List<Range> ranges, EndPoint target,
long skip)
     {
-        Future<Boolean> futurePtr = null;
-        if (ranges != null)
-        {
-            futurePtr = MinorCompactionManager.instance().submit(ColumnFamilyStore.this,
ranges, target, fileList);
-        }
-        else
-        {
-            MinorCompactionManager.instance().submitMajor(ColumnFamilyStore.this, skip);
-        }
+        assert ranges != null;
+        Future<List<SSTableReader>> futurePtr = MinorCompactionManager.instance().submit(ColumnFamilyStore.this,
ranges, target);
 
-        boolean result = true;
+        List<SSTableReader> result;
         try
         {
             /* Waiting for the compaction to complete. */
-            if (futurePtr != null)
-            {
-                result = futurePtr.get();
-            }
+            result = futurePtr.get();
             if (logger_.isDebugEnabled())
               logger_.debug("Done forcing compaction ...");
         }
-        catch (ExecutionException ex)
-        {
-            if (logger_.isDebugEnabled())
-              logger_.debug(LogUtil.throwableToString(ex));
-        }
-        catch (InterruptedException ex2)
+        catch (Exception ex)
         {
-            if (logger_.isDebugEnabled())
-              logger_.debug(LogUtil.throwableToString(ex2));
+            throw new RuntimeException(ex);
         }
         return result;
     }
@@ -723,9 +707,9 @@
         return maxFile;
     }
 
-    boolean doAntiCompaction(List<Range> ranges, EndPoint target, List<String>
fileList) throws IOException
+    List<SSTableReader> doAntiCompaction(List<Range> ranges, EndPoint target)
throws IOException
     {
-        return doFileAntiCompaction(ssTables_.getSSTables(), ranges, target, fileList);
+        return doFileAntiCompaction(ssTables_.getSSTables(), ranges, target);
     }
 
     void forceCleanup()
@@ -756,10 +740,14 @@
     {
         assert sstable != null;
         List<Range> myRanges;
-        List<String> newFiles = new ArrayList<String>();
         Map<EndPoint, List<Range>> endPointtoRangeMap = StorageService.instance().constructEndPointToRangesMap();
         myRanges = endPointtoRangeMap.get(StorageService.getLocalStorageEndPoint());
-        doFileAntiCompaction(Arrays.asList(sstable), myRanges, null, newFiles);
+        List<SSTableReader> sstables = doFileAntiCompaction(Arrays.asList(sstable),
myRanges, null);
+        if (!sstables.isEmpty())
+        {
+            assert sstables.size() == 1;
+            addSSTable(sstables.get(0));
+        }
         if (logger_.isDebugEnabled())
           logger_.debug("Original file : " + sstable + " of size " + sstable.length());
         ssTables_.markCompacted(Arrays.asList(sstable));
@@ -772,13 +760,12 @@
      * @param sstables
      * @param ranges
      * @param target
-     * @param fileList
      * @return
      * @throws IOException
      */
-    boolean doFileAntiCompaction(Collection<SSTableReader> sstables, List<Range>
ranges, EndPoint target, List<String> fileList) throws IOException
+    List<SSTableReader> doFileAntiCompaction(Collection<SSTableReader> sstables,
List<Range> ranges, EndPoint target) throws IOException
     {
-        boolean result = false;
+        List<SSTableReader> results = new ArrayList<SSTableReader>();
         long startTime = System.currentTimeMillis();
         long totalBytesRead = 0;
         long totalBytesWritten = 0;
@@ -796,12 +783,12 @@
         {
             logger_.error("Total bytes to be written for range compaction  ..."
                           + expectedRangeFileSize + "   is greater than the safe limit of
the disk space available.");
-            return result;
+            return results;
         }
         PriorityQueue<FileStruct> pq = initializePriorityQueue(sstables, ranges);
         if (pq.isEmpty())
         {
-            return result;
+            return results;
         }
 
         mergedFileName = getTempSSTableFileName();
@@ -912,17 +899,7 @@
 
         if (rangeWriter != null)
         {
-            rangeWriter.closeAndOpenReader();
-            if (fileList != null)
-            {
-                //Retain order. The -Data.db file needs to be last because 
-                //the receiving end checks for this file before opening the SSTable
-                //and adding this to the list of SSTables.
-                fileList.add(rangeWriter.indexFilename());
-                fileList.add(rangeWriter.filterFilename());
-                fileList.add(rangeWriter.getFilename());
-            }
-            result = true;
+            results.add(rangeWriter.closeAndOpenReader());
         }
 
         if (logger_.isDebugEnabled())
@@ -932,7 +909,7 @@
             logger_.debug("Total bytes written for range split  ..."
                           + totalBytesWritten + "   Total keys read ..." + totalkeysRead);
         }
-        return result;
+        return results;
     }
 
     /*

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=814756&r1=814755&r2=814756&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Mon
Sep 14 18:04:46 2009
@@ -19,7 +19,6 @@
 package org.apache.cassandra.db;
 
 import java.util.Collection;
-import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -188,7 +187,7 @@
             }
         }
         hintStore.forceFlush();
-        hintStore.forceCompaction(null, null, 0, null);
+        hintStore.forceAntiCompaction(null, null, 0);
 
         if (logger_.isDebugEnabled())
           logger_.debug("Finished deliverAllHints");

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java?rev=814756&r1=814755&r2=814756&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/MinorCompactionManager.java
Mon Sep 14 18:04:46 2009
@@ -30,6 +30,7 @@
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.io.SSTableReader;
 
 import org.apache.log4j.Logger;
 
@@ -60,29 +61,27 @@
         return instance_;
     }
 
-    static class FileCompactor2 implements Callable<Boolean>
+    static class FileCompactor2 implements Callable<List<SSTableReader>>
     {
         private ColumnFamilyStore columnFamilyStore_;
         private List<Range> ranges_;
         private EndPoint target_;
-        private List<String> fileList_;
 
-        FileCompactor2(ColumnFamilyStore columnFamilyStore, List<Range> ranges, EndPoint
target,List<String> fileList)
+        FileCompactor2(ColumnFamilyStore columnFamilyStore, List<Range> ranges, EndPoint
target)
         {
             columnFamilyStore_ = columnFamilyStore;
             ranges_ = ranges;
             target_ = target;
-            fileList_ = fileList;
         }
 
-        public Boolean call()
+        public List<SSTableReader> call()
         {
-        	boolean result;
+        	List<SSTableReader> results;
             if (logger_.isDebugEnabled())
               logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
             try
             {
-                result = columnFamilyStore_.doAntiCompaction(ranges_, target_,fileList_);
+                results = columnFamilyStore_.doAntiCompaction(ranges_, target_);
             }
             catch (IOException e)
             {
@@ -90,7 +89,7 @@
             }
             if (logger_.isDebugEnabled())
               logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
-            return result;
+            return results;
         }
     }
 
@@ -178,9 +177,9 @@
         compactor_.submit(new CleanupCompactor(columnFamilyStore));
     }
 
-    public Future<Boolean> submit(ColumnFamilyStore columnFamilyStore, List<Range>
ranges, EndPoint target, List<String> fileList)
+    public Future<List<SSTableReader>> submit(ColumnFamilyStore columnFamilyStore,
List<Range> ranges, EndPoint target)
     {
-        return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges, target, fileList)
);
+        return compactor_.submit( new FileCompactor2(columnFamilyStore, ranges, target) );
     }
 
     public void  submitMajor(ColumnFamilyStore columnFamilyStore, long skip)

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=814756&r1=814755&r2=814756&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Mon Sep 14 18:04:46
2009
@@ -459,9 +459,9 @@
      * do a complete compaction since we can figure out based on the ranges
      * whether the files need to be split.
     */
-    public boolean forceCompaction(List<Range> ranges, EndPoint target, List<String>
fileList)
+    public List<SSTableReader> forceAntiCompaction(List<Range> ranges, EndPoint
target)
     {
-        boolean result = true;
+        List<SSTableReader> allResults = new ArrayList<SSTableReader>();
         Set<String> columnFamilies = tableMetadata_.getColumnFamilies();
         for ( String columnFamily : columnFamilies )
         {
@@ -469,12 +469,9 @@
                 continue;
             
             ColumnFamilyStore cfStore = columnFamilyStores_.get( columnFamily );
-            if ( cfStore != null )
-            {
-                cfStore.forceCompaction(ranges, target, 0, fileList);                
-            }
+            allResults.addAll(cfStore.forceAntiCompaction(ranges, target, 0));
         }
-        return result;
+        return allResults;
     }
     
     /*

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java?rev=814756&r1=814755&r2=814756&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/dht/BootstrapMetadataVerbHandler.java
Mon Sep 14 18:04:46 2009
@@ -26,6 +26,7 @@
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.io.DataInputBuffer;
+import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
@@ -130,7 +131,12 @@
               logger_.debug("Forcing compaction ...");
             /* Get the counting bloom filter for each endpoint and the list of files that
need to be streamed */
             List<String> fileList = new ArrayList<String>();
-            table.forceCompaction(ranges, target, fileList);
+            for (SSTableReader sstable : table.forceAntiCompaction(ranges, target))
+            {
+                fileList.add(sstable.indexFilename());
+                fileList.add(sstable.filterFilename());
+                fileList.add(sstable.getFilename());
+            }
             doHandoff(target, fileList, tName);
             //In Handoff, Streaming the file also deletes the file, so no cleanup needed
           
         }

Modified: incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java?rev=814756&r1=814755&r2=814756&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java (original)
+++ incubator/cassandra/trunk/test/unit/org/apache/cassandra/db/BootstrapTest.java Mon Sep
14 18:04:46 2009
@@ -32,6 +32,8 @@
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.io.StreamContextManager;
+import org.apache.cassandra.io.SSTableReader;
+
 import org.junit.Test;
 
 public class BootstrapTest
@@ -54,16 +56,13 @@
         }
         
         store.forceBlockingFlush();
-        List<String> fileList = new ArrayList<String>();
         List<Range> ranges  = new ArrayList<Range>();
         IPartitioner partitioner = new CollatingOrderPreservingPartitioner();
         Range r = new Range(partitioner.getToken("0"), partitioner.getToken("zzzzzzz"));
         ranges.add(r);
 
-        boolean result = store.forceCompaction(ranges, new EndPoint("127.0.0.1", 9150), 0,
fileList);
-
-        assertEquals(true, result); // some keys should have qualified
-        assertEquals(true, fileList.size() >= 3); //Data, index, filter files
+        List<SSTableReader> fileList = store.forceAntiCompaction(ranges, new EndPoint("127.0.0.1",
9150), 0);
+        assert fileList.size() >= 1;
     }
 
     @Test



Mime
View raw message