accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [2/3] accumulo git commit: ACCUMULO-4187: Added rate limiting for major compactions.
Date Tue, 19 Apr 2016 17:51:55 GMT
ACCUMULO-4187: Added rate limiting for major compactions.

Added configuration property tserver.compaction.major.throughput of type PropertyType.MEMORY with a default of 0B (unlimited).  If another value is specified (e.g. 30M), then all tablet servers will limit the I/O performed during major compaction accordingly (e.g. neither reading nor writing more than 30MiB per second combined over all major compaction threads).


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/783314c8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/783314c8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/783314c8

Branch: refs/heads/master
Commit: 783314c890c988d7e824148fc97384718f1e3561
Parents: 584b812
Author: Shawn Walker <accumulo@shawn-walker.net>
Authored: Wed Apr 6 13:18:09 2016 -0400
Committer: Shawn Walker <accumulo@shawn-walker.net>
Committed: Tue Apr 19 13:18:50 2016 -0400

----------------------------------------------------------------------
 .../core/client/impl/OfflineIterator.java       |   2 +-
 .../client/mapred/AccumuloFileOutputFormat.java |   2 +-
 .../mapreduce/AccumuloFileOutputFormat.java     |   2 +-
 .../core/client/mock/MockTableOperations.java   |   2 +-
 .../org/apache/accumulo/core/conf/Property.java |   2 +
 .../accumulo/core/file/BloomFilterLayer.java    |   4 +-
 .../core/file/DispatchingFileFactory.java       |  25 +--
 .../accumulo/core/file/FileOperations.java      |  16 +-
 .../file/blockfile/impl/CachableBlockFile.java  |  53 ++++--
 .../core/file/map/MapFileOperations.java        |  20 +--
 .../accumulo/core/file/rfile/CreateEmpty.java   |   4 +-
 .../core/file/rfile/RFileOperations.java        |  31 ++--
 .../accumulo/core/file/rfile/SplitLarge.java    |   4 +-
 .../accumulo/core/file/rfile/bcfile/BCFile.java |  99 +++++-----
 .../bcfile/BoundedRangeFileInputStream.java     | 154 ----------------
 .../streams/BoundedRangeFileInputStream.java    | 152 ++++++++++++++++
 .../streams/PositionedDataOutputStream.java     |  35 ++++
 .../core/file/streams/PositionedOutput.java     |  26 +++
 .../core/file/streams/PositionedOutputs.java    |  68 +++++++
 .../file/streams/RateLimitedInputStream.java    |  69 +++++++
 .../file/streams/RateLimitedOutputStream.java   |  57 ++++++
 .../file/streams/SeekableDataInputStream.java   |  46 +++++
 .../core/util/ratelimit/GuavaRateLimiter.java   |  63 +++++++
 .../core/util/ratelimit/NullRateLimiter.java    |  33 ++++
 .../core/util/ratelimit/RateLimiter.java        |  27 +++
 .../ratelimit/SharedRateLimiterFactory.java     | 180 +++++++++++++++++++
 .../client/mock/MockTableOperationsTest.java    |   2 +-
 .../core/file/BloomFilterLayerLookupTest.java   |   4 +-
 .../accumulo/core/file/FileOperationsTest.java  |   2 +-
 .../core/file/rfile/CreateCompatTestFile.java   |   2 +-
 .../core/file/rfile/MultiLevelIndexTest.java    |   3 +-
 .../accumulo/core/file/rfile/RFileTest.java     |   3 +-
 .../core/file/streams/MockRateLimiter.java      |  38 ++++
 .../streams/RateLimitedInputStreamTest.java     |  69 +++++++
 .../streams/RateLimitedOutputStreamTest.java    |  56 ++++++
 .../accumulo/server/client/BulkImporter.java    |   2 +-
 .../apache/accumulo/server/init/Initialize.java |   2 +-
 .../apache/accumulo/server/util/FileUtil.java   |  10 +-
 .../server/client/BulkImporterTest.java         |   2 +-
 .../apache/accumulo/tserver/FileManager.java    |   2 +-
 .../apache/accumulo/tserver/InMemoryMap.java    |   4 +-
 .../apache/accumulo/tserver/TabletServer.java   |  29 ++-
 .../compaction/MajorCompactionRequest.java      |   2 +-
 .../accumulo/tserver/tablet/Compactor.java      |   9 +-
 .../accumulo/tserver/tablet/MinorCompactor.java |  11 ++
 .../apache/accumulo/tserver/tablet/Tablet.java  |  18 +-
 .../accumulo/tserver/tablet/TabletData.java     |   2 +-
 .../accumulo/test/BulkImportMonitoringIT.java   |   2 +-
 .../accumulo/test/CompactionRateLimitingIT.java |  81 +++++++++
 .../apache/accumulo/test/CreateRandomRFile.java |   2 +-
 .../accumulo/test/GenerateSequentialRFile.java  |   2 +-
 .../apache/accumulo/test/GetFileInfoBulkIT.java |   2 +-
 .../org/apache/accumulo/test/ShellServerIT.java |   4 +-
 .../org/apache/accumulo/test/TestIngest.java    |   3 +-
 .../accumulo/test/functional/BulkFileIT.java    |   6 +-
 .../test/mapred/AccumuloFileOutputFormatIT.java |   2 +-
 .../mapreduce/AccumuloFileOutputFormatIT.java   |   2 +-
 .../performance/metadata/FastBulkImportIT.java  |   2 +-
 .../performance/scan/CollectTabletStats.java    |   4 +-
 .../accumulo/test/proxy/SimpleProxyBase.java    |   2 +-
 .../test/randomwalk/bulk/BulkPlusOne.java       |   2 +-
 .../test/randomwalk/concurrent/BulkImport.java  |   5 +-
 .../test/randomwalk/security/TableOp.java       |   2 +-
 63 files changed, 1253 insertions(+), 318 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
index 487af11..20c53e1 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/OfflineIterator.java
@@ -345,7 +345,7 @@ class OfflineIterator implements Iterator<Entry<Key,Value>> {
     // TODO need to close files - ACCUMULO-1303
     for (String file : absFiles) {
       FileSystem fs = VolumeConfiguration.getVolume(file, conf, config).getFileSystem();
-      FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, acuTableConf, null, null);
+      FileSKVIterator reader = FileOperations.getInstance().openReader(file, false, fs, conf, null, acuTableConf, null, null);
       if (scannerSamplerConfigImpl != null) {
         reader = reader.getSample(scannerSamplerConfigImpl);
         if (reader == null)

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
index d636776..8fa5f62 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormat.java
@@ -186,7 +186,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
         }
 
         if (out == null) {
-          out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, acuConf);
+          out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, null, acuConf);
           out.startDefaultLocalityGroup();
         }
         out.append(key, value);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
index b241f33..2d62279 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormat.java
@@ -184,7 +184,7 @@ public class AccumuloFileOutputFormat extends FileOutputFormat<Key,Value> {
         }
 
         if (out == null) {
-          out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, acuConf);
+          out = FileOperations.getInstance().openWriter(file.toString(), file.getFileSystem(conf), conf, null, acuConf);
           out.startDefaultLocalityGroup();
         }
         out.append(key, value);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
