cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject svn commit: r1041105 - in /cassandra/branches/cassandra-0.6: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/io/ src/java/org/apache/cassandra/service/ src/java/org/apache/cassandra/streaming/
Date Wed, 01 Dec 2010 17:50:06 GMT
Author: jbellis
Date: Wed Dec  1 17:50:06 2010
New Revision: 1041105

URL: http://svn.apache.org/viewvc?rev=1041105&view=rev
Log:
avoid opening readers on anticompacted to-be-streamed temporary files
patch by thobbs; reviewed by mdennis and jbellis for CASSANDRA-1752

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1041105&r1=1041104&r2=1041105&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Wed Dec  1 17:50:06 2010
@@ -10,6 +10,8 @@
  * detect and warn when obsolete version of JNA is present (CASSANDRA-1770)
  * fix live-column-count of slice ranges including tombstoned supercolumn 
    with live subcolumn (CASSANDRA-1591)
+ * avoid opening readers on anticompacted to-be-streamed temporary
+   files (CASSANDRA-1752)
 
 
 0.6.8

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1041105&r1=1041104&r2=1041105&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
Wed Dec  1 17:50:06 2010
@@ -133,11 +133,11 @@ public class CompactionManager implement
         return executor.submit(runnable);
     }
 
-    public Future<List<SSTableReader>> submitAnticompaction(final ColumnFamilyStore
cfStore, final Collection<Range> ranges, final InetAddress target)
+    public Future<List<String>> submitAnticompaction(final ColumnFamilyStore
cfStore, final Collection<Range> ranges, final InetAddress target)
     {
-        Callable<List<SSTableReader>> callable = new Callable<List<SSTableReader>>()
+        Callable<List<String>> callable = new Callable<List<String>>()
         {
-            public List<SSTableReader> call() throws IOException
+            public List<String> call() throws IOException
             {
                 return doAntiCompaction(cfStore, cfStore.getSSTables(), ranges, target);
             }
@@ -320,18 +320,7 @@ public class CompactionManager implement
         return sstables.size();
     }
 
-    /**
-     * This function is used to do the anti compaction process , it spits out the file which
has keys that belong to a given range
-     * If the target is not specified it spits out the file as a compacted file with the
unecessary ranges wiped out.
-     *
-     * @param cfs
-     * @param sstables
-     * @param ranges
-     * @param target
-     * @return
-     * @throws java.io.IOException
-     */
-    private List<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs, Collection<SSTableReader>
sstables, Collection<Range> ranges, InetAddress target)
+    private SSTableWriter antiCompactionHelper(ColumnFamilyStore cfs, Collection<SSTableReader>
sstables, Collection<Range> ranges, InetAddress target)
             throws IOException
     {
         Table table = cfs.getTable();
@@ -348,10 +337,9 @@ public class CompactionManager implement
             // compacting for streaming: send to subdirectory
             compactionFileLocation = compactionFileLocation + File.separator + DatabaseDescriptor.STREAMING_SUBDIR;
         }
-        List<SSTableReader> results = new ArrayList<SSTableReader>();
 
         long startTime = System.currentTimeMillis();
-        long totalkeysWritten = 0;
+        long totalKeysWritten = 0;
 
         int expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstables)
/ 2));
         if (logger.isDebugEnabled())
@@ -364,11 +352,6 @@ public class CompactionManager implement
 
         try
         {
-            if (!nni.hasNext())
-            {
-                return results;
-            }
-
             while (nni.hasNext())
             {
                 CompactionIterator.CompactedRow row = nni.next();
@@ -379,7 +362,7 @@ public class CompactionManager implement
                     writer = new SSTableWriter(newFilename, expectedBloomFilterSize, StorageService.getPartitioner());
                 }
                 writer.append(row.key, row.buffer);
-                totalkeysWritten++;
+                totalKeysWritten++;
             }
         }
         finally
@@ -389,12 +372,53 @@ public class CompactionManager implement
 
         if (writer != null)
         {
-            results.add(writer.closeAndOpenReader());
             String format = "AntiCompacted to %s.  %d/%d bytes for %d keys.  Time: %dms.";
             long dTime = System.currentTimeMillis() - startTime;
-            logger.info(String.format(format, writer.getFilename(), SSTable.getTotalBytes(sstables),
results.get(0).length(), totalkeysWritten, dTime));
+            List<String> filenames = writer.getAllFilenames();
+            long length = new File(filenames.get(filenames.size() -1)).length(); // Data
file is last in the list
+            logger.info(String.format(format, writer.getFilename(), SSTable.getTotalBytes(sstables),
length, totalKeysWritten, dTime));
+        }
+        return writer;
+    }
+
+    /**
+     * This function is used to do the anti compaction process.  It spits out a file which
has keys
+     * that belong to a given range. If the target is not specified it spits out the file
as a compacted file with the
+     * unnecessary ranges wiped out.
+     *
+     * @param cfs
+     * @param sstables
+     * @param ranges
+     * @param target
+     * @return
+     * @throws java.io.IOException
+     */
+    private List<String> doAntiCompaction(ColumnFamilyStore cfs, Collection<SSTableReader>
sstables, Collection<Range> ranges, InetAddress target)
+            throws IOException
+    {
+        List<String> filenames = new ArrayList<String>(SSTable.FILES_ON_DISK);
+        SSTableWriter writer = antiCompactionHelper(cfs, sstables, ranges, target);
+        if (writer != null)
+        {
+            writer.close();
+            filenames = writer.getAllFilenames();
         }
+        return filenames;
+    }
 
