cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1053927 - in /cassandra/branches/cassandra-0.7: ./ interface/thrift/gen-java/org/apache/cassandra/thrift/ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ test/unit/org/apache/cassandra/db/
Date Thu, 30 Dec 2010 16:18:34 GMT
Author: jbellis
Date: Thu Dec 30 16:18:33 2010
New Revision: 1053927

URL: http://svn.apache.org/viewvc?rev=1053927&view=rev
Log:
merge from 0.7.0

Added:
    cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/CleanupTest.java
Modified:
    cassandra/branches/cassandra-0.7/   (props changed)
    cassandra/branches/cassandra-0.7/CHANGES.txt
    cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
  (props changed)
    cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
  (props changed)
    cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
  (props changed)
    cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
  (props changed)
    cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
  (props changed)
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
    cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java

Propchange: cassandra/branches/cassandra-0.7/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 30 16:18:33 2010
@@ -1,6 +1,6 @@
 /cassandra/branches/cassandra-0.6:922689-1053763
 /cassandra/branches/cassandra-0.7:1026516,1035666,1050269
-/cassandra/branches/cassandra-0.7.0:1053690-1053891
+/cassandra/branches/cassandra-0.7.0:1053690-1053891,1053922
 /cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
 /cassandra/trunk:1026516-1026734,1028929
 /incubator/cassandra/branches/cassandra-0.3:774578-796573

Modified: cassandra/branches/cassandra-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/CHANGES.txt?rev=1053927&r1=1053926&r2=1053927&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.7/CHANGES.txt Thu Dec 30 16:18:33 2010
@@ -22,6 +22,7 @@ dev
  * change RandomPartitioner min token to -1 to avoid collision w/
    tokens on actual nodes (CASSANDRA-1901)
  * examine the right nibble when validating TimeUUID (CASSANDRA-1910)
+ * include secondary indexes in cleanup (CASSANDRA-1916)
  * CFS.scrubDataDirectories should also cleanup invalid secondary indexes
    (CASSANDRA-1904)
 

Propchange: cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 30 16:18:33 2010
@@ -1,6 +1,6 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1053763
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516,1035666,1050269
-/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1053891
+/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1053891,1053922
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1026734,1028929
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573

Propchange: cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 30 16:18:33 2010
@@ -1,6 +1,6 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1053763
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516,1035666,1050269
-/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1053891
+/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1053891,1053922
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1026734,1028929
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198

Propchange: cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 30 16:18:33 2010
@@ -1,6 +1,6 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1053763
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516,1035666,1050269
-/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1053891
+/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1053891,1053922
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1026734,1028929
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573

Propchange: cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 30 16:18:33 2010
@@ -1,6 +1,6 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1053763
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516,1035666,1050269
-/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1053891
+/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1053891,1053922
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1026734,1028929
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573

Propchange: cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Dec 30 16:18:33 2010
@@ -1,6 +1,6 @@
 /cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1053763
 /cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516,1035666,1050269
-/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1053891
+/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1053891,1053922
 /cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
 /cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1026734,1028929
 /incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1053927&r1=1053926&r2=1053927&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java
Thu Dec 30 16:18:33 2010
@@ -18,10 +18,12 @@
 
 package org.apache.cassandra.db;
 
+import java.io.DataOutput;
 import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
@@ -31,7 +33,6 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.commons.collections.PredicateUtils;
-import org.apache.commons.collections.iterators.CollatingIterator;
 import org.apache.commons.collections.iterators.FilterIterator;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
@@ -47,7 +48,6 @@ import org.apache.cassandra.io.sstable.*
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.service.AntiEntropyService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
@@ -368,96 +368,109 @@ public class CompactionManager implement
     }
 
     /**
-     * This function is used to do the anti compaction process , it spits out the file which
has keys that belong to a given range
-     * If the target is not specified it spits out the file as a compacted file with the
unecessary ranges wiped out.
+     * This function goes over each file and removes the keys that the node is not responsible
for
+     * and only keeps keys that this node is responsible for.
      *
-     * @param cfs
-     * @param sstables
-     * @param ranges
-     * @param target
-     * @return
-     * @throws java.io.IOException
+     * @throws IOException
      */
