accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ctubb...@apache.org
Subject [1/3] accumulo git commit: ACCUMULO-4187: Added rate limiting for major compactions.
Date Tue, 19 Apr 2016 17:51:54 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master 4f45908c0 -> c8e6fe749


http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java b/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
index 792e199..ce49f36 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mock/MockTableOperationsTest.java
@@ -225,7 +225,7 @@ public class MockTableOperationsTest {
     fs.delete(tempFile, true);
     fs.mkdirs(failures);
     fs.mkdirs(tempFile.getParent());
-    FileSKVWriter writer = FileOperations.getInstance().openWriter(tempFile.toString(), fs, defaultConf, AccumuloConfiguration.getDefaultConfiguration());
+    FileSKVWriter writer = FileOperations.getInstance().openWriter(tempFile.toString(), fs, defaultConf, null, AccumuloConfiguration.getDefaultConfiguration());
     writer.startDefaultLocalityGroup();
     List<Pair<Key,Value>> keyVals = new ArrayList<Pair<Key,Value>>();
     for (int i = 0; i < 5; i++) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java b/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java
index ca388e5..0fb4bd6 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/BloomFilterLayerLookupTest.java
@@ -80,7 +80,7 @@ public class BloomFilterLayerLookupTest {
     // get output file name
     String suffix = FileOperations.getNewFileExtension(acuconf);
     String fname = new File(tempDir.getRoot(), testName + "." + suffix).getAbsolutePath();
-    FileSKVWriter bmfw = FileOperations.getInstance().openWriter(fname, fs, conf, acuconf);
+    FileSKVWriter bmfw = FileOperations.getInstance().openWriter(fname, fs, conf, null, acuconf);
 
     // write data to file
     long t1 = System.currentTimeMillis();
@@ -96,7 +96,7 @@ public class BloomFilterLayerLookupTest {
     bmfw.close();
 
     t1 = System.currentTimeMillis();
-    FileSKVIterator bmfr = FileOperations.getInstance().openReader(fname, false, fs, conf, acuconf);
+    FileSKVIterator bmfr = FileOperations.getInstance().openReader(fname, false, fs, conf, null, acuconf);
     t2 = System.currentTimeMillis();
     LOG.debug("Opened " + fname + " in " + (t2 - t1));
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java b/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java
index 3fdeb8a..e8ccf35 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/FileOperationsTest.java
@@ -51,7 +51,7 @@ public class FileOperationsTest {
       Configuration conf = new Configuration();
       FileSystem fs = FileSystem.getLocal(conf);
       AccumuloConfiguration acuconf = AccumuloConfiguration.getDefaultConfiguration();
-      writer = fileOperations.openWriter(filename, fs, conf, acuconf);
+      writer = fileOperations.openWriter(filename, fs, conf, null, acuconf);
       writer.close();
     } catch (Exception ex) {
       caughtException = true;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
index 3eadc06..ad29a69 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/CreateCompatTestFile.java
@@ -56,7 +56,7 @@ public class CreateCompatTestFile {
   public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();
     FileSystem fs = FileSystem.get(conf);
-    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs, new Path(args[0]), "gz", conf, AccumuloConfiguration.getDefaultConfiguration());
+    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(fs, new Path(args[0]), "gz", null, conf, AccumuloConfiguration.getDefaultConfiguration());
     RFile.Writer writer = new RFile.Writer(_cbw, 1000);
 
     writer.startNewLocalityGroup("lg1", ncfs(nf("cf_", 1), nf("cf_", 2)));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
index 66978dd..391bea1 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiLevelIndexTest.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
 import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Writer;
 import org.apache.accumulo.core.file.rfile.RFileTest.SeekableByteArrayInputStream;
+import org.apache.accumulo.core.file.streams.PositionedOutputs;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -54,7 +55,7 @@ public class MultiLevelIndexTest extends TestCase {
     AccumuloConfiguration aconf = AccumuloConfiguration.getDefaultConfiguration();
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     FSDataOutputStream dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
-    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(dos, "gz", CachedConfiguration.getInstance(), aconf);
+    CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos), "gz", CachedConfiguration.getInstance(), aconf);
 
     BufferedWriter mliw = new BufferedWriter(new Writer(_cbw, maxBlockSize));
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
index a442347..5f66503 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/RFileTest.java
@@ -86,6 +86,7 @@ import com.google.common.hash.HashCode;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import com.google.common.primitives.Bytes;
+import org.apache.accumulo.core.file.streams.PositionedOutputs;
 
 public class RFileTest {
 
@@ -224,7 +225,7 @@ public class RFileTest {
     public void openWriter(boolean startDLG, int blockSize) throws IOException {
       baos = new ByteArrayOutputStream();
       dos = new FSDataOutputStream(baos, new FileSystem.Statistics("a"));
-      CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(dos, "gz", conf, accumuloConfiguration);
+      CachableBlockFile.Writer _cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(dos), "gz", conf, accumuloConfiguration);
 
       SamplerConfigurationImpl samplerConfig = SamplerConfigurationImpl.newSamplerConfig(accumuloConfiguration);
       Sampler sampler = null;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/streams/MockRateLimiter.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/streams/MockRateLimiter.java b/core/src/test/java/org/apache/accumulo/core/file/streams/MockRateLimiter.java
new file mode 100644
index 0000000..9574d36
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/streams/MockRateLimiter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+
+public class MockRateLimiter implements RateLimiter {
+  private final AtomicLong permitsAcquired = new AtomicLong();
+
+  @Override
+  public long getRate() {
+    return 0;
+  }
+
+  @Override
+  public void acquire(long permits) {
+    permitsAcquired.addAndGet(permits);
+  }
+
+  public long getPermitsAcquired() {
+    return permitsAcquired.get();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java
new file mode 100644
index 0000000..6baff87
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedInputStreamTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+import org.apache.hadoop.fs.Seekable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RateLimitedInputStreamTest {
+
+  @Test
+  public void permitsAreProperlyAcquired() throws Exception {
+    Random randGen = new Random();
+    MockRateLimiter rateLimiter = new MockRateLimiter();
+    long bytesRetrieved = 0;
+    try (InputStream is = new RateLimitedInputStream(new RandomInputStream(), rateLimiter)) {
+      for (int i = 0; i < 100; ++i) {
+        int count = Math.abs(randGen.nextInt()) % 65536;
+        int countRead = is.read(new byte[count]);
+        Assert.assertEquals(count, countRead);
+        bytesRetrieved += count;
+      }
+    }
+    Assert.assertEquals(bytesRetrieved, rateLimiter.getPermitsAcquired());
+  }
+
+  private static class RandomInputStream extends InputStream implements Seekable {
+    private final Random r = new Random();
+
+    @Override
+    public int read() throws IOException {
+      return r.nextInt() & 0xff;
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      throw new UnsupportedOperationException("Not supported yet."); // To change body of generated methods, choose Tools | Templates.
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      throw new UnsupportedOperationException("Not supported yet."); // To change body of generated methods, choose Tools | Templates.
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      throw new UnsupportedOperationException("Not supported yet."); // To change body of generated methods, choose Tools | Templates.
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
new file mode 100644
index 0000000..9e12354
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/file/streams/RateLimitedOutputStreamTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.file.streams;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.CountingOutputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.util.Random;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RateLimitedOutputStreamTest {
+
+  @Test
+  public void permitsAreProperlyAcquired() throws Exception {
+    Random randGen = new Random();
+    MockRateLimiter rateLimiter = new MockRateLimiter();
+    long bytesWritten = 0;
+    try (RateLimitedOutputStream os = new RateLimitedOutputStream(new NullOutputStream(), rateLimiter)) {
+      for (int i = 0; i < 100; ++i) {
+        byte[] bytes = new byte[Math.abs(randGen.nextInt() % 65536)];
+        os.write(bytes);
+        bytesWritten += bytes.length;
+      }
+      Assert.assertEquals(bytesWritten, os.position());
+    }
+    Assert.assertEquals(bytesWritten, rateLimiter.getPermitsAcquired());
+  }
+
+  public static class NullOutputStream extends FilterOutputStream implements PositionedOutput {
+    public NullOutputStream() {
+      super(new CountingOutputStream(ByteStreams.nullOutputStream()));
+    }
+
+    @Override
+    public long position() throws IOException {
+      return ((CountingOutputStream) out).getCount();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
index 2656d4b..cf55e35 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java
@@ -642,7 +642,7 @@ public class BulkImporter {
     String filename = file.toString();
     // log.debug(filename + " finding overlapping tablets " + startRow + " -> " + endRow);
     FileSystem fs = vm.getVolumeByPath(file).getFileSystem();
-    FileSKVIterator reader = FileOperations.getInstance().openReader(filename, true, fs, fs.getConf(), context.getConfiguration());
+    FileSKVIterator reader = FileOperations.getInstance().openReader(filename, true, fs, fs.getConf(), null, context.getConfiguration());
     try {
       Text row = startRow;
       if (row == null)

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 4378f2c..e58a79c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -465,7 +465,7 @@ public class Initialize implements KeywordExecutable {
       createEntriesForTablet(sorted, tablet);
     }
     FileSystem fs = volmanager.getVolumeByPath(new Path(fileName)).getFileSystem();
-    FileSKVWriter tabletWriter = FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), AccumuloConfiguration.getDefaultConfiguration());
+    FileSKVWriter tabletWriter = FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), null, AccumuloConfiguration.getDefaultConfiguration());
     tabletWriter.startDefaultLocalityGroup();
 
     for (Entry<Key,Value> entry : sorted.entrySet()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
index 04e17d5..9291808 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileUtil.java
@@ -132,7 +132,7 @@ public class FileUtil {
 
       outFiles.add(newMapFile);
       FileSystem ns = fs.getVolumeByPath(new Path(newMapFile)).getFileSystem();
-      FileSKVWriter writer = new RFileOperations().openWriter(newMapFile.toString(), ns, ns.getConf(), acuConf);
+      FileSKVWriter writer = new RFileOperations().openWriter(newMapFile.toString(), ns, ns.getConf(), null, acuConf);
       writer.startDefaultLocalityGroup();
       List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(inFiles.size());
 
@@ -404,7 +404,7 @@ public class FileUtil {
           reader = FileOperations.getInstance().openIndex(path.toString(), ns, ns.getConf(), acuConf);
         else
           reader = FileOperations.getInstance().openReader(path.toString(), new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false,
-              ns, ns.getConf(), acuConf);
+              ns, ns.getConf(), null, acuConf);
 
         while (reader.hasTop()) {
           Key key = reader.getTopKey();
@@ -428,7 +428,7 @@ public class FileUtil {
         readers.add(FileOperations.getInstance().openIndex(path.toString(), ns, ns.getConf(), acuConf));
       else
         readers.add(FileOperations.getInstance().openReader(path.toString(), new Range(prevEndRow, false, null, true), LocalityGroupUtil.EMPTY_CF_SET, false,
-            ns, ns.getConf(), acuConf));
+            ns, ns.getConf(), null, acuConf));
 
     }
     return numKeys;
@@ -445,7 +445,7 @@ public class FileUtil {
       FileSKVIterator reader = null;
       FileSystem ns = fs.getVolumeByPath(mapfile.path()).getFileSystem();
       try {
-        reader = FileOperations.getInstance().openReader(mapfile.toString(), false, ns, ns.getConf(), acuConf);
+        reader = FileOperations.getInstance().openReader(mapfile.toString(), false, ns, ns.getConf(), null, acuConf);
 
         Key firstKey = reader.getFirstKey();
         if (firstKey != null) {
@@ -479,7 +479,7 @@ public class FileUtil {
     for (FileRef ref : mapFiles) {
       Path path = ref.path();
       FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
-      FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), acuConf);
+      FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), null, acuConf);
 
       try {
         if (!reader.hasTop())

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
----------------------------------------------------------------------
diff --git a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
index 0116101..646e400 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java
@@ -113,7 +113,7 @@ public class BulkImporterTest {
     EasyMock.replay(context);
     String file = "target/testFile.rf";
     fs.delete(new Path(file), true);
-    FileSKVWriter writer = FileOperations.getInstance().openWriter(file, fs, fs.getConf(), context.getConfiguration());
+    FileSKVWriter writer = FileOperations.getInstance().openWriter(file, fs, fs.getConf(), null, context.getConfiguration());
     writer.startDefaultLocalityGroup();
     Value empty = new Value(new byte[] {});
     writer.append(new Key("a", "cf", "cq"), empty);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
index 2928418..c28d060 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java
@@ -316,7 +316,7 @@ public class FileManager {
         Path path = new Path(file);
         FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
         // log.debug("Opening "+file + " path " + path);
-        FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(),
+        FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), false, ns, ns.getConf(), null,
             context.getServerConfigurationFactory().getTableConfiguration(tablet), dataCache, indexCache);
         reservedFiles.add(reader);
         readersReserved.put(reader, file);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
index ca4719d..5219e33 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/InMemoryMap.java
@@ -632,7 +632,7 @@ public class InMemoryMap {
         Configuration conf = CachedConfiguration.getInstance();
         FileSystem fs = FileSystem.getLocal(conf);
 
-        reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, SiteConfiguration.getInstance());
+        reader = new RFileOperations().openReader(memDumpFile, true, fs, conf, null, SiteConfiguration.getInstance());
         if (iflag != null)
           reader.setInterruptFlag(iflag);
 
@@ -804,7 +804,7 @@ public class InMemoryMap {
           siteConf = createSampleConfig(siteConf);
         }
 
-        FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, siteConf);
+        FileSKVWriter out = new RFileOperations().openWriter(tmpFile, fs, newConf, null, siteConf);
 
         InterruptibleIterator iter = map.skvIterator(null);
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index f16f726..1523c55 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -256,8 +256,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.net.HostAndPort;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory;
 
 public class TabletServer extends AccumuloServerContext implements Runnable {
+
   private static final Logger log = LoggerFactory.getLogger(TabletServer.class);
   private static final long MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS = 1000;
   private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000;
@@ -330,7 +333,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
     super(confFactory);
     this.confFactory = confFactory;
     this.fs = fs;
-    AccumuloConfiguration aconf = getConfiguration();
+    final AccumuloConfiguration aconf = getConfiguration();
     Instance instance = getInstance();
     log.info("Version " + Constants.VERSION);
     log.info("Instance " + instance.getInstanceID());
@@ -3089,4 +3092,28 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
     bulkImportStatus.removeBulkImportStatus(files);
   }
 
+  private static final String MAJC_READ_LIMITER_KEY = "tserv_majc_read";
+  private static final String MAJC_WRITE_LIMITER_KEY = "tserv_majc_write";
+  private final SharedRateLimiterFactory.RateProvider rateProvider = new SharedRateLimiterFactory.RateProvider() {
+    @Override
+    public long getDesiredRate() {
+      return getConfiguration().getMemoryInBytes(Property.TSERV_MAJC_THROUGHPUT);
+    }
+  };
+
+  /**
+   * Get the {@link RateLimiter} for reads during major compactions on this tserver. All writes performed during major compactions are throttled to conform to
+   * this RateLimiter.
+   */
+  public final RateLimiter getMajorCompactionReadLimiter() {
+    return SharedRateLimiterFactory.getInstance().create(MAJC_READ_LIMITER_KEY, rateProvider);
+  }
+
+  /**
+   * Get the RateLimiter for writes during major compations on this tserver. All reads performed during major compactions are throttled to conform to this
+   * RateLimiter.
+   */
+  public final RateLimiter getMajorCompactionWriteLimiter() {
+    return SharedRateLimiterFactory.getInstance().create(MAJC_WRITE_LIMITER_KEY, rateProvider);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
index 26b1b12..79c0c4f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/MajorCompactionRequest.java
@@ -77,7 +77,7 @@ public class MajorCompactionRequest implements Cloneable {
     // @TODO ensure these files are always closed?
     FileOperations fileFactory = FileOperations.getInstance();
     FileSystem ns = volumeManager.getVolumeByPath(ref.path()).getFileSystem();
-    FileSKVIterator openReader = fileFactory.openReader(ref.path().toString(), true, ns, ns.getConf(), tableConfig);
+    FileSKVIterator openReader = fileFactory.openReader(ref.path().toString(), true, ns, ns.getConf(), null, tableConfig);
     return openReader;
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
index 2ef4a34..bde5be0 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Compactor.java
@@ -52,6 +52,7 @@ import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.util.LocalityGroupUtil;
 import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -81,6 +82,10 @@ public class Compactor implements Callable<CompactionStats> {
     boolean isCompactionEnabled();
 
     IteratorScope getIteratorScope();
+
+    RateLimiter getReadLimiter();
+
+    RateLimiter getWriteLimiter();
   }
 
   private final Map<FileRef,DataFileValue> filesToCompact;
@@ -192,7 +197,7 @@ public class Compactor implements Callable<CompactionStats> {
     try {
       FileOperations fileFactory = FileOperations.getInstance();
       FileSystem ns = this.fs.getVolumeByPath(outputFilePath).getFileSystem();
-      mfw = fileFactory.openWriter(outputFilePathName, ns, ns.getConf(), acuTableConf);
+      mfw = fileFactory.openWriter(outputFilePathName, ns, ns.getConf(), env.getWriteLimiter(), acuTableConf);
 
       Map<String,Set<ByteSequence>> lGroups;
       try {
@@ -280,7 +285,7 @@ public class Compactor implements Callable<CompactionStats> {
         FileSystem fs = this.fs.getVolumeByPath(mapFile.path()).getFileSystem();
         FileSKVIterator reader;
 
-        reader = fileFactory.openReader(mapFile.path().toString(), false, fs, fs.getConf(), acuTableConf);
+        reader = fileFactory.openReader(mapFile.path().toString(), false, fs, fs.getConf(), env.getReadLimiter(), acuTableConf);
 
         readers.add(reader);
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
index e3f8193..6bd2545 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.impl.Tables;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.master.state.tables.TableState;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.problems.ProblemReport;
@@ -69,6 +70,16 @@ public class MinorCompactor extends Compactor {
       public IteratorScope getIteratorScope() {
         return IteratorScope.minc;
       }
+
+      @Override
+      public RateLimiter getReadLimiter() {
+        return null;
+      }
+
+      @Override
+      public RateLimiter getWriteLimiter() {
+        return null;
+      }
     }, Collections.<IteratorSetting> emptyList(), mincReason.ordinal(), tableConfig);
     this.tabletServer = tabletServer;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index cf99dbd..fb47afc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -18,7 +18,6 @@ package org.apache.accumulo.tserver.tablet;
 
 import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Objects.requireNonNull;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
@@ -153,6 +152,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import org.apache.accumulo.core.util.ratelimit.RateLimiter;
+import static java.util.Objects.requireNonNull;
 
 /**
  *
@@ -1583,7 +1584,7 @@ public class Tablet implements TabletCommitter {
     for (Entry<FileRef,DataFileValue> entry : allFiles.entrySet()) {
       FileRef file = entry.getKey();
       FileSystem ns = fs.getVolumeByPath(file.path()).getFileSystem();
-      FileSKVIterator openReader = fileFactory.openReader(file.path().toString(), true, ns, ns.getConf(), this.getTableConfiguration());
+      FileSKVIterator openReader = fileFactory.openReader(file.path().toString(), true, ns, ns.getConf(), null, this.getTableConfiguration());
       try {
         Key first = openReader.getFirstKey();
         Key last = openReader.getLastKey();
@@ -1807,8 +1808,8 @@ public class Tablet implements TabletCommitter {
         AccumuloConfiguration tableConf = createTableConfiguration(tableConfiguration, plan);
 
         Span span = Trace.start("compactFiles");
-        try {
 
+        try {
           CompactionEnv cenv = new CompactionEnv() {
             @Override
             public boolean isCompactionEnabled() {
@@ -1819,6 +1820,17 @@ public class Tablet implements TabletCommitter {
             public IteratorScope getIteratorScope() {
               return IteratorScope.majc;
             }
+
+            @Override
+            public RateLimiter getReadLimiter() {
+              return getTabletServer().getMajorCompactionReadLimiter();
+            }
+
+            @Override
+            public RateLimiter getWriteLimiter() {
+              return getTabletServer().getMajorCompactionWriteLimiter();
+            }
+
           };
 
           HashMap<FileRef,DataFileValue> copy = new HashMap<FileRef,DataFileValue>(getDatafileManager().getDatafileSizes());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
index d1cd5ce..d66863e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/TabletData.java
@@ -149,7 +149,7 @@ public class TabletData {
       dataFiles.put(ref, dfv);
 
       FileSystem ns = fs.getVolumeByPath(path).getFileSystem();
-      FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), conf);
+      FileSKVIterator reader = FileOperations.getInstance().openReader(path.toString(), true, ns, ns.getConf(), null, conf);
       long maxTime = -1;
       try {
         while (reader.hasTop()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java b/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
index 7ba253f..2f90d7e 100644
--- a/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/BulkImportMonitoringIT.java
@@ -98,7 +98,7 @@ public class BulkImportMonitoringIT extends ConfigurableMacBase {
           fs.mkdirs(bulkFailures);
           fs.mkdirs(files);
           for (int i = 0; i < 10; i++) {
-            FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(),
+            FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(), null,
                 AccumuloConfiguration.getDefaultConfiguration());
             writer.startDefaultLocalityGroup();
             for (int j = 0x100; j < 0xfff; j += 3) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/CompactionRateLimitingIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/CompactionRateLimitingIT.java b/test/src/main/java/org/apache/accumulo/test/CompactionRateLimitingIT.java
new file mode 100644
index 0000000..6aa6930
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/CompactionRateLimitingIT.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.Random;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CompactionRateLimitingIT extends ConfigurableMacBase {
+  public static final long BYTES_TO_WRITE = 10 * 1024 * 1024;
+  public static final long RATE = 1 * 1024 * 1024;
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration fsConf) {
+    cfg.setProperty(Property.TSERV_MAJC_THROUGHPUT, RATE + "B");
+    cfg.setProperty(Property.TABLE_MAJC_RATIO, "20");
+    cfg.setProperty(Property.TABLE_FILE_COMPRESSION_TYPE, "none");
+  }
+
+  @Test
+  public void majorCompactionsAreRateLimited() throws Exception {
+    long bytesWritten = 0;
+    String tableName = getUniqueNames(1)[0];
+    Connector conn = getCluster().getConnector("root", new PasswordToken(ROOT_PASSWORD));
+    conn.tableOperations().create(tableName);
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    try {
+      Random r = new Random();
+      while (bytesWritten < BYTES_TO_WRITE) {
+        byte[] rowKey = new byte[32];
+        r.nextBytes(rowKey);
+
+        byte[] qual = new byte[32];
+        r.nextBytes(qual);
+
+        byte[] value = new byte[1024];
+        r.nextBytes(value);
+
+        Mutation m = new Mutation(rowKey);
+        m.put(new byte[0], qual, value);
+        bw.addMutation(m);
+
+        bytesWritten += rowKey.length + qual.length + value.length;
+      }
+    } finally {
+      bw.close();
+    }
+
+    conn.tableOperations().flush(tableName, null, null, true);
+
+    long compactionStart = System.currentTimeMillis();
+    conn.tableOperations().compact(tableName, null, null, false, true);
+    long duration = System.currentTimeMillis() - compactionStart;
+    Assert.assertTrue(
+        String.format("Expected a compaction rate of no more than %,d bytes/sec, but saw a rate of %,f bytes/sec", RATE, 1000.0 * bytesWritten / duration),
+        duration > 1000L * BYTES_TO_WRITE / RATE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java b/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java
index ada8504..3929dc7 100644
--- a/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java
+++ b/test/src/main/java/org/apache/accumulo/test/CreateRandomRFile.java
@@ -69,7 +69,7 @@ public class CreateRandomRFile {
     FileSKVWriter mfw;
     try {
       FileSystem fs = FileSystem.get(conf);
-      mfw = new RFileOperations().openWriter(file, fs, conf, AccumuloConfiguration.getDefaultConfiguration());
+      mfw = new RFileOperations().openWriter(file, fs, conf, null, AccumuloConfiguration.getDefaultConfiguration());
     } catch (IOException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java b/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java
index 64a1aaf..21b2b78 100644
--- a/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java
+++ b/test/src/main/java/org/apache/accumulo/test/GenerateSequentialRFile.java
@@ -58,7 +58,7 @@ public class GenerateSequentialRFile implements Runnable {
       final Configuration conf = new Configuration();
       Path p = new Path(opts.filePath);
       final FileSystem fs = p.getFileSystem(conf);
-      FileSKVWriter writer = FileOperations.getInstance().openWriter(opts.filePath, fs, conf, DefaultConfiguration.getInstance());
+      FileSKVWriter writer = FileOperations.getInstance().openWriter(opts.filePath, fs, conf, null, DefaultConfiguration.getInstance());
 
       writer.startDefaultLocalityGroup();
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java b/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
index 259a19e..b042d97 100644
--- a/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/GetFileInfoBulkIT.java
@@ -123,7 +123,7 @@ public class GetFileInfoBulkIT extends ConfigurableMacBase {
           fs.mkdirs(bulkFailures);
           fs.mkdirs(files);
           for (int i = 0; i < 100; i++) {
-            FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(),
+            FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(), null,
                 AccumuloConfiguration.getDefaultConfiguration());
             writer.startDefaultLocalityGroup();
             for (int j = 0x100; j < 0xfff; j += 3) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
index 28112ac..0f0c292 100644
--- a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
@@ -1338,9 +1338,9 @@ public class ShellServerIT extends SharedMiniClusterBase {
     assertTrue(errorsDir.mkdir());
     fs.mkdirs(new Path(errorsDir.toString()));
     AccumuloConfiguration aconf = AccumuloConfiguration.getDefaultConfiguration();
-    FileSKVWriter evenWriter = FileOperations.getInstance().openWriter(even, fs, conf, aconf);
+    FileSKVWriter evenWriter = FileOperations.getInstance().openWriter(even, fs, conf, null, aconf);
     evenWriter.startDefaultLocalityGroup();
-    FileSKVWriter oddWriter = FileOperations.getInstance().openWriter(odd, fs, conf, aconf);
+    FileSKVWriter oddWriter = FileOperations.getInstance().openWriter(odd, fs, conf, null, aconf);
     oddWriter.startDefaultLocalityGroup();
     long timestamp = System.currentTimeMillis();
     Text cf = new Text("cf");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/TestIngest.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/TestIngest.java b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
index 9f79c71..8e196c4 100644
--- a/test/src/main/java/org/apache/accumulo/test/TestIngest.java
+++ b/test/src/main/java/org/apache/accumulo/test/TestIngest.java
@@ -218,7 +218,8 @@ public class TestIngest {
 
     if (opts.outputFile != null) {
       Configuration conf = CachedConfiguration.getInstance();
-      writer = FileOperations.getInstance().openWriter(opts.outputFile + "." + RFile.EXTENSION, fs, conf, AccumuloConfiguration.getDefaultConfiguration());
+      writer = FileOperations.getInstance()
+          .openWriter(opts.outputFile + "." + RFile.EXTENSION, fs, conf, null, AccumuloConfiguration.getDefaultConfiguration());
       writer.startDefaultLocalityGroup();
     } else {
       bw = connector.createBatchWriter(opts.getTableName(), bwOpts.getBatchWriterConfig());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
index 1abafeb..9c2ab55 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java
@@ -74,17 +74,17 @@ public class BulkFileIT extends AccumuloClusterHarness {
 
     fs.delete(new Path(dir), true);
 
-    FileSKVWriter writer1 = FileOperations.getInstance().openWriter(dir + "/f1." + RFile.EXTENSION, fs, conf, aconf);
+    FileSKVWriter writer1 = FileOperations.getInstance().openWriter(dir + "/f1." + RFile.EXTENSION, fs, conf, null, aconf);
     writer1.startDefaultLocalityGroup();
     writeData(writer1, 0, 333);
     writer1.close();
 
-    FileSKVWriter writer2 = FileOperations.getInstance().openWriter(dir + "/f2." + RFile.EXTENSION, fs, conf, aconf);
+    FileSKVWriter writer2 = FileOperations.getInstance().openWriter(dir + "/f2." + RFile.EXTENSION, fs, conf, null, aconf);
     writer2.startDefaultLocalityGroup();
     writeData(writer2, 334, 999);
     writer2.close();
 
-    FileSKVWriter writer3 = FileOperations.getInstance().openWriter(dir + "/f3." + RFile.EXTENSION, fs, conf, aconf);
+    FileSKVWriter writer3 = FileOperations.getInstance().openWriter(dir + "/f3." + RFile.EXTENSION, fs, conf, null, aconf);
     writer3.startDefaultLocalityGroup();
     writeData(writer3, 1000, 1999);
     writer3.close();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
index fda3713..93ebdb9 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
@@ -193,7 +193,7 @@ public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
 
       Configuration conf = CachedConfiguration.getInstance();
       DefaultConfiguration acuconf = DefaultConfiguration.getInstance();
-      FileSKVIterator sample = RFileOperations.getInstance().openReader(files[0].toString(), false, FileSystem.get(conf), conf, acuconf)
+      FileSKVIterator sample = RFileOperations.getInstance().openReader(files[0].toString(), false, FileSystem.get(conf), conf, null, acuconf)
           .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
       assertNotNull(sample);
     } else {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
index 883a657..2b172c0 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
@@ -205,7 +205,7 @@ public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
 
       Configuration conf = CachedConfiguration.getInstance();
       DefaultConfiguration acuconf = DefaultConfiguration.getInstance();
-      FileSKVIterator sample = RFileOperations.getInstance().openReader(files[0].toString(), false, FileSystem.get(conf), conf, acuconf)
+      FileSKVIterator sample = RFileOperations.getInstance().openReader(files[0].toString(), false, FileSystem.get(conf), conf, null, acuconf)
           .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
       assertNotNull(sample);
     } else {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java b/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
index 1c94ff8..9db6dd1 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/metadata/FastBulkImportIT.java
@@ -90,7 +90,7 @@ public class FastBulkImportIT extends ConfigurableMacBase {
     fs.mkdirs(bulkFailures);
     fs.mkdirs(files);
     for (int i = 0; i < 100; i++) {
-      FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(),
+      FileSKVWriter writer = FileOperations.getInstance().openWriter(files.toString() + "/bulk_" + i + "." + RFile.EXTENSION, fs, fs.getConf(), null,
           AccumuloConfiguration.getDefaultConfiguration());
       writer.startDefaultLocalityGroup();
       for (int j = 0x100; j < 0xfff; j += 3) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
index f829b73..fac8c16 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
@@ -447,7 +447,7 @@ public class CollectTabletStats {
 
     for (FileRef file : files) {
       FileSystem ns = fs.getVolumeByPath(file.path()).getFileSystem();
-      FileSKVIterator reader = FileOperations.getInstance().openReader(file.path().toString(), false, ns, ns.getConf(), aconf);
+      FileSKVIterator reader = FileOperations.getInstance().openReader(file.path().toString(), false, ns, ns.getConf(), null, aconf);
       Range range = new Range(ke.getPrevEndRow(), false, ke.getEndRow(), true);
       reader.seek(range, columnSet, columnSet.size() == 0 ? false : true);
       while (reader.hasTop() && !range.afterEndKey(reader.getTopKey())) {
@@ -477,7 +477,7 @@ public class CollectTabletStats {
 
     for (FileRef file : files) {
       FileSystem ns = fs.getVolumeByPath(file.path()).getFileSystem();
-      readers.add(FileOperations.getInstance().openReader(file.path().toString(), false, ns, ns.getConf(), aconf.getConfiguration()));
+      readers.add(FileOperations.getInstance().openReader(file.path().toString(), false, ns, ns.getConf(), null, aconf.getConfiguration()));
     }
 
     List<IterInfo> emptyIterinfo = Collections.emptyList();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java b/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
index ed31f58..3ac5b51 100644
--- a/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/proxy/SimpleProxyBase.java
@@ -2077,7 +2077,7 @@ public abstract class SimpleProxyBase extends SharedMiniClusterBase {
 
     // Write an RFile
     String filename = dir + "/bulk/import/rfile.rf";
-    FileSKVWriter writer = FileOperations.getInstance().openWriter(filename, fs, fs.getConf(), DefaultConfiguration.getInstance());
+    FileSKVWriter writer = FileOperations.getInstance().openWriter(filename, fs, fs.getConf(), null, DefaultConfiguration.getInstance());
     writer.startDefaultLocalityGroup();
     writer.append(new org.apache.accumulo.core.data.Key(new Text("a"), new Text("b"), new Text("c")), new Value("value".getBytes(UTF_8)));
     writer.close();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
index c54a8e7..26ea422 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/BulkPlusOne.java
@@ -83,7 +83,7 @@ public class BulkPlusOne extends BulkImportTest {
 
     for (int i = 0; i < parts; i++) {
       String fileName = dir + "/" + String.format("part_%d.", i) + RFile.EXTENSION;
-      FileSKVWriter f = FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), defaultConfiguration);
+      FileSKVWriter f = FileOperations.getInstance().openWriter(fileName, fs, fs.getConf(), null, defaultConfiguration);
       f.startDefaultLocalityGroup();
       int start = rows.get(i);
       int end = rows.get(i + 1);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BulkImport.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BulkImport.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BulkImport.java
index 5af08ec..ec89a5a 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BulkImport.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/concurrent/BulkImport.java
@@ -36,6 +36,7 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.blockfile.impl.CachableBlockFile;
 import org.apache.accumulo.core.file.rfile.RFile;
+import org.apache.accumulo.core.file.streams.PositionedOutputs;
 import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.test.randomwalk.Environment;
 import org.apache.accumulo.test.randomwalk.State;
@@ -52,8 +53,8 @@ public class BulkImport extends Test {
 
     public RFileBatchWriter(Configuration conf, FileSystem fs, String file) throws IOException {
       AccumuloConfiguration aconf = AccumuloConfiguration.getDefaultConfiguration();
-      CachableBlockFile.Writer cbw = new CachableBlockFile.Writer(fs.create(new Path(file), false, conf.getInt("io.file.buffer.size", 4096),
-          (short) conf.getInt("dfs.replication", 3), conf.getLong("dfs.block.size", 1 << 26)), "gz", conf, aconf);
+      CachableBlockFile.Writer cbw = new CachableBlockFile.Writer(PositionedOutputs.wrap(fs.create(new Path(file), false,
+          conf.getInt("io.file.buffer.size", 4096), (short) conf.getInt("dfs.replication", 3), conf.getLong("dfs.block.size", 1 << 26))), "gz", conf, aconf);
       writer = new RFile.Writer(cbw, 100000);
       writer.startDefaultLocalityGroup();
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/783314c8/test/src/main/java/org/apache/accumulo/test/randomwalk/security/TableOp.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/security/TableOp.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/security/TableOp.java
index 2612fc9..4a60866 100644
--- a/test/src/main/java/org/apache/accumulo/test/randomwalk/security/TableOp.java
+++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/security/TableOp.java
@@ -203,7 +203,7 @@ public class TableOp extends Test {
         Path dir = new Path("/tmp", "bulk_" + UUID.randomUUID().toString());
         Path fail = new Path(dir.toString() + "_fail");
         FileSystem fs = WalkingSecurity.get(state, env).getFs();
-        FileSKVWriter f = FileOperations.getInstance().openWriter(dir + "/securityBulk." + RFile.EXTENSION, fs, fs.getConf(),
+        FileSKVWriter f = FileOperations.getInstance().openWriter(dir + "/securityBulk." + RFile.EXTENSION, fs, fs.getConf(), null,
             AccumuloConfiguration.getDefaultConfiguration());
         f.startDefaultLocalityGroup();
         fs.mkdirs(fail);


Mime
View raw message