cassandra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbel...@apache.org
Subject [6/6] git commit: ghetto merge from 1.2
Date Tue, 30 Apr 2013 20:51:44 GMT
ghetto merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/74f37b5a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/74f37b5a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/74f37b5a

Branch: refs/heads/trunk
Commit: 74f37b5a398fea0d60c5dc87124208a28580e8c3
Parents: e74c13f
Author: Jonathan Ellis <jbellis@apache.org>
Authored: Tue Apr 30 15:51:34 2013 -0500
Committer: Jonathan Ellis <jbellis@apache.org>
Committed: Tue Apr 30 15:51:34 2013 -0500

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../apache/cassandra/cache/IMeasurableMemory.java  |   21 +++++++
 .../db/compaction/AbstractCompactionStrategy.java  |    4 +-
 .../db/compaction/CompactionController.java        |   24 +-------
 .../db/compaction/CompactionIterable.java          |   12 +---
 .../cassandra/db/compaction/CompactionManager.java |   25 +++++++-
 .../db/compaction/LeveledCompactionStrategy.java   |    7 +-
 .../db/compaction/ParallelCompactionIterable.java  |   13 +---
 .../apache/cassandra/db/compaction/Scrubber.java   |    8 +--
 .../io/compress/CompressedRandomAccessReader.java  |    2 +-
 .../io/sstable/SSTableBoundedScanner.java          |    6 +-
 .../apache/cassandra/io/sstable/SSTableReader.java |   24 +++++++-
 .../cassandra/io/sstable/SSTableScanner.java       |    7 ++-
 .../org/apache/cassandra/tools/SSTableExport.java  |   44 +++-----------
 .../org/apache/cassandra/utils/ObjectSizes.java    |   23 +++++++-
 .../org/apache/cassandra/cache/ObjectSizeTest.java |   21 +++++++
 .../LegacyLeveledManifestTestHelper.java           |   21 +++++++
 .../cassandra/io/sstable/SSTableReaderTest.java    |    4 +-
 18 files changed, 171 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/74f37b5a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 48cd54a..31054e3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -40,6 +40,7 @@
  * Add binary protocol versioning (CASSANDRA-5436)
 
 1.2.5
+ * fix compaction throttling bursty-ness (CASSANDRA-4316)
  * reduce memory consumption of IndexSummary (CASSANDRA-5506)
  * remove per-row column name bloom filters (CASSANDRA-5492)
  * Include fatal errors in trace events (CASSANDRA-5447)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74f37b5a/src/java/org/apache/cassandra/cache/IMeasurableMemory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cache/IMeasurableMemory.java b/src/java/org/apache/cassandra/cache/IMeasurableMemory.java
index 6b5f00e..16ca7c2 100644
--- a/src/java/org/apache/cassandra/cache/IMeasurableMemory.java
+++ b/src/java/org/apache/cassandra/cache/IMeasurableMemory.java
@@ -1,4 +1,25 @@
 package org.apache.cassandra.cache;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 public interface IMeasurableMemory
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74f37b5a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index aefacd6..ba89aa3 100644
--- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@ -21,6 +21,7 @@ import java.util.*;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -207,9 +208,10 @@ public abstract class AbstractCompactionStrategy
      */
     public List<ICompactionScanner> getScanners(Collection<SSTableReader> sstables,
Range<Token> range)
     {
+        RateLimiter limiter = CompactionManager.instance.getRateLimiter();
         ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>();
         for (SSTableReader sstable : sstables)
-            scanners.add(sstable.getScanner(range));
+            scanners.add(sstable.getScanner(range, limiter));
         return scanners;
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74f37b5a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index b81d6cb..997016e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -24,6 +24,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.primitives.Ints;
+import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,6 +41,8 @@ import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.AlwaysPresentFilter;
 import org.apache.cassandra.utils.Throttle;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.Throttle;
 
 /**
  * Manage compaction options.
@@ -55,21 +59,6 @@ public class CompactionController
     public final int gcBefore;
     public final int mergeShardBefore;
 
-    private final Throttle throttle = new Throttle("Cassandra_Throttle", new Throttle.ThroughputFunction()
-    {
-        /** @return Instantaneous throughput target in bytes per millisecond. */
-        public int targetThroughput()
-        {
-            if (DatabaseDescriptor.getCompactionThroughputMbPerSec() < 1 || StorageService.instance.isBootstrapMode())
-                // throttling disabled
-                return 0;
-            // total throughput
-            int totalBytesPerMS = DatabaseDescriptor.getCompactionThroughputMbPerSec() *
1024 * 1024 / 1000;
-            // per stream throughput (target bytes per MS)
-            return totalBytesPerMS / Math.max(1, CompactionManager.instance.getActiveCompactions());
-        }
-    });
-
     /**
      * Constructor that subclasses may use when overriding shouldPurge to not need overlappingTree
      */