-    private List<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs, Collection<SSTableReader>
sstables, Collection<Range> ranges, InetAddress target)
-            throws IOException
+    private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
     {
+        assert !cfs.isIndex();
         Table table = cfs.table;
-        logger.info("AntiCompacting [" + StringUtils.join(sstables, ",") + "]");
-        // Calculate the expected compacted filesize
-        long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(sstables) / 2;
-        String compactionFileLocation = table.getDataFileLocation(expectedRangeFileSize);
-        if (compactionFileLocation == null)
-        {
-            throw new UnsupportedOperationException("disk full");
-        }
+        Collection<Range> ranges = StorageService.instance.getLocalRanges(table.name);
 
-        List<SSTableReader> results = new ArrayList<SSTableReader>();
-        long startTime = System.currentTimeMillis();
-        long totalkeysWritten = 0;
-
-        int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstables)
/ 2));
-        if (logger.isDebugEnabled())
-          logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
-
-        SSTableWriter writer = null;
-        CompactionIterator ci = new AntiCompactionIterator(cfs, sstables, ranges, (int) (System.currentTimeMillis()
/ 1000) - cfs.metadata.getGcGraceSeconds(), cfs.isCompleteSSTables(sstables));
-        Iterator<AbstractCompactedRow> nni = new FilterIterator(ci, PredicateUtils.notNullPredicate());
-        executor.beginCompaction(cfs, ci);
-
-        try
+        for (SSTableReader sstable : cfs.getSSTables())
         {
-            if (!nni.hasNext())
+            logger.info("AntiCompacting " + sstable);
+            // Calculate the expected compacted filesize
+            long expectedRangeFileSize = cfs.getExpectedCompactedFileSize(Arrays.asList(sstable))
/ 2;
+            String compactionFileLocation = table.getDataFileLocation(expectedRangeFileSize);
+            if (compactionFileLocation == null)
+                throw new UnsupportedOperationException("disk full");
+
+            long startTime = System.currentTimeMillis();
+            long totalkeysWritten = 0;
+
+            int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(),
+                                                   (int)(SSTableReader.getApproximateKeyCount(Arrays.asList(sstable))
/ 2));
+            if (logger.isDebugEnabled())
+              logger.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+
+            SSTableWriter writer = null;
+            SSTableScanner scanner = sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE);
+            SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns();
+            executor.beginCompaction(cfs, new CleanupInfo(sstable, scanner));
+            try
             {
-                return results;
+                while (scanner.hasNext())
+                {
+                    SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next();
+                    if (Range.isTokenInRanges(row.getKey().token, ranges))
+                    {
+                        writer = maybeCreateWriter(cfs, compactionFileLocation, expectedBloomFilterSize,
writer);
+                        writer.append(new EchoedRow(row));
+                        totalkeysWritten++;
+                    }
+                    else
+                    {
+                        while (row.hasNext())
+                        {
+                            IColumn column = row.next();
+                            if (indexedColumns.contains(column.name()))
+                                Table.cleanupIndexEntry(cfs, row.getKey().key, column);
+                        }
+                    }
+                }
+            }
+            finally
+            {
+                scanner.close();
             }
 
