cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r764678 - in /incubator/cassandra/trunk: src/org/apache/cassandra/db/ColumnFamilyStore.java src/org/apache/cassandra/db/MinorCompactionManager.java test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Date Tue, 14 Apr 2009 05:12:33 GMT
Author: jbellis
Date: Tue Apr 14 05:12:32 2009
New Revision: 764678

URL: http://svn.apache.org/viewvc?rev=764678&view=rev
Log:
add compaction test showing regression.  patch by jbellis; reviewed by Todd Lipcon
and Jun Rao for #80.

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java
    incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java?rev=764678&r1=764677&r2=764678&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnFamilyStore.java Tue Apr 14
05:12:32 2009
@@ -74,7 +74,7 @@
     private AtomicReference<BinaryMemtable> binaryMemtable_;
 
     /* SSTables on disk for this column family */
-    private Set<String> ssTables_ = new HashSet<String>();
+    Set<String> ssTables_ = new HashSet<String>();
 
     /* Modification lock used for protecting reads from compactions. */
     private ReentrantReadWriteLock lock_ = new ReentrantReadWriteLock(true);
@@ -162,6 +162,7 @@
         {
         	HintedHandOffManager.instance().submit(this);
         }
+        // TODO this seems unnecessary -- each memtable flush checks to see if it needs to
compact, too
         MinorCompactionManager.instance().submitPeriodicCompaction(this);
     }
 
@@ -671,7 +672,7 @@
             	}
             	catch ( Exception ex)
             	{
-            		ex.printStackTrace();
+                    logger_.warn("corrupt file?  or are you just blowing away data files
manually out from under me?", ex);
             		try
             		{
             			if (fs != null)
@@ -734,7 +735,7 @@
     /*
      * Break the files into buckets and then compact.
      */
-    void doCompaction()
+    void doCompaction() throws IOException
     {
         isCompacting_.set(true);
         List<String> files = new ArrayList<String>(ssTables_);
@@ -755,17 +756,10 @@
 	    				if( count == threshHold_ )
 	    					break;
 	    			}
-	    	        try
-	    	        {
-	    	        	// For each bucket if it has crossed the threshhold do the compaction
-	    	        	// In case of range  compaction merge the counting bloom filters also.
-	    	        	if( count == threshHold_)
-	    	        		doFileCompaction(files, bufSize_);
-	    	        }
-	    	        catch ( Exception ex)
-	    	        {
-                		logger_.warn(LogUtil.throwableToString(ex));
-	    	        }
+                    // For each bucket if it has crossed the threshhold do the compaction
+                    // In case of range  compaction merge the counting bloom filters also.
+                    if( count == threshHold_)
+                        doFileCompaction(files, bufSize_);
 	    		}
 	    	}
         }
@@ -1175,7 +1169,7 @@
      * to get the latest data.
      *
      */
