accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [3/9] accumulo git commit: ACCUMULO-4501 ACCUMULO-96 Added Summarization
Date Mon, 20 Mar 2017 14:48:59 GMT
http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/util/CancelFlagFuture.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/CancelFlagFuture.java b/core/src/main/java/org/apache/accumulo/core/util/CancelFlagFuture.java
new file mode 100644
index 0000000..9da91c0
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/CancelFlagFuture.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.util;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * A simple future wrapper that will set an atomic boolean to true if a future is successfully canceled
+ */
+public class CancelFlagFuture<T> implements Future<T> {
+
+  private Future<T> wrappedFuture;
+  private AtomicBoolean cancelFlag;
+
+  public CancelFlagFuture(Future<T> wrappedFuture, AtomicBoolean cancelFlag) {
+    this.wrappedFuture = wrappedFuture;
+    this.cancelFlag = cancelFlag;
+  }
+
+  @Override
+  public boolean cancel(boolean mayInterruptIfRunning) {
+    boolean ret = wrappedFuture.cancel(mayInterruptIfRunning);
+    if (ret) {
+      cancelFlag.set(true);
+    }
+    return ret;
+  }
+
+  @Override
+  public boolean isCancelled() {
+    return wrappedFuture.isCancelled();
+  }
+
+  @Override
+  public boolean isDone() {
+    return wrappedFuture.isDone();
+  }
+
+  @Override
+  public T get() throws InterruptedException, ExecutionException {
+    return wrappedFuture.get();
+  }
+
+  @Override
+  public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+    return wrappedFuture.get(timeout, unit);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java b/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java
new file mode 100644
index 0000000..c417b0f
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/util/CompletableFutureUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.util;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+
+public class CompletableFutureUtil {
+
+  // create a binary tree of completable future operations, where each node in the tree merges the results of their children when complete
+  public static <T> CompletableFuture<T> merge(List<CompletableFuture<T>> futures, BiFunction<T,T,T> mergeFunc, Supplier<T> nothing) {
+    if (futures.size() == 0) {
+      return CompletableFuture.completedFuture(nothing.get());
+    }
+    while (futures.size() > 1) {
+      ArrayList<CompletableFuture<T>> mergedFutures = new ArrayList<>(futures.size() / 2);
+      for (int i = 0; i < futures.size(); i += 2) {
+        if (i + 1 == futures.size()) {
+          mergedFutures.add(futures.get(i));
+        } else {
+          mergedFutures.add(futures.get(i).thenCombine(futures.get(i + 1), mergeFunc));
+        }
+      }
+
+      futures = mergedFutures;
+    }
+
+    return futures.get(0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/thrift/data.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/data.thrift b/core/src/main/thrift/data.thrift
index 3e5c56c..648c37d 100644
--- a/core/src/main/thrift/data.thrift
+++ b/core/src/main/thrift/data.thrift
@@ -149,6 +149,40 @@ struct TConditionalSession {
         3:i64 ttl;
 }
 
+struct TSummarizerConfiguration {
+  1:string classname
+  2:map<string, string> options
+  3:string configId
+}
+
+struct TSummary {
+   1:map<string, i64> summary
+   2:TSummarizerConfiguration config
+   3:i64 filesContaining
+   4:i64 filesExceeding
+   5:i64 filesLarge
+}
+
+struct TSummaries {
+   1:bool finished
+   2:i64 sessionId
+   3:i64 totalFiles
+   4:i64 deletedFiles
+   5:list<TSummary> summaries
+}
+
+struct TRowRange {
+  1:binary startRow
+  2:binary endRow
+}
+
+struct TSummaryRequest {
+  1:string tableId
+  2:TRowRange bounds
+  3:list<TSummarizerConfiguration> summarizers
+  4:string summarizerPattern
+}
+
 typedef map<TKeyExtent,list<TConditionalMutation>> CMBatch
 
 typedef map<TKeyExtent,list<TMutation>> UpdateBatch

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/main/thrift/tabletserver.thrift
----------------------------------------------------------------------
diff --git a/core/src/main/thrift/tabletserver.thrift b/core/src/main/thrift/tabletserver.thrift
index 7697a2d..b56449f 100644
--- a/core/src/main/thrift/tabletserver.thrift
+++ b/core/src/main/thrift/tabletserver.thrift
@@ -230,6 +230,11 @@ service TabletClientService extends client.ClientService {
   list<ActiveCompaction> getActiveCompactions(2:trace.TInfo tinfo, 1:security.TCredentials credentials) throws (1:client.ThriftSecurityException sec)
   oneway void removeLogs(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:list<string> filenames)
   list<string> getActiveLogs(1:trace.TInfo tinfo, 2:security.TCredentials credentials)
+
+  data.TSummaries startGetSummaries(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:data.TSummaryRequest request) throws (1:client.ThriftSecurityException sec, 2:client.ThriftTableOperationException tope)
+  data.TSummaries startGetSummariesForPartition(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:data.TSummaryRequest request, 4:i32 modulus, 5:i32 remainder) throws (1:client.ThriftSecurityException sec)
+  data.TSummaries startGetSummariesFromFiles(1:trace.TInfo tinfo, 2:security.TCredentials credentials, 3:data.TSummaryRequest request, 4:map<string,list<data.TRowRange>> files) throws (1:client.ThriftSecurityException sec)
+  data.TSummaries contiuneGetSummaries(1:trace.TInfo tinfo, 2:i64 sessionId) throws (1:NoSuchScanIDException nssi)
 }
 
 typedef i32 TabletID

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
index 1d699c2..496cde3 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TableOperationsHelperTest.java
@@ -27,6 +27,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeMap;
+import java.util.function.Predicate;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -37,8 +38,10 @@ import org.apache.accumulo.core.client.admin.CompactionConfig;
 import org.apache.accumulo.core.client.admin.DiskUsage;
 import org.apache.accumulo.core.client.admin.Locations;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.SummaryRetriever;
 import org.apache.accumulo.core.client.admin.TimeType;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
 import org.apache.accumulo.core.security.Authorizations;
@@ -249,6 +252,29 @@ public class TableOperationsHelperTest {
     public Locations locate(String tableName, Collection<Range> ranges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public SummaryRetriever summaries(String tableName) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void addSummarizers(String tableName, SummarizerConfiguration... summarizerConf) throws TableNotFoundException, AccumuloException,
+        AccumuloSecurityException {
+      throw new UnsupportedOperationException();
+
+    }
+
+    @Override
+    public void removeSummarizers(String tableName, Predicate<SummarizerConfiguration> predicate) throws AccumuloException, TableNotFoundException,
+        AccumuloSecurityException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<SummarizerConfiguration> listSummarizers(String tableName) throws AccumuloException, TableNotFoundException, AccumuloSecurityException {
+      throw new UnsupportedOperationException();
+    }
   }
 
   protected TableOperationsHelper getHelper() {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java
index d85db92..367c833 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mapred/AccumuloFileOutputFormatTest.java
@@ -17,12 +17,18 @@
 package org.apache.accumulo.core.client.mapred;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Collection;
 
 import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator;
 import org.apache.accumulo.core.client.sample.RowSampler;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.CountingSummarizer;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer;
+import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
@@ -43,6 +49,9 @@ public class AccumuloFileOutputFormatTest {
     samplerConfig.addOption("hasher", "murmur3_32");
     samplerConfig.addOption("modulus", "109");
 
+    SummarizerConfiguration sc1 = SummarizerConfiguration.builder(VisibilitySummarizer.class).addOption(CountingSummarizer.MAX_COUNTERS_OPT, 2048).build();
+    SummarizerConfiguration sc2 = SummarizerConfiguration.builder(FamilySummarizer.class).addOption(CountingSummarizer.MAX_COUNTERS_OPT, 256).build();
+
     JobConf job = new JobConf();
     AccumuloFileOutputFormat.setReplication(job, a);
     AccumuloFileOutputFormat.setFileBlockSize(job, b);
@@ -50,6 +59,7 @@ public class AccumuloFileOutputFormatTest {
     AccumuloFileOutputFormat.setIndexBlockSize(job, d);
     AccumuloFileOutputFormat.setCompressionType(job, e);
     AccumuloFileOutputFormat.setSampler(job, samplerConfig);
+    AccumuloFileOutputFormat.setSummarizers(job, sc1, sc2);
 
     AccumuloConfiguration acuconf = FileOutputConfigurator.getAccumuloConfiguration(AccumuloFileOutputFormat.class, job);
 
@@ -60,6 +70,11 @@ public class AccumuloFileOutputFormatTest {
     assertEquals("snappy", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
     assertEquals(new SamplerConfigurationImpl(samplerConfig), SamplerConfigurationImpl.newSamplerConfig(acuconf));
 
+    Collection<SummarizerConfiguration> summarizerConfigs = SummarizerConfiguration.fromTableProperties(acuconf);
+    assertEquals(2, summarizerConfigs.size());
+    assertTrue(summarizerConfigs.contains(sc1));
+    assertTrue(summarizerConfigs.contains(sc2));
+
     a = 17;
     b = 1300l;
     c = 150l;
@@ -85,5 +100,8 @@ public class AccumuloFileOutputFormatTest {
     assertEquals(110l, acuconf.getMemoryInBytes(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE_INDEX));
     assertEquals("lzo", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
     assertEquals(new SamplerConfigurationImpl(samplerConfig), SamplerConfigurationImpl.newSamplerConfig(acuconf));
+
+    summarizerConfigs = SummarizerConfiguration.fromTableProperties(acuconf);
+    assertEquals(0, summarizerConfigs.size());
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
index 39d226b..5143453 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloFileOutputFormatTest.java
@@ -17,12 +17,18 @@
 package org.apache.accumulo.core.client.mapreduce;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Collection;
 
 import org.apache.accumulo.core.client.mapreduce.lib.impl.FileOutputConfigurator;
 import org.apache.accumulo.core.client.sample.RowSampler;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.CountingSummarizer;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer;
+import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
@@ -43,6 +49,9 @@ public class AccumuloFileOutputFormatTest {
     samplerConfig.addOption("hasher", "murmur3_32");
     samplerConfig.addOption("modulus", "109");
 
+    SummarizerConfiguration sc1 = SummarizerConfiguration.builder(VisibilitySummarizer.class).addOption(CountingSummarizer.MAX_COUNTERS_OPT, 2048).build();
+    SummarizerConfiguration sc2 = SummarizerConfiguration.builder(FamilySummarizer.class).addOption(CountingSummarizer.MAX_COUNTERS_OPT, 256).build();
+
     Job job1 = Job.getInstance();
     AccumuloFileOutputFormat.setReplication(job1, a);
     AccumuloFileOutputFormat.setFileBlockSize(job1, b);
@@ -50,6 +59,7 @@ public class AccumuloFileOutputFormatTest {
     AccumuloFileOutputFormat.setIndexBlockSize(job1, d);
     AccumuloFileOutputFormat.setCompressionType(job1, e);
     AccumuloFileOutputFormat.setSampler(job1, samplerConfig);
+    AccumuloFileOutputFormat.setSummarizers(job1, sc1, sc2);
 
     AccumuloConfiguration acuconf = FileOutputConfigurator.getAccumuloConfiguration(AccumuloFileOutputFormat.class, job1.getConfiguration());
 
@@ -60,6 +70,11 @@ public class AccumuloFileOutputFormatTest {
     assertEquals("snappy", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
     assertEquals(new SamplerConfigurationImpl(samplerConfig), SamplerConfigurationImpl.newSamplerConfig(acuconf));
 
+    Collection<SummarizerConfiguration> summarizerConfigs = SummarizerConfiguration.fromTableProperties(acuconf);
+    assertEquals(2, summarizerConfigs.size());
+    assertTrue(summarizerConfigs.contains(sc1));
+    assertTrue(summarizerConfigs.contains(sc2));
+
     a = 17;
     b = 1300l;
     c = 150l;
@@ -86,5 +101,8 @@ public class AccumuloFileOutputFormatTest {
     assertEquals("lzo", acuconf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
     assertEquals(new SamplerConfigurationImpl(samplerConfig), SamplerConfigurationImpl.newSamplerConfig(acuconf));
 
+    summarizerConfigs = SummarizerConfiguration.fromTableProperties(acuconf);
+    assertEquals(0, summarizerConfigs.size());
+
   }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
index 4993810..3352b0f 100644
--- a/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/client/rfile/RFileTest.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -36,6 +37,11 @@ import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.client.sample.RowSampler;
 import org.apache.accumulo.core.client.sample.SamplerConfiguration;
+import org.apache.accumulo.core.client.summary.CounterSummary;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
+import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer;
+import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.ArrayByteSequence;
@@ -83,6 +89,10 @@ public class RFileTest {
   }
 
   private SortedMap<Key,Value> createTestData(int startRow, int rows, int startFamily, int families, int qualifiers) {
+    return createTestData(startRow, rows, startFamily, families, qualifiers, "");
+  }
+
+  private SortedMap<Key,Value> createTestData(int startRow, int rows, int startFamily, int families, int qualifiers, String... vis) {
     TreeMap<Key,Value> testData = new TreeMap<>();
 
     for (int r = 0; r < rows; r++) {
@@ -91,8 +101,10 @@ public class RFileTest {
         String fam = colStr(f + startFamily);
         for (int q = 0; q < qualifiers; q++) {
           String qual = colStr(q);
-          Key k = new Key(row, fam, qual);
-          testData.put(k, new Value((k.hashCode() + "").getBytes()));
+          for (String v : vis) {
+            Key k = new Key(row, fam, qual, v);
+            testData.put(k, new Value((k.hashCode() + "").getBytes()));
+          }
         }
       }
     }
@@ -498,6 +510,148 @@ public class RFileTest {
     scanner.close();
   }
 
+  @Test
+  public void testSummaries() throws Exception {
+    SummarizerConfiguration sc1 = SummarizerConfiguration.builder(VisibilitySummarizer.class).build();
+    SummarizerConfiguration sc2 = SummarizerConfiguration.builder(FamilySummarizer.class).build();
+
+    LocalFileSystem localFs = FileSystem.getLocal(new Configuration());
+    String testFile = createTmpTestFile();
+
+    SortedMap<Key,Value> testData1 = createTestData(0, 100, 0, 4, 1, "A&B", "A&B&C");
+
+    RFileWriter writer = RFile.newWriter().to(testFile).withFileSystem(localFs).withSummarizers(sc1, sc2).build();
+    writer.append(testData1.entrySet());
+    writer.close();
+
+    // verify summary data
+    Collection<Summary> summaries = RFile.summaries().from(testFile).withFileSystem(localFs).read();
+    Assert.assertEquals(2, summaries.size());
+    for (Summary summary : summaries) {
+      Assert.assertEquals(0, summary.getFileStatistics().getInaccurate());
+      Assert.assertEquals(1, summary.getFileStatistics().getTotal());
+      String className = summary.getSummarizerConfiguration().getClassName();
+      CounterSummary counterSummary = new CounterSummary(summary);
+      if (className.equals(FamilySummarizer.class.getName())) {
+        Map<String,Long> counters = counterSummary.getCounters();
+        Map<String,Long> expected = ImmutableMap.of("0000", 200l, "0001", 200l, "0002", 200l, "0003", 200l);
+        Assert.assertEquals(expected, counters);
+      } else if (className.equals(VisibilitySummarizer.class.getName())) {
+        Map<String,Long> counters = counterSummary.getCounters();
+        Map<String,Long> expected = ImmutableMap.of("A&B", 400l, "A&B&C", 400l);
+        Assert.assertEquals(expected, counters);
+      } else {
+        Assert.fail("Unexpected classname " + className);
+      }
+    }
+
+    // check if writing summary data impacted normal rfile functionality
+    Scanner scanner = RFile.newScanner().from(testFile).withFileSystem(localFs).withAuthorizations(new Authorizations("A", "B", "C")).build();
+    Assert.assertEquals(testData1, toMap(scanner));
+    scanner.close();
+
+    String testFile2 = createTmpTestFile();
+    SortedMap<Key,Value> testData2 = createTestData(100, 100, 0, 4, 1, "A&B", "A&B&C");
+    writer = RFile.newWriter().to(testFile2).withFileSystem(localFs).withSummarizers(sc1, sc2).build();
+    writer.append(testData2.entrySet());
+    writer.close();
+
+    // verify reading summaries from multiple files works
+    summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).read();
+    Assert.assertEquals(2, summaries.size());
+    for (Summary summary : summaries) {
+      Assert.assertEquals(0, summary.getFileStatistics().getInaccurate());
+      Assert.assertEquals(2, summary.getFileStatistics().getTotal());
+      String className = summary.getSummarizerConfiguration().getClassName();
+      CounterSummary counterSummary = new CounterSummary(summary);
+      if (className.equals(FamilySummarizer.class.getName())) {
+        Map<String,Long> counters = counterSummary.getCounters();
+        Map<String,Long> expected = ImmutableMap.of("0000", 400l, "0001", 400l, "0002", 400l, "0003", 400l);
+        Assert.assertEquals(expected, counters);
+      } else if (className.equals(VisibilitySummarizer.class.getName())) {
+        Map<String,Long> counters = counterSummary.getCounters();
+        Map<String,Long> expected = ImmutableMap.of("A&B", 800l, "A&B&C", 800l);
+        Assert.assertEquals(expected, counters);
+      } else {
+        Assert.fail("Unexpected classname " + className);
+      }
+    }
+
+    // verify reading a subset of summaries works
+    summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).read();
+    checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 0);
+
+    // the following test check boundry conditions for start row and end row
+    summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(99)).read();
+    checkSummaries(summaries, ImmutableMap.of("A&B", 400l, "A&B&C", 400l), 0);
+
+    summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(98)).read();
+    checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 1);
+
+    summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(0)).read();
+    checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 1);
+
+    summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow("#").read();
+    checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 0);
+
+    summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(100)).read();
+    checkSummaries(summaries, ImmutableMap.of("A&B", 400l, "A&B&C", 400l), 1);
+
+    summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(99)).read();
+    checkSummaries(summaries, ImmutableMap.of("A&B", 400l, "A&B&C", 400l), 0);
+
+    summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(100)).read();
+    checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 1);
+
+    summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(199)).read();
+    checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 0);
+
+    summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(50))
+        .endRow(rowStr(150)).read();
+    checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 2);
+
+    summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(120))
+        .endRow(rowStr(150)).read();
+    checkSummaries(summaries, ImmutableMap.of("A&B", 400l, "A&B&C", 400l), 1);
+
+    summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(50))
+        .endRow(rowStr(199)).read();
+    checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 1);
+
+    summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow("#").endRow(rowStr(150))
+        .read();
+    checkSummaries(summaries, ImmutableMap.of("A&B", 800l, "A&B&C", 800l), 1);
+
+    summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(199)).read();
+    checkSummaries(summaries, ImmutableMap.of(), 0);
+    summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).startRow(rowStr(200)).read();
+    checkSummaries(summaries, ImmutableMap.of(), 0);
+
+    summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).endRow("#").read();
+    checkSummaries(summaries, ImmutableMap.of(), 0);
+
+    summaries = RFile.summaries().from(testFile, testFile2).withFileSystem(localFs).selectSummaries(sc -> sc.equals(sc1)).endRow(rowStr(0)).read();
+    checkSummaries(summaries, ImmutableMap.of("A&B", 400l, "A&B&C", 400l), 1);
+  }
+
+  private void checkSummaries(Collection<Summary> summaries, Map<String,Long> expected, int extra) {
+    Assert.assertEquals(1, summaries.size());
+    for (Summary summary : summaries) {
+      Assert.assertEquals(extra, summary.getFileStatistics().getInaccurate());
+      Assert.assertEquals(extra, summary.getFileStatistics().getExtra());
+      Assert.assertEquals(2, summary.getFileStatistics().getTotal());
+      String className = summary.getSummarizerConfiguration().getClassName();
+      CounterSummary counterSummary = new CounterSummary(summary);
+      if (className.equals(VisibilitySummarizer.class.getName())) {
+        Map<String,Long> counters = counterSummary.getCounters();
+
+        Assert.assertEquals(expected, counters);
+      } else {
+        Assert.fail("Unexpected classname " + className);
+      }
+    }
+  }
+
   @Test(expected = IllegalArgumentException.class)
   public void testOutOfOrder() throws Exception {
     // test that exception declared in API is thrown

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/client/summary/CountingSummarizerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/client/summary/CountingSummarizerTest.java b/core/src/test/java/org/apache/accumulo/core/client/summary/CountingSummarizerTest.java
new file mode 100644
index 0000000..06f8d35
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/client/summary/CountingSummarizerTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.client.summary;
+
+import static org.apache.accumulo.core.client.summary.CountingSummarizer.COUNTER_STAT_PREFIX;
+import static org.apache.accumulo.core.client.summary.CountingSummarizer.DELETES_IGNORED_STAT;
+import static org.apache.accumulo.core.client.summary.CountingSummarizer.EMITTED_STAT;
+import static org.apache.accumulo.core.client.summary.CountingSummarizer.INGNORE_DELETES_OPT;
+import static org.apache.accumulo.core.client.summary.CountingSummarizer.MAX_COUNTERS_OPT;
+import static org.apache.accumulo.core.client.summary.CountingSummarizer.MAX_COUNTER_LEN_OPT;
+import static org.apache.accumulo.core.client.summary.CountingSummarizer.SEEN_STAT;
+import static org.apache.accumulo.core.client.summary.CountingSummarizer.TOO_LONG_STAT;
+import static org.apache.accumulo.core.client.summary.CountingSummarizer.TOO_MANY_STAT;
+
+import java.util.Arrays;
+import java.util.HashMap;
+
+import org.apache.accumulo.core.client.summary.CounterSummary;
+import org.apache.accumulo.core.client.summary.CountingSummarizer;
+import org.apache.accumulo.core.client.summary.Summarizer;
+import org.apache.accumulo.core.client.summary.Summarizer.Collector;
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer;
+import org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CountingSummarizerTest {
+
+  public static class MultiSummarizer extends CountingSummarizer<String> {
+    @Override
+    protected Converter<String> converter() {
+      return (k, v, c) -> {
+        c.accept("rp:" + k.getRowData().subSequence(0, 2).toString());
+        c.accept("fp:" + k.getColumnFamilyData().subSequence(0, 2).toString());
+        c.accept("qp:" + k.getColumnQualifierData().subSequence(0, 2).toString());
+      };
+    }
+  }
+
+  @Test
+  public void testMultipleEmit() {
+    SummarizerConfiguration sc = SummarizerConfiguration.builder(MultiSummarizer.class).build();
+    MultiSummarizer countSum = new MultiSummarizer();
+
+    Summarizer.Collector collector = countSum.collector(sc);
+
+    Value val = new Value("abc");
+
+    HashMap<String,Long> expected = new HashMap<>();
+
+    for (String row : new String[] {"ask", "asleep", "some", "soul"}) {
+      for (String fam : new String[] {"hop", "hope", "nope", "noop"}) {
+        for (String qual : new String[] {"mad", "lad", "lab", "map"}) {
+          collector.accept(new Key(row, fam, qual), val);
+
+          expected.merge("rp:" + row.substring(0, 2), 1l, Long::sum);
+          expected.merge("fp:" + fam.substring(0, 2), 1l, Long::sum);
+          expected.merge("qp:" + qual.substring(0, 2), 1l, Long::sum);
+        }
+      }
+    }
+
+    HashMap<String,Long> stats = new HashMap<>();
+    collector.summarize((k, v) -> stats.put(k, v));
+
+    CounterSummary csum = new CounterSummary(stats);
+    Assert.assertEquals(expected, csum.getCounters());
+    Assert.assertEquals(64, csum.getSeen());
+    Assert.assertEquals(3 * 64, csum.getEmitted());
+    Assert.assertEquals(0, csum.getIgnored());
+    Assert.assertEquals(0, csum.getDeletesIgnored());
+  }
+
+  @Test
+  public void testSummarizing() {
+    SummarizerConfiguration sc = SummarizerConfiguration.builder(FamilySummarizer.class).addOptions(MAX_COUNTERS_OPT, "5", MAX_COUNTER_LEN_OPT, "10").build();
+    FamilySummarizer countSum = new FamilySummarizer();
+
+    Value val = new Value("abc");
+
+    Summarizer.Collector collector = countSum.collector(sc);
+    for (String fam : Arrays.asList("f1", "f1", "f1", "f2", "f1", "f70000000000000000000", "f70000000000000000001", "f2", "f3", "f4", "f5", "f6", "f7", "f3",
+        "f7")) {
+      collector.accept(new Key("r", fam), val);
+    }
+
+    Key dk = new Key("r", "f2");
+    dk.setDeleted(true);
+    collector.accept(dk, new Value(""));
+
+    HashMap<String,Long> stats = new HashMap<>();
+    collector.summarize((k, v) -> stats.put(k, v));
+
+    String p = COUNTER_STAT_PREFIX;
+
+    HashMap<String,Long> expected = new HashMap<>();
+    expected.put(p + "f1", 4l);
+    expected.put(p + "f2", 2l);
+    expected.put(p + "f3", 2l);
+    expected.put(p + "f4", 1l);
+    expected.put(p + "f5", 1l);
+    expected.put(TOO_LONG_STAT, 2l);
+    expected.put(TOO_MANY_STAT, 3l);
+    expected.put(SEEN_STAT, 16l);
+    expected.put(EMITTED_STAT, 15l);
+    expected.put(DELETES_IGNORED_STAT, 1l);
+
+    Assert.assertEquals(expected, stats);
+
+    CounterSummary csum = new CounterSummary(stats);
+    Assert.assertEquals(5, csum.getIgnored());
+    Assert.assertEquals(3, csum.getTooMany());
+    Assert.assertEquals(2, csum.getTooLong());
+    Assert.assertEquals(16, csum.getSeen());
+    Assert.assertEquals(15, csum.getEmitted());
+    Assert.assertEquals(1, csum.getDeletesIgnored());
+
+    expected.clear();
+    expected.put("f1", 4l);
+    expected.put("f2", 2l);
+    expected.put("f3", 2l);
+    expected.put("f4", 1l);
+    expected.put("f5", 1l);
+    Assert.assertEquals(expected, csum.getCounters());
+
+  }
+
+  @Test
+  public void testMerge() {
+    SummarizerConfiguration sc = SummarizerConfiguration.builder(VisibilitySummarizer.class).addOption(MAX_COUNTERS_OPT, "5").build();
+    VisibilitySummarizer countSum = new VisibilitySummarizer();
+
+    String p = COUNTER_STAT_PREFIX;
+
+    HashMap<String,Long> sm1 = new HashMap<>();
+    sm1.put(p + "f001", 9l);
+    sm1.put(p + "f002", 4l);
+    sm1.put(p + "f003", 2l);
+    sm1.put(p + "f004", 1l);
+    sm1.put(p + "f005", 19l);
+    sm1.put(EMITTED_STAT, 15l);
+    sm1.put(SEEN_STAT, 5l);
+    sm1.put(DELETES_IGNORED_STAT, 1l);
+
+    HashMap<String,Long> sm2 = new HashMap<>();
+    sm2.put(p + "f001", 1l);
+    sm2.put(p + "f002", 2l);
+    sm2.put(p + "f00a", 7l);
+    sm2.put(p + "f00b", 1l);
+    sm2.put(p + "f00c", 17l);
+    sm2.put(EMITTED_STAT, 18l);
+    sm2.put(SEEN_STAT, 6l);
+    sm2.put(DELETES_IGNORED_STAT, 2l);
+
+    countSum.combiner(sc).merge(sm1, sm2);
+
+    HashMap<String,Long> expected = new HashMap<>();
+    expected.put(p + "f001", 10l);
+    expected.put(p + "f002", 6l);
+    expected.put(p + "f005", 19l);
+    expected.put(p + "f00a", 7l);
+    expected.put(p + "f00c", 17l);
+    expected.put(TOO_LONG_STAT, 0l);
+    expected.put(TOO_MANY_STAT, 4l);
+    expected.put(EMITTED_STAT, 18l + 15l);
+    expected.put(SEEN_STAT, 6l + 5l);
+    expected.put(DELETES_IGNORED_STAT, 3l);
+
+    Assert.assertEquals(expected, sm1);
+
+    sm2.clear();
+    sm2.put(p + "f001", 19l);
+    sm2.put(p + "f002", 2l);
+    sm2.put(p + "f003", 3l);
+    sm2.put(p + "f00b", 13l);
+    sm2.put(p + "f00c", 2l);
+    sm2.put(TOO_LONG_STAT, 1l);
+    sm2.put(TOO_MANY_STAT, 3l);
+    sm2.put(EMITTED_STAT, 21l);
+    sm2.put(SEEN_STAT, 7l);
+    sm2.put(DELETES_IGNORED_STAT, 5l);
+
+    countSum.combiner(sc).merge(sm1, sm2);
+
+    expected.clear();
+    expected.put(p + "f001", 29l);
+    expected.put(p + "f002", 8l);
+    expected.put(p + "f005", 19l);
+    expected.put(p + "f00b", 13l);
+    expected.put(p + "f00c", 19l);
+    expected.put(TOO_LONG_STAT, 1l);
+    expected.put(TOO_MANY_STAT, 17l);
+    expected.put(EMITTED_STAT, 21l + 18 + 15);
+    expected.put(SEEN_STAT, 7l + 6 + 5);
+    expected.put(DELETES_IGNORED_STAT, 8l);
+  }
+
+  @Test
+  public void testCountDeletes() {
+    SummarizerConfiguration sc = SummarizerConfiguration.builder(FamilySummarizer.class).addOptions(INGNORE_DELETES_OPT, "false").build();
+    FamilySummarizer countSum = new FamilySummarizer();
+
+    Key k1 = new Key("r1", "f1");
+    Key k2 = new Key("r1", "f1");
+    k2.setDeleted(true);
+    Key k3 = new Key("r1", "f2");
+
+    Collector collector = countSum.collector(sc);
+    collector.accept(k1, new Value(""));
+    collector.accept(k2, new Value(""));
+    collector.accept(k3, new Value(""));
+
+    String p = COUNTER_STAT_PREFIX;
+
+    HashMap<String,Long> expected = new HashMap<>();
+    expected.put(p + "f1", 2l);
+    expected.put(p + "f2", 1l);
+    expected.put(TOO_LONG_STAT, 0l);
+    expected.put(TOO_MANY_STAT, 0l);
+    expected.put(SEEN_STAT, 3l);
+    expected.put(EMITTED_STAT, 3l);
+    expected.put(DELETES_IGNORED_STAT, 0l);
+
+    HashMap<String,Long> stats = new HashMap<>();
+    collector.summarize(stats::put);
+    Assert.assertEquals(expected, stats);
+
+    CounterSummary csum = new CounterSummary(stats);
+    Assert.assertEquals(0, csum.getIgnored());
+    Assert.assertEquals(0, csum.getTooMany());
+    Assert.assertEquals(0, csum.getTooLong());
+    Assert.assertEquals(3, csum.getSeen());
+    Assert.assertEquals(3, csum.getEmitted());
+    Assert.assertEquals(0, csum.getDeletesIgnored());
+
+    expected.clear();
+    expected.put("f1", 2l);
+    expected.put("f2", 1l);
+    Assert.assertEquals(expected, csum.getCounters());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/summary/SummaryCollectionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/summary/SummaryCollectionTest.java b/core/src/test/java/org/apache/accumulo/core/summary/SummaryCollectionTest.java
new file mode 100644
index 0000000..95702d3
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/summary/SummaryCollectionTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.summary;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
+import org.apache.accumulo.core.client.summary.Summary;
+import org.apache.accumulo.core.client.summary.Summary.FileStatistics;
+import org.apache.accumulo.core.client.summary.summarizers.FamilySummarizer;
+import org.apache.accumulo.core.summary.SummaryCollection.FileSummary;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SummaryCollectionTest {
+  @Test
+  public void testDeleted() {
+    SummarizerConfiguration conf = SummarizerConfiguration.builder(FamilySummarizer.class).build();
+
+    HashMap<String,Long> stats = new HashMap<String,Long>();
+    stats.put("c:foo", 9L);
+    FileSummary fs1 = new FileSummary(conf, stats, false);
+    SummaryCollection sc1 = new SummaryCollection(Collections.singleton(fs1));
+
+    stats = new HashMap<String,Long>();
+    stats.put("c:foo", 5L);
+    stats.put("c:bar", 3L);
+    FileSummary fs2 = new FileSummary(conf, stats, true);
+    SummaryCollection sc2 = new SummaryCollection(Collections.singleton(fs2));
+
+    SummaryCollection sc3 = new SummaryCollection(Collections.emptyList());
+
+    SummaryCollection sc4 = new SummaryCollection(Collections.emptyList(), true);
+
+    SummarizerFactory factory = new SummarizerFactory();
+    SummaryCollection mergeSc = new SummaryCollection();
+    for (SummaryCollection sc : Arrays.asList(sc1, sc2, sc3, sc4, sc4)) {
+      mergeSc.merge(sc, factory);
+    }
+
+    for (SummaryCollection sc : Arrays.asList(mergeSc, new SummaryCollection(mergeSc.toThrift()))) {
+      List<Summary> summaries = sc.getSummaries();
+      Assert.assertEquals(1, summaries.size());
+      Summary summary = summaries.get(0);
+      FileStatistics filestats = summary.getFileStatistics();
+      Assert.assertEquals(5, filestats.getTotal());
+      Assert.assertEquals(1, filestats.getExtra());
+      Assert.assertEquals(0, filestats.getLarge());
+      Assert.assertEquals(1, filestats.getMissing());
+      Assert.assertEquals(2, filestats.getDeleted());
+      Assert.assertEquals(4, filestats.getInaccurate());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java b/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java
new file mode 100644
index 0000000..9de580d
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/util/CompletableFutureUtilTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.core.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CompletableFutureUtilTest {
+  @Test
+  public void testMerge() throws Exception {
+    ExecutorService es = Executors.newFixedThreadPool(3);
+    try {
+      for (int n : new int[] {1, 2, 3, 997, 1000}) {
+        List<CompletableFuture<Integer>> futures = new ArrayList<>();
+        for (int i = 1; i <= n; i++) {
+          final int num = i;
+          futures.add(CompletableFuture.supplyAsync(() -> num, es));
+        }
+
+        CompletableFuture<Integer> mergedFutures = CompletableFutureUtil.merge(futures, Integer::sum, () -> 0);
+        Assert.assertEquals(n * (n + 1) / 2, mergedFutures.get().intValue());
+      }
+
+      // test zero
+      CompletableFuture<Integer> mergedFutures = CompletableFutureUtil.merge(Collections.emptyList(), Integer::sum, () -> 0);
+      Assert.assertEquals(0, mergedFutures.get().intValue());
+    } finally {
+      es.shutdown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/main/asciidoc/accumulo_user_manual.asciidoc b/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
index fe5403c..979d943 100644
--- a/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
+++ b/docs/src/main/asciidoc/accumulo_user_manual.asciidoc
@@ -63,6 +63,8 @@ include::chapters/kerberos.txt[]
 
 include::chapters/sampling.txt[]
 
+include::chapters/summaries.txt[]
+
 include::chapters/administration.txt[]
 
 include::chapters/multivolume.txt[]

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/docs/src/main/asciidoc/chapters/summaries.txt
----------------------------------------------------------------------
diff --git a/docs/src/main/asciidoc/chapters/summaries.txt b/docs/src/main/asciidoc/chapters/summaries.txt
new file mode 100644
index 0000000..08d8011
--- /dev/null
+++ b/docs/src/main/asciidoc/chapters/summaries.txt
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+== Summary Statistics
+
+=== Overview
+
+Accumulo has the ability to generate summary statistics about data in a table
+using user defined functions.  Currently these statistics are only generated for
+data written to files.  Data recently written to Accumulo that is still in
+memory will not contribute to summary statistics.
+
+This feature can be used to inform a user about what data is in their table.
+Summary statistics can also be used by compaction strategies to make decisions
+about which files to compact.  
+
+Summary data is stored in each file Accumulo produces.  Accumulo can gather
+summary information from across a cluster merging it along the way.  In order
+for this to be fast the, summary information should fit in cache.  There is a
+dedicated cache for summary data on each tserver with a configurable size.  In
+order for summary data to fit in cache, it should probably be small.
+
+For information on writing a custom summarizer see the javadoc for
++org.apache.accumulo.core.client.summary.Summarizer+.  The package
++org.apache.accumulo.core.client.summary.summarizers+ contains summarizer
+implementations that ship with Accumulo and can be configured for use.
+
+=== Inaccuracies
+
+Summary data can be inaccurate when files are missing summary data or when
+files have extra summary data. Files can contain data outside of a tablets
+boundaries. This can happen as result of bulk imported files and tablet splits.
+When this happens, those files could contain extra summary information.
+Accumulo offsets this some by storing summary information for multiple row
+ranges per a file.  However, the ranges are not granular enough to completely
+offset extra data.
+
+Any source of inaccuracies is reported when summary information is requested.
+In the shell examples below this can be seen on the +File Statistics+ line.
+For files missing summary information, the compact command in the shell has a
++--sf-no-summary+ option.  This options compacts files that do not have the
+summary information configured for the table.  The compact command also has the
++--sf-extra-summary+ option which will compact files with extra summary
+information.
+
+=== Configuring
+
+The following tablet server and table properties configure summarization.
+
+* <<appendices/config.txt#_tserver_cache_summary_size>>
+* <<appendices/config.txt#_tserver_summary_partition_threads>>
+* <<appendices/config.txt#_tserver_summary_remote_threads>>
+* <<appendices/config.txt#_tserver_summary_retrieval_threads>>
+* <<appendices/config.txt#TABLE_SUMMARIZER_PREFIX>>
+* <<appendices/config.txt#_table_file_summary_maxsize>>
+
+=== Permissions
+
+Because summary data may be derived from sensitive data, requesting summary data
+requires a special permission.  User must have the table permission
++GET_SUMMARIES+ in order to retrieve summary data.
+
+
+=== Bulk import
+
+When generating rfiles to bulk import into Accumulo, those rfiles can contain
+summary data.  To use this feature, look at the javadoc on the
++AccumuloFileOutputFormat.setSummarizers(...)+ method.  Also,
++org.apache.accumulo.core.client.rfile.RFile+ has options for creating RFiles
+with embedded summary data.
+
+=== Examples
+
+This example walks through using summarizers in the Accumulo shell.  Below a
+table is created and some data is inserted to summarize.
+
+ root@uno> createtable summary_test
+ root@uno summary_test> setauths -u root -s PI,GEO,TIME
+ root@uno summary_test> insert 3b503bd name last Doe
+ root@uno summary_test> insert 3b503bd name first John
+ root@uno summary_test> insert 3b503bd contact address "123 Park Ave, NY, NY" -l PI&GEO
+ root@uno summary_test> insert 3b503bd date birth "1/11/1942" -l PI&TIME
+ root@uno summary_test> insert 3b503bd date married "5/11/1962" -l PI&TIME
+ root@uno summary_test> insert 3b503bd contact home_phone 1-123-456-7890 -l PI
+ root@uno summary_test> insert d5d18dd contact address "50 Lake Shore Dr, Chicago, IL" -l PI&GEO
+ root@uno summary_test> insert d5d18dd name first Jane
+ root@uno summary_test> insert d5d18dd name last Doe
+ root@uno summary_test> insert d5d18dd date birth 8/15/1969 -l PI&TIME
+ root@uno summary_test> scan -s PI,GEO,TIME
+ 3b503bd contact:address [PI&GEO]    123 Park Ave, NY, NY
+ 3b503bd contact:home_phone [PI]    1-123-456-7890
+ 3b503bd date:birth [PI&TIME]    1/11/1942
+ 3b503bd date:married [PI&TIME]    5/11/1962
+ 3b503bd name:first []    John
+ 3b503bd name:last []    Doe
+ d5d18dd contact:address [PI&GEO]    50 Lake Shore Dr, Chicago, IL
+ d5d18dd date:birth [PI&TIME]    8/15/1969
+ d5d18dd name:first []    Jane
+ d5d18dd name:last []    Doe
+
+After inserting the data, summaries are requested below.  No summaries are returned.
+
+ root@uno summary_test> summaries
+
+The visibility summarizer is configured below and the table is flushed.
+Flushing the table creates a file creating summary data in the process. The
+summary data returned counts how many times each column visibility occurred.
+The statistics with a +c:+ prefix are visibilities.  The others are generic
+statistics created by the CountingSummarizer that VisibilitySummarizer extends. 
+
+ root@uno summary_test> config -t summary_test -s table.summarizer.vis=org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer
+ root@uno summary_test> summaries
+ root@uno summary_test> flush -w
+ 2017-02-24 19:54:46,090 [shell.Shell] INFO : Flush of table summary_test completed.
+ root@uno summary_test> summaries
+  Summarizer         : org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer vis {}
+  File Statistics    : [total:1, missing:0, extra:0, large:0]
+  Summary Statistics : 
+     c:                                                           = 4
+     c:PI                                                         = 1
+     c:PI&GEO                                                     = 2
+     c:PI&TIME                                                    = 3
+     emitted                                                      = 10
+     seen                                                         = 10
+     tooLong                                                      = 0
+     tooMany                                                      = 0
+
+VisibilitySummarizer has an option +maxCounters+ that determines the max number
+of column visibilites it will track.  Below this option is set and compaction
+is forced to regenerate summary data.  The new summary data only has three
+visibilites and now the +tooMany+ statistic is 4.  This is the number of
+visibilites that were not counted.
+
+ root@uno summary_test> config -t summary_test -s table.summarizer.vis.opt.maxCounters=3
+ root@uno summary_test> compact -w
+ 2017-02-24 19:54:46,267 [shell.Shell] INFO : Compacting table ...
+ 2017-02-24 19:54:47,127 [shell.Shell] INFO : Compaction of table summary_test completed for given range
+ root@uno summary_test> summaries
+  Summarizer         : org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer vis {maxCounters=3}
+  File Statistics    : [total:1, missing:0, extra:0, large:0]
+  Summary Statistics : 
+     c:PI                                                         = 1
+     c:PI&GEO                                                     = 2
+     c:PI&TIME                                                    = 3
+     emitted                                                      = 10
+     seen                                                         = 10
+     tooLong                                                      = 0
+     tooMany                                                      = 4
+
+Another summarizer is configured below that tracks the number of deletes.  Also
+a compaction strategy that uses this summary data is configured.  The
++TooManyDeletesCompactionStrategy+ will force a compaction of the tablet when
+the ratio of deletes to non-deletes is over 25%.  This threshold is
+configurable.  Below a delete is added and its reflected in the statistics.  In
+this case there is 1 delete and 10 non-deletes, not enough to force a
+compaction of the tablet.
+
+....
+root@uno summary_test> config -t summary_test -s table.summarizer.del=org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer
+root@uno summary_test> compact -w
+2017-02-24 19:54:47,282 [shell.Shell] INFO : Compacting table ...
+2017-02-24 19:54:49,236 [shell.Shell] INFO : Compaction of table summary_test completed for given range
+root@uno summary_test> config -t summary_test -s table.compaction.major.ratio=10
+root@uno summary_test> config -t summary_test -s table.majc.compaction.strategy=org.apache.accumulo.tserver.compaction.strategies.TooManyDeletesCompactionStrategy
+root@uno summary_test> deletemany -r d5d18dd -c date -f
+[DELETED] d5d18dd date:birth [PI&TIME]
+root@uno summary_test> flush -w
+2017-02-24 19:54:49,686 [shell.Shell] INFO : Flush of table summary_test completed.
+root@uno summary_test> summaries
+ Summarizer         : org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer vis {maxCounters=3}
+ File Statistics    : [total:2, missing:0, extra:0, large:0]
+ Summary Statistics : 
+    c:PI                                                         = 1
+    c:PI&GEO                                                     = 2
+    c:PI&TIME                                                    = 4
+    emitted                                                      = 11
+    seen                                                         = 11
+    tooLong                                                      = 0
+    tooMany                                                      = 4
+
+ Summarizer         : org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer del {}
+ File Statistics    : [total:2, missing:0, extra:0, large:0]
+ Summary Statistics : 
+    deletes                                                      = 1
+    total                                                        = 11
+....
+
+Some more deletes are added and the table is flushed below.  This results in 4
+deletes and 10 non-deletes, which triggers a full compaction.  A full
+compaction of all files is the only time when delete markers are dropped.  The
+compaction ratio was set to 10 above to show that the number of files did not
+trigger the compaction.   After the compaction there no deletes 6 non-deletes.
+
+....
+root@uno summary_test> deletemany -r d5d18dd -f
+[DELETED] d5d18dd contact:address [PI&GEO]
+[DELETED] d5d18dd name:first []
+[DELETED] d5d18dd name:last []
+root@uno summary_test> flush -w
+2017-02-24 19:54:52,800 [shell.Shell] INFO : Flush of table summary_test completed.
+root@uno summary_test> summaries
+ Summarizer         : org.apache.accumulo.core.client.summary.summarizers.VisibilitySummarizer vis {maxCounters=3}
+ File Statistics    : [total:1, missing:0, extra:0, large:0]
+ Summary Statistics : 
+    c:PI                                                         = 1
+    c:PI&GEO                                                     = 1
+    c:PI&TIME                                                    = 2
+    emitted                                                      = 6
+    seen                                                         = 6
+    tooLong                                                      = 0
+    tooMany                                                      = 2
+
+ Summarizer         : org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer del {}
+ File Statistics    : [total:1, missing:0, extra:0, large:0]
+ Summary Statistics : 
+    deletes                                                      = 0
+    total                                                        = 6
+root@uno summary_test>   
+....
+

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
index 3b19aeb..1c670bc 100644
--- a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
+++ b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneAccumuloCluster.java
@@ -142,8 +142,7 @@ public class StandaloneAccumuloCluster implements AccumuloCluster {
 
   @Override
   public StandaloneClusterControl getClusterControl() {
-    return new StandaloneClusterControl(accumuloHome, clientAccumuloConfDir, serverAccumuloConfDir,
-                                        clientCmdPrefix, serverCmdPrefix);
+    return new StandaloneClusterControl(accumuloHome, clientAccumuloConfDir, serverAccumuloConfDir, clientCmdPrefix, serverCmdPrefix);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java
index bf1ccc7..45028e9 100644
--- a/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/cluster/standalone/StandaloneClusterControl.java
@@ -63,7 +63,8 @@ public class StandaloneClusterControl implements ClusterControl {
 
   protected String accumuloServicePath, accumuloPath, accumuloUtilPath;
 
-  public StandaloneClusterControl(String accumuloHome, String clientAccumuloConfDir, String serverAccumuloConfDir, String clientCmdPrefix, String serverCmdPrefix) {
+  public StandaloneClusterControl(String accumuloHome, String clientAccumuloConfDir, String serverAccumuloConfDir, String clientCmdPrefix,
+      String serverCmdPrefix) {
     this.options = new RemoteShellOptions();
     this.accumuloHome = accumuloHome;
     this.clientAccumuloConfDir = clientAccumuloConfDir;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
----------------------------------------------------------------------
diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
index 20763bb..2693c05 100644
--- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
+++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloConfigImpl.java
@@ -139,7 +139,8 @@ public class MiniAccumuloConfigImpl {
       mergeProp(Property.TSERV_PORTSEARCH.getKey(), "true");
       mergeProp(Property.TSERV_DATACACHE_SIZE.getKey(), "10M");
       mergeProp(Property.TSERV_INDEXCACHE_SIZE.getKey(), "10M");
-      mergeProp(Property.TSERV_MAXMEM.getKey(), "50M");
+      mergeProp(Property.TSERV_SUMMARYCACHE_SIZE.getKey(), "10M");
+      mergeProp(Property.TSERV_MAXMEM.getKey(), "40M");
       mergeProp(Property.TSERV_WALOG_MAX_SIZE.getKey(), "100M");
       mergeProp(Property.TSERV_NATIVEMAP_ENABLED.getKey(), "false");
       // since there is a small amount of memory, check more frequently for majc... setting may not be needed in 1.5

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
index c4edc96..fd7746a 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
@@ -845,4 +845,9 @@ public class SecurityOperation {
     authenticate(credentials);
     return hasSystemPermission(credentials, SystemPermission.OBTAIN_DELEGATION_TOKEN, false);
   }
+
+  public boolean canGetSummaries(TCredentials credentials, String tableId, String namespaceId) throws ThriftSecurityException {
+    authenticate(credentials);
+    return hasTablePermission(credentials, tableId, namespaceId, TablePermission.GET_SUMMARIES, false);
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 47a0d18..7187e3d 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -50,6 +50,8 @@ import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -76,7 +78,9 @@ import org.apache.accumulo.core.client.impl.Translator.TKeyExtentTranslator;
 import org.apache.accumulo.core.client.impl.Translator.TRangeTranslator;
 import org.apache.accumulo.core.client.impl.Translators;
 import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
+import org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.conf.SiteConfiguration;
@@ -102,7 +106,11 @@ import org.apache.accumulo.core.data.thrift.TKeyExtent;
 import org.apache.accumulo.core.data.thrift.TKeyValue;
 import org.apache.accumulo.core.data.thrift.TMutation;
 import org.apache.accumulo.core.data.thrift.TRange;
+import org.apache.accumulo.core.data.thrift.TRowRange;
+import org.apache.accumulo.core.data.thrift.TSummaries;
+import org.apache.accumulo.core.data.thrift.TSummaryRequest;
 import org.apache.accumulo.core.data.thrift.UpdateErrors;
+import org.apache.accumulo.core.file.blockfile.cache.BlockCache;
 import org.apache.accumulo.core.iterators.IterationInterruptedException;
 import org.apache.accumulo.core.master.thrift.BulkImportState;
 import org.apache.accumulo.core.master.thrift.Compacting;
@@ -119,6 +127,9 @@ import org.apache.accumulo.core.rpc.ThriftUtil;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.thrift.TCredentials;
+import org.apache.accumulo.core.summary.Gatherer;
+import org.apache.accumulo.core.summary.Gatherer.FileSystemResolver;
+import org.apache.accumulo.core.summary.SummaryCollection;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
 import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
@@ -235,6 +246,7 @@ import org.apache.accumulo.tserver.session.MultiScanSession;
 import org.apache.accumulo.tserver.session.ScanSession;
 import org.apache.accumulo.tserver.session.Session;
 import org.apache.accumulo.tserver.session.SessionManager;
+import org.apache.accumulo.tserver.session.SummarySession;
 import org.apache.accumulo.tserver.session.UpdateSession;
 import org.apache.accumulo.tserver.tablet.BulkImportCacheCleaner;
 import org.apache.accumulo.tserver.tablet.CommitSession;
@@ -1790,6 +1802,109 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
       log.warn("Garbage collector is attempting to remove logs through the tablet server");
       log.warn("This is probably because your file Garbage Collector is an older version than your tablet servers.\n" + "Restart your file Garbage Collector.");
     }
+
+    private TSummaries getSummaries(Future<SummaryCollection> future) throws TimeoutException {
+      try {
+        SummaryCollection sc = future.get(MAX_TIME_TO_WAIT_FOR_SCAN_RESULT_MILLIS, TimeUnit.MILLISECONDS);
+        return sc.toThrift();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      } catch (ExecutionException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    private TSummaries handleTimeout(long sessionId) {
+      long timeout = TabletServer.this.getConfiguration().getTimeInMillis(Property.TSERV_CLIENT_TIMEOUT);
+      sessionManager.removeIfNotAccessed(sessionId, timeout);
+      return new TSummaries(false, sessionId, -1, -1, null);
+    }
+
+    private TSummaries startSummaryOperation(TCredentials credentials, Future<SummaryCollection> future) {
+      try {
+        return getSummaries(future);
+      } catch (TimeoutException e) {
+        long sid = sessionManager.createSession(new SummarySession(credentials, future), false);
+        while (sid == 0) {
+          sessionManager.removeSession(sid);
+          sid = sessionManager.createSession(new SummarySession(credentials, future), false);
+        }
+        return handleTimeout(sid);
+      }
+    }
+
+    @Override
+    public TSummaries startGetSummaries(TInfo tinfo, TCredentials credentials, TSummaryRequest request) throws ThriftSecurityException,
+        ThriftTableOperationException, NoSuchScanIDException, TException {
+      String namespaceId;
+      try {
+        namespaceId = Tables.getNamespaceId(TabletServer.this.getInstance(), request.getTableId());
+      } catch (TableNotFoundException e1) {
+        throw new ThriftTableOperationException(request.getTableId(), null, null, TableOperationExceptionType.NOTFOUND, null);
+      }
+
+      if (!security.canGetSummaries(credentials, request.getTableId(), namespaceId)) {
+        throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException();
+      }
+
+      ServerConfigurationFactory factory = TabletServer.this.getServerConfigurationFactory();
+      ExecutorService es = resourceManager.getSummaryPartitionExecutor();
+      Future<SummaryCollection> future = new Gatherer(TabletServer.this, request, factory.getTableConfiguration(request.getTableId())).gather(es);
+
+      return startSummaryOperation(credentials, future);
+    }
+
+    @Override
+    public TSummaries startGetSummariesForPartition(TInfo tinfo, TCredentials credentials, TSummaryRequest request, int modulus, int remainder)
+        throws ThriftSecurityException, NoSuchScanIDException, TException {
+      // do not expect users to call this directly, expect other tservers to call this method
+      if (!security.canPerformSystemActions(credentials)) {
+        throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException();
+      }
+
+      ServerConfigurationFactory factory = TabletServer.this.getServerConfigurationFactory();
+      ExecutorService spe = resourceManager.getSummaryRemoteExecutor();
+      Future<SummaryCollection> future = new Gatherer(TabletServer.this, request, factory.getTableConfiguration(request.getTableId())).processPartition(spe,
+          modulus, remainder);
+
+      return startSummaryOperation(credentials, future);
+    }
+
+    @Override
+    public TSummaries startGetSummariesFromFiles(TInfo tinfo, TCredentials credentials, TSummaryRequest request, Map<String,List<TRowRange>> files)
+        throws ThriftSecurityException, NoSuchScanIDException, TException {
+      // do not expect users to call this directly, expect other tservers to call this method
+      if (!security.canPerformSystemActions(credentials)) {
+        throw new AccumuloSecurityException(credentials.getPrincipal(), SecurityErrorCode.PERMISSION_DENIED).asThriftException();
+      }
+
+      ExecutorService srp = resourceManager.getSummaryRetrievalExecutor();
+      TableConfiguration tableCfg = confFactory.getTableConfiguration(request.getTableId());
+      BlockCache summaryCache = resourceManager.getSummaryCache();
+      BlockCache indexCache = resourceManager.getIndexCache();
+      FileSystemResolver volMgr = p -> fs.getVolumeByPath(p).getFileSystem();
+      Future<SummaryCollection> future = new Gatherer(TabletServer.this, request, tableCfg).processFiles(volMgr, files, summaryCache, indexCache, srp);
+
+      return startSummaryOperation(credentials, future);
+    }
+
+    @Override
+    public TSummaries contiuneGetSummaries(TInfo tinfo, long sessionId) throws NoSuchScanIDException, TException {
+      SummarySession session = (SummarySession) sessionManager.getSession(sessionId);
+      if (session == null) {
+        throw new NoSuchScanIDException();
+      }
+
+      Future<SummaryCollection> future = session.getFuture();
+      try {
+        TSummaries tsums = getSummaries(future);
+        sessionManager.removeSession(sessionId);
+        return tsums;
+      } catch (TimeoutException e) {
+        return handleTimeout(sessionId);
+      }
+    }
   }
 
   private class SplitRunner implements Runnable {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 3cd7bfa..411ffa5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.tserver;
 
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static java.util.Objects.requireNonNull;
 
 import java.io.IOException;
@@ -65,7 +66,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 
 /**
  * ResourceManager is responsible for managing the resources of all tablets within a tablet server.
@@ -86,12 +86,13 @@ public class TabletServerResourceManager {
   private final ExecutorService assignMetaDataPool;
   private final ExecutorService readAheadThreadPool;
   private final ExecutorService defaultReadAheadThreadPool;
+  private final ExecutorService summaryRetrievalPool;
+  private final ExecutorService summaryParitionPool;
+  private final ExecutorService summaryRemotePool;
   private final Map<String,ExecutorService> threadPools = new TreeMap<>();
 
   private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> activeAssignments;
 
-  private final VolumeManager fs;
-
   private final FileManager fileManager;
 
   private final MemoryManager memoryManager;
@@ -100,6 +101,7 @@ public class TabletServerResourceManager {
 
   private final BlockCache _dCache;
   private final BlockCache _iCache;
+  private final BlockCache _sCache;
   private final TabletServer tserver;
   private final ServerConfigurationFactory conf;
 
@@ -141,6 +143,14 @@ public class TabletServerResourceManager {
     return createEs(max, name, new LinkedBlockingQueue<Runnable>());
   }
 
+  private ExecutorService createIdlingEs(Property max, String name, long timeout, TimeUnit timeUnit) {
+    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
+    int maxThreads = conf.getConfiguration().getCount(max);
+    ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, timeout, timeUnit, queue, new NamingThreadFactory(name));
+    tp.allowCoreThreadTimeOut(true);
+    return addEs(max, name, tp);
+  }
+
   private ExecutorService createEs(Property max, String name, BlockingQueue<Runnable> queue) {
     int maxThreads = conf.getConfiguration().getCount(max);
     ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L, TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name));
@@ -154,7 +164,6 @@ public class TabletServerResourceManager {
   public TabletServerResourceManager(TabletServer tserver, VolumeManager fs) {
     this.tserver = tserver;
     this.conf = tserver.getServerConfigurationFactory();
-    this.fs = fs;
     final AccumuloConfiguration acuConf = conf.getConfiguration();
 
     long maxMemory = acuConf.getMemoryInBytes(Property.TSERV_MAXMEM);
@@ -163,15 +172,18 @@ public class TabletServerResourceManager {
     long blockSize = acuConf.getMemoryInBytes(Property.TSERV_DEFAULT_BLOCKSIZE);
     long dCacheSize = acuConf.getMemoryInBytes(Property.TSERV_DATACACHE_SIZE);
     long iCacheSize = acuConf.getMemoryInBytes(Property.TSERV_INDEXCACHE_SIZE);
+    long sCacheSize = acuConf.getMemoryInBytes(Property.TSERV_SUMMARYCACHE_SIZE);
     long totalQueueSize = acuConf.getMemoryInBytes(Property.TSERV_TOTAL_MUTATION_QUEUE_MAX);
 
     String policy = acuConf.get(Property.TSERV_CACHE_POLICY);
     if (policy.equalsIgnoreCase("LRU")) {
       _iCache = new LruBlockCache(iCacheSize, blockSize);
       _dCache = new LruBlockCache(dCacheSize, blockSize);
+      _sCache = new LruBlockCache(sCacheSize, blockSize);
     } else if (policy.equalsIgnoreCase("TinyLFU")) {
       _iCache = new TinyLfuBlockCache(iCacheSize, blockSize);
       _dCache = new TinyLfuBlockCache(dCacheSize, blockSize);
+      _sCache = new TinyLfuBlockCache(sCacheSize, blockSize);
     } else {
       throw new IllegalArgumentException("Unknown Block cache policy " + policy);
     }
@@ -179,14 +191,14 @@ public class TabletServerResourceManager {
     Runtime runtime = Runtime.getRuntime();
     if (usingNativeMap) {
       // Still check block cache sizes when using native maps.
-      if (dCacheSize + iCacheSize + totalQueueSize > runtime.maxMemory()) {
+      if (dCacheSize + iCacheSize + sCacheSize + totalQueueSize > runtime.maxMemory()) {
         throw new IllegalArgumentException(String.format("Block cache sizes %,d and mutation queue size %,d is too large for this JVM configuration %,d",
-            dCacheSize + iCacheSize, totalQueueSize, runtime.maxMemory()));
+            dCacheSize + iCacheSize + sCacheSize, totalQueueSize, runtime.maxMemory()));
       }
-    } else if (maxMemory + dCacheSize + iCacheSize + totalQueueSize > runtime.maxMemory()) {
+    } else if (maxMemory + dCacheSize + iCacheSize + sCacheSize + totalQueueSize > runtime.maxMemory()) {
       throw new IllegalArgumentException(String.format(
           "Maximum tablet server map memory %,d block cache sizes %,d and mutation queue size %,d is too large for this JVM configuration %,d", maxMemory,
-          dCacheSize + iCacheSize, totalQueueSize, runtime.maxMemory()));
+          dCacheSize + iCacheSize + sCacheSize, totalQueueSize, runtime.maxMemory()));
     }
     runtime.gc();
 
@@ -222,6 +234,10 @@ public class TabletServerResourceManager {
     readAheadThreadPool = createEs(Property.TSERV_READ_AHEAD_MAXCONCURRENT, "tablet read ahead");
     defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT, "metadata tablets read ahead");
 
+    summaryRetrievalPool = createIdlingEs(Property.TSERV_SUMMARY_RETRIEVAL_THREADS, "summary file retriever", 60, TimeUnit.SECONDS);
+    summaryRemotePool = createIdlingEs(Property.TSERV_SUMMARY_REMOTE_THREADS, "summary remote", 60, TimeUnit.SECONDS);
+    summaryParitionPool = createIdlingEs(Property.TSERV_SUMMARY_PARTITION_THREADS, "summary partition", 60, TimeUnit.SECONDS);
+
     int maxOpenFiles = acuConf.getCount(Property.TSERV_SCAN_MAX_OPENFILES);
 
     fileManager = new FileManager(tserver, fs, maxOpenFiles, _dCache, _iCache);
@@ -657,7 +673,7 @@ public class TabletServerResourceManager {
       CompactionStrategy strategy = Property.createTableInstanceFromPropertyName(tableConf, Property.TABLE_COMPACTION_STRATEGY, CompactionStrategy.class,
           new DefaultCompactionStrategy());
       strategy.init(Property.getCompactionStrategyOptions(tableConf));
-      MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, TabletServerResourceManager.this.fs, tableConf);
+      MajorCompactionRequest request = new MajorCompactionRequest(extent, reason, tableConf);
       request.setFiles(tabletFiles);
       try {
         return strategy.shouldCompact(request);
@@ -760,4 +776,19 @@ public class TabletServerResourceManager {
     return _dCache;
   }
 
+  public BlockCache getSummaryCache() {
+    return _sCache;
+  }
+
+  public ExecutorService getSummaryRetrievalExecutor() {
+    return summaryRetrievalPool;
+  }
+
+  public ExecutorService getSummaryPartitionExecutor() {
+    return summaryParitionPool;
+  }
+
+  public ExecutorService getSummaryRemoteExecutor() {
+    return summaryRemotePool;
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/94cdcc4d/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
index 0e0089a..85a4a13 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compaction/CompactionStrategy.java
@@ -32,7 +32,6 @@ import java.util.Map;
  * compaction thread.
  */
 public abstract class CompactionStrategy {
-
   /**
    * The settings for the compaction strategy pulled from zookeeper. The <tt>table.compacations.major.strategy.opts</tt> part of the setting will be removed.
    */


Mime
View raw message