accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject [1/7] accumulo git commit: ACCUMULO-3913 Added per table sampling
Date Mon, 21 Sep 2015 13:51:26 GMT
Repository: accumulo
Updated Branches:
  refs/heads/master fdcc1698c -> 45f18c174


http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java
index f183b25..c8b0e11 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/commands/CompactCommand.java
@@ -38,7 +38,7 @@ public class CompactCommand extends TableOperation {
 
   // file selection and file output options
   private Option enameOption, epathOption, sizeLtOption, sizeGtOption, minFilesOption, outBlockSizeOpt, outHdfsBlockSizeOpt, outIndexBlockSizeOpt,
-      outCompressionOpt, outReplication;
+      outCompressionOpt, outReplication, enoSampleOption;
 
   private CompactionConfig compactionConfig = null;
 
@@ -89,6 +89,7 @@ public class CompactCommand extends TableOperation {
   private Map<String,String> getConfigurableCompactionStrategyOpts(CommandLine cl) {
     Map<String,String> opts = new HashMap<>();
 
+    put(cl, opts, enoSampleOption, CompactionSettings.SF_NO_SAMPLE);
     put(cl, opts, enameOption, CompactionSettings.SF_NAME_RE_OPT);
     put(cl, opts, epathOption, CompactionSettings.SF_PATH_RE_OPT);
     put(cl, opts, sizeLtOption, CompactionSettings.SF_LT_ESIZE_OPT);
@@ -190,6 +191,9 @@ public class CompactCommand extends TableOperation {
     cancelOpt = new Option(null, "cancel", false, "cancel user initiated compactions");
     opts.addOption(cancelOpt);
 
+    enoSampleOption = new Option(null, "sf-no-sample", false,
+        "Select files that have no sample data or sample data that differes from the table configuration.");
+    opts.addOption(enoSampleOption);
     enameOption = newLAO("sf-ename", "Select files using regular expression to match file names. Only matches against last part of path.");
     opts.addOption(enameOption);
     epathOption = newLAO("sf-epath", "Select files using regular expression to match file paths to compact. Matches against full path.");

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/shell/src/main/java/org/apache/accumulo/shell/commands/GrepCommand.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/GrepCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/GrepCommand.java
index 97bddc9..44ee93c 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/GrepCommand.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/commands/GrepCommand.java
@@ -61,6 +61,8 @@ public class GrepCommand extends ScanCommand {
 
     scanner.setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS);
 
+    setupSampling(tableName, cl, shellState, scanner);
+
     for (int i = 0; i < cl.getArgs().length; i++) {
       setUpIterator(Integer.MAX_VALUE - cl.getArgs().length + i, "grep" + i, cl.getArgs()[i], scanner, cl);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/shell/src/main/java/org/apache/accumulo/shell/commands/ScanCommand.java
----------------------------------------------------------------------
diff --git a/shell/src/main/java/org/apache/accumulo/shell/commands/ScanCommand.java b/shell/src/main/java/org/apache/accumulo/shell/commands/ScanCommand.java
index 3531fe9..595829b 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/ScanCommand.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/commands/ScanCommand.java
@@ -26,9 +26,11 @@ import java.util.concurrent.TimeUnit;
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.SampleNotPresentException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.ScannerBase;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
@@ -60,6 +62,19 @@ public class ScanCommand extends Command {
   private Option optEndRowExclusive;
   private Option timeoutOption;
   private Option profileOpt;
+  private Option sampleOpt;
+
+  protected void setupSampling(final String tableName, final CommandLine cl, final Shell shellState, ScannerBase scanner) throws TableNotFoundException,
+      AccumuloException, AccumuloSecurityException {
+    if (getUseSample(cl)) {
+      SamplerConfiguration samplerConfig = shellState.getConnector().tableOperations().getSamplerConfiguration(tableName);
+      if (samplerConfig == null) {
+        throw new SampleNotPresentException("Table " + tableName + " does not have sampling configured");
+      }
+      Shell.log.debug("Using sampling configuration : " + samplerConfig);
+      scanner.setSamplerConfiguration(samplerConfig);
+    }
+  }
 
   @Override
   public int execute(final String fullCommand, final CommandLine cl, final Shell shellState) throws Exception {
@@ -86,6 +101,8 @@ public class ScanCommand extends Command {
     // set timeout
     scanner.setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS);
 
+    setupSampling(tableName, cl, shellState, scanner);
+
     // output the records
     if (cl.hasOption(showFewOpt.getOpt())) {
       final String showLength = cl.getOptionValue(showFewOpt.getOpt());
@@ -112,6 +129,10 @@ public class ScanCommand extends Command {
     return 0;
   }
 
+  protected boolean getUseSample(CommandLine cl) {
+    return cl.hasOption(sampleOpt.getLongOpt());
+  }
+
   protected long getTimeout(final CommandLine cl) {
     if (cl.hasOption(timeoutOption.getLongOpt())) {
       return AccumuloConfiguration.getTimeInMillis(cl.getOptionValue(timeoutOption.getLongOpt()));
@@ -294,6 +315,7 @@ public class ScanCommand extends Command {
     timeoutOption = new Option(null, "timeout", true,
         "time before scan should fail if no data is returned. If no unit is given assumes seconds.  Units d,h,m,s,and ms are supported.  e.g. 30s or 100ms");
     outputFileOpt = new Option("o", "output", true, "local file to write the scan output to");
+    sampleOpt = new Option(null, "sample", false, "Show sample");
 
     scanOptAuths.setArgName("comma-separated-authorizations");
     scanOptRow.setArgName("row");
@@ -324,6 +346,7 @@ public class ScanCommand extends Command {
     o.addOption(timeoutOption);
     o.addOption(outputFileOpt);
     o.addOption(profileOpt);
+    o.addOption(sampleOpt);
 
     return o;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/start/.gitignore
----------------------------------------------------------------------
diff --git a/start/.gitignore b/start/.gitignore
index 56204d2..e7d7fb1 100644
--- a/start/.gitignore
+++ b/start/.gitignore
@@ -23,3 +23,4 @@
 /.pydevproject
 /.idea
 /*.iml
+/target/

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/test/src/main/java/org/apache/accumulo/test/InMemoryMapMemoryUsageTest.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/InMemoryMapMemoryUsageTest.java b/test/src/main/java/org/apache/accumulo/test/InMemoryMapMemoryUsageTest.java
index fb0050f..05b405e 100644
--- a/test/src/main/java/org/apache/accumulo/test/InMemoryMapMemoryUsageTest.java
+++ b/test/src/main/java/org/apache/accumulo/test/InMemoryMapMemoryUsageTest.java
@@ -18,9 +18,11 @@ package org.apache.accumulo.test;
 
 import java.util.Collections;
 
+import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.LocalityGroupUtil.LocalityGroupConfigurationError;
 import org.apache.accumulo.tserver.InMemoryMap;
 import org.apache.hadoop.io.Text;
 
@@ -51,7 +53,11 @@ class InMemoryMapMemoryUsageTest extends MemoryUsageTest {
 
   @Override
   void init() {
-    imm = new InMemoryMap(false, "/tmp");
+    try {
+      imm = new InMemoryMap(DefaultConfiguration.getInstance());
+    } catch (LocalityGroupConfigurationError e) {
+      throw new RuntimeException(e);
+    }
     key = new Text();
 
     colf = new Text(String.format("%0" + colFamLen + "d", 0));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/test/src/main/java/org/apache/accumulo/test/SampleIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/SampleIT.java b/test/src/main/java/org/apache/accumulo/test/SampleIT.java
new file mode 100644
index 0000000..423b955
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/SampleIT.java
@@ -0,0 +1,497 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.ClientSideIteratorScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.SampleNotPresentException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.impl.OfflineScanner;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.sample.RowSampler;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+public class SampleIT extends AccumuloClusterHarness {
+
+  private static final Map<String,String> OPTIONS_1 = ImmutableMap.of("hasher", "murmur3_32", "modulus", "1009");
+  private static final Map<String,String> OPTIONS_2 = ImmutableMap.of("hasher", "murmur3_32", "modulus", "997");
+
+  private static final SamplerConfiguration SC1 = new SamplerConfiguration(RowSampler.class.getName()).setOptions(OPTIONS_1);
+  private static final SamplerConfiguration SC2 = new SamplerConfiguration(RowSampler.class.getName()).setOptions(OPTIONS_2);
+
+  public static class IteratorThatUsesSample extends WrappingIterator {
+    private SortedKeyValueIterator<Key,Value> sampleDC;
+    private boolean hasTop;
+
+    @Override
+    public boolean hasTop() {
+      return hasTop && super.hasTop();
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+
+      int sampleCount = 0;
+      sampleDC.seek(range, columnFamilies, inclusive);
+
+      while (sampleDC.hasTop()) {
+        sampleCount++;
+        sampleDC.next();
+      }
+
+      if (sampleCount < 10) {
+        hasTop = true;
+        super.seek(range, columnFamilies, inclusive);
+      } else {
+        // its too much data
+        hasTop = false;
+      }
+    }
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+      super.init(source, options, env);
+
+      IteratorEnvironment sampleEnv = env.cloneWithSamplingEnabled();
+
+      sampleDC = source.deepCopy(sampleEnv);
+    }
+  }
+
+  @Test
+  public void testBasic() throws Exception {
+
+    Connector conn = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    String clone = tableName + "_clone";
+
+    conn.tableOperations().create(tableName, new NewTableConfiguration().enableSampling(SC1));
+
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+
+    TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+    String someRow = writeData(bw, SC1, expected);
+
+    Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
+    Scanner isoScanner = new IsolatedScanner(conn.createScanner(tableName, Authorizations.EMPTY));
+    Scanner csiScanner = new ClientSideIteratorScanner(conn.createScanner(tableName, Authorizations.EMPTY));
+    scanner.setSamplerConfiguration(SC1);
+    csiScanner.setSamplerConfiguration(SC1);
+    isoScanner.setSamplerConfiguration(SC1);
+    isoScanner.setBatchSize(10);
+
+    BatchScanner bScanner = conn.createBatchScanner(tableName, Authorizations.EMPTY, 2);
+    bScanner.setSamplerConfiguration(SC1);
+    bScanner.setRanges(Arrays.asList(new Range()));
+
+    check(expected, scanner, bScanner, isoScanner, csiScanner);
+
+    conn.tableOperations().flush(tableName, null, null, true);
+
+    Scanner oScanner = newOfflineScanner(conn, tableName, clone, SC1);
+    check(expected, scanner, bScanner, isoScanner, csiScanner, oScanner);
+
+    // ensure non sample data can be scanned after scanning sample data
+    for (ScannerBase sb : Arrays.asList(scanner, bScanner, isoScanner, csiScanner, oScanner)) {
+      sb.clearSamplerConfiguration();
+      Assert.assertEquals(20000, Iterables.size(sb));
+      sb.setSamplerConfiguration(SC1);
+    }
+
+    Iterator<Key> it = expected.keySet().iterator();
+    while (it.hasNext()) {
+      Key k = it.next();
+      if (k.getRow().toString().equals(someRow)) {
+        it.remove();
+      }
+    }
+
+    expected.put(new Key(someRow, "cf1", "cq1", 8), new Value("42".getBytes()));
+    expected.put(new Key(someRow, "cf1", "cq3", 8), new Value("suprise".getBytes()));
+
+    Mutation m = new Mutation(someRow);
+
+    m.put("cf1", "cq1", 8, "42");
+    m.putDelete("cf1", "cq2", 8);
+    m.put("cf1", "cq3", 8, "suprise");
+
+    bw.addMutation(m);
+    bw.close();
+
+    check(expected, scanner, bScanner, isoScanner, csiScanner);
+
+    conn.tableOperations().flush(tableName, null, null, true);
+
+    oScanner = newOfflineScanner(conn, tableName, clone, SC1);
+    check(expected, scanner, bScanner, isoScanner, csiScanner, oScanner);
+
+    scanner.setRange(new Range(someRow));
+    isoScanner.setRange(new Range(someRow));
+    csiScanner.setRange(new Range(someRow));
+    oScanner.setRange(new Range(someRow));
+    bScanner.setRanges(Arrays.asList(new Range(someRow)));
+
+    expected.clear();
+
+    expected.put(new Key(someRow, "cf1", "cq1", 8), new Value("42".getBytes()));
+    expected.put(new Key(someRow, "cf1", "cq3", 8), new Value("suprise".getBytes()));
+
+    check(expected, scanner, bScanner, isoScanner, csiScanner, oScanner);
+
+    bScanner.close();
+  }
+
+  private Scanner newOfflineScanner(Connector conn, String tableName, String clone, SamplerConfiguration sc) throws Exception {
+    if (conn.tableOperations().exists(clone)) {
+      conn.tableOperations().delete(clone);
+    }
+    Map<String,String> em = Collections.emptyMap();
+    Set<String> es = Collections.emptySet();
+    conn.tableOperations().clone(tableName, clone, false, em, es);
+    conn.tableOperations().offline(clone, true);
+    String cloneID = conn.tableOperations().tableIdMap().get(clone);
+    OfflineScanner oScanner = new OfflineScanner(conn.getInstance(), new Credentials(getAdminPrincipal(), getAdminToken()), cloneID, Authorizations.EMPTY);
+    if (sc != null) {
+      oScanner.setSamplerConfiguration(sc);
+    }
+    return oScanner;
+  }
+
+  private void updateExpected(SamplerConfiguration sc, TreeMap<Key,Value> expected) {
+    expected.clear();
+
+    RowSampler sampler = new RowSampler();
+    sampler.init(sc);
+
+    for (int i = 0; i < 10000; i++) {
+      String row = String.format("r_%06d", i);
+
+      Key k1 = new Key(row, "cf1", "cq1", 7);
+      if (sampler.accept(k1)) {
+        expected.put(k1, new Value(("" + i).getBytes()));
+      }
+
+      Key k2 = new Key(row, "cf1", "cq2", 7);
+      if (sampler.accept(k2)) {
+        expected.put(k2, new Value(("" + (100000000 - i)).getBytes()));
+      }
+    }
+  }
+
+  private String writeData(BatchWriter bw, SamplerConfiguration sc, TreeMap<Key,Value> expected) throws MutationsRejectedException {
+    int count = 0;
+    String someRow = null;
+
+    RowSampler sampler = new RowSampler();
+    sampler.init(sc);
+
+    for (int i = 0; i < 10000; i++) {
+      String row = String.format("r_%06d", i);
+      Mutation m = new Mutation(row);
+
+      m.put("cf1", "cq1", 7, "" + i);
+      m.put("cf1", "cq2", 7, "" + (100000000 - i));
+
+      bw.addMutation(m);
+
+      Key k1 = new Key(row, "cf1", "cq1", 7);
+      if (sampler.accept(k1)) {
+        expected.put(k1, new Value(("" + i).getBytes()));
+        count++;
+        if (count == 5) {
+          someRow = row;
+        }
+      }
+
+      Key k2 = new Key(row, "cf1", "cq2", 7);
+      if (sampler.accept(k2)) {
+        expected.put(k2, new Value(("" + (100000000 - i)).getBytes()));
+      }
+    }
+
+    bw.flush();
+
+    return someRow;
+  }
+
+  private int countEntries(Iterable<Entry<Key,Value>> scanner) {
+
+    int count = 0;
+    Iterator<Entry<Key,Value>> iter = scanner.iterator();
+
+    while (iter.hasNext()) {
+      iter.next();
+      count++;
+    }
+
+    return count;
+  }
+
+  private void setRange(Range range, List<? extends ScannerBase> scanners) {
+    for (ScannerBase s : scanners) {
+      if (s instanceof Scanner) {
+        ((Scanner) s).setRange(range);
+      } else {
+        ((BatchScanner) s).setRanges(Collections.singleton(range));
+      }
+
+    }
+  }
+
+  @Test
+  public void testIterator() throws Exception {
+    Connector conn = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    String clone = tableName + "_clone";
+
+    conn.tableOperations().create(tableName, new NewTableConfiguration().enableSampling(SC1));
+
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+
+    TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+    writeData(bw, SC1, expected);
+
+    ArrayList<Key> keys = new ArrayList<>(expected.keySet());
+
+    Range range1 = new Range(keys.get(6), true, keys.get(11), true);
+
+    Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
+    Scanner isoScanner = new IsolatedScanner(conn.createScanner(tableName, Authorizations.EMPTY));
+    ClientSideIteratorScanner csiScanner = new ClientSideIteratorScanner(conn.createScanner(tableName, Authorizations.EMPTY));
+    BatchScanner bScanner = conn.createBatchScanner(tableName, Authorizations.EMPTY, 2);
+
+    csiScanner.setIteratorSamplerConfiguration(SC1);
+
+    List<? extends ScannerBase> scanners = Arrays.asList(scanner, isoScanner, bScanner, csiScanner);
+
+    for (ScannerBase s : scanners) {
+      s.addScanIterator(new IteratorSetting(100, IteratorThatUsesSample.class));
+    }
+
+    // the iterator should see less than 10 entries in sample data, and return data
+    setRange(range1, scanners);
+    for (ScannerBase s : scanners) {
+      Assert.assertEquals(2954, countEntries(s));
+    }
+
+    Range range2 = new Range(keys.get(5), true, keys.get(18), true);
+    setRange(range2, scanners);
+
+    // the iterator should see more than 10 entries in sample data, and return no data
+    for (ScannerBase s : scanners) {
+      Assert.assertEquals(0, countEntries(s));
+    }
+
+    // flush an rerun same test against files
+    conn.tableOperations().flush(tableName, null, null, true);
+
+    Scanner oScanner = newOfflineScanner(conn, tableName, clone, null);
+    oScanner.addScanIterator(new IteratorSetting(100, IteratorThatUsesSample.class));
+    scanners = Arrays.asList(scanner, isoScanner, bScanner, csiScanner, oScanner);
+
+    setRange(range1, scanners);
+    for (ScannerBase s : scanners) {
+      Assert.assertEquals(2954, countEntries(s));
+    }
+
+    setRange(range2, scanners);
+    for (ScannerBase s : scanners) {
+      Assert.assertEquals(0, countEntries(s));
+    }
+
+    updateSamplingConfig(conn, tableName, SC2);
+
+    csiScanner.setIteratorSamplerConfiguration(SC2);
+
+    oScanner = newOfflineScanner(conn, tableName, clone, null);
+    oScanner.addScanIterator(new IteratorSetting(100, IteratorThatUsesSample.class));
+    scanners = Arrays.asList(scanner, isoScanner, bScanner, csiScanner, oScanner);
+
+    for (ScannerBase s : scanners) {
+      try {
+        countEntries(s);
+        Assert.fail("Expected SampleNotPresentException, but it did not happen : " + s.getClass().getSimpleName());
+      } catch (SampleNotPresentException e) {
+
+      }
+    }
+  }
+
+  private void setSamplerConfig(SamplerConfiguration sc, ScannerBase... scanners) {
+    for (ScannerBase s : scanners) {
+      s.setSamplerConfiguration(sc);
+    }
+  }
+
+  @Test
+  public void testSampleNotPresent() throws Exception {
+
+    Connector conn = getConnector();
+    String tableName = getUniqueNames(1)[0];
+    String clone = tableName + "_clone";
+
+    conn.tableOperations().create(tableName);
+
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+
+    TreeMap<Key,Value> expected = new TreeMap<Key,Value>();
+    writeData(bw, SC1, expected);
+
+    Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY);
+    Scanner isoScanner = new IsolatedScanner(conn.createScanner(tableName, Authorizations.EMPTY));
+    isoScanner.setBatchSize(10);
+    Scanner csiScanner = new ClientSideIteratorScanner(conn.createScanner(tableName, Authorizations.EMPTY));
+    BatchScanner bScanner = conn.createBatchScanner(tableName, Authorizations.EMPTY, 2);
+    bScanner.setRanges(Arrays.asList(new Range()));
+
+    // ensure sample not present exception occurs when sampling is not configured
+    assertSampleNotPresent(SC1, scanner, isoScanner, bScanner, csiScanner);
+
+    conn.tableOperations().flush(tableName, null, null, true);
+
+    Scanner oScanner = newOfflineScanner(conn, tableName, clone, SC1);
+    assertSampleNotPresent(SC1, scanner, isoScanner, bScanner, csiScanner, oScanner);
+
+    // configure sampling, however there exist an rfile w/o sample data... so should still see sample not present exception
+
+    updateSamplingConfig(conn, tableName, SC1);
+
+    // create clone with new config
+    oScanner = newOfflineScanner(conn, tableName, clone, SC1);
+
+    assertSampleNotPresent(SC1, scanner, isoScanner, bScanner, csiScanner, oScanner);
+
+    // create rfile with sample data present
+    conn.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+    // should be able to scan sample now
+    oScanner = newOfflineScanner(conn, tableName, clone, SC1);
+    setSamplerConfig(SC1, scanner, csiScanner, isoScanner, bScanner, oScanner);
+    check(expected, scanner, isoScanner, bScanner, csiScanner, oScanner);
+
+    // change sampling config
+    updateSamplingConfig(conn, tableName, SC2);
+
+    // create clone with new config
+    oScanner = newOfflineScanner(conn, tableName, clone, SC2);
+
+    // rfile should have different sample config than table, and scan should not work
+    assertSampleNotPresent(SC2, scanner, isoScanner, bScanner, csiScanner, oScanner);
+
+    // create rfile that has same sample data as table config
+    conn.tableOperations().compact(tableName, new CompactionConfig().setWait(true));
+
+    // should be able to scan sample now
+    updateExpected(SC2, expected);
+    oScanner = newOfflineScanner(conn, tableName, clone, SC2);
+    setSamplerConfig(SC2, scanner, csiScanner, isoScanner, bScanner, oScanner);
+    check(expected, scanner, isoScanner, bScanner, csiScanner, oScanner);
+
+    bScanner.close();
+  }
+
+  private void updateSamplingConfig(Connector conn, String tableName, SamplerConfiguration sc) throws TableNotFoundException, AccumuloException,
+      AccumuloSecurityException {
+    conn.tableOperations().setSamplerConfiguration(tableName, sc);
+    // wait for for config change
+    conn.tableOperations().offline(tableName, true);
+    conn.tableOperations().online(tableName, true);
+  }
+
+  private void assertSampleNotPresent(SamplerConfiguration sc, ScannerBase... scanners) {
+
+    for (ScannerBase scanner : scanners) {
+      SamplerConfiguration csc = scanner.getSamplerConfiguration();
+
+      scanner.setSamplerConfiguration(sc);
+
+      try {
+        for (Iterator<Entry<Key,Value>> i = scanner.iterator(); i.hasNext();) {
+          Entry<Key,Value> entry = i.next();
+          entry.getKey();
+        }
+        Assert.fail("Expected SampleNotPresentException, but it did not happen : " + scanner.getClass().getSimpleName());
+      } catch (SampleNotPresentException e) {
+
+      }
+
+      scanner.clearSamplerConfiguration();
+      for (Iterator<Entry<Key,Value>> i = scanner.iterator(); i.hasNext();) {
+        Entry<Key,Value> entry = i.next();
+        entry.getKey();
+      }
+
+      if (csc == null) {
+        scanner.clearSamplerConfiguration();
+      } else {
+        scanner.setSamplerConfiguration(csc);
+      }
+    }
+  }
+
+  private void check(TreeMap<Key,Value> expected, ScannerBase... scanners) {
+    TreeMap<Key,Value> actual = new TreeMap<>();
+    for (ScannerBase s : scanners) {
+      actual.clear();
+      for (Entry<Key,Value> entry : s) {
+        actual.put(entry.getKey(), entry.getValue());
+      }
+      Assert.assertEquals(String.format("Saw %d instead of %d entries using %s", actual.size(), expected.size(), s.getClass().getSimpleName()), expected,
+          actual);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
index e7b5799..ae38fb8 100644
--- a/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ShellServerIT.java
@@ -41,8 +41,6 @@ import java.util.Map.Entry;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
-import jline.console.ConsoleReader;
-
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.ClientConfiguration;
 import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
@@ -91,6 +89,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 
+import jline.console.ConsoleReader;
+
 public class ShellServerIT extends SharedMiniClusterBase {
   public static class TestOutputStream extends OutputStream {
     StringBuilder sb = new StringBuilder();
@@ -975,6 +975,26 @@ public class ShellServerIT extends SharedMiniClusterBase {
     ts.exec("compact -t " + clone + " -w --sf-ename F.* --sf-lt-esize 1K");
 
     assertEquals(3, countFiles(cloneId));
+
+    String clone2 = table + "_clone_2";
+    ts.exec("clonetable -s table.sampler.opt.hasher=murmur3_32,table.sampler.opt.modulus=7,table.sampler=org.apache.accumulo.core.sample.RowSampler " + clone
+        + " " + clone2);
+    String clone2Id = getTableId(clone2);
+
+    assertEquals(3, countFiles(clone2Id));
+
+    ts.exec("table " + clone2);
+    ts.exec("insert v n l o");
+    ts.exec("flush -w");
+
+    ts.exec("insert x n l o");
+    ts.exec("flush -w");
+
+    assertEquals(5, countFiles(clone2Id));
+
+    ts.exec("compact -t " + clone2 + " -w --sf-no-sample");
+
+    assertEquals(3, countFiles(clone2Id));
   }
 
   @Test
@@ -989,6 +1009,54 @@ public class ShellServerIT extends SharedMiniClusterBase {
   }
 
   @Test
+  public void testScanScample() throws Exception {
+    final String table = name.getMethodName();
+
+    // compact
+    ts.exec("createtable " + table);
+
+    ts.exec("insert 9255 doc content 'abcde'");
+    ts.exec("insert 9255 doc url file://foo.txt");
+    ts.exec("insert 8934 doc content 'accumulo scales'");
+    ts.exec("insert 8934 doc url file://accumulo_notes.txt");
+    ts.exec("insert 2317 doc content 'milk, eggs, bread, parmigiano-reggiano'");
+    ts.exec("insert 2317 doc url file://groceries/9.txt");
+    ts.exec("insert 3900 doc content 'EC2 ate my homework'");
+    ts.exec("insert 3900 doc uril file://final_project.txt");
+
+    String clone1 = table + "_clone_1";
+    ts.exec("clonetable -s table.sampler.opt.hasher=murmur3_32,table.sampler.opt.modulus=3,table.sampler=org.apache.accumulo.core.sample.RowSampler " + table
+        + " " + clone1);
+
+    ts.exec("compact -t " + clone1 + " -w --sf-no-sample");
+
+    ts.exec("table " + clone1);
+    ts.exec("scan --sample", true, "parmigiano-reggiano", true);
+    ts.exec("grep --sample reg", true, "parmigiano-reggiano", true);
+    ts.exec("scan --sample", true, "accumulo", false);
+    ts.exec("grep --sample acc", true, "accumulo", false);
+
+    // create table where table sample config differs from whats in file
+    String clone2 = table + "_clone_2";
+    ts.exec("clonetable -s table.sampler.opt.hasher=murmur3_32,table.sampler.opt.modulus=2,table.sampler=org.apache.accumulo.core.sample.RowSampler " + clone1
+        + " " + clone2);
+
+    ts.exec("table " + clone2);
+    ts.exec("scan --sample", false, "SampleNotPresentException", true);
+    ts.exec("grep --sample reg", false, "SampleNotPresentException", true);
+
+    ts.exec("compact -t " + clone2 + " -w --sf-no-sample");
+
+    for (String expected : Arrays.asList("2317", "3900", "9255")) {
+      ts.exec("scan --sample", true, expected, true);
+      ts.exec("grep --sample " + expected.substring(0, 2), true, expected, true);
+    }
+
+    ts.exec("scan --sample", true, "8934", false);
+    ts.exec("grep --sample 89", true, "8934", false);
+  }
+
+  @Test
   public void constraint() throws Exception {
     final String table = name.getMethodName();
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/test/src/main/java/org/apache/accumulo/test/functional/ExamplesIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ExamplesIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ExamplesIT.java
index 71ddbcd..826907c 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ExamplesIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ExamplesIT.java
@@ -17,6 +17,7 @@
 package org.apache.accumulo.test.functional;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -102,7 +103,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Iterators;
-import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 
 public class ExamplesIT extends AccumuloClusterHarness {
   private static final Logger log = LoggerFactory.getLogger(ExamplesIT.class);
@@ -390,7 +390,7 @@ public class ExamplesIT extends AccumuloClusterHarness {
     Index.index(30, src, "\\W+", bw);
     bw.close();
     BatchScanner bs = c.createBatchScanner(shard, Authorizations.EMPTY, 4);
-    List<String> found = Query.query(bs, Arrays.asList("foo", "bar"));
+    List<String> found = Query.query(bs, Arrays.asList("foo", "bar"), null);
     bs.close();
     // should find ourselves
     boolean thisFile = false;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
index 485d6d2..3098251 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java
@@ -430,8 +430,8 @@ public class ReadWriteIT extends AccumuloClusterHarness {
         PrintInfo.main(args.toArray(new String[args.size()]));
         newOut.flush();
         String stdout = baos.toString();
-        assertTrue(stdout.contains("Locality group         : g1"));
-        assertTrue(stdout.contains("families      : [colf]"));
+        assertTrue(stdout.contains("Locality group           : g1"));
+        assertTrue(stdout.contains("families        : [colf]"));
       } finally {
         newOut.close();
         System.setOut(oldOut);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
index 7a4223d..dd085cc 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloFileOutputFormatIT.java
@@ -30,15 +30,23 @@ import java.io.IOException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
 import org.apache.accumulo.core.client.mapred.AccumuloFileOutputFormat;
 import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.lib.impl.ConfiguratorBase;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.sample.RowSampler;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -65,6 +73,9 @@ public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
   private static AssertionError e1 = null;
   private static AssertionError e2 = null;
 
+  private static final SamplerConfiguration SAMPLER_CONFIG = new SamplerConfiguration(RowSampler.class.getName()).addOption("hasher", "murmur3_32").addOption(
+      "modulus", "3");
+
   @Rule
   public TemporaryFolder folder = new TemporaryFolder(new File(System.getProperty("user.dir") + "/target"));
 
@@ -141,6 +152,7 @@ public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
       AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
       AccumuloInputFormat.setInputTableName(job, table);
       AccumuloFileOutputFormat.setOutputPath(job, new Path(args[1]));
+      AccumuloFileOutputFormat.setSampler(job, SAMPLER_CONFIG);
 
       job.setMapperClass(BAD_TABLE.equals(table) ? BadKeyMapper.class : IdentityMapper.class);
       job.setMapOutputKeyClass(Key.class);
@@ -177,6 +189,12 @@ public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
     if (content) {
       assertEquals(1, files.length);
       assertTrue(files[0].exists());
+
+      Configuration conf = CachedConfiguration.getInstance();
+      DefaultConfiguration acuconf = DefaultConfiguration.getInstance();
+      FileSKVIterator sample = RFileOperations.getInstance().openReader(files[0].toString(), false, FileSystem.get(conf), conf, acuconf)
+          .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
+      assertNotNull(sample);
     } else {
       assertEquals(0, files.length);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java
index 2cef382..cd80139 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapred/AccumuloInputFormatIT.java
@@ -27,11 +27,14 @@ import java.util.Collections;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
 import org.apache.accumulo.core.client.mapred.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapred.RangeInputSplit;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.sample.RowSampler;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
@@ -60,7 +63,9 @@ public class AccumuloInputFormatIT extends AccumuloClusterHarness {
   }
 
   private static AssertionError e1 = null;
+  private static int e1Count = 0;
   private static AssertionError e2 = null;
+  private static int e2Count = 0;
 
   private static class MRTester extends Configured implements Tool {
     private static class TestMapper implements Mapper<Key,Value,Key,Value> {
@@ -76,6 +81,7 @@ public class AccumuloInputFormatIT extends AccumuloClusterHarness {
           assertEquals(new String(v.get()), String.format("%09x", count));
         } catch (AssertionError e) {
           e1 = e;
+          e1Count++;
         }
         key = new Key(k);
         count++;
@@ -90,6 +96,7 @@ public class AccumuloInputFormatIT extends AccumuloClusterHarness {
           assertEquals(100, count);
         } catch (AssertionError e) {
           e2 = e;
+          e2Count++;
         }
       }
 
@@ -98,11 +105,17 @@ public class AccumuloInputFormatIT extends AccumuloClusterHarness {
     @Override
     public int run(String[] args) throws Exception {
 
-      if (args.length != 1) {
-        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <table>");
+      if (args.length != 1 && args.length != 3) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <table> [<batchScan> <scan sample>]");
       }
 
       String table = args[0];
+      Boolean batchScan = false;
+      boolean sample = false;
+      if (args.length == 3) {
+        batchScan = Boolean.parseBoolean(args[1]);
+        sample = Boolean.parseBoolean(args[2]);
+      }
 
       JobConf job = new JobConf(getConf());
       job.setJarByClass(this.getClass());
@@ -112,6 +125,10 @@ public class AccumuloInputFormatIT extends AccumuloClusterHarness {
       AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
       AccumuloInputFormat.setInputTableName(job, table);
       AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+      AccumuloInputFormat.setBatchScan(job, batchScan);
+      if (sample) {
+        AccumuloInputFormat.setSamplerConfiguration(job, SAMPLER_CONFIG);
+      }
 
       job.setMapperClass(TestMapper.class);
       job.setMapOutputKeyClass(Key.class);
@@ -143,11 +160,47 @@ public class AccumuloInputFormatIT extends AccumuloClusterHarness {
     }
     bw.close();
 
+    e1 = null;
+    e2 = null;
+
     MRTester.main(table);
     assertNull(e1);
     assertNull(e2);
   }
 
+  private static final SamplerConfiguration SAMPLER_CONFIG = new SamplerConfiguration(RowSampler.class.getName()).addOption("hasher", "murmur3_32").addOption(
+      "modulus", "3");
+
+  @Test
+  public void testSample() throws Exception {
+    final String TEST_TABLE_3 = getUniqueNames(1)[0];
+
+    Connector c = getConnector();
+    c.tableOperations().create(TEST_TABLE_3, new NewTableConfiguration().enableSampling(SAMPLER_CONFIG));
+    BatchWriter bw = c.createBatchWriter(TEST_TABLE_3, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    MRTester.main(TEST_TABLE_3, "False", "True");
+    Assert.assertEquals(38, e1Count);
+    Assert.assertEquals(1, e2Count);
+
+    e2Count = e1Count = 0;
+    MRTester.main(TEST_TABLE_3, "False", "False");
+    Assert.assertEquals(0, e1Count);
+    Assert.assertEquals(0, e2Count);
+
+    e2Count = e1Count = 0;
+    MRTester.main(TEST_TABLE_3, "True", "True");
+    Assert.assertEquals(38, e1Count);
+    Assert.assertEquals(1, e2Count);
+
+  }
+
   @Test
   public void testCorrectRangeInputSplits() throws Exception {
     JobConf job = new JobConf();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
index 8f53378..d00a9b3 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloFileOutputFormatIT.java
@@ -27,14 +27,22 @@ import java.io.IOException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
 import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.file.FileSKVIterator;
+import org.apache.accumulo.core.file.rfile.RFileOperations;
+import org.apache.accumulo.core.sample.RowSampler;
+import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
+import org.apache.accumulo.core.util.CachedConfiguration;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
@@ -55,6 +63,9 @@ public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
   private String TEST_TABLE;
   private String EMPTY_TABLE;
 
+  private static final SamplerConfiguration SAMPLER_CONFIG = new SamplerConfiguration(RowSampler.class.getName()).addOption("hasher", "murmur3_32").addOption(
+      "modulus", "3");
+
   @Override
   protected int defaultTimeoutSeconds() {
     return 4 * 60;
@@ -152,6 +163,7 @@ public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
       AccumuloInputFormat.setInputTableName(job, table);
       AccumuloInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
       AccumuloFileOutputFormat.setOutputPath(job, new Path(args[1]));
+      AccumuloFileOutputFormat.setSampler(job, SAMPLER_CONFIG);
 
       job.setMapperClass(table.endsWith("_mapreduce_bad_table") ? BadKeyMapper.class : Mapper.class);
       job.setMapOutputKeyClass(Key.class);
@@ -189,6 +201,12 @@ public class AccumuloFileOutputFormatIT extends AccumuloClusterHarness {
     if (content) {
       assertEquals(1, files.length);
       assertTrue(files[0].exists());
+
+      Configuration conf = CachedConfiguration.getInstance();
+      DefaultConfiguration acuconf = DefaultConfiguration.getInstance();
+      FileSKVIterator sample = RFileOperations.getInstance().openReader(files[0].toString(), false, FileSystem.get(conf), conf, acuconf)
+          .getSample(new SamplerConfigurationImpl(SAMPLER_CONFIG));
+      assertNotNull(sample);
     } else {
       assertEquals(0, files.length);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java
index 1ca4f92..0a5bd68 100644
--- a/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/AccumuloInputFormatIT.java
@@ -39,6 +39,8 @@ import org.apache.accumulo.core.client.ClientConfiguration;
 import org.apache.accumulo.core.client.ClientConfiguration.ClientProperty;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.client.admin.SamplerConfiguration;
 import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
 import org.apache.accumulo.core.client.mapreduce.RangeInputSplit;
 import org.apache.accumulo.core.client.mapreduce.impl.BatchInputSplit;
@@ -51,6 +53,7 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.sample.RowSampler;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.harness.AccumuloClusterHarness;
@@ -270,15 +273,18 @@ public class AccumuloInputFormatIT extends AccumuloClusterHarness {
     @Override
     public int run(String[] args) throws Exception {
 
-      if (args.length != 2 && args.length != 3) {
-        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <table> <inputFormatClass> [<batchScan>]");
+      if (args.length != 2 && args.length != 4) {
+        throw new IllegalArgumentException("Usage : " + MRTester.class.getName() + " <table> <inputFormatClass> [<batchScan> <scan sample>]");
       }
 
       String table = args[0];
       String inputFormatClassName = args[1];
       Boolean batchScan = false;
-      if (args.length == 3)
+      boolean sample = false;
+      if (args.length == 4) {
         batchScan = Boolean.parseBoolean(args[2]);
+        sample = Boolean.parseBoolean(args[3]);
+      }
 
       assertionErrors.put(table + "_map", new AssertionError("Dummy_map"));
       assertionErrors.put(table + "_cleanup", new AssertionError("Dummy_cleanup"));
@@ -296,6 +302,9 @@ public class AccumuloInputFormatIT extends AccumuloClusterHarness {
       AccumuloInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
       AccumuloInputFormat.setInputTableName(job, table);
       AccumuloInputFormat.setBatchScan(job, batchScan);
+      if (sample) {
+        AccumuloInputFormat.setSamplerConfiguration(job, SAMPLER_CONFIG);
+      }
 
       job.setMapperClass(TestMapper.class);
       job.setMapOutputKeyClass(Key.class);
@@ -335,6 +344,38 @@ public class AccumuloInputFormatIT extends AccumuloClusterHarness {
     assertEquals(1, assertionErrors.get(TEST_TABLE_1 + "_cleanup").size());
   }
 
+  private static final SamplerConfiguration SAMPLER_CONFIG = new SamplerConfiguration(RowSampler.class.getName()).addOption("hasher", "murmur3_32").addOption(
+      "modulus", "3");
+
+  @Test
+  public void testSample() throws Exception {
+    final String TEST_TABLE_3 = getUniqueNames(1)[0];
+
+    Connector c = getConnector();
+    c.tableOperations().create(TEST_TABLE_3, new NewTableConfiguration().enableSampling(SAMPLER_CONFIG));
+    BatchWriter bw = c.createBatchWriter(TEST_TABLE_3, new BatchWriterConfig());
+    for (int i = 0; i < 100; i++) {
+      Mutation m = new Mutation(new Text(String.format("%09x", i + 1)));
+      m.put(new Text(), new Text(), new Value(String.format("%09x", i).getBytes()));
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    Assert.assertEquals(0, MRTester.main(new String[] {TEST_TABLE_3, AccumuloInputFormat.class.getName(), "False", "True"}));
+    assertEquals(39, assertionErrors.get(TEST_TABLE_3 + "_map").size());
+    assertEquals(2, assertionErrors.get(TEST_TABLE_3 + "_cleanup").size());
+
+    assertionErrors.clear();
+    Assert.assertEquals(0, MRTester.main(new String[] {TEST_TABLE_3, AccumuloInputFormat.class.getName(), "False", "False"}));
+    assertEquals(1, assertionErrors.get(TEST_TABLE_3 + "_map").size());
+    assertEquals(1, assertionErrors.get(TEST_TABLE_3 + "_cleanup").size());
+
+    assertionErrors.clear();
+    Assert.assertEquals(0, MRTester.main(new String[] {TEST_TABLE_3, AccumuloInputFormat.class.getName(), "True", "True"}));
+    assertEquals(39, assertionErrors.get(TEST_TABLE_3 + "_map").size());
+    assertEquals(2, assertionErrors.get(TEST_TABLE_3 + "_cleanup").size());
+  }
+
   @Test
   public void testMapWithBatchScanner() throws Exception {
     final String TEST_TABLE_2 = getUniqueNames(1)[0];
@@ -349,7 +390,7 @@ public class AccumuloInputFormatIT extends AccumuloClusterHarness {
     }
     bw.close();
 
-    Assert.assertEquals(0, MRTester.main(new String[] {TEST_TABLE_2, AccumuloInputFormat.class.getName(), "True"}));
+    Assert.assertEquals(0, MRTester.main(new String[] {TEST_TABLE_2, AccumuloInputFormat.class.getName(), "True", "False"}));
     assertEquals(1, assertionErrors.get(TEST_TABLE_2 + "_map").size());
     assertEquals(1, assertionErrors.get(TEST_TABLE_2 + "_cleanup").size());
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/45f18c17/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
index ef05f37..559703f 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java
@@ -55,6 +55,7 @@ import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
 import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
 import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
 import org.apache.accumulo.core.tabletserver.thrift.TDurability;
+import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
@@ -77,6 +78,7 @@ import org.apache.thrift.TException;
 
 import com.beust.jcommander.Parameter;
 import com.google.common.net.HostAndPort;
+
 import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 
 /**
@@ -136,14 +138,14 @@ public class NullTserver {
 
     @Override
     public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, Map<TKeyExtent,List<TRange>> batch, List<TColumn> columns,
-        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, long batchTimeOut) {
+        List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, TSamplerConfiguration tsc, long batchTimeOut) {
       return null;
     }
 
     @Override
     public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent extent, TRange range, List<TColumn> columns, int batchSize,
         List<IterInfo> ssiList, Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites, boolean isolated,
-        long readaheadThreshold, long batchTimeOut) {
+        long readaheadThreshold, TSamplerConfiguration tsc, long batchTimeOut) {
       return null;
     }
 


Mime
View raw message