-            while (nni.hasNext())
+            List<SSTableReader> results = new ArrayList<SSTableReader>();
+            if (writer != null)
             {
-                AbstractCompactedRow row = nni.next();
-                if (writer == null)
+                SSTableReader newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
+                results.add(newSstable);
+
+                String format = "AntiCompacted to %s.  %,d to %,d (~%d%% of original) bytes
for %,d keys.  Time: %,dms.";
+                long dTime = System.currentTimeMillis() - startTime;
+                long startsize = sstable.length();
+                long endsize = newSstable.length();
+                double ratio = (double)endsize / (double)startsize;
+                logger.info(String.format(format, writer.getFilename(), startsize, endsize,
(int)(ratio*100), totalkeysWritten, dTime));
+            }
+
+            // flush to ensure we don't lose the tombstones on a restart, since they are
not commitlog'd
+            for (ByteBuffer columnName : cfs.getIndexedColumns())
+            {
+                try
                 {
-                    FileUtils.createDirectory(compactionFileLocation);
-                    String newFilename = new File(cfs.getTempSSTablePath(compactionFileLocation)).getAbsolutePath();
-                    writer = new SSTableWriter(newFilename, expectedBloomFilterSize, cfs.metadata,
cfs.partitioner);
+                    cfs.getIndexedColumnFamilyStore(columnName).forceBlockingFlush();
+                }
+                catch (ExecutionException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
                 }
-                writer.append(row);
-                totalkeysWritten++;
             }
+            cfs.replaceCompactedSSTables(Arrays.asList(sstable), results);
         }
-        finally
-        {
-            ci.close();
-        }
-
-        if (writer != null)
-        {
-            results.add(writer.closeAndOpenReader(getMaxDataAge(sstables)));
-
-            String format = "AntiCompacted to %s.  %,d to %,d (~%d%% of original) bytes for
%,d keys.  Time: %,dms.";
-            long dTime = System.currentTimeMillis() - startTime;
-            long startsize = SSTable.getTotalBytes(sstables);
-            long endsize = results.get(0).length();
-            double ratio = (double)endsize / (double)startsize;
-            logger.info(String.format(format, writer.getFilename(), startsize, endsize, (int)(ratio*100),
totalkeysWritten, dTime));
-        }
-
-        return results;
     }
 
-    /**
-     * This function goes over each file and removes the keys that the node is not responsible
for
-     * and only keeps keys that this node is responsible for.
-     *
-     * @throws IOException
-     */
-    private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
+    private SSTableWriter maybeCreateWriter(ColumnFamilyStore cfs, String compactionFileLocation,
int expectedBloomFilterSize, SSTableWriter writer)
+            throws IOException
     {
-        Collection<SSTableReader> originalSSTables = cfs.getSSTables();
-        List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables, StorageService.instance.getLocalRanges(cfs.table.name),
null);
-        if (!sstables.isEmpty())
+        if (writer == null)
         {
-            cfs.replaceCompactedSSTables(originalSSTables, sstables);
+            FileUtils.createDirectory(compactionFileLocation);
+            String newFilename = new File(cfs.getTempSSTablePath(compactionFileLocation)).getAbsolutePath();
+            writer = new SSTableWriter(newFilename, expectedBloomFilterSize, cfs.metadata,
cfs.partitioner);
         }
