cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vi...@apache.org
Subject git commit: Make scrub and cleanup operations throttled patch by Vijay; reviewed by yukim for CASSANDRA-4100
Date Wed, 11 Apr 2012 00:41:32 GMT
Updated Branches:
  refs/heads/trunk aa83c7f2f -> f774b7fc3


Make scrub and cleanup operations throttled
patch by Vijay; reviewed by yukim for CASSANDRA-4100

resolved Conflicts in:
	src/java/org/apache/cassandra/db/compaction/CompactionController.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f774b7fc
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f774b7fc
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f774b7fc

Branch: refs/heads/trunk
Commit: f774b7fc396f4fec611247159023c58863de5f85
Parents: aa83c7f
Author: Vijay Parthasarathy <vijay2win@gmail.com>
Authored: Tue Apr 10 17:34:18 2012 -0700
Committer: Vijay Parthasarathy <vijay2win@gmail.com>
Committed: Tue Apr 10 17:41:03 2012 -0700

----------------------------------------------------------------------
 .../db/compaction/AbstractCompactionIterable.java  |   20 --------------
 .../db/compaction/CompactionController.java        |   21 +++++++++++++++
 .../db/compaction/CompactionIterable.java          |    2 +-
 .../cassandra/db/compaction/CompactionManager.java |    6 ++++
 .../db/compaction/ParallelCompactionIterable.java  |    2 +-
 5 files changed, 29 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f774b7fc/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
index d5bd3d4..2a590bf 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionIterable.java
@@ -24,12 +24,9 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableScanner;
-import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.CloseableIterator;
-import org.apache.cassandra.utils.Throttle;
 
 public abstract class AbstractCompactionIterable extends CompactionInfo.Holder implements
Iterable<AbstractCompactedRow>
 {
@@ -41,8 +38,6 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder
i
     protected volatile long bytesRead;
     protected final List<SSTableScanner> scanners;
 
-    protected final Throttle throttle;
-
     public AbstractCompactionIterable(CompactionController controller, OperationType type,
List<SSTableScanner> scanners)
     {
         this.controller = controller;
@@ -54,21 +49,6 @@ public abstract class AbstractCompactionIterable extends CompactionInfo.Holder
i
         for (SSTableScanner scanner : scanners)
             bytes += scanner.getFileLength();
         this.totalBytes = bytes;
-
-        this.throttle = new Throttle(toString(), new Throttle.ThroughputFunction()
-        {
-            /** @return Instantaneous throughput target in bytes per millisecond. */
-            public int targetThroughput()
-            {
-                if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || StorageService.instance.isBootstrapMode())
-                    // throttling disabled
-                    return 0;
-                // total throughput
-                int totalBytesPerMS = DatabaseDescriptor.getCompactionThroughputMbPerSec()
* 1024 * 1024 / 1000;
-                // per stream throughput (target bytes per MS)
-                return totalBytesPerMS / Math.max(1, CompactionManager.instance.getActiveCompactions());
-            }
-        });
     }
 
     protected static List<SSTableScanner> getScanners(Iterable<SSTableReader>
sstables) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f774b7fc/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index cb4e87a..f7d5354 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -31,6 +31,8 @@ import org.apache.cassandra.db.DataTracker;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Throttle;
 import org.apache.cassandra.utils.IntervalTree.Interval;
 import org.apache.cassandra.utils.IntervalTree.IntervalTree;
 
@@ -46,6 +48,20 @@ public class CompactionController
 
     public final int gcBefore;
     public final int mergeShardBefore;
+    private final Throttle throttle = new Throttle("Cassandra_Throttle", new Throttle.ThroughputFunction()
+    {
+        /** @return Instantaneous throughput target in bytes per millisecond. */
+        public int targetThroughput()
+        {
+            if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || StorageService.instance.isBootstrapMode())
+                // throttling disabled
+                return 0;
+            // total throughput
+            int totalBytesPerMS = DatabaseDescriptor.getCompactionThroughputMbPerSec() *
1024 * 1024 / 1000;
+            // per stream throughput (target bytes per MS)
+            return totalBytesPerMS / Math.max(1, CompactionManager.instance.getActiveCompactions());
+        }
+    });
 
     public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables,
int gcBefore, boolean forceDeserialize)
     {
@@ -119,4 +135,9 @@ public class CompactionController
     {
         return getCompactedRow(Collections.singletonList(row));
     }
+    
+    public void mayThrottle(long currentBytes)
+    {
+        throttle.throttle(currentBytes);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f774b7fc/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index ba4e3c2..b50066c 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -115,7 +115,7 @@ public class CompactionIterable extends AbstractCompactionIterable
                     for (SSTableScanner scanner : scanners)
                         n += scanner.getFilePointer();
                     bytesRead = n;
-                    throttle.throttle(bytesRead);
+                    controller.mayThrottle(bytesRead);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f774b7fc/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 36f05c1..5e19ce2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -467,6 +467,7 @@ public class CompactionManager implements CompactionManagerMBean
         // row header (key or data size) is corrupt. (This means our position in the index
file will be one row
         // "ahead" of the data file.)
         final RandomAccessReader dataFile = sstable.openDataReader(true);
+        long rowsRead = 0;
         RandomAccessReader indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)),
true);
         ScrubInfo scrubInfo = new ScrubInfo(dataFile, sstable);
         executor.beginCompaction(scrubInfo);
@@ -607,6 +608,8 @@ public class CompactionManager implements CompactionManagerMBean
                         badRows++;
                     }
                 }
+                if ((rowsRead++ % 1000) == 0)
+                    controller.mayThrottle(dataFile.getFilePointer());
             }
 
             if (writer.getFilePointer() > 0)
@@ -690,6 +693,7 @@ public class CompactionManager implements CompactionManagerMBean
                 throw new IOException("disk full");
 
             SSTableScanner scanner = sstable.getDirectScanner();
+            long rowsRead = 0;
             Collection<ByteBuffer> indexedColumns = cfs.indexManager.getIndexedColumns();
             List<IColumn> indexedColumnsInRow = null;
 
@@ -749,6 +753,8 @@ public class CompactionManager implements CompactionManagerMBean
                             }
                         }
                     }
+                    if ((rowsRead++ % 1000) == 0)
+                        controller.mayThrottle(scanner.getFilePointer());
                 }
                 if (writer != null)
                     newSstable = writer.closeAndOpenReader(sstable.maxDataAge);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f774b7fc/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index a04bb91..c9fab64 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -164,7 +164,7 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
                 for (SSTableScanner scanner : scanners)
                     n += scanner.getFilePointer();
                 bytesRead = n;
-                throttle.throttle(bytesRead);
+                controller.mayThrottle(bytesRead);
             }
             return compacted;
         }


Mime
View raw message