index d465138..aa64a10 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockTableOperations.java
@@ -289,7 +289,7 @@ class MockTableOperations extends TableOperationsHelper {
      */
     for (FileStatus importStatus : fs.listStatus(importPath)) {
       try {
-        FileSKVIterator importIterator = FileOperations.getInstance().openReader(importStatus.getPath().toString(), true, fs, fs.getConf(),
+        FileSKVIterator importIterator = FileOperations.getInstance().openReader(importStatus.getPath().toString(), true, fs, fs.getConf(), null,
             AccumuloConfiguration.getDefaultConfiguration());
         while (importIterator.hasTop()) {
           Key key = importIterator.getTopKey();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index a2d01e7..bc1e60e 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -293,6 +293,8 @@ public enum Property {
       "The maximum number of concurrent tablet migrations for a tablet server"),
   TSERV_MAJC_MAXCONCURRENT("tserver.compaction.major.concurrent.max", "3", PropertyType.COUNT,
       "The maximum number of concurrent major compactions for a tablet server"),
+  TSERV_MAJC_THROUGHPUT("tserver.compaction.major.throughput", "0B", PropertyType.MEMORY,
+      "Maximum number of bytes to read or write per second over all major compactions on a TabletServer, or 0B for unlimited."),
   TSERV_MINC_MAXCONCURRENT("tserver.compaction.minor.concurrent.max", "4", PropertyType.COUNT,
       "The maximum number of concurrent minor compactions for a tablet server"),
   TSERV_MAJC_TRACE_PERCENT("tserver.compaction.major.trace.percent", "0.1", PropertyType.FRACTION, "The percent of major compactions to trace"),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
index 758df12..c9918bd 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/BloomFilterLayer.java
@@ -458,7 +458,7 @@ public class BloomFilterLayer {
 
     String suffix = FileOperations.getNewFileExtension(acuconf);
     String fname = "/tmp/test." + suffix;
-    FileSKVWriter bmfw = FileOperations.getInstance().openWriter(fname, fs, conf, acuconf);
+    FileSKVWriter bmfw = FileOperations.getInstance().openWriter(fname, fs, conf, null, acuconf);
 
     long t1 = System.currentTimeMillis();
 
@@ -477,7 +477,7 @@ public class BloomFilterLayer {
     bmfw.close();
 
     t1 = System.currentTimeMillis();
-    FileSKVIterator bmfr = FileOperations.getInstance().openReader(fname, false, fs, conf, acuconf);
+    FileSKVIterator bmfr = FileOperations.getInstance().openReader(fname, false, fs, conf, null, acuconf);
     t2 = System.currentTimeMillis();
     out.println("Opened " + fname + " in " + (t2 - t1));
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java b/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
index 1e7ecc9..9478a29 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/DispatchingFileFactory.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.map.MapFileOperations;
 import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -65,8 +66,9 @@ class DispatchingFileFactory extends FileOperations {
   }
 
   @Override
-  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
-    FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, null, null);
+  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+      AccumuloConfiguration acuconf) throws IOException {
+    FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, readLimiter, acuconf, null, null);
     if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
       return new BloomFilterLayer.Reader(iter, acuconf);
     }
@@ -74,8 +76,8 @@ class DispatchingFileFactory extends FileOperations {
   }
 
   @Override
-  public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
-    FileSKVWriter writer = findFileFactory(file).openWriter(file, fs, conf, acuconf);
+  public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf) throws IOException {
+    FileSKVWriter writer = findFileFactory(file).openWriter(file, fs, conf, writeLimiter, acuconf);
     if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
       return new BloomFilterLayer.Writer(writer, acuconf);
     }
@@ -89,32 +91,32 @@ class DispatchingFileFactory extends FileOperations {
 
   @Override
   public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
-      AccumuloConfiguration tableConf) throws IOException {
-    return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, null, null);
+      RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException {
+    return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, readLimiter, tableConf, null, null);
   }
 
   @Override
   public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
-      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
+      RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
 
     if (!tableConf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
       indexCache = null;
     if (!tableConf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
       dataCache = null;
 
-    return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf, dataCache, indexCache);
+    return findFileFactory(file).openReader(file, range, columnFamilies, inclusive, fs, conf, readLimiter, tableConf, dataCache, indexCache);
   }
 
   @Override
-  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
-      BlockCache dataCache, BlockCache indexCache) throws IOException {
+  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+      AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException {
 
     if (!acuconf.getBoolean(Property.TABLE_INDEXCACHE_ENABLED))
       indexCache = null;
     if (!acuconf.getBoolean(Property.TABLE_BLOCKCACHE_ENABLED))
       dataCache = null;
 
-    FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, acuconf, dataCache, indexCache);
+    FileSKVIterator iter = findFileFactory(file).openReader(file, seekToBeginning, fs, conf, readLimiter, acuconf, dataCache, indexCache);
     if (acuconf.getBoolean(Property.TABLE_BLOOM_ENABLED)) {
       return new BloomFilterLayer.Reader(iter, acuconf);
     }
@@ -132,5 +134,4 @@ class DispatchingFileFactory extends FileOperations {
 
     return findFileFactory(file).openIndex(file, fs, conf, acuconf, dCache, iCache);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
index 3798453..dc7c646 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/FileOperations.java
@@ -28,6 +28,7 @@ import org.apache.accumulo.core.data.ByteSequence;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 
@@ -55,23 +56,24 @@ public abstract class FileOperations {
    */
 
   public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
-      AccumuloConfiguration tableConf) throws IOException;
+      RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException;
 
   public abstract FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
-      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
+      RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException;
 
   /**
    * Open a reader that fully support seeking and also enable any optimizations related to seeking, like bloom filters.
    *
    */
 
-  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf)
-      throws IOException;
+  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+      AccumuloConfiguration acuconf) throws IOException;
 
-  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
-      BlockCache dataCache, BlockCache indexCache) throws IOException;
+  public abstract FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+      AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException;
 
-  public abstract FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException;
+  public abstract FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf)
+      throws IOException;
 
   public abstract FileSKVIterator openIndex(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
index 07ac5af..6a170e8 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/blockfile/impl/CachableBlockFile.java
@@ -21,6 +21,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.lang.ref.SoftReference;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -33,11 +34,14 @@ import org.apache.accumulo.core.file.blockfile.cache.CacheEntry;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader;
 import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Writer.BlockAppender;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.accumulo.core.file.streams.PositionedOutput;
+import org.apache.accumulo.core.file.streams.RateLimitedInputStream;
+import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Seekable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,20 +59,22 @@ public class CachableBlockFile {
   public static class Writer implements BlockFileWriter {
     private BCFile.Writer _bc;
     private BlockWrite _bw;
-    private final FSDataOutputStream fsout;
+    private final PositionedOutput fsout;
     private long length = 0;
 
-    public Writer(FileSystem fs, Path fName, String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
-      this.fsout = fs.create(fName);
-      init(fsout, compressAlgor, conf, accumuloConfiguration);
+    public Writer(FileSystem fs, Path fName, String compressAlgor, RateLimiter writeLimiter, Configuration conf, AccumuloConfiguration accumuloConfiguration)
+        throws IOException {
+      this(new RateLimitedOutputStream(fs.create(fName), writeLimiter), compressAlgor, conf, accumuloConfiguration);
     }
 
-    public Writer(FSDataOutputStream fsout, String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
+    public <OutputStreamType extends OutputStream & PositionedOutput> Writer(OutputStreamType fsout, String compressAlgor, Configuration conf,
+        AccumuloConfiguration accumuloConfiguration) throws IOException {
       this.fsout = fsout;
       init(fsout, compressAlgor, conf, accumuloConfiguration);
     }
 
-    private void init(FSDataOutputStream fsout, String compressAlgor, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
+    private <OutputStreamT extends OutputStream & PositionedOutput> void init(OutputStreamT fsout, String compressAlgor, Configuration conf,
+        AccumuloConfiguration accumuloConfiguration) throws IOException {
       _bc = new BCFile.Writer(fsout, compressAlgor, conf, false, accumuloConfiguration);
     }
 
@@ -90,8 +96,8 @@ public class CachableBlockFile {
       _bw.close();
       _bc.close();
 
-      length = this.fsout.getPos();
-      this.fsout.close();
+      length = this.fsout.position();
+      ((OutputStream) this.fsout).close();
     }
 
     @Override
@@ -139,11 +145,12 @@ public class CachableBlockFile {
    *
    */
   public static class Reader implements BlockFileReader {
+    private final RateLimiter readLimiter;
     private BCFile.Reader _bc;
     private String fileName = "not_available";
     private BlockCache _dCache = null;
     private BlockCache _iCache = null;
-    private FSDataInputStream fin = null;
+    private InputStream fin = null;
     private FileSystem fs;
     private Configuration conf;
     private boolean closed = false;
@@ -221,6 +228,11 @@ public class CachableBlockFile {
 
     public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data, BlockCache index, AccumuloConfiguration accumuloConfiguration)
         throws IOException {
+      this(fs, dataFile, conf, data, index, null, accumuloConfiguration);
+    }
+
+    public Reader(FileSystem fs, Path dataFile, Configuration conf, BlockCache data, BlockCache index, RateLimiter readLimiter,
+        AccumuloConfiguration accumuloConfiguration) throws IOException {
 
       /*
        * Grab path create input stream grab len create file
@@ -232,21 +244,25 @@ public class CachableBlockFile {
       this.fs = fs;
       this.conf = conf;
       this.accumuloConfiguration = accumuloConfiguration;
+      this.readLimiter = readLimiter;
     }
 
-    public Reader(FSDataInputStream fsin, long len, Configuration conf, BlockCache data, BlockCache index, AccumuloConfiguration accumuloConfiguration)
-        throws IOException {
+    public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fsin, long len, Configuration conf, BlockCache data, BlockCache index,
+        AccumuloConfiguration accumuloConfiguration) throws IOException {
       this._dCache = data;
       this._iCache = index;
+      this.readLimiter = null;
       init(fsin, len, conf, accumuloConfiguration);
     }
 
-    public Reader(FSDataInputStream fsin, long len, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
-      // this.fin = fsin;
+    public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fsin, long len, Configuration conf,
+        AccumuloConfiguration accumuloConfiguration) throws IOException {
+      this.readLimiter = null;
       init(fsin, len, conf, accumuloConfiguration);
     }
 
-    private void init(FSDataInputStream fsin, long len, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
+    private <InputStreamT extends InputStream & Seekable> void init(InputStreamT fsin, long len, Configuration conf, AccumuloConfiguration accumuloConfiguration)
+        throws IOException {
       this._bc = new BCFile.Reader(this, fsin, len, conf, accumuloConfiguration);
     }
 
@@ -257,8 +273,9 @@ public class CachableBlockFile {
       if (_bc == null) {
         // lazily open file if needed
         Path path = new Path(fileName);
-        fin = fs.open(path);
-        init(fin, fs.getFileStatus(path).getLen(), conf, accumuloConfiguration);
+        RateLimitedInputStream fsIn = new RateLimitedInputStream(fs.open(path), this.readLimiter);
+        fin = fsIn;
+        init(fsIn, fs.getFileStatus(path).getLen(), conf, accumuloConfiguration);
       }
 
       return _bc;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
index 75cfa7e..a72a243 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/map/MapFileOperations.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
 import org.apache.accumulo.core.iterators.system.MapFileIterator;
 import org.apache.accumulo.core.iterators.system.SequenceFileIterator;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -141,7 +142,8 @@ public class MapFileOperations extends FileOperations {
   }
 
   @Override
-  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
+  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+      AccumuloConfiguration acuconf) throws IOException {
     FileSKVIterator iter = new RangeIterator(new MapFileIterator(acuconf, fs, file, conf));
 
     if (seekToBeginning)
@@ -151,10 +153,8 @@ public class MapFileOperations extends FileOperations {
   }
 
   @Override
-  public FileSKVWriter openWriter(final String file, final FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
-
+  public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf) throws IOException {
     throw new UnsupportedOperationException();
-
   }
 
   @Override
@@ -169,7 +169,7 @@ public class MapFileOperations extends FileOperations {
 
   @Override
   public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
-      AccumuloConfiguration tableConf) throws IOException {
+      RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException {
     MapFileIterator mfIter = new MapFileIterator(tableConf, fs, file, conf);
 
     FileSKVIterator iter = new RangeIterator(mfIter);
@@ -181,16 +181,16 @@ public class MapFileOperations extends FileOperations {
 
   @Override
   public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
-      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
+      RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
 
-    return openReader(file, range, columnFamilies, inclusive, fs, conf, tableConf);
+    return openReader(file, range, columnFamilies, inclusive, fs, conf, readLimiter, tableConf);
   }
 
   @Override
-  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
-      BlockCache dataCache, BlockCache indexCache) throws IOException {
+  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+      AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException {
 
-    return openReader(file, seekToBeginning, fs, conf, acuconf);
+    return openReader(file, seekToBeginning, fs, conf, readLimiter, acuconf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
index 75d5567..045bdbb 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/CreateEmpty.java
@@ -77,8 +77,8 @@ public class CreateEmpty {
     for (String arg : opts.files) {
       Path path = new Path(arg);
       log.info("Writing to file '" + path + "'");
-      FileSKVWriter writer = (new RFileOperations())
-          .openWriter(arg, path.getFileSystem(conf), conf, DefaultConfiguration.getDefaultConfiguration(), opts.codec);
+      FileSKVWriter writer = (new RFileOperations()).openWriter(arg, path.getFileSystem(conf), conf, null, DefaultConfiguration.getDefaultConfiguration(),
+          opts.codec);
       writer.close();
     }
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
index a41785a..730a9d3 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/RFileOperations.java
@@ -36,6 +36,8 @@ import org.apache.accumulo.core.file.rfile.RFile.Writer;
 import org.apache.accumulo.core.sample.Sampler;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.sample.impl.SamplerFactory;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.accumulo.core.file.streams.RateLimitedOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -68,16 +70,17 @@ public class RFileOperations extends FileOperations {
   }
 
   @Override
-  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
-    return openReader(file, seekToBeginning, fs, conf, acuconf, null, null);
+  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+      AccumuloConfiguration acuconf) throws IOException {
+    return openReader(file, seekToBeginning, fs, conf, readLimiter, acuconf, null, null);
   }
 
   @Override
-  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf,
-      BlockCache dataCache, BlockCache indexCache) throws IOException {
+  public FileSKVIterator openReader(String file, boolean seekToBeginning, FileSystem fs, Configuration conf, RateLimiter readLimiter,
+      AccumuloConfiguration acuconf, BlockCache dataCache, BlockCache indexCache) throws IOException {
     Path path = new Path(file);
 
-    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache, acuconf);
+    CachableBlockFile.Reader _cbr = new CachableBlockFile.Reader(fs, path, conf, dataCache, indexCache, readLimiter, acuconf);
     Reader iter = new RFile.Reader(_cbr);
 
     if (seekToBeginning) {
@@ -89,26 +92,27 @@ public class RFileOperations extends FileOperations {
 
   @Override
   public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
-      AccumuloConfiguration tableConf) throws IOException {
-    FileSKVIterator iter = openReader(file, false, fs, conf, tableConf, null, null);
+      RateLimiter readLimiter, AccumuloConfiguration tableConf) throws IOException {
+    FileSKVIterator iter = openReader(file, false, fs, conf, readLimiter, tableConf, null, null);
     iter.seek(range, columnFamilies, inclusive);
     return iter;
   }
 
   @Override
   public FileSKVIterator openReader(String file, Range range, Set<ByteSequence> columnFamilies, boolean inclusive, FileSystem fs, Configuration conf,
-      AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
-    FileSKVIterator iter = openReader(file, false, fs, conf, tableConf, dataCache, indexCache);
+      RateLimiter readLimiter, AccumuloConfiguration tableConf, BlockCache dataCache, BlockCache indexCache) throws IOException {
+    FileSKVIterator iter = openReader(file, false, fs, conf, readLimiter, tableConf, dataCache, indexCache);
     iter.seek(range, columnFamilies, inclusive);
     return iter;
   }
 
   @Override
-  public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf) throws IOException {
-    return openWriter(file, fs, conf, acuconf, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
+  public FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf) throws IOException {
+    return openWriter(file, fs, conf, writeLimiter, acuconf, acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
   }
 
-  FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, AccumuloConfiguration acuconf, String compression) throws IOException {
+  FileSKVWriter openWriter(String file, FileSystem fs, Configuration conf, RateLimiter writeLimiter, AccumuloConfiguration acuconf, String compression)
+      throws IOException {
     int hrep = conf.getInt("dfs.replication", -1);
     int trep = acuconf.getCount(Property.TABLE_FILE_REPLICATION);
     int rep = hrep;
@@ -132,7 +136,8 @@ public class RFileOperations extends FileOperations {
       sampler = SamplerFactory.newSampler(samplerConfig, acuconf);
     }
 
-    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, bufferSize, (short) rep, block), compression, conf, acuconf);
+    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(new RateLimitedOutputStream(fs.create(new Path(file), false, bufferSize, (short) rep, block),
+        writeLimiter), compression, conf, acuconf);
     Writer writer = new RFile.Writer(_cbw, (int) blockSize, (int) indexBlockSize, samplerConfig, sampler);
     return writer;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
index 4e5b232..6c3aab3 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/SplitLarge.java
@@ -69,8 +69,8 @@ public class SplitLarge {
         String largeName = file.substring(0, file.length() - 3) + "_large.rf";
 
         int blockSize = (int) aconf.getMemoryInBytes(Property.TABLE_FILE_BLOCK_SIZE);
-        try (Writer small = new RFile.Writer(new CachableBlockFile.Writer(fs, new Path(smallName), "gz", conf, aconf), blockSize);
-            Writer large = new RFile.Writer(new CachableBlockFile.Writer(fs, new Path(largeName), "gz", conf, aconf), blockSize)) {
+        try (Writer small = new RFile.Writer(new CachableBlockFile.Writer(fs, new Path(smallName), "gz", null, conf, aconf), blockSize);
+            Writer large = new RFile.Writer(new CachableBlockFile.Writer(fs, new Path(largeName), "gz", null, conf, aconf), blockSize)) {
           small.startDefaultLocalityGroup();
           large.startDefaultLocalityGroup();
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
index 3764603..d7632f3 100644
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
+++ b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BCFile.java
@@ -45,11 +45,14 @@ import org.apache.accumulo.core.security.crypto.CryptoModule;
 import org.apache.accumulo.core.security.crypto.CryptoModuleFactory;
 import org.apache.accumulo.core.security.crypto.CryptoModuleParameters;
 import org.apache.accumulo.core.security.crypto.SecretKeyEncryptionStrategy;
+import org.apache.accumulo.core.file.streams.BoundedRangeFileInputStream;
+import org.apache.accumulo.core.file.streams.PositionedDataOutputStream;
+import org.apache.accumulo.core.file.streams.PositionedOutput;
+import org.apache.accumulo.core.file.streams.SeekableDataInputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
@@ -87,7 +90,7 @@ public final class BCFile {
    * BCFile writer, the entry point for creating a new BCFile.
    */
   static public class Writer implements Closeable {
-    private final FSDataOutputStream out;
+    private final PositionedDataOutputStream out;
     private final Configuration conf;
     private final CryptoModule cryptoModule;
     private BCFileCryptoModuleParameters cryptoParams;
@@ -127,7 +130,7 @@ public final class BCFile {
       private final Algorithm compressAlgo;
       private Compressor compressor; // !null only if using native
       // Hadoop compression
-      private final FSDataOutputStream fsOut;
+      private final PositionedDataOutputStream fsOut;
       private final OutputStream cipherOut;
       private final long posStart;
       private final SimpleBufferedOutputStream fsBufferedOutput;
@@ -139,11 +142,11 @@ public final class BCFile {
        * @param cryptoModule
        *          the module to use to obtain cryptographic streams
        */
-      public WBlockState(Algorithm compressionAlgo, FSDataOutputStream fsOut, BytesWritable fsOutputBuffer, Configuration conf, CryptoModule cryptoModule,
-          CryptoModuleParameters cryptoParams) throws IOException {
+      public WBlockState(Algorithm compressionAlgo, PositionedDataOutputStream fsOut, BytesWritable fsOutputBuffer, Configuration conf,
+          CryptoModule cryptoModule, CryptoModuleParameters cryptoParams) throws IOException {
         this.compressAlgo = compressionAlgo;
         this.fsOut = fsOut;
-        this.posStart = fsOut.getPos();
+        this.posStart = fsOut.position();
 
         fsOutputBuffer.setCapacity(getFSOutputBufferSize(conf));
 
@@ -211,7 +214,7 @@ public final class BCFile {
        * @return The current byte offset in underlying file.
        */
       long getCurrentPos() throws IOException {
-        return fsOut.getPos() + fsBufferedOutput.size();
+        return fsOut.position() + fsBufferedOutput.size();
       }
 
       long getStartPos() {
@@ -338,18 +341,18 @@ public final class BCFile {
      *          Name of the compression algorithm, which will be used for all data blocks.
      * @see Compression#getSupportedAlgorithms
      */
-    public Writer(FSDataOutputStream fout, String compressionName, Configuration conf, boolean trackDataBlocks, AccumuloConfiguration accumuloConfiguration)
-        throws IOException {
-      if (fout.getPos() != 0) {
+    public <OutputStreamType extends OutputStream & PositionedOutput> Writer(OutputStreamType fout, String compressionName, Configuration conf,
+        boolean trackDataBlocks, AccumuloConfiguration accumuloConfiguration) throws IOException {
+      if (fout.position() != 0) {
         throw new IOException("Output file not at zero offset.");
       }
 
-      this.out = fout;
+      this.out = new PositionedDataOutputStream(fout);
       this.conf = conf;
       dataIndex = new DataIndex(compressionName, trackDataBlocks);
       metaIndex = new MetaIndex();
       fsOutputBuffer = new BytesWritable();
-      Magic.write(fout);
+      Magic.write(this.out);
 
       // Set up crypto-related detail, including secret key generation and encryption
 
@@ -388,14 +391,14 @@ public final class BCFile {
             appender.close();
           }
 
-          long offsetIndexMeta = out.getPos();
+          long offsetIndexMeta = out.position();
           metaIndex.write(out);
 
           if (cryptoParams.getAlgorithmName() == null || cryptoParams.getAlgorithmName().equals(Property.CRYPTO_CIPHER_SUITE.getDefaultValue())) {
             out.writeLong(offsetIndexMeta);
             API_VERSION_1.write(out);
           } else {
-            long offsetCryptoParameters = out.getPos();
+            long offsetCryptoParameters = out.position();
             cryptoParams.write(out);
 
             // Meta Index, crypto params offsets and the trailing section are written out directly.
@@ -594,7 +597,7 @@ public final class BCFile {
   static public class Reader implements Closeable {
     private static final String META_NAME = "BCFile.metaindex";
     private static final String CRYPTO_BLOCK_NAME = "BCFile.cryptoparams";
-    private final FSDataInputStream in;
+    private final SeekableDataInputStream in;
     private final Configuration conf;
     final DataIndex dataIndex;
     // Index for meta blocks
@@ -613,8 +616,8 @@ public final class BCFile {
       private final BlockRegion region;
       private final InputStream in;
 
-      public RBlockState(Algorithm compressionAlgo, FSDataInputStream fsin, BlockRegion region, Configuration conf, CryptoModule cryptoModule,
-          Version bcFileVersion, CryptoModuleParameters cryptoParams) throws IOException {
+      public <InputStreamType extends InputStream & Seekable> RBlockState(Algorithm compressionAlgo, InputStreamType fsin, BlockRegion region,
+          Configuration conf, CryptoModule cryptoModule, Version bcFileVersion, CryptoModuleParameters cryptoParams) throws IOException {
         this.compressAlgo = compressionAlgo;
         this.region = region;
         this.decompressor = compressionAlgo.getDecompressor();
@@ -752,15 +755,15 @@ public final class BCFile {
      * @param fileLength
      *          Length of the corresponding file
      */
-    public Reader(FSDataInputStream fin, long fileLength, Configuration conf, AccumuloConfiguration accumuloConfiguration) throws IOException {
-
-      this.in = fin;
+    public <InputStreamType extends InputStream & Seekable> Reader(InputStreamType fin, long fileLength, Configuration conf,
+        AccumuloConfiguration accumuloConfiguration) throws IOException {
+      this.in = new SeekableDataInputStream(fin);
       this.conf = conf;
 
       // Move the cursor to grab the version and the magic first
-      fin.seek(fileLength - Magic.size() - Version.size());
-      version = new Version(fin);
-      Magic.readAndVerify(fin);
+      this.in.seek(fileLength - Magic.size() - Version.size());
+      version = new Version(this.in);
+      Magic.readAndVerify(this.in);
 
       // Do a version check
       if (!version.compatibleWith(BCFile.API_VERSION) && !version.equals(BCFile.API_VERSION_1)) {
@@ -772,26 +775,26 @@ public final class BCFile {
       long offsetCryptoParameters = 0;
 
       if (version.equals(API_VERSION_1)) {
-        fin.seek(fileLength - Magic.size() - Version.size() - (Long.SIZE / Byte.SIZE));
-        offsetIndexMeta = fin.readLong();
+        this.in.seek(fileLength - Magic.size() - Version.size() - (Long.SIZE / Byte.SIZE));
+        offsetIndexMeta = this.in.readLong();
 
       } else {
-        fin.seek(fileLength - Magic.size() - Version.size() - (2 * (Long.SIZE / Byte.SIZE)));
-        offsetIndexMeta = fin.readLong();
-        offsetCryptoParameters = fin.readLong();
+        this.in.seek(fileLength - Magic.size() - Version.size() - (2 * (Long.SIZE / Byte.SIZE)));
+        offsetIndexMeta = this.in.readLong();
+        offsetCryptoParameters = this.in.readLong();
       }
 
       // read meta index
-      fin.seek(offsetIndexMeta);
-      metaIndex = new MetaIndex(fin);
+      this.in.seek(offsetIndexMeta);
+      metaIndex = new MetaIndex(this.in);
 
       // If they exist, read the crypto parameters
       if (!version.equals(BCFile.API_VERSION_1)) {
 
         // read crypto parameters
-        fin.seek(offsetCryptoParameters);
+        this.in.seek(offsetCryptoParameters);
         cryptoParams = new BCFileCryptoModuleParameters();
-        cryptoParams.read(fin);
+        cryptoParams.read(this.in);
 
         this.cryptoModule = CryptoModuleFactory.getCryptoModule(cryptoParams.getAllOptions().get(Property.CRYPTO_MODULE_CLASS.getKey()));
 
@@ -832,9 +835,9 @@ public final class BCFile {
       }
     }
 
-    public Reader(CachableBlockFile.Reader cache, FSDataInputStream fin, long fileLength, Configuration conf, AccumuloConfiguration accumuloConfiguration)
-        throws IOException {
-      this.in = fin;
+    public <InputStreamType extends InputStream & Seekable> Reader(CachableBlockFile.Reader cache, InputStreamType fin, long fileLength, Configuration conf,
+        AccumuloConfiguration accumuloConfiguration) throws IOException {
+      this.in = new SeekableDataInputStream(fin);
       this.conf = conf;
 
       BlockRead cachedMetaIndex = cache.getCachedMetaBlock(META_NAME);
@@ -845,9 +848,9 @@ public final class BCFile {
         // move the cursor to the beginning of the tail, containing: offset to the
         // meta block index, version and magic
         // Move the cursor to grab the version and the magic first
-        fin.seek(fileLength - Magic.size() - Version.size());
-        version = new Version(fin);
-        Magic.readAndVerify(fin);
+        this.in.seek(fileLength - Magic.size() - Version.size());
+        version = new Version(this.in);
+        Magic.readAndVerify(this.in);
 
         // Do a version check
         if (!version.compatibleWith(BCFile.API_VERSION) && !version.equals(BCFile.API_VERSION_1)) {
@@ -859,26 +862,26 @@ public final class BCFile {
         long offsetCryptoParameters = 0;
 
         if (version.equals(API_VERSION_1)) {
-          fin.seek(fileLength - Magic.size() - Version.size() - (Long.SIZE / Byte.SIZE));
-          offsetIndexMeta = fin.readLong();
+          this.in.seek(fileLength - Magic.size() - Version.size() - (Long.SIZE / Byte.SIZE));
+          offsetIndexMeta = this.in.readLong();
 
         } else {
-          fin.seek(fileLength - Magic.size() - Version.size() - (2 * (Long.SIZE / Byte.SIZE)));
-          offsetIndexMeta = fin.readLong();
-          offsetCryptoParameters = fin.readLong();
+          this.in.seek(fileLength - Magic.size() - Version.size() - (2 * (Long.SIZE / Byte.SIZE)));
+          offsetIndexMeta = this.in.readLong();
+          offsetCryptoParameters = this.in.readLong();
         }
 
         // read meta index
-        fin.seek(offsetIndexMeta);
-        metaIndex = new MetaIndex(fin);
+        this.in.seek(offsetIndexMeta);
+        metaIndex = new MetaIndex(this.in);
 
         // If they exist, read the crypto parameters
         if (!version.equals(BCFile.API_VERSION_1) && cachedCryptoParams == null) {
 
           // read crypto parameters
-          fin.seek(offsetCryptoParameters);
+          this.in.seek(offsetCryptoParameters);
           cryptoParams = new BCFileCryptoModuleParameters();
-          cryptoParams.read(fin);
+          cryptoParams.read(this.in);
 
           if (accumuloConfiguration.getBoolean(Property.CRYPTO_OVERRIDE_KEY_STRATEGY_WITH_CONFIGURED_STRATEGY)) {
             Map<String,String> cryptoConfFromAccumuloConf = accumuloConfiguration.getAllPropertiesWithPrefix(Property.CRYPTO_PREFIX);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
deleted file mode 100644
index f93bb84..0000000
--- a/core/src/main/java/org/apache/accumulo/core/file/rfile/bcfile/BoundedRangeFileInputStream.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.accumulo.core.file.rfile.bcfile;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.security.AccessController;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-
-/**
- * BoundedRangeFIleInputStream abstracts a contiguous region of a Hadoop FSDataInputStream as a regular input stream. One can create multiple
- * BoundedRangeFileInputStream on top of the same FSDataInputStream and they would not interfere with each other.
- */
-class BoundedRangeFileInputStream extends InputStream {
-
-  private FSDataInputStream in;
-  private long pos;
-  private long end;
-  private long mark;
-  private final byte[] oneByte = new byte[1];
-
-  /**
-   * Constructor
-   *
-   * @param in
-   *          The FSDataInputStream we connect to.
-   * @param offset
-   *          Beginning offset of the region.
-   * @param length
-   *          Length of the region.
-   *
-   *          The actual length of the region may be smaller if (off_begin + length) goes beyond the end of FS input stream.
-   */
-  public BoundedRangeFileInputStream(FSDataInputStream in, long offset, long length) {
-    if (offset < 0 || length < 0) {
-      throw new IndexOutOfBoundsException("Invalid offset/length: " + offset + "/" + length);
-    }
-
-    this.in = in;
-    this.pos = offset;
-    this.end = offset + length;
-    this.mark = -1;
-  }
-
-  @Override
-  public int available() throws IOException {
-    int avail = in.available();
-    if (pos + avail > end) {
-      avail = (int) (end - pos);
-    }
-
-    return avail;
-  }
-
-  @Override
-  public int read() throws IOException {
-    int ret = read(oneByte);
-    if (ret == 1)
-      return oneByte[0] & 0xff;
-    return -1;
-  }
-
-  @Override
-  public int read(byte[] b) throws IOException {
-    return read(b, 0, b.length);
-  }
-
-  @Override
-  public int read(final byte[] b, final int off, int len) throws IOException {
-    if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
-      throw new IndexOutOfBoundsException();
-    }
-
-    final int n = (int) Math.min(Integer.MAX_VALUE, Math.min(len, (end - pos)));
-    if (n == 0)
-      return -1;
-    Integer ret = 0;
-    final FSDataInputStream inLocal = in;
-    synchronized (inLocal) {
-      inLocal.seek(pos);
-      try {
-        ret = AccessController.doPrivileged(new PrivilegedExceptionAction<Integer>() {
-          @Override
-          public Integer run() throws IOException {
-            int ret = 0;
-            ret = inLocal.read(b, off, n);
-            return ret;
-          }
-        });
-      } catch (PrivilegedActionException e) {
-        throw (IOException) e.getException();
-      }
-    }
-    if (ret < 0) {
-      end = pos;
-      return -1;
-    }
-    pos += ret;
-    return ret;
-  }
-
-  @Override
-  /*
-   * We may skip beyond the end of the file.
-   */
-  public long skip(long n) throws IOException {
-    long len = Math.min(n, end - pos);
-    pos += len;
-    return len;
-  }
-
-  @Override
-  public void mark(int readlimit) {
-    mark = pos;
-  }
-
-  @Override
-  public void reset() throws IOException {
-    if (mark < 0)
-      throw new IOException("Resetting to invalid mark");
-    pos = mark;
-  }
-
-  @Override
-  public boolean markSupported() {
-    return true;
-  }
-
-  @Override
-  public void close() {
-    // Invalidate the state of the stream.
-    in = null;
-    pos = end;
-    mark = -1;
-  }
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java
new file mode 100644
index 0000000..1c01843
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/BoundedRangeFileInputStream.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.AccessController;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.fs.Seekable;
+
+/**
+ * BoundedRangeFIleInputStream abstracts a contiguous region of a Hadoop FSDataInputStream as a regular input stream. One can create multiple
+ * BoundedRangeFileInputStream on top of the same FSDataInputStream and they would not interfere with each other.
+ */
+public class BoundedRangeFileInputStream extends InputStream {
+  private InputStream in;
+  private long pos;
+  private long end;
+  private long mark;
+  private final byte[] oneByte = new byte[1];
+
+  /**
+   * Constructor
+   *
+   * @param in
+   *          The FSDataInputStream we connect to.
+   * @param offset
+   *          Beginning offset of the region.
+   * @param length
+   *          Length of the region.
+   *
+   *          The actual length of the region may be smaller if (off_begin + length) goes beyond the end of FS input stream.
+   */
+  public <StreamType extends InputStream & Seekable> BoundedRangeFileInputStream(StreamType in, long offset, long length) {
+    if (offset < 0 || length < 0) {
+      throw new IndexOutOfBoundsException("Invalid offset/length: " + offset + "/" + length);
+    }
+
+    this.in = in;
+    this.pos = offset;
+    this.end = offset + length;
+    this.mark = -1;
+  }
+
+  @Override
+  public int available() throws IOException {
+    int avail = in.available();
+    if (pos + avail > end) {
+      avail = (int) (end - pos);
+    }
+
+    return avail;
+  }
+
+  @Override
+  public int read() throws IOException {
+    int ret = read(oneByte);
+    if (ret == 1)
+      return oneByte[0] & 0xff;
+    return -1;
+  }
+
+  @Override
+  public int read(byte[] b) throws IOException {
+    return read(b, 0, b.length);
+  }
+
+  @Override
+  public int read(final byte[] b, final int off, int len) throws IOException {
+    if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
+      throw new IndexOutOfBoundsException();
+    }
+
+    final int n = (int) Math.min(Integer.MAX_VALUE, Math.min(len, (end - pos)));
+    if (n == 0)
+      return -1;
+    Integer ret = 0;
+    final InputStream inLocal = in;
+    synchronized (inLocal) {
+      ((Seekable) inLocal).seek(pos);
+      try {
+        ret = AccessController.doPrivileged(new PrivilegedExceptionAction<Integer>() {
+          @Override
+          public Integer run() throws IOException {
+            int ret = 0;
+            ret = inLocal.read(b, off, n);
+            return ret;
+          }
+        });
+      } catch (PrivilegedActionException e) {
+        throw (IOException) e.getException();
+      }
+    }
+    if (ret < 0) {
+      end = pos;
+      return -1;
+    }
+    pos += ret;
+    return ret;
+  }
+
+  @Override
+  /*
+   * We may skip beyond the end of the file.
+   */
+  public long skip(long n) throws IOException {
+    long len = Math.min(n, end - pos);
+    pos += len;
+    return len;
+  }
+
+  @Override
+  public void mark(int readlimit) {
+    mark = pos;
+  }
+
+  @Override
+  public void reset() throws IOException {
+    if (mark < 0)
+      throw new IOException("Resetting to invalid mark");
+    pos = mark;
+  }
+
+  @Override
+  public boolean markSupported() {
+    return true;
+  }
+
+  @Override
+  public void close() {
+    // Invalidate the state of the stream.
+    in = null;
+    pos = end;
+    mark = -1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java
new file mode 100644
index 0000000..bd18426
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedDataOutputStream.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A filter converting a {@link PositionedOutput} {@code OutputStream} to a {@code PositionedOutput} {@code DataOutputStream}
+ */
+public class PositionedDataOutputStream extends DataOutputStream implements PositionedOutput {
+  public <StreamType extends OutputStream & PositionedOutput> PositionedDataOutputStream(StreamType type) {
+    super(type);
+  }
+
+  @Override
+  public long position() throws IOException {
+    return ((PositionedOutput) out).position();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java
new file mode 100644
index 0000000..e5dcba4
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutput.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.IOException;
+
+/**
+ * For any byte sink (but especially OutputStream), the ability to report how many bytes have been sunk.
+ */
+public interface PositionedOutput {
+  public long position() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
new file mode 100644
index 0000000..4769818
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/PositionedOutputs.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Objects;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+/**
+ * Utility functions for {@link PositionedOutput}.
+ */
+public class PositionedOutputs {
+  private PositionedOutputs() {}
+
+  /** Convert an {@code OutputStream} into an {@code OutputStream} implementing {@link PositionedOutput}. */
+  public static PositionedOutputStream wrap(final OutputStream fout) {
+    Objects.requireNonNull(fout);
+    if (fout instanceof FSDataOutputStream) {
+      return new PositionedOutputStream(fout) {
+        @Override
+        public long position() throws IOException {
+          return ((FSDataOutputStream) fout).getPos();
+        }
+      };
+    } else if (fout instanceof PositionedOutput) {
+      return new PositionedOutputStream(fout) {
+        @Override
+        public long position() throws IOException {
+          return ((PositionedOutput) fout).position();
+        }
+      };
+    } else {
+      return new PositionedOutputStream(fout) {
+        @Override
+        public long position() throws IOException {
+          throw new UnsupportedOperationException("Underlying stream does not support position()");
+        }
+      };
+    }
+  }
+
+  private static abstract class PositionedOutputStream extends FilterOutputStream implements PositionedOutput {
+    public PositionedOutputStream(OutputStream stream) {
+      super(stream);
+    }
+
+    @Override
+    public void write(byte[] data, int off, int len) throws IOException {
+      out.write(data, off, len);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java
new file mode 100644
index 0000000..5254086
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedInputStream.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.hadoop.fs.Seekable;
+
+/**
+ * A decorator for an {@code InputStream} which limits the rate at which reads are performed.
+ */
+public class RateLimitedInputStream extends FilterInputStream implements Seekable {
+  private final RateLimiter rateLimiter;
+
+  public <StreamType extends InputStream & Seekable> RateLimitedInputStream(StreamType stream, RateLimiter rateLimiter) {
+    super(stream);
+    this.rateLimiter = rateLimiter == null ? NullRateLimiter.INSTANCE : rateLimiter;
+  }
+
+  @Override
+  public int read() throws IOException {
+    int val = in.read();
+    if (val >= 0) {
+      rateLimiter.acquire(1);
+    }
+    return val;
+  }
+
+  @Override
+  public int read(byte[] buffer, int offset, int length) throws IOException {
+    int count = in.read(buffer, offset, length);
+    if (count > 0) {
+      rateLimiter.acquire(count);
+    }
+    return count;
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    ((Seekable) in).seek(pos);
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return ((Seekable) in).getPos();
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return ((Seekable) in).seekToNewSource(targetPos);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
new file mode 100644
index 0000000..b426a6b
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStream.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import org.apache.accumulo.core.util.ratelimit.NullRateLimiter;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+
+/**
+ * A decorator for {@code OutputStream} which limits the rate at which data may be written.
+ */
+public class RateLimitedOutputStream extends FilterOutputStream implements PositionedOutput {
+  private final RateLimiter writeLimiter;
+
+  public RateLimitedOutputStream(OutputStream wrappedStream, RateLimiter writeLimiter) {
+    super(PositionedOutputs.wrap(wrappedStream));
+    this.writeLimiter = writeLimiter == null ? NullRateLimiter.INSTANCE : writeLimiter;
+  }
+
+  @Override
+  public void write(int i) throws IOException {
+    writeLimiter.acquire(1);
+    out.write(i);
+  }
+
+  @Override
+  public void write(byte[] buffer, int offset, int length) throws IOException {
+    writeLimiter.acquire(length);
+    out.write(buffer, offset, length);
+  }
+
+  @Override
+  public void close() throws IOException {
+    out.close();
+  }
+
+  @Override
+  public long position() throws IOException {
+    return ((PositionedOutput) out).position();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/file/streams/SeekableDataInputStream.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/file/streams/SeekableDataInputStream.java b/core/src/main/java/org/apache/accumulo/core/file/streams/SeekableDataInputStream.java
new file mode 100644
index 0000000..09060f5
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/file/streams/SeekableDataInputStream.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import org.apache.hadoop.fs.Seekable;
+
+/**
+ * A wrapper converting a {@link Seekable} {@code InputStream} into a {@code Seekable} {@link DataInputStream}
+ */
+public class SeekableDataInputStream extends DataInputStream implements Seekable {
+  public <StreamType extends InputStream & Seekable> SeekableDataInputStream(StreamType stream) {
+    super(stream);
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    ((Seekable) in).seek(pos);
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return ((Seekable) in).getPos();
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return ((Seekable) in).seekToNewSource(targetPos);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java
new file mode 100644
index 0000000..6e9781d
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/GuavaRateLimiter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util.ratelimit;
+
+/** Rate limiter from the Guava library. */
+public class GuavaRateLimiter implements RateLimiter {
+  private final com.google.common.util.concurrent.RateLimiter rateLimiter;
+  private long currentRate;
+
+  /**
+   * Constructor
+   *
+   * @param initialRate
+   *          Count of permits which should be made available per second. A nonpositive rate is taken to indicate there should be no limitation on rate.
+   */
+  public GuavaRateLimiter(long initialRate) {
+    this.currentRate = initialRate;
+    this.rateLimiter = com.google.common.util.concurrent.RateLimiter.create(initialRate > 0 ? initialRate : Long.MAX_VALUE);
+  }
+
+  @Override
+  public long getRate() {
+    return currentRate;
+  }
+
+  /**
+   * Change the rate at which permits are made available.
+   *
+   * @param newRate
+   *          Count of permits which should be made available per second. A nonpositive rate is taken to indicate that there should be no limitation on rate.
+   */
+  public void setRate(long newRate) {
+    this.rateLimiter.setRate(newRate > 0 ? newRate : Long.MAX_VALUE);
+    this.currentRate = newRate;
+  }
+
+  @Override
+  public void acquire(long permits) {
+    if (this.currentRate > 0) {
+      while (permits > Integer.MAX_VALUE) {
+        rateLimiter.acquire(Integer.MAX_VALUE);
+        permits -= Integer.MAX_VALUE;
+      }
+      if (permits > 0) {
+        rateLimiter.acquire((int) permits);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/util/ratelimit/NullRateLimiter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/NullRateLimiter.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/NullRateLimiter.java
new file mode 100644
index 0000000..ac746c8
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/NullRateLimiter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util.ratelimit;
+
+/** A rate limiter which doesn't actually limit rates at all. */
+public class NullRateLimiter implements RateLimiter {
+  public static final NullRateLimiter INSTANCE = new NullRateLimiter();
+
+  private NullRateLimiter() {}
+
+  @Override
+  public long getRate() {
+    return 0;
+  }
+
+  @Override
+  public void acquire(long permits) {}
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java
new file mode 100644
index 0000000..ff64840
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/RateLimiter.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util.ratelimit;
+
+public interface RateLimiter {
+  /**
+   * Get current QPS of the rate limiter, with a nonpositive rate indicating no limit.
+   */
+  public long getRate();
+
+  /** Sleep until the specified number of queries are available. */
+  public void acquire(long permits);
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
new file mode 100644
index 0000000..ac1eec9
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/ratelimit/SharedRateLimiterFactory.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.util.ratelimit;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.WeakHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides the ability to retrieve a {@link RateLimiter} keyed to a specific string, which will dynamically update its rate according to a specified callback
+ * function.
+ */
+public class SharedRateLimiterFactory {
+  private static final long REPORT_RATE = 60000;
+  private static final long UPDATE_RATE = 1000;
+  private static SharedRateLimiterFactory instance = null;
+  private final Logger log = LoggerFactory.getLogger(SharedRateLimiterFactory.class);
+  private final WeakHashMap<String,SharedRateLimiter> activeLimiters = new WeakHashMap<>();
+
+  private SharedRateLimiterFactory() {}
+
+  /** Get the singleton instance of the SharedRateLimiterFactory. */
+  public static synchronized SharedRateLimiterFactory getInstance() {
+    if (instance == null) {
+      instance = new SharedRateLimiterFactory();
+
+      Timer timer = new Timer("SharedRateLimiterFactory update/report polling");
+
+      // Update periodically
+      timer.schedule(new TimerTask() {
+        @Override
+        public void run() {
+          instance.update();
+        }
+      }, UPDATE_RATE, UPDATE_RATE);
+
+      // Report periodically
+      timer.schedule(new TimerTask() {
+        @Override
+        public void run() {
+          instance.report();
+        }
+      }, REPORT_RATE, REPORT_RATE);
+    }
+    return instance;
+  }
+
+  /**
+   * A callback which provides the current rate for a {@link RateLimiter}.
+   */
+  public static interface RateProvider {
+    /**
+     * Calculate the current rate for the {@link RateLimiter}.
+     *
+     * @return Count of permits which should be provided per second. A nonpositive count is taken to indicate that no rate limiting should be performed.
+     */
+    public long getDesiredRate();
+  }
+
+  /**
+   * Lookup the RateLimiter associated with the specified name, or create a new one for that name.
+   *
+   * @param name
+   *          key for the rate limiter
+   * @param rateProvider
+   *          a function which can be called to get what the current rate for the rate limiter should be.
+   */
+  public RateLimiter create(String name, RateProvider rateProvider) {
+    synchronized (activeLimiters) {
+      if (activeLimiters.containsKey(name)) {
+        SharedRateLimiter limiter = activeLimiters.get(name);
+        return limiter;
+      } else {
+        long initialRate;
+        initialRate = rateProvider.getDesiredRate();
+        SharedRateLimiter limiter = new SharedRateLimiter(name, rateProvider, initialRate);
+        activeLimiters.put(name, limiter);
+        return limiter;
+      }
+    }
+  }
+
+  /**
+   * Walk through all of the currently active RateLimiters, having each update its current rate. This is called periodically so that we can dynamically update
+   * as configuration changes.
+   */
+  protected void update() {
+    Map<String,SharedRateLimiter> limitersCopy;
+    synchronized (activeLimiters) {
+      limitersCopy = ImmutableMap.copyOf(activeLimiters);
+    }
+    for (Map.Entry<String,SharedRateLimiter> entry : limitersCopy.entrySet()) {
+      try {
+        entry.getValue().update();
+      } catch (Exception ex) {
+        log.error(String.format("Failed to update limiter %s", entry.getKey()), ex);
+      }
+    }
+  }
+
+  /** Walk through all of the currently active RateLimiters, having each report its activity to the debug log. */
+  protected void report() {
+    Map<String,SharedRateLimiter> limitersCopy;
+    synchronized (activeLimiters) {
+      limitersCopy = ImmutableMap.copyOf(activeLimiters);
+    }
+    for (Map.Entry<String,SharedRateLimiter> entry : limitersCopy.entrySet()) {
+      try {
+        entry.getValue().report();
+      } catch (Exception ex) {
+        log.error(String.format("Failed to report limiter %s", entry.getKey()), ex);
+      }
+    }
+  }
+
+  protected class SharedRateLimiter extends GuavaRateLimiter {
+    private volatile long permitsAcquired = 0;
+    private volatile long lastUpdate;
+
+    private final RateProvider rateProvider;
+    private final String name;
+
+    SharedRateLimiter(String name, RateProvider rateProvider, long initialRate) {
+      super(initialRate);
+      this.name = name;
+      this.rateProvider = rateProvider;
+      this.lastUpdate = System.currentTimeMillis();
+    }
+
+    @Override
+    public void acquire(long permits) {
+      super.acquire(permits);
+      permitsAcquired += permits;
+    }
+
+    /** Poll the callback, updating the current rate if necessary. */
+    public void update() {
+      // Reset rate if needed
+      long rate = rateProvider.getDesiredRate();
+      if (rate != getRate()) {
+        setRate(rate);
+      }
+    }
+
+    /** Report the current throughput and usage of this rate limiter to the debug log. */
+    public void report() {
+      if (log.isDebugEnabled()) {
+        long duration = System.currentTimeMillis() - lastUpdate;
+        if (duration == 0)
+          return;
+        lastUpdate = System.currentTimeMillis();
+
+        long sum = permitsAcquired;
+        permitsAcquired = 0;
+
+        if (sum > 0) {
+          log.debug(String.format("RateLimiter '%s': %,d of %,d permits/second", name, sum * 1000L / duration, getRate()));
+        }
+      }
+    }
+  }
+}


Mime
View raw message