+    /**
+     * Like doAntiCompaction(), but returns an List of SSTableReaders instead of a list of
filenames.
+     * @throws java.io.IOException
+     */
+    private List<SSTableReader> doAntiCompactionReturnReaders(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress target)
+            throws IOException
+    {
+        List<SSTableReader> results = new ArrayList<SSTableReader>(1);
+        SSTableWriter writer = antiCompactionHelper(cfs, sstables, ranges, target);
+        if (writer != null)
+        {
+            results.add(writer.closeAndOpenReader());
+        }
         return results;
     }
 
@@ -407,7 +431,7 @@ public class CompactionManager implement
     private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
     {
         Collection<SSTableReader> originalSSTables = cfs.getSSTables();
-        List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables, StorageService.instance.getLocalRanges(cfs.getTable().name),
null);
+        List<SSTableReader> sstables = doAntiCompactionReturnReaders(cfs, originalSSTables,
StorageService.instance.getLocalRanges(cfs.getTable().name), null);
         if (!sstables.isEmpty())
         {
             cfs.replaceCompactedSSTables(originalSSTables, sstables);

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java?rev=1041105&r1=1041104&r2=1041105&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java Wed Dec 
1 17:50:06 2010
@@ -282,9 +282,9 @@ public class Table 
      * do a complete compaction since we can figure out based on the ranges
      * whether the files need to be split.
     */
-    public List<SSTableReader> forceAntiCompaction(Collection<Range> ranges,
InetAddress target)
+    public List<String> forceAntiCompaction(Collection<Range> ranges, InetAddress
target)
     {
-        List<SSTableReader> allResults = new ArrayList<SSTableReader>();
+        List<String> allResults = new ArrayList<String>();
         Set<String> columnFamilies = tableMetadata.getColumnFamilies();
         for ( String columnFamily : columnFamilies )
         {

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=1041105&r1=1041104&r2=1041105&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java Wed
Dec  1 17:50:06 2010
@@ -114,7 +114,7 @@ public class SSTableWriter extends SSTab
     /**
      * Renames temporary SSTable files to valid data, index, and bloom filter files
      */
-    public SSTableReader closeAndOpenReader() throws IOException
+    public void close() throws IOException
     {
         // bloom filter
         FileOutputStream fos = new FileOutputStream(filterFilename());
@@ -136,6 +136,14 @@ public class SSTableWriter extends SSTab
         path = rename(path); // important to do this last since index & filter file names
are derived from it
 
         indexSummary.complete();
+    }
+    
+    /**
+     * Renames temporary SSTable files to valid data, index, and bloom filter files and returns
an SSTableReader
+     */
+    public SSTableReader closeAndOpenReader() throws IOException
+    {
+        this.close();
         return new SSTableReader(path, partitioner, indexSummary, bf);
     }
 

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1041105&r1=1041104&r2=1041105&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
Wed Dec  1 17:50:06 2010
@@ -485,12 +485,12 @@ public class AntiEntropyService
             try
             {
                 List<Range> ranges = new ArrayList<Range>(differences);
-                final List<SSTableReader> sstables = CompactionManager.instance.submitAnticompaction(cfstore,
ranges, remote).get();
+                final List<String> filenames = CompactionManager.instance.submitAnticompaction(cfstore,
ranges, remote).get();
                 Future f = StageManager.getStage(StageManager.STREAM_STAGE).submit(new WrappedRunnable()

                 {
                     protected void runMayThrow() throws Exception
                     {
-                        StreamOut.transferSSTables(remote, sstables, cf.left);
+                        StreamOut.transferSSTables(remote, filenames, cf.left);
                         StreamOutManager.remove(remote);
                     }
                 });

Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1041105&r1=1041104&r2=1041105&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
(original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
Wed Dec  1 17:50:06 2010
@@ -112,19 +112,16 @@ public class StreamOut
      * Transfers a group of sstables from a single table to the target endpoint
      * and then marks them as ready for local deletion.
      */
-    public static void transferSSTables(InetAddress target, List<SSTableReader> sstables,
String table) throws IOException
+    public static void transferSSTables(InetAddress target, List<String> filenames,
String table) throws IOException
     {
-        PendingFile[] pendingFiles = new PendingFile[SSTable.FILES_ON_DISK * sstables.size()];
+        PendingFile[] pendingFiles = new PendingFile[filenames.size()];
         int i = 0;
-        for (SSTableReader sstable : sstables)
+        for (String filename : filenames)
         {
-            for (String filename : sstable.getAllFilenames())
-            {
-                File file = new File(filename);
-                pendingFiles[i++] = new PendingFile(file.getAbsolutePath(), file.length(),
table);
-            }
+            File file = new File(filename);
+            pendingFiles[i++] = new PendingFile(file.getAbsolutePath(), file.length(), table);
         }
-        logger.info("Stream context metadata " + StringUtils.join(pendingFiles, ", " + "
" + sstables.size() + " sstables."));
+        logger.info("Stream context metadata " + StringUtils.join(pendingFiles, ", " + "
" + filenames.size() + " sstables."));
         StreamOutManager.get(target).addFilesToStream(pendingFiles);
         StreamInitiateMessage biMessage = new StreamInitiateMessage(pendingFiles);
         Message message = StreamInitiateMessage.makeStreamInitiateMessage(biMessage);



Mime
View raw message