-    void  doFileCompaction(List<String> files,  int minBufferSize)
+    void  doFileCompaction(List<String> files,  int minBufferSize) throws IOException
     {
     	String newfile = null;
         long startTime = System.currentTimeMillis();
@@ -1183,175 +1177,168 @@
         long totalBytesWritten = 0;
         long totalkeysRead = 0;
         long totalkeysWritten = 0;
-        try
-        {
-	        // Calculate the expected compacted filesize
-	    	long expectedCompactedFileSize = getExpectedCompactedFileSize(files);
-	        String compactionFileLocation = DatabaseDescriptor.getCompactionFileLocation(expectedCompactedFileSize);
-	        // If the compaction file path is null that means we have no space left for this
compaction.
-	        if( compactionFileLocation == null )
-	        {
-        		String maxFile = getMaxSizeFile( files );
-        		files.remove( maxFile );
-        		doFileCompaction(files , minBufferSize);
-        		return;
-	        }
-	        PriorityQueue<FileStruct> pq = initializePriorityQueue(files, null, minBufferSize);
-	        if (pq.size() > 0)
-	        {
-	            String mergedFileName = getTempFileName( files );
-	            SSTable ssTable = null;
-	            String lastkey = null;
-	            List<FileStruct> lfs = new ArrayList<FileStruct>();
-	            DataOutputBuffer bufOut = new DataOutputBuffer();
-	            int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
-	            expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize
: SSTable.indexInterval();
-	            logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
-	            /* Create the bloom filter for the compacted file. */
-	            BloomFilter compactedBloomFilter = new BloomFilter(expectedBloomFilterSize,
15);
-	            List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
+        // Calculate the expected compacted filesize
+        long expectedCompactedFileSize = getExpectedCompactedFileSize(files);
+        String compactionFileLocation = DatabaseDescriptor.getCompactionFileLocation(expectedCompactedFileSize);
+        // If the compaction file path is null that means we have no space left for this
compaction.
+        if( compactionFileLocation == null )
+        {
+            String maxFile = getMaxSizeFile( files );
+            files.remove( maxFile );
+            doFileCompaction(files , minBufferSize);
+            return;
+        }
+        PriorityQueue<FileStruct> pq = initializePriorityQueue(files, null, minBufferSize);
+        if (pq.size() > 0)
+        {
+            String mergedFileName = getTempFileName( files );
+            SSTable ssTable = null;
+            String lastkey = null;
+            List<FileStruct> lfs = new ArrayList<FileStruct>();
+            DataOutputBuffer bufOut = new DataOutputBuffer();
+            int expectedBloomFilterSize = SSTable.getApproximateKeyCount(files);
+            expectedBloomFilterSize = (expectedBloomFilterSize > 0) ? expectedBloomFilterSize
: SSTable.indexInterval();
+            logger_.debug("Expected bloom filter size : " + expectedBloomFilterSize);
+            /* Create the bloom filter for the compacted file. */
+            BloomFilter compactedBloomFilter = new BloomFilter(expectedBloomFilterSize, 15);
+            List<ColumnFamily> columnFamilies = new ArrayList<ColumnFamily>();
 
-	            while (pq.size() > 0 || lfs.size() > 0)
-	            {
-	                FileStruct fs = null;
-	                if (pq.size() > 0)
-	                {
-	                    fs = pq.poll();                        
-	                }
-	                if (fs != null
-	                        && (lastkey == null || lastkey.equals(fs.getKey())))
-	                {
-	                    // The keys are the same so we need to add this to the
-	                    // ldfs list
-	                    lastkey = fs.getKey();
-	                    lfs.add(fs);
-	                }
-	                else
-	                {
-	                    Collections.sort(lfs, new FileStructComparator());
-	                    ColumnFamily columnFamily;
-	                    bufOut.reset();
-	                    if(lfs.size() > 1)
-	                    {
-		                    for (FileStruct filestruct : lfs)
-		                    {
-		                    	try
-		                    	{
-	                                /* read the length although we don't need it */
-	                                filestruct.getBufIn().readInt();
-	                                // Skip the Index
-                                    IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
-	                                // We want to add only 2 and resolve them right there in
order to save on memory footprint
-	                                if(columnFamilies.size() > 1)
-	                                {
-	    		                        merge(columnFamilies);
-	                                }
-			                        // deserialize into column families                          
         
-			                        columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
-		                    	}
-		                    	catch ( Exception ex)
-		                    	{
-                                    logger_.warn("error in filecompaction", ex);
+            while (pq.size() > 0 || lfs.size() > 0)
+            {
+                FileStruct fs = null;
+                if (pq.size() > 0)
+                {
+                    fs = pq.poll();
+                }
+                if (fs != null
+                        && (lastkey == null || lastkey.equals(fs.getKey())))
+                {
+                    // The keys are the same so we need to add this to the
+                    // ldfs list
+                    lastkey = fs.getKey();
+                    lfs.add(fs);
+                }
+                else
+                {
+                    Collections.sort(lfs, new FileStructComparator());
+                    ColumnFamily columnFamily;
+                    bufOut.reset();
+                    if(lfs.size() > 1)
+                    {
+                        for (FileStruct filestruct : lfs)
+                        {
+                            try
+                            {
+                                /* read the length although we don't need it */
+                                filestruct.getBufIn().readInt();
+                                // Skip the Index
+                                IndexHelper.skipBloomFilterAndIndex(filestruct.getBufIn());
+                                // We want to add only 2 and resolve them right there in
order to save on memory footprint
+                                if(columnFamilies.size() > 1)
+                                {
+                                    merge(columnFamilies);
                                 }
-		                    }
-		                    // Now after merging all crap append to the sstable
-		                    columnFamily = resolveAndRemoveDeleted(columnFamilies);
-		                    columnFamilies.clear();
-		                    if( columnFamily != null )
-		                    {
-			                	/* serialize the cf with column indexes */
-			                    ColumnFamily.serializerWithIndexes().serialize(columnFamily, bufOut);
-		                    }
-	                    }
-	                    else
-	                    {
-		                    FileStruct filestruct = lfs.get(0);
-	                    	try
-	                    	{
-		                        /* read the length although we don't need it */
-		                        int size = filestruct.getBufIn().readInt();
-		                        bufOut.write(filestruct.getBufIn(), size);
-	                    	}
-	                    	catch ( Exception ex)
-	                    	{
-	                    		ex.printStackTrace();
-	                            filestruct.close();
-	                            continue;
-	                    	}
-	                    }
-	                    	         
-	                    if ( ssTable == null )
-	                    {
-	                    	ssTable = new SSTable(compactionFileLocation, mergedFileName);	   
                	
-	                    }
-                        ssTable.append(lastkey, bufOut);
+                                // deserialize into column families
+                                columnFamilies.add(ColumnFamily.serializer().deserialize(filestruct.getBufIn()));
+                            }
+                            catch ( Exception ex)
+                            {
+                                logger_.warn("error in filecompaction", ex);
+                            }
+                        }
+                        // Now after merging all crap append to the sstable
+                        columnFamily = resolveAndRemoveDeleted(columnFamilies);
+                        columnFamilies.clear();
+                        if( columnFamily != null )
+                        {
+                            /* serialize the cf with column indexes */
+                            ColumnFamily.serializerWithIndexes().serialize(columnFamily,
bufOut);
+                        }
+                    }
+                    else
+                    {
+                        FileStruct filestruct = lfs.get(0);
+                        try
+                        {
+                            /* read the length although we don't need it */
+                            int size = filestruct.getBufIn().readInt();
+                            bufOut.write(filestruct.getBufIn(), size);
+                        }
+                        catch ( Exception ex)
+                        {
+                            ex.printStackTrace();
+                            filestruct.close();
+                            continue;
+                        }
+                    }
 
-                        /* Fill the bloom filter with the key */
-	                    doFill(compactedBloomFilter, lastkey);
-	                    totalkeysWritten++;
-	                    for (FileStruct filestruct : lfs)
-	                    {
-	                    	try
-	                    	{
-                                filestruct.advance();
-	                    		if (filestruct.isExhausted())
-	                    		{
-	                    			continue;
-	                    		}
-	                    		pq.add(filestruct);
-		                        totalkeysRead++;
-	                    	}
-	                    	catch ( Throwable ex )
-	                    	{
-	                    		// Ignore the exception as it might be a corrupted file
-	                    		// in any case we have read as far as possible from it
-	                    		// and it will be deleted after compaction.
-	                            filestruct.close();
+                    if ( ssTable == null )
+                    {
+                        ssTable = new SSTable(compactionFileLocation, mergedFileName);
+                    }
+                    ssTable.append(lastkey, bufOut);
+
+                    /* Fill the bloom filter with the key */
+                    doFill(compactedBloomFilter, lastkey);
+                    totalkeysWritten++;
+                    for (FileStruct filestruct : lfs)
+                    {
+                        try
+                        {
+                            filestruct.advance();
+                            if (filestruct.isExhausted())
+                            {
+                                continue;
                             }
-	                    }
-	                    lfs.clear();
-	                    lastkey = null;
-	                    if (fs != null)
-	                    {
-	                        /* Add back the fs since we processed the rest of filestructs */
-	                        pq.add(fs);
-	                    }
-	                }
-	            }
-	            if ( ssTable != null )
-	            {
-	                ssTable.closeRename(compactedBloomFilter);
-	                newfile = ssTable.getDataFileLocation();
-	            }
-	            lock_.writeLock().lock();
-	            try
-	            {
-	                for (String file : files)
-	                {
-	                    ssTables_.remove(file);
-	                    SSTable.removeAssociatedBloomFilter(file);
-	                }
-	                if ( newfile != null )
-	                {
-	                    ssTables_.add(newfile);
-	                    logger_.debug("Inserting bloom filter for file " + newfile);
-	                    SSTable.storeBloomFilter(newfile, compactedBloomFilter);
-	                    totalBytesWritten = (new File(newfile)).length();
-	                }
-	            }
-	            finally
-	            {
-	                lock_.writeLock().unlock();
-	            }
-	            for (String file : files)
-	            {
-	                SSTable.delete(file);
-	            }
-	        }
-        }
-        catch ( Exception ex)
-        {
-            logger_.warn( LogUtil.throwableToString(ex) );
+                            pq.add(filestruct);
+                            totalkeysRead++;
+                        }
+                        catch ( Throwable ex )
+                        {
+                            // Ignore the exception as it might be a corrupted file
+                            // in any case we have read as far as possible from it
+                            // and it will be deleted after compaction.
+                            filestruct.close();
+                        }
+                    }
+                    lfs.clear();
+                    lastkey = null;
+                    if (fs != null)
+                    {
+                        /* Add back the fs since we processed the rest of filestructs */
+                        pq.add(fs);
+                    }
+                }
+            }
+            if ( ssTable != null )
+            {
+                ssTable.closeRename(compactedBloomFilter);
+                newfile = ssTable.getDataFileLocation();
+            }
+            lock_.writeLock().lock();
+            try
+            {
+                for (String file : files)
+                {
+                    ssTables_.remove(file);
+                    SSTable.removeAssociatedBloomFilter(file);
+                }
+                if ( newfile != null )
+                {
+                    ssTables_.add(newfile);
+                    logger_.debug("Inserting bloom filter for file " + newfile);
+                    SSTable.storeBloomFilter(newfile, compactedBloomFilter);
+                    totalBytesWritten = (new File(newfile)).length();
+                }
+            }
+            finally
+            {
+                lock_.writeLock().unlock();
+            }
+            for (String file : files)
+            {
+                SSTable.delete(file);
+            }
         }
         logger_.debug("Total time taken for compaction  ..."
                 + (System.currentTimeMillis() - startTime));

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java?rev=764678&r1=764677&r2=764678&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/MinorCompactionManager.java Tue
Apr 14 05:12:32 2009
@@ -81,16 +81,16 @@
 
         public void run()
         {
+                logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
             try
             {
-                logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
-            	columnFamilyStore_.doCompaction();
-                logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
+                columnFamilyStore_.doCompaction();
             }
-            catch (Throwable th)
+            catch (IOException e)
             {
-                logger_.error( LogUtil.throwableToString(th) );
+                throw new RuntimeException(e);
             }
+            logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
         }
     }
 
@@ -149,16 +149,9 @@
 
         public void run()
         {
-            try
-            {
-                logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
-            	columnFamilyStore_.doCleanupCompaction();
-                logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
-            }
-            catch (Throwable th)
-            {
-                logger_.error( LogUtil.throwableToString(th) );
-            }
+            logger_.debug("Started  compaction ..."+columnFamilyStore_.columnFamily_);
+            columnFamilyStore_.doCleanupCompaction();
+            logger_.debug("Finished compaction ..."+columnFamilyStore_.columnFamily_);
         }
     }
     
@@ -181,9 +174,9 @@
     			MinorCompactionManager.intervalInMins_, TimeUnit.MINUTES);       
     }
 