@@ -237,11 +226,6 @@ public class CompactionController
         return getCompactedRow(Collections.singletonList(row));
     }
 
-    public void mayThrottle(long currentBytes)
-    {
-        throttle.throttle(currentBytes);
-    }
-
     public void close()
     {
         SSTableReader.releaseReferences(overlappingSSTables);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74f37b5a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
index 32b4942..3614ed1 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
@@ -78,14 +78,10 @@ public class CompactionIterable extends AbstractCompactionIterable
             finally
             {
                 rows.clear();
-                if ((row++ % 1000) == 0)
-                {
-                    long n = 0;
-                    for (ICompactionScanner scanner : scanners)
-                        n += scanner.getCurrentPosition();
-                    bytesRead = n;
-                    controller.mayThrottle(bytesRead);
-                }
+                long n = 0;
+                for (ICompactionScanner scanner : scanners)
+                    n += scanner.getCurrentPosition();
+                bytesRead = n;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74f37b5a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index f62e077..877d349 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -27,6 +27,7 @@ import javax.management.ObjectName;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.*;
+import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -99,6 +100,26 @@ public class CompactionManager implements CompactionManagerMBean
     private final CompactionMetrics metrics = new CompactionMetrics(executor, validationExecutor);
     private final Multiset<ColumnFamilyStore> compactingCF = ConcurrentHashMultiset.create();
 
+    private final RateLimiter compactionRateLimiter = RateLimiter.create(Double.MAX_VALUE);
+
+    /**
+     * Gets compaction rate limiter. When compaction_throughput_mb_per_sec is 0 or node is
bootstrapping,
+     * this returns rate limiter with the rate of Double.MAX_VALUE bytes per second.
+     * Rate unit is bytes per sec.
+     *
+     * @return RateLimiter with rate limit set
+     */
+    public RateLimiter getRateLimiter()
+    {
+        double currentThroughput = DatabaseDescriptor.getCompactionThroughputMbPerSec() *
1024 * 1024;
+        // if throughput is set to 0, throttling is disabled
+        if (currentThroughput == 0 || StorageService.instance.isBootstrapMode())
+            currentThroughput = Double.MAX_VALUE;
+        if (compactionRateLimiter.getRate() != currentThroughput)
+            compactionRateLimiter.setRate(currentThroughput);
+        return compactionRateLimiter;
+    }
+
     /**
      * Call this whenever a compaction might be needed on the given columnfamily.
      * It's okay to over-call (within reason) since the compactions are single-threaded,
@@ -471,7 +492,7 @@ public class CompactionManager implements CompactionManagerMBean
             if (compactionFileLocation == null)
                 throw new IOException("disk full");
 
-            SSTableScanner scanner = sstable.getScanner();
+            SSTableScanner scanner = sstable.getScanner(getRateLimiter());
             long rowsRead = 0;
             List<Column> indexedColumnsInRow = null;
 
@@ -531,8 +552,6 @@ public class CompactionManager implements CompactionManagerMBean
                             }
                         }
                     }
-                    if ((rowsRead++ % 1000) == 0)
-                        controller.mayThrottle(scanner.getCurrentPosition());
                 }
                 if (writer != null)
                     newSstable = writer.closeAndOpenReader(sstable.maxDataAge);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74f37b5a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index 0e8c2a7..66b32f7 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.collect.*;
 import com.google.common.primitives.Doubles;
+import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -177,7 +178,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
implem
             {
                 // L0 makes no guarantees about overlapping-ness.  Just create a direct scanner
for each
                 for (SSTableReader sstable : byLevel.get(level))
-                    scanners.add(sstable.getScanner(range));
+                    scanners.add(sstable.getScanner(range, CompactionManager.instance.getRateLimiter()));
             }
             else
             {
@@ -226,7 +227,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
implem
             Collections.sort(this.sstables, SSTable.sstableComparator);
             sstableIterator = this.sstables.iterator();
             assert sstableIterator.hasNext(); // caller should check intersecting first
-            currentScanner = sstableIterator.next().getScanner(range);
+            currentScanner = sstableIterator.next().getScanner(range, CompactionManager.instance.getRateLimiter());
         }
 
         public static List<SSTableReader> intersecting(Collection<SSTableReader>
sstables, Range<Token> range)
@@ -261,7 +262,7 @@ public class LeveledCompactionStrategy extends AbstractCompactionStrategy
implem
                         currentScanner = null;
                         return endOfData();
                     }
-                    currentScanner = sstableIterator.next().getScanner(range);
+                    currentScanner = sstableIterator.next().getScanner(range, CompactionManager.instance.getRateLimiter());
                 }
             }
             catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74f37b5a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
index 225cd4d..718b1e3 100644
--- a/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
+++ b/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
@@ -117,7 +117,6 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
     private class Reducer extends MergeIterator.Reducer<RowContainer, CompactedRowContainer>
     {
         private final List<RowContainer> rows = new ArrayList<RowContainer>();
-        private int row = 0;
 
         private final ThreadPoolExecutor executor = new DebuggableThreadPoolExecutor(FBUtilities.getAvailableProcessors(),
                                                                                      Integer.MAX_VALUE,
@@ -137,14 +136,10 @@ public class ParallelCompactionIterable extends AbstractCompactionIterable
             ParallelCompactionIterable.this.updateCounterFor(rows.size());
             CompactedRowContainer compacted = getCompactedRow(rows);
             rows.clear();
-            if ((row++ % 1000) == 0)
-            {
-                long n = 0;
-                for (ICompactionScanner scanner : scanners)
-                    n += scanner.getCurrentPosition();
-                bytesRead = n;
-                controller.mayThrottle(bytesRead);
-            }
+            long n = 0;
+            for (ICompactionScanner scanner : scanners)
+                n += scanner.getCurrentPosition();
+            bytesRead = n;
             return compacted;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74f37b5a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index fa4dd51..2f194f3 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -44,8 +44,6 @@ public class Scrubber implements Closeable
     private final RandomAccessReader indexFile;
     private final ScrubInfo scrubInfo;
 
-    private long rowsRead;
-
     private SSTableWriter writer;
     private SSTableReader newSstable;
     private SSTableReader newInOrderSstable;
@@ -93,7 +91,9 @@ public class Scrubber implements Closeable
         // we'll also loop through the index at the same time, using the position from the
index to recover if the
         // row header (key or data size) is corrupt. (This means our position in the index
file will be one row
         // "ahead" of the data file.)
-        this.dataFile = sstable.openDataReader();
+        this.dataFile = isOffline
+                        ? sstable.openDataReader()
+                        : sstable.openDataReader(CompactionManager.instance.getRateLimiter());
         this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)));
         this.scrubInfo = new ScrubInfo(dataFile, sstable);
     }
@@ -248,8 +248,6 @@ public class Scrubber implements Closeable
                         badRows++;
                     }
                 }
-                if ((rowsRead++ % 1000) == 0)
-                    controller.mayThrottle(dataFile.getFilePointer());
             }
 
             if (writer.getFilePointer() > 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74f37b5a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
index c0f8820..a8efb56 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
@@ -70,7 +70,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader
     // raw checksum bytes
     private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]);
 
-    private CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata,
PoolingSegmentedFile owner) throws FileNotFoundException
+    protected CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata,
PoolingSegmentedFile owner) throws FileNotFoundException
     {
         super(new File(dataFilePath), metadata.chunkLength(), owner);
         this.metadata = metadata;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74f37b5a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
index b5ec5e0..4febea6 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.io.sstable;
 import java.util.Arrays;
 import java.util.Iterator;
 
+import com.google.common.util.concurrent.RateLimiter;
+
 import org.apache.cassandra.db.columniterator.OnDiskAtomIterator;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.utils.Pair;
@@ -32,9 +34,9 @@ public class SSTableBoundedScanner extends SSTableScanner
     private final Iterator<Pair<Long, Long>> rangeIterator;
     private Pair<Long, Long> currentRange;
 
-    SSTableBoundedScanner(SSTableReader sstable, Iterator<Pair<Long, Long>> rangeIterator)
+    SSTableBoundedScanner(SSTableReader sstable, Iterator<Pair<Long, Long>> rangeIterator,
RateLimiter limiter)
     {
-        super(sstable);
+        super(sstable, limiter);
         assert rangeIterator.hasNext(); // use EmptyCompactionScanner otherwise
         this.rangeIterator = rangeIterator;
         currentRange = rangeIterator.next();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74f37b5a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 2d6c1df..33a302d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.primitives.Longs;
+import com.google.common.util.concurrent.RateLimiter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +49,7 @@ import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.index.SecondaryIndex;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.io.compress.CompressedRandomAccessReader;
+import org.apache.cassandra.io.compress.CompressedThrottledReader;
 import org.apache.cassandra.io.compress.CompressionMetadata;
 import org.apache.cassandra.io.util.*;
 import org.apache.cassandra.service.CacheService;
@@ -1001,22 +1003,28 @@ public class SSTableReader extends SSTable
     */
     public SSTableScanner getScanner()
     {
-        return new SSTableScanner(this);
+        return getScanner((RateLimiter) null);
     }
 
+   public SSTableScanner getScanner(RateLimiter limiter)
+   {
+       return new SSTableScanner(this, limiter);
+   }
+
    /**
     * Direct I/O SSTableScanner over a defined range of tokens.
     *
     * @param range the range of keys to cover
     * @return A Scanner for seeking over the rows of the SSTable.
     */
-    public ICompactionScanner getScanner(Range<Token> range)
+    public ICompactionScanner getScanner(Range<Token> range, RateLimiter limiter)
     {
         if (range == null)
-            return getScanner();
+            return getScanner(limiter);
+
         Iterator<Pair<Long, Long>> rangeIterator = getPositionsForRanges(Collections.singletonList(range)).iterator();
         if (rangeIterator.hasNext())
-            return new SSTableBoundedScanner(this, rangeIterator);
+            return new SSTableBoundedScanner(this, rangeIterator, limiter);
         else
             return new EmptyCompactionScanner(getFilename());
     }
@@ -1172,6 +1180,14 @@ public class SSTableReader extends SSTable
         return sstableMetadata;
     }
 
+    public RandomAccessReader openDataReader(RateLimiter limiter)
+    {
+        assert limiter != null;
+        return compression
+               ? CompressedThrottledReader.open(getFilename(), getCompressionMetadata(),
limiter)
+               : ThrottledReader.open(new File(getFilename()), limiter);
+    }
+
     public RandomAccessReader openDataReader()
     {
         return compression

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74f37b5a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
index 3089769..38b9e65 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Iterator;
 
+import com.google.common.util.concurrent.RateLimiter;
+
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.RowPosition;
@@ -45,10 +47,11 @@ public class SSTableScanner implements ICompactionScanner
 
     /**
      * @param sstable SSTable to scan.
+     * @param limiter
      */
-    SSTableScanner(SSTableReader sstable)
+    SSTableScanner(SSTableReader sstable, RateLimiter limiter)
     {
-        this.dfile = sstable.openDataReader();
+        this.dfile = limiter == null ? sstable.openDataReader() : sstable.openDataReader(limiter);
         this.ifile = sstable.openIndexReader();
         this.sstable = sstable;
         this.filter = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74f37b5a/src/java/org/apache/cassandra/tools/SSTableExport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java
index f3ee10b..e6a06bc 100644
--- a/src/java/org/apache/cassandra/tools/SSTableExport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableExport.java
@@ -17,53 +17,29 @@
  */
 package org.apache.cassandra.tools;
 
-import static org.apache.cassandra.utils.ByteBufferUtil.bytesToHex;
-import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes;
-
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
+import java.util.*;
+
+import org.apache.commons.cli.*;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.CounterColumn;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletedColumn;
-import org.apache.cassandra.db.DeletionInfo;
-import org.apache.cassandra.db.DeletionTime;
-import org.apache.cassandra.db.ExpiringColumn;
-import org.apache.cassandra.db.Column;
-import org.apache.cassandra.db.OnDiskAtom;
-import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.KeyIterator;
-import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
-import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableScanner;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.map.ObjectMapper;
 
+import static org.apache.cassandra.utils.ByteBufferUtil.bytesToHex;
+import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes;
+
 /**
  * Export SSTables to JSON format.
  */
@@ -143,7 +119,7 @@ public class SSTableExport
     /**
      * Serialize columns using given column iterator
      *
-     * @param columns column iterator
+     * @param atoms column iterator
      * @param out output stream
      * @param comparator columns comparator
      * @param cfMetaData Column Family metadata (to get validator)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74f37b5a/src/java/org/apache/cassandra/utils/ObjectSizes.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ObjectSizes.java b/src/java/org/apache/cassandra/utils/ObjectSizes.java
index 6c28389..66ec501 100644
--- a/src/java/org/apache/cassandra/utils/ObjectSizes.java
+++ b/src/java/org/apache/cassandra/utils/ObjectSizes.java
@@ -1,4 +1,25 @@
 package org.apache.cassandra.utils;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.lang.management.ManagementFactory;
 import java.lang.management.MemoryPoolMXBean;
@@ -201,4 +222,4 @@ public class ObjectSizes
     {
         return meter.measureDeep(pojo);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74f37b5a/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java b/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java
index 4adf4f9..6cafe25 100644
--- a/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java
+++ b/test/unit/org/apache/cassandra/cache/ObjectSizeTest.java
@@ -1,4 +1,25 @@
 package org.apache.cassandra.cache;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 import java.nio.ByteBuffer;
 import java.util.UUID;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74f37b5a/test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTestHelper.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTestHelper.java
b/test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTestHelper.java
index 9ce60ea..4ee92fe 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTestHelper.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LegacyLeveledManifestTestHelper.java
@@ -1,4 +1,25 @@
 package org.apache.cassandra.db.compaction;
+/*
+ * 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ * 
+ */
+
 
 
 import java.io.File;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/74f37b5a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
index 9d95eda..637d60a 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
@@ -301,9 +301,7 @@ public class SSTableReaderTest extends SchemaLoader
         boolean foundScanner = false;
         for (SSTableReader s : store.getSSTables())
         {
-            ICompactionScanner scanner = s.getScanner(new Range<Token>(t(0),
-                                                                       t(1),
-                                                                       s.partitioner));
+            ICompactionScanner scanner = s.getScanner(new Range<Token>(t(0), t(1),
s.partitioner), null);
             scanner.next(); // throws exception pre 5407
             foundScanner = true;
         }


Mime
View raw message