+        return writer;
     }
 
     /**
@@ -633,55 +646,6 @@ public class CompactionManager implement
         }
     }
 
-    private static class AntiCompactionIterator extends CompactionIterator
-    {
-        private Set<SSTableScanner> scanners;
-
-        public AntiCompactionIterator(ColumnFamilyStore cfStore, Collection<SSTableReader>
sstables, Collection<Range> ranges, int gcBefore, boolean isMajor)
-                throws IOException
-        {
-            super(cfStore, getCollatedRangeIterator(sstables, ranges), gcBefore, isMajor);
-        }
-
-        private static Iterator getCollatedRangeIterator(Collection<SSTableReader>
sstables, final Collection<Range> ranges)
-                throws IOException
-        {
-            org.apache.commons.collections.Predicate rangesPredicate = new org.apache.commons.collections.Predicate()
-            {
-                public boolean evaluate(Object row)
-                {
-                    return Range.isTokenInRanges(((SSTableIdentityIterator)row).getKey().token,
ranges);
-                }
-            };
-            // TODO CollatingIterator iter = FBUtilities.<SSTableIdentityIterator>getCollatingIterator();
-            CollatingIterator iter = FBUtilities.getCollatingIterator();
-            for (SSTableReader sstable : sstables)
-            {
-                SSTableScanner scanner = sstable.getDirectScanner(FILE_BUFFER_SIZE);
-                iter.addIterator(new FilterIterator(scanner, rangesPredicate));
-            }
-            return iter;
-        }
-
-        public Iterable<SSTableScanner> getScanners()
-        {
-            if (scanners == null)
-            {
-                scanners = new HashSet<SSTableScanner>();
-                for (Object o : ((CollatingIterator)source).getIterators())
-                {
-                    scanners.add((SSTableScanner)((FilterIterator)o).getIterator());
-                }
-            }
-            return scanners;
-        }
-
-        public String getTaskType()
-        {
-            return "Cleanup";
-        }
-    }
-
     public void checkAllColumnFamilies() throws IOException
     {
         // perform estimates
@@ -821,4 +785,63 @@ public class CompactionManager implement
             throw new IllegalStateException("May not call SimpleFuture.get(long, TimeUnit)");
         }
     }
+
+    private static class EchoedRow extends AbstractCompactedRow
+    {
+        private final SSTableIdentityIterator row;
+
+        public EchoedRow(SSTableIdentityIterator row)
+        {
+            super(row.getKey());
+            this.row = row;
+        }
+
+        public void write(DataOutput out) throws IOException
+        {
+            row.echoData(out);
+        }
+
+        public void update(MessageDigest digest)
+        {
+            // EchoedRow is not used in anti-entropy validation
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean isEmpty()
+        {
+            return !row.hasNext();
+        }
+
+        public int columnCount()
+        {
+            return row.columnCount;
+        }
+    }
+
+    private static class CleanupInfo implements ICompactionInfo
+    {
+        private final SSTableReader sstable;
+        private final SSTableScanner scanner;
+
+        public CleanupInfo(SSTableReader sstable, SSTableScanner scanner)
+        {
+            this.sstable = sstable;
+            this.scanner = scanner;
+        }
+
+        public long getTotalBytes()
+        {
+            return scanner.getFileLength();
+        }
+
+        public long getBytesRead()
+        {
+            return scanner.getFilePointer();
+        }
+
+        public String getTaskType()
+        {
+            return "Cleanup of " + sstable.getColumnFamilyName();
+        }
+    }
 }

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java?rev=1053927&r1=1053926&r2=1053927&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/Table.java Thu Dec 30
16:18:33 2010
@@ -571,6 +571,21 @@ public class Table
         return fullMemtables;
     }
 
+    public static void cleanupIndexEntry(ColumnFamilyStore cfs, ByteBuffer key, IColumn column)
+    {
+        if (column.isMarkedForDelete())
+            return;
+        int localDeletionTime = (int) (System.currentTimeMillis() / 1000);
+        DecoratedKey<LocalToken> valueKey = cfs.getIndexKeyFor(column.name(), column.value());
+        ColumnFamily cfi = cfs.newIndexedColumnFamily(column.name());
+        cfi.addTombstone(key, localDeletionTime, column.timestamp());
+        Memtable fullMemtable = cfs.getIndexedColumnFamilyStore(column.name()).apply(valueKey,
cfi);
+        if (logger.isDebugEnabled())
+            logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, cfi);
+        if (fullMemtable != null)
+            fullMemtable.cfs.maybeSwitchMemtable(fullMemtable, false);
+    }
+
     public IndexBuilder createIndexBuilder(ColumnFamilyStore cfs, SortedSet<ByteBuffer>
columns, ReducingKeyIterator iter)
     {
         return new IndexBuilder(cfs, columns, iter);

Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java?rev=1053927&r1=1053926&r2=1053927&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
(original)
+++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/CompactionIterator.java
Thu Dec 30 16:18:33 2010
@@ -44,7 +44,7 @@ implements Closeable, ICompactionInfo
 {
     private static Logger logger = LoggerFactory.getLogger(CompactionIterator.class);
 
-    protected static final int FILE_BUFFER_SIZE = 1024 * 1024;
+    public static final int FILE_BUFFER_SIZE = 1024 * 1024;
 
     protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
     private final ColumnFamilyStore cfs;

Added: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/CleanupTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/CleanupTest.java?rev=1053927&view=auto
==============================================================================
--- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/CleanupTest.java (added)
+++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/CleanupTest.java Thu
Dec 30 16:18:33 2010
@@ -0,0 +1,124 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.junit.Test;
+
+import org.apache.cassandra.CleanupHelper;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
+import org.apache.cassandra.db.filter.IFilter;
+import org.apache.cassandra.db.filter.QueryFilter;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.IndexClause;
+import org.apache.cassandra.thrift.IndexExpression;
+import org.apache.cassandra.thrift.IndexOperator;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class CleanupTest extends CleanupHelper
+{
+    public static final int LOOPS = 800;
+    public static final String TABLE1 = "Keyspace1";
+    public static final String CF1 = "Indexed1";
+    public static final ByteBuffer COLUMN = ByteBuffer.wrap("birthdate".getBytes());
+    public static final ByteBuffer VALUE = ByteBuffer.allocate(8);
+    static
+    {
+        VALUE.putLong(20101229);
+        VALUE.flip();
+    }
+
+    @Test
+    public void testCleanup() throws IOException, ExecutionException, InterruptedException
+    {
+        Table table = Table.open(TABLE1);
+
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(CF1);
+        fillCF(cfs, LOOPS);
+
+        assertEquals(cfs.getIndexedColumns().iterator().next(), COLUMN);
+
+        ColumnFamilyStore cfi = cfs.getIndexedColumnFamilyStore(COLUMN);
+
+        assertTrue(cfi.isIndexBuilt());
+
+        IndexExpression expr = new IndexExpression(COLUMN, IndexOperator.EQ, VALUE);
+        IndexClause clause = new IndexClause(Arrays.asList(expr), FBUtilities.EMPTY_BYTE_BUFFER,
Integer.MAX_VALUE);
+        IFilter filter = new IdentityQueryFilter();
+        IPartitioner p = StorageService.getPartitioner();
+        Range range = new Range(p.getMinimumToken(), p.getMinimumToken());
+        List<Row> rows = table.getColumnFamilyStore(CF1).scan(clause, range, filter);
+
+        assertEquals(LOOPS, rows.size());
+
+        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
+
+        assertNotNull(tmd);
+        assertEquals(0, tmd.getTokenToEndpointMap().size());
+
+        // Since this test has no ring cleanup will remove all
+        CompactionManager.instance.performCleanup(cfs);
+
+        // row data should be gone
+        rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter());
+        assertEquals(0, rows.size());
+
+        // not only should it be gone but there should be no data on disk, not even tombstones
+        assert cfs.getSSTables().isEmpty();
+
+        // 2ary indexes should result in no results, but
+        rows = cfs.scan(clause, range, filter);
+        assertEquals(0, rows.size());
+    }
+
+    protected void fillCF(ColumnFamilyStore store, int rowsPerSSTable) throws ExecutionException,
InterruptedException, IOException
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        for (int i = 0; i < rowsPerSSTable; i++)
+        {
+            String key = String.valueOf(i);
+
+            // create a row and update the birthdate value, test that the index query fetches
the new version
+            RowMutation rm;
+            rm = new RowMutation(TABLE1, ByteBufferUtil.bytes(key));
+            rm.add(new QueryPath(CF1, null, COLUMN), VALUE, System.currentTimeMillis());
+            rm.apply();
+        }
+
+        store.forceBlockingFlush();        
+        store.buildSecondaryIndexes(store.getSSTables(), store.getIndexedColumns());
+    }
+}



Mime
View raw message