-    public void submit(ColumnFamilyStore columnFamilyStore)
+    public Future submit(ColumnFamilyStore columnFamilyStore)
     {
-        compactor_.submit(new FileCompactor(columnFamilyStore));
+        return compactor_.submit(new FileCompactor(columnFamilyStore));
     }
     
     public void submitCleanup(ColumnFamilyStore columnFamilyStore)

Modified: incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=764678&r1=764677&r2=764678&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java (original)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/db/ColumnFamilyStoreTest.java Tue
Apr 14 05:12:32 2009
@@ -314,4 +314,26 @@
         fos.close();
         return f.getAbsolutePath();
     }
+
+    @Test
+    public void testCompaction() throws IOException, ColumnFamilyNotDefinedException, ExecutionException,
InterruptedException
+    {
+        Table table = Table.open("Table1");
+        ColumnFamilyStore store = table.getColumnFamilyStore("Standard1");
+        store.ssTables_.clear(); // TODO integrate this better into test setup/teardown
+
+        for (int j = 0; j < 5; j++) {
+            for (int i = 0; i < 10; i++) {
+                long epoch = System.currentTimeMillis()  /  1000;
+                String key = String.format("%s.%s.%s",  epoch,  1,  i);
+                RowMutation rm = new RowMutation("Table1", key);
+                rm.add("Standard1:A", new byte[0], epoch);
+                rm.apply();
+            }
+            store.forceFlush();
+            waitForFlush();
+        }
+        Future ft = MinorCompactionManager.instance().submit(store);
+        ft.get();
+    }
 }



Mime
View raw message