accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mwa...@apache.org
Subject [2/7] accumulo-examples git commit: ACCUMULO-4511 Adding examples from Accumulo repo
Date Fri, 09 Dec 2016 17:12:15 GMT
http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/test/java/org/apache/accumulo/examples/ExamplesIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/examples/ExamplesIT.java b/src/test/java/org/apache/accumulo/examples/ExamplesIT.java
new file mode 100644
index 0000000..51f2473
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/examples/ExamplesIT.java
@@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples;
+
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.accumulo.cluster.standalone.StandaloneAccumuloCluster;
+import org.apache.accumulo.cluster.standalone.StandaloneClusterControl;
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
+import org.apache.accumulo.core.client.security.tokens.KerberosToken;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.AgeOffFilter;
+import org.apache.accumulo.core.iterators.user.SummingCombiner;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.examples.client.Flush;
+import org.apache.accumulo.examples.client.RandomBatchScanner;
+import org.apache.accumulo.examples.client.RandomBatchWriter;
+import org.apache.accumulo.examples.client.ReadWriteExample;
+import org.apache.accumulo.examples.client.RowOperations;
+import org.apache.accumulo.examples.client.SequentialBatchWriter;
+import org.apache.accumulo.examples.client.TraceDumpExample;
+import org.apache.accumulo.examples.client.TracingExample;
+import org.apache.accumulo.examples.combiner.StatsCombiner;
+import org.apache.accumulo.examples.constraints.MaxMutationSize;
+import org.apache.accumulo.examples.dirlist.Ingest;
+import org.apache.accumulo.examples.dirlist.QueryUtil;
+import org.apache.accumulo.examples.helloworld.InsertWithBatchWriter;
+import org.apache.accumulo.examples.helloworld.ReadData;
+import org.apache.accumulo.examples.isolation.InterferenceTest;
+import org.apache.accumulo.examples.mapreduce.RegexExample;
+import org.apache.accumulo.examples.mapreduce.RowHash;
+import org.apache.accumulo.examples.mapreduce.TableToFile;
+import org.apache.accumulo.examples.mapreduce.TeraSortIngest;
+import org.apache.accumulo.examples.mapreduce.WordCount;
+import org.apache.accumulo.examples.mapreduce.bulk.BulkIngestExample;
+import org.apache.accumulo.examples.mapreduce.bulk.GenerateTestData;
+import org.apache.accumulo.examples.mapreduce.bulk.SetupTable;
+import org.apache.accumulo.examples.mapreduce.bulk.VerifyIngest;
+import org.apache.accumulo.examples.shard.ContinuousQuery;
+import org.apache.accumulo.examples.shard.Index;
+import org.apache.accumulo.examples.shard.Query;
+import org.apache.accumulo.examples.shard.Reverse;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl.LogWriter;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.start.Main;
+import org.apache.accumulo.test.TestIngest;
+import org.apache.accumulo.tracer.TraceServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.Tool;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterators;
+
+public class ExamplesIT extends AccumuloClusterHarness {
+  private static final Logger log = LoggerFactory.getLogger(ExamplesIT.class);
+  private static final BatchWriterOpts bwOpts = new BatchWriterOpts();
+  private static final BatchWriterConfig bwc = new BatchWriterConfig();
+  private static final String visibility = "A|B";
+  private static final String auths = "A,B";
+
+  Connector c;
+  String instance;
+  String keepers;
+  String user;
+  String passwd;
+  String keytab;
+  BatchWriter bw;
+  IteratorSetting is;
+  String dir;
+  FileSystem fs;
+  Authorizations origAuths;
+  boolean saslEnabled;
+
+  @Override
+  public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration hadoopConf) {
+    // 128MB * 3
+    cfg.setDefaultMemory(cfg.getDefaultMemory() * 3, MemoryUnit.BYTE);
+  }
+
+  @Before
+  public void getClusterInfo() throws Exception {
+    c = getConnector();
+    user = getAdminPrincipal();
+    AuthenticationToken token = getAdminToken();
+    if (token instanceof KerberosToken) {
+      keytab = getAdminUser().getKeytab().getAbsolutePath();
+      saslEnabled = true;
+    } else if (token instanceof PasswordToken) {
+      passwd = new String(((PasswordToken) getAdminToken()).getPassword(), UTF_8);
+      saslEnabled = false;
+    } else {
+      Assert.fail("Unknown token type: " + token);
+    }
+    fs = getCluster().getFileSystem();
+    instance = c.getInstance().getInstanceName();
+    keepers = c.getInstance().getZooKeepers();
+    dir = new Path(cluster.getTemporaryPath(), getClass().getName()).toString();
+
+    origAuths = c.securityOperations().getUserAuthorizations(user);
+    c.securityOperations().changeUserAuthorizations(user, new Authorizations(auths.split(",")));
+  }
+
+  @After
+  public void resetAuths() throws Exception {
+    if (null != origAuths) {
+      getConnector().securityOperations().changeUserAuthorizations(getAdminPrincipal(), origAuths);
+    }
+  }
+
+  @Override
+  public int defaultTimeoutSeconds() {
+    return 6 * 60;
+  }
+
+  @Test
+  public void testTrace() throws Exception {
+    Process trace = null;
+    if (ClusterType.MINI == getClusterType()) {
+      MiniAccumuloClusterImpl impl = (MiniAccumuloClusterImpl) cluster;
+      trace = impl.exec(TraceServer.class);
+      while (!c.tableOperations().exists("trace"))
+        sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+    }
+    String[] args;
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "-C", "-D", "-c"};
+    } else {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-C", "-D", "-c"};
+    }
+    Entry<Integer,String> pair = cluster.getClusterControl().execWithStdout(TracingExample.class, args);
+    Assert.assertEquals("Expected return code of zero. STDOUT=" + pair.getValue(), 0, pair.getKey().intValue());
+    String result = pair.getValue();
+    Pattern pattern = Pattern.compile("TraceID: ([0-9a-f]+)");
+    Matcher matcher = pattern.matcher(result);
+    int count = 0;
+    while (matcher.find()) {
+      if (saslEnabled) {
+        args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--traceid", matcher.group(1)};
+      } else {
+        args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--traceid", matcher.group(1)};
+      }
+      pair = cluster.getClusterControl().execWithStdout(TraceDumpExample.class, args);
+      count++;
+    }
+    assertTrue(count > 0);
+    assertTrue("Output did not contain myApp@myHost", pair.getValue().contains("myApp@myHost"));
+    if (ClusterType.MINI == getClusterType() && null != trace) {
+      trace.destroy();
+    }
+  }
+
+  @Test
+  public void testClasspath() throws Exception {
+    Entry<Integer,String> entry = getCluster().getClusterControl().execWithStdout(Main.class, new String[] {"classpath"});
+    assertEquals(0, entry.getKey().intValue());
+    String result = entry.getValue();
+    int level1 = result.indexOf("Level 1");
+    int level2 = result.indexOf("Level 2");
+    int level3 = result.indexOf("Level 3");
+    int level4 = result.indexOf("Level 4");
+    assertTrue("Level 1 classloader not present.", level1 >= 0);
+    assertTrue("Level 2 classloader not present.", level2 > 0);
+    assertTrue("Level 3 classloader not present.", level3 > 0);
+    assertTrue("Level 4 classloader not present.", level4 > 0);
+    assertTrue(level1 < level2);
+    assertTrue(level2 < level3);
+    assertTrue(level3 < level4);
+  }
+
+  @Test
+  public void testDirList() throws Exception {
+    String[] names = getUniqueNames(3);
+    String dirTable = names[0], indexTable = names[1], dataTable = names[2];
+    String[] args;
+    String dirListDirectory;
+    switch (getClusterType()) {
+      case MINI:
+        dirListDirectory = ((MiniAccumuloClusterImpl) getCluster()).getConfig().getDir().getAbsolutePath();
+        break;
+      case STANDALONE:
+        dirListDirectory = ((StandaloneAccumuloCluster) getCluster()).getAccumuloHome();
+        break;
+      default:
+        throw new RuntimeException("Unknown cluster type");
+    }
+    assumeTrue(new File(dirListDirectory).exists());
+    // Index a directory listing on /tmp. If this is running against a standalone cluster, we can't guarantee Accumulo source will be there.
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--dirTable", dirTable, "--indexTable", indexTable, "--dataTable",
+          dataTable, "--vis", visibility, "--chunkSize", Integer.toString(10000), dirListDirectory};
+    } else {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--dirTable", dirTable, "--indexTable", indexTable, "--dataTable",
+          dataTable, "--vis", visibility, "--chunkSize", Integer.toString(10000), dirListDirectory};
+    }
+    Entry<Integer,String> entry = getClusterControl().execWithStdout(Ingest.class, args);
+    assertEquals("Got non-zero return code. Stdout=" + entry.getValue(), 0, entry.getKey().intValue());
+
+    String expectedFile;
+    switch (getClusterType()) {
+      case MINI:
+        // Should be present in a minicluster dir
+        expectedFile = "accumulo-site.xml";
+        break;
+      case STANDALONE:
+        // Should be in place on standalone installs (not having to follow symlinks)
+        expectedFile = "LICENSE";
+        break;
+      default:
+        throw new RuntimeException("Unknown cluster type");
+    }
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-z", keepers, "--keytab", keytab, "-u", user, "-t", indexTable, "--auths", auths, "--search", "--path",
+          expectedFile};
+    } else {
+      args = new String[] {"-i", instance, "-z", keepers, "-p", passwd, "-u", user, "-t", indexTable, "--auths", auths, "--search", "--path", expectedFile};
+    }
+    entry = getClusterControl().execWithStdout(QueryUtil.class, args);
+    if (ClusterType.MINI == getClusterType()) {
+      MiniAccumuloClusterImpl impl = (MiniAccumuloClusterImpl) cluster;
+      for (LogWriter writer : impl.getLogWriters()) {
+        writer.flush();
+      }
+    }
+
+    log.info("result " + entry.getValue());
+    assertEquals(0, entry.getKey().intValue());
+    assertTrue(entry.getValue().contains(expectedFile));
+  }
+
+  @Test
+  public void testAgeoffFilter() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    is = new IteratorSetting(10, AgeOffFilter.class);
+    AgeOffFilter.setTTL(is, 1000L);
+    c.tableOperations().attachIterator(tableName, is);
+    sleepUninterruptibly(500, TimeUnit.MILLISECONDS); // let zookeeper updates propagate.
+    bw = c.createBatchWriter(tableName, bwc);
+    Mutation m = new Mutation("foo");
+    m.put("a", "b", "c");
+    bw.addMutation(m);
+    bw.close();
+    sleepUninterruptibly(1, TimeUnit.SECONDS);
+    assertEquals(0, Iterators.size(c.createScanner(tableName, Authorizations.EMPTY).iterator()));
+  }
+
+  @Test
+  public void testStatsCombiner() throws Exception {
+    String table = getUniqueNames(1)[0];
+    c.tableOperations().create(table);
+    is = new IteratorSetting(10, StatsCombiner.class);
+    StatsCombiner.setCombineAllColumns(is, true);
+
+    c.tableOperations().attachIterator(table, is);
+    bw = c.createBatchWriter(table, bwc);
+    // Write two mutations otherwise the NativeMap would dedupe them into a single update
+    Mutation m = new Mutation("foo");
+    m.put("a", "b", "1");
+    bw.addMutation(m);
+    m = new Mutation("foo");
+    m.put("a", "b", "3");
+    bw.addMutation(m);
+    bw.flush();
+
+    Iterator<Entry<Key,Value>> iter = c.createScanner(table, Authorizations.EMPTY).iterator();
+    assertTrue("Iterator had no results", iter.hasNext());
+    Entry<Key,Value> e = iter.next();
+    assertEquals("Results ", "1,3,4,2", e.getValue().toString());
+    assertFalse("Iterator had additional results", iter.hasNext());
+
+    m = new Mutation("foo");
+    m.put("a", "b", "0,20,20,2");
+    bw.addMutation(m);
+    bw.close();
+
+    iter = c.createScanner(table, Authorizations.EMPTY).iterator();
+    assertTrue("Iterator had no results", iter.hasNext());
+    e = iter.next();
+    assertEquals("Results ", "0,20,24,4", e.getValue().toString());
+    assertFalse("Iterator had additional results", iter.hasNext());
+  }
+
+  @Test
+  public void testBloomFilters() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    c.tableOperations().setProperty(tableName, Property.TABLE_BLOOM_ENABLED.getKey(), "true");
+    String[] args;
+    if (saslEnabled) {
+      args = new String[] {"--seed", "7", "-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--num", "100000", "--min", "0", "--max",
+          "1000000000", "--size", "50", "--batchMemory", "2M", "--batchLatency", "60s", "--batchThreads", "3", "-t", tableName};
+    } else {
+      args = new String[] {"--seed", "7", "-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--num", "100000", "--min", "0", "--max", "1000000000",
+          "--size", "50", "--batchMemory", "2M", "--batchLatency", "60s", "--batchThreads", "3", "-t", tableName};
+    }
+    goodExec(RandomBatchWriter.class, args);
+    c.tableOperations().flush(tableName, null, null, true);
+    long diff = 0, diff2 = 0;
+    // try the speed test a couple times in case the system is loaded with other tests
+    for (int i = 0; i < 2; i++) {
+      long now = System.currentTimeMillis();
+      if (saslEnabled) {
+        args = new String[] {"--seed", "7", "-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--num", "10000", "--min", "0", "--max",
+            "1000000000", "--size", "50", "--scanThreads", "4", "-t", tableName};
+      } else {
+        args = new String[] {"--seed", "7", "-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--num", "10000", "--min", "0", "--max", "1000000000",
+            "--size", "50", "--scanThreads", "4", "-t", tableName};
+      }
+      goodExec(RandomBatchScanner.class, args);
+      diff = System.currentTimeMillis() - now;
+      now = System.currentTimeMillis();
+      if (saslEnabled) {
+        args = new String[] {"--seed", "8", "-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--num", "10000", "--min", "0", "--max",
+            "1000000000", "--size", "50", "--scanThreads", "4", "-t", tableName};
+      } else {
+        args = new String[] {"--seed", "8", "-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--num", "10000", "--min", "0", "--max", "1000000000",
+            "--size", "50", "--scanThreads", "4", "-t", tableName};
+      }
+      int retCode = getClusterControl().exec(RandomBatchScanner.class, args);
+      assertEquals(1, retCode);
+      diff2 = System.currentTimeMillis() - now;
+      if (diff2 < diff)
+        break;
+    }
+    assertTrue(diff2 < diff);
+  }
+
+  @Test
+  public void testShardedIndex() throws Exception {
+    File src = new File(System.getProperty("user.dir") + "/src");
+    assumeTrue(src.exists());
+    String[] names = getUniqueNames(3);
+    final String shard = names[0], index = names[1];
+    c.tableOperations().create(shard);
+    c.tableOperations().create(index);
+    bw = c.createBatchWriter(shard, bwc);
+    Index.index(30, src, "\\W+", bw);
+    bw.close();
+    BatchScanner bs = c.createBatchScanner(shard, Authorizations.EMPTY, 4);
+    List<String> found = Query.query(bs, Arrays.asList("foo", "bar"), null);
+    bs.close();
+    // should find ourselves
+    boolean thisFile = false;
+    for (String file : found) {
+      if (file.endsWith("/ExamplesIT.java"))
+        thisFile = true;
+    }
+    assertTrue(thisFile);
+
+    String[] args;
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-z", keepers, "--shardTable", shard, "--doc2Term", index, "-u", user, "--keytab", keytab};
+    } else {
+      args = new String[] {"-i", instance, "-z", keepers, "--shardTable", shard, "--doc2Term", index, "-u", getAdminPrincipal(), "-p", passwd};
+    }
+    // create a reverse index
+    goodExec(Reverse.class, args);
+
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-z", keepers, "--shardTable", shard, "--doc2Term", index, "-u", user, "--keytab", keytab, "--terms", "5",
+          "--count", "1000"};
+    } else {
+      args = new String[] {"-i", instance, "-z", keepers, "--shardTable", shard, "--doc2Term", index, "-u", user, "-p", passwd, "--terms", "5", "--count",
+          "1000"};
+    }
+    // run some queries
+    goodExec(ContinuousQuery.class, args);
+  }
+
+  @Test
+  public void testMaxMutationConstraint() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    c.tableOperations().addConstraint(tableName, MaxMutationSize.class.getName());
+    TestIngest.Opts opts = new TestIngest.Opts();
+    opts.rows = 1;
+    opts.cols = 1000;
+    opts.setTableName(tableName);
+    if (saslEnabled) {
+      opts.updateKerberosCredentials(cluster.getClientConfig());
+    } else {
+      opts.setPrincipal(getAdminPrincipal());
+    }
+    try {
+      TestIngest.ingest(c, opts, bwOpts);
+    } catch (MutationsRejectedException ex) {
+      assertEquals(1, ex.getConstraintViolationSummaries().size());
+    }
+  }
+
+  @Test
+  public void testBulkIngest() throws Exception {
+    // TODO Figure out a way to run M/R with Kerberos
+    assumeTrue(getAdminToken() instanceof PasswordToken);
+    String tableName = getUniqueNames(1)[0];
+    FileSystem fs = getFileSystem();
+    Path p = new Path(dir, "tmp");
+    if (fs.exists(p)) {
+      fs.delete(p, true);
+    }
+    goodExec(GenerateTestData.class, "--start-row", "0", "--count", "10000", "--output", dir + "/tmp/input/data");
+
+    List<String> commonArgs = new ArrayList<>(Arrays.asList(new String[] {"-i", instance, "-z", keepers, "-u", user, "--table", tableName}));
+    if (saslEnabled) {
+      commonArgs.add("--keytab");
+      commonArgs.add(keytab);
+    } else {
+      commonArgs.add("-p");
+      commonArgs.add(passwd);
+    }
+
+    List<String> args = new ArrayList<>(commonArgs);
+    goodExec(SetupTable.class, args.toArray(new String[0]));
+
+    args = new ArrayList<>(commonArgs);
+    args.addAll(Arrays.asList(new String[] {"--inputDir", dir + "/tmp/input", "--workDir", dir + "/tmp"}));
+    goodExec(BulkIngestExample.class, args.toArray(new String[0]));
+
+    args = new ArrayList<>(commonArgs);
+    args.addAll(Arrays.asList(new String[] {"--start-row", "0", "--count", "10000"}));
+    goodExec(VerifyIngest.class, args.toArray(new String[0]));
+  }
+
+  @Test
+  public void testTeraSortAndRead() throws Exception {
+    // TODO Figure out a way to run M/R with Kerberos
+    assumeTrue(getAdminToken() instanceof PasswordToken);
+    String tableName = getUniqueNames(1)[0];
+    String[] args;
+    if (saslEnabled) {
+      args = new String[] {"--count", (1000 * 1000) + "", "-nk", "10", "-xk", "10", "-nv", "10", "-xv", "10", "-t", tableName, "-i", instance, "-z", keepers,
+          "-u", user, "--keytab", keytab, "--splits", "4"};
+    } else {
+      args = new String[] {"--count", (1000 * 1000) + "", "-nk", "10", "-xk", "10", "-nv", "10", "-xv", "10", "-t", tableName, "-i", instance, "-z", keepers,
+          "-u", user, "-p", passwd, "--splits", "4"};
+    }
+    goodExec(TeraSortIngest.class, args);
+    Path output = new Path(dir, "tmp/nines");
+    if (fs.exists(output)) {
+      fs.delete(output, true);
+    }
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "-t", tableName, "--rowRegex", ".*999.*", "--output",
+          output.toString()};
+    } else {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", tableName, "--rowRegex", ".*999.*", "--output", output.toString()};
+    }
+    goodExec(RegexExample.class, args);
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "-t", tableName, "--column", "c:"};
+    } else {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", tableName, "--column", "c:"};
+    }
+    goodExec(RowHash.class, args);
+    output = new Path(dir, "tmp/tableFile");
+    if (fs.exists(output)) {
+      fs.delete(output, true);
+    }
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "-t", tableName, "--output", output.toString()};
+    } else {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", tableName, "--output", output.toString()};
+    }
+    goodExec(TableToFile.class, args);
+  }
+
+  @Test
+  public void testWordCount() throws Exception {
+    // TODO Figure out a way to run M/R with Kerberos
+    assumeTrue(getAdminToken() instanceof PasswordToken);
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    is = new IteratorSetting(10, SummingCombiner.class);
+    SummingCombiner.setColumns(is, Collections.singletonList(new IteratorSetting.Column(new Text("count"))));
+    SummingCombiner.setEncodingType(is, SummingCombiner.Type.STRING);
+    c.tableOperations().attachIterator(tableName, is);
+    Path readme = new Path(new Path(System.getProperty("user.dir")).getParent(), "README.md");
+    if (!new File(readme.toString()).exists()) {
+      log.info("Not running test: README.md does not exist)");
+      return;
+    }
+    fs.copyFromLocalFile(readme, new Path(dir + "/tmp/wc/README.md"));
+    String[] args;
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-u", user, "--keytab", keytab, "-z", keepers, "--input", dir + "/tmp/wc", "-t", tableName};
+    } else {
+      args = new String[] {"-i", instance, "-u", user, "-p", passwd, "-z", keepers, "--input", dir + "/tmp/wc", "-t", tableName};
+    }
+    goodExec(WordCount.class, args);
+  }
+
+  @Test
+  public void testInsertWithBatchWriterAndReadData() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    String[] args;
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "-t", tableName};
+    } else {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", tableName};
+    }
+    goodExec(InsertWithBatchWriter.class, args);
+    goodExec(ReadData.class, args);
+  }
+
+  @Test
+  public void testIsolatedScansWithInterference() throws Exception {
+    String[] args;
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "-t", getUniqueNames(1)[0], "--iterations", "100000", "--isolated"};
+    } else {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", getUniqueNames(1)[0], "--iterations", "100000", "--isolated"};
+    }
+    goodExec(InterferenceTest.class, args);
+  }
+
+  @Test
+  public void testScansWithInterference() throws Exception {
+    String[] args;
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "-t", getUniqueNames(1)[0], "--iterations", "100000"};
+    } else {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", getUniqueNames(1)[0], "--iterations", "100000"};
+    }
+    goodExec(InterferenceTest.class, args);
+  }
+
+  @Test
+  public void testRowOperations() throws Exception {
+    String[] args;
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab};
+    } else {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd};
+    }
+    goodExec(RowOperations.class, args);
+  }
+
+  @Test
+  public void testBatchWriter() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    String[] args;
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "-t", tableName, "--start", "0", "--num", "100000", "--size", "50",
+          "--batchMemory", "10000000", "--batchLatency", "1000", "--batchThreads", "4", "--vis", visibility};
+    } else {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "-t", tableName, "--start", "0", "--num", "100000", "--size", "50",
+          "--batchMemory", "10000000", "--batchLatency", "1000", "--batchThreads", "4", "--vis", visibility};
+    }
+    goodExec(SequentialBatchWriter.class, args);
+
+  }
+
+  @Test
+  public void testReadWriteAndDelete() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    String[] args;
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--auths", auths, "--table", tableName, "--createtable", "-c",
+          "--debug"};
+    } else {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--auths", auths, "--table", tableName, "--createtable", "-c", "--debug"};
+    }
+    goodExec(ReadWriteExample.class, args);
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--auths", auths, "--table", tableName, "-d", "--debug"};
+    } else {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--auths", auths, "--table", tableName, "-d", "--debug"};
+    }
+    goodExec(ReadWriteExample.class, args);
+
+  }
+
+  @Test
+  public void testRandomBatchesAndFlush() throws Exception {
+    String tableName = getUniqueNames(1)[0];
+    c.tableOperations().create(tableName);
+    String[] args;
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--table", tableName, "--num", "100000", "--min", "0", "--max",
+          "100000", "--size", "100", "--batchMemory", "1000000", "--batchLatency", "1000", "--batchThreads", "4", "--vis", visibility};
+    } else {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--table", tableName, "--num", "100000", "--min", "0", "--max", "100000",
+          "--size", "100", "--batchMemory", "1000000", "--batchLatency", "1000", "--batchThreads", "4", "--vis", visibility};
+    }
+    goodExec(RandomBatchWriter.class, args);
+
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--table", tableName, "--num", "10000", "--min", "0", "--max",
+          "100000", "--size", "100", "--scanThreads", "4", "--auths", auths};
+    } else {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--table", tableName, "--num", "10000", "--min", "0", "--max", "100000",
+          "--size", "100", "--scanThreads", "4", "--auths", auths};
+    }
+    goodExec(RandomBatchScanner.class, args);
+
+    if (saslEnabled) {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "--keytab", keytab, "--table", tableName};
+    } else {
+      args = new String[] {"-i", instance, "-z", keepers, "-u", user, "-p", passwd, "--table", tableName};
+    }
+    goodExec(Flush.class, args);
+  }
+
+  private void goodExec(Class<?> theClass, String... args) throws InterruptedException, IOException {
+    Entry<Integer,String> pair;
+    if (Tool.class.isAssignableFrom(theClass) && ClusterType.STANDALONE == getClusterType()) {
+      StandaloneClusterControl control = (StandaloneClusterControl) getClusterControl();
+      pair = control.execMapreduceWithStdout(theClass, args);
+    } else {
+      // We're already slurping stdout into memory (not redirecting to file). Might as well add it to error message.
+      pair = getClusterControl().execWithStdout(theClass, args);
+    }
+    Assert.assertEquals("stdout=" + pair.getValue(), 0, pair.getKey().intValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/test/java/org/apache/accumulo/examples/constraints/AlphaNumKeyConstraintTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/examples/constraints/AlphaNumKeyConstraintTest.java b/src/test/java/org/apache/accumulo/examples/constraints/AlphaNumKeyConstraintTest.java
new file mode 100644
index 0000000..0f4407f
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/examples/constraints/AlphaNumKeyConstraintTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.constraints;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class AlphaNumKeyConstraintTest {
+
+  private AlphaNumKeyConstraint ankc = new AlphaNumKeyConstraint();
+
+  @Test
+  public void test() {
+    Mutation goodMutation = new Mutation(new Text("Row1"));
+    goodMutation.put(new Text("Colf2"), new Text("ColQ3"), new Value("value".getBytes()));
+    assertNull(ankc.check(null, goodMutation));
+
+    // Check that violations are in row, cf, cq order
+    Mutation badMutation = new Mutation(new Text("Row#1"));
+    badMutation.put(new Text("Colf$2"), new Text("Colq%3"), new Value("value".getBytes()));
+    assertEquals(ImmutableList.of(AlphaNumKeyConstraint.NON_ALPHA_NUM_ROW, AlphaNumKeyConstraint.NON_ALPHA_NUM_COLF, AlphaNumKeyConstraint.NON_ALPHA_NUM_COLQ),
+        ankc.check(null, badMutation));
+  }
+
+  @Test
+  public void testGetViolationDescription() {
+    assertEquals(AlphaNumKeyConstraint.ROW_VIOLATION_MESSAGE, ankc.getViolationDescription(AlphaNumKeyConstraint.NON_ALPHA_NUM_ROW));
+    assertEquals(AlphaNumKeyConstraint.COLF_VIOLATION_MESSAGE, ankc.getViolationDescription(AlphaNumKeyConstraint.NON_ALPHA_NUM_COLF));
+    assertEquals(AlphaNumKeyConstraint.COLQ_VIOLATION_MESSAGE, ankc.getViolationDescription(AlphaNumKeyConstraint.NON_ALPHA_NUM_COLQ));
+    assertNull(ankc.getViolationDescription((short) 4));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/test/java/org/apache/accumulo/examples/constraints/NumericValueConstraintTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/examples/constraints/NumericValueConstraintTest.java b/src/test/java/org/apache/accumulo/examples/constraints/NumericValueConstraintTest.java
new file mode 100644
index 0000000..7004710
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/examples/constraints/NumericValueConstraintTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.constraints;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+import com.google.common.collect.Iterables;
+
+public class NumericValueConstraintTest {
+
+  private NumericValueConstraint nvc = new NumericValueConstraint();
+
+  @Test
+  public void testCheck() {
+    Mutation goodMutation = new Mutation(new Text("r"));
+    goodMutation.put(new Text("cf"), new Text("cq"), new Value("1234".getBytes()));
+    assertNull(nvc.check(null, goodMutation));
+
+    // Check that multiple bad mutations result in one violation only
+    Mutation badMutation = new Mutation(new Text("r"));
+    badMutation.put(new Text("cf"), new Text("cq"), new Value("foo1234".getBytes()));
+    badMutation.put(new Text("cf2"), new Text("cq2"), new Value("foo1234".getBytes()));
+    assertEquals(NumericValueConstraint.NON_NUMERIC_VALUE, Iterables.getOnlyElement(nvc.check(null, badMutation)).shortValue());
+  }
+
+  @Test
+  public void testGetViolationDescription() {
+    assertEquals(NumericValueConstraint.VIOLATION_MESSAGE, nvc.getViolationDescription(NumericValueConstraint.NON_NUMERIC_VALUE));
+    assertNull(nvc.getViolationDescription((short) 2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/test/java/org/apache/accumulo/examples/dirlist/CountIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/examples/dirlist/CountIT.java b/src/test/java/org/apache/accumulo/examples/dirlist/CountIT.java
new file mode 100644
index 0000000..5330fe2
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/examples/dirlist/CountIT.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.dirlist;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.ArrayList;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.examples.dirlist.FileCount;
+import org.apache.accumulo.examples.dirlist.FileCount.Opts;
+import org.apache.accumulo.examples.dirlist.Ingest;
+import org.apache.accumulo.examples.dirlist.QueryUtil;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CountIT extends ConfigurableMacBase {
+
+  private Connector conn;
+  private String tableName;
+
+  @Before
+  public void setupInstance() throws Exception {
+    tableName = getUniqueNames(1)[0];
+    conn = getConnector();
+    conn.tableOperations().create(tableName);
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    ColumnVisibility cv = new ColumnVisibility();
+    // / has 1 dir
+    // /local has 2 dirs 1 file
+    // /local/user1 has 2 files
+    bw.addMutation(Ingest.buildMutation(cv, "/local", true, false, true, 272, 12345, null));
+    bw.addMutation(Ingest.buildMutation(cv, "/local/user1", true, false, true, 272, 12345, null));
+    bw.addMutation(Ingest.buildMutation(cv, "/local/user2", true, false, true, 272, 12345, null));
+    bw.addMutation(Ingest.buildMutation(cv, "/local/file", false, false, false, 1024, 12345, null));
+    bw.addMutation(Ingest.buildMutation(cv, "/local/file", false, false, false, 1024, 23456, null));
+    bw.addMutation(Ingest.buildMutation(cv, "/local/user1/file1", false, false, false, 2024, 12345, null));
+    bw.addMutation(Ingest.buildMutation(cv, "/local/user1/file2", false, false, false, 1028, 23456, null));
+    bw.close();
+  }
+
+  @Test
+  public void test() throws Exception {
+    Scanner scanner = conn.createScanner(tableName, new Authorizations());
+    scanner.fetchColumn(new Text("dir"), new Text("counts"));
+    assertFalse(scanner.iterator().hasNext());
+
+    Opts opts = new Opts();
+    ScannerOpts scanOpts = new ScannerOpts();
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    opts.instance = conn.getInstance().getInstanceName();
+    opts.zookeepers = conn.getInstance().getZooKeepers();
+    opts.setTableName(tableName);
+    opts.setPrincipal(conn.whoami());
+    opts.setPassword(new Opts.Password(ROOT_PASSWORD));
+    FileCount fc = new FileCount(opts, scanOpts, bwOpts);
+    fc.run();
+
+    ArrayList<Pair<String,String>> expected = new ArrayList<>();
+    expected.add(new Pair<>(QueryUtil.getRow("").toString(), "1,0,3,3"));
+    expected.add(new Pair<>(QueryUtil.getRow("/local").toString(), "2,1,2,3"));
+    expected.add(new Pair<>(QueryUtil.getRow("/local/user1").toString(), "0,2,0,2"));
+    expected.add(new Pair<>(QueryUtil.getRow("/local/user2").toString(), "0,0,0,0"));
+
+    int i = 0;
+    for (Entry<Key,Value> e : scanner) {
+      assertEquals(e.getKey().getRow().toString(), expected.get(i).getFirst());
+      assertEquals(e.getValue().toString(), expected.get(i).getSecond());
+      i++;
+    }
+    assertEquals(i, expected.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/test/java/org/apache/accumulo/examples/filedata/ChunkCombinerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/examples/filedata/ChunkCombinerTest.java b/src/test/java/org/apache/accumulo/examples/filedata/ChunkCombinerTest.java
new file mode 100644
index 0000000..881bbdf
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/examples/filedata/ChunkCombinerTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.filedata;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import junit.framework.TestCase;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.PartialKey;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+
+public class ChunkCombinerTest extends TestCase {
+
+  public static class MapIterator implements SortedKeyValueIterator<Key,Value> {
+    private Iterator<Entry<Key,Value>> iter;
+    private Entry<Key,Value> entry;
+    Collection<ByteSequence> columnFamilies;
+    private SortedMap<Key,Value> map;
+    private Range range;
+
+    @Override
+    public MapIterator deepCopy(IteratorEnvironment env) {
+      return new MapIterator(map);
+    }
+
+    private MapIterator(SortedMap<Key,Value> map) {
+      this.map = map;
+      iter = map.entrySet().iterator();
+      this.range = new Range();
+      if (iter.hasNext())
+        entry = iter.next();
+      else
+        entry = null;
+    }
+
+    @Override
+    public Key getTopKey() {
+      return entry.getKey();
+    }
+
+    @Override
+    public Value getTopValue() {
+      return entry.getValue();
+    }
+
+    @Override
+    public boolean hasTop() {
+      return entry != null;
+    }
+
+    @Override
+    public void next() throws IOException {
+      entry = null;
+      while (iter.hasNext()) {
+        entry = iter.next();
+        if (columnFamilies.size() > 0 && !columnFamilies.contains(entry.getKey().getColumnFamilyData())) {
+          entry = null;
+          continue;
+        }
+        if (range.afterEndKey(entry.getKey()))
+          entry = null;
+        break;
+      }
+    }
+
+    @Override
+    public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+      if (!inclusive) {
+        throw new IllegalArgumentException("can only do inclusive colf filtering");
+      }
+      this.columnFamilies = columnFamilies;
+      this.range = range;
+
+      Key key = range.getStartKey();
+      if (key == null) {
+        key = new Key();
+      }
+
+      iter = map.tailMap(key).entrySet().iterator();
+      next();
+      while (hasTop() && range.beforeStartKey(getTopKey())) {
+        next();
+      }
+    }
+
+    @Override
+    public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private TreeMap<Key,Value> row1;
+  private TreeMap<Key,Value> row2;
+  private TreeMap<Key,Value> row3;
+  private TreeMap<Key,Value> allRows;
+
+  private TreeMap<Key,Value> cRow1;
+  private TreeMap<Key,Value> cRow2;
+  private TreeMap<Key,Value> cRow3;
+  private TreeMap<Key,Value> allCRows;
+
+  private TreeMap<Key,Value> cOnlyRow1;
+  private TreeMap<Key,Value> cOnlyRow2;
+  private TreeMap<Key,Value> cOnlyRow3;
+  private TreeMap<Key,Value> allCOnlyRows;
+
+  private TreeMap<Key,Value> badrow;
+
+  @Override
+  protected void setUp() {
+    row1 = new TreeMap<>();
+    row2 = new TreeMap<>();
+    row3 = new TreeMap<>();
+    allRows = new TreeMap<>();
+
+    cRow1 = new TreeMap<>();
+    cRow2 = new TreeMap<>();
+    cRow3 = new TreeMap<>();
+    allCRows = new TreeMap<>();
+
+    cOnlyRow1 = new TreeMap<>();
+    cOnlyRow2 = new TreeMap<>();
+    cOnlyRow3 = new TreeMap<>();
+    allCOnlyRows = new TreeMap<>();
+
+    badrow = new TreeMap<>();
+
+    String refs = FileDataIngest.REFS_CF.toString();
+    String fileext = FileDataIngest.REFS_FILE_EXT;
+    String filename = FileDataIngest.REFS_ORIG_FILE;
+    String chunk_cf = FileDataIngest.CHUNK_CF.toString();
+
+    row1.put(new Key("row1", refs, "hash1\0" + fileext, "C"), new Value("jpg".getBytes()));
+    row1.put(new Key("row1", refs, "hash1\0" + filename, "D"), new Value("foo1.jpg".getBytes()));
+    row1.put(new Key("row1", chunk_cf, "0000", "A"), new Value("V1".getBytes()));
+    row1.put(new Key("row1", chunk_cf, "0000", "B"), new Value("V1".getBytes()));
+    row1.put(new Key("row1", chunk_cf, "0001", "A"), new Value("V2".getBytes()));
+    row1.put(new Key("row1", chunk_cf, "0001", "B"), new Value("V2".getBytes()));
+
+    cRow1.put(new Key("row1", refs, "hash1\0" + fileext, "C"), new Value("jpg".getBytes()));
+    cRow1.put(new Key("row1", refs, "hash1\0" + filename, "D"), new Value("foo1.jpg".getBytes()));
+    cRow1.put(new Key("row1", chunk_cf, "0000", "(C)|(D)"), new Value("V1".getBytes()));
+    cRow1.put(new Key("row1", chunk_cf, "0001", "(C)|(D)"), new Value("V2".getBytes()));
+
+    cOnlyRow1.put(new Key("row1", chunk_cf, "0000", "(C)|(D)"), new Value("V1".getBytes()));
+    cOnlyRow1.put(new Key("row1", chunk_cf, "0001", "(C)|(D)"), new Value("V2".getBytes()));
+
+    row2.put(new Key("row2", refs, "hash1\0" + fileext, "A"), new Value("jpg".getBytes()));
+    row2.put(new Key("row2", refs, "hash1\0" + filename, "B"), new Value("foo1.jpg".getBytes()));
+    row2.put(new Key("row2", chunk_cf, "0000", "A|B"), new Value("V1".getBytes()));
+    row2.put(new Key("row2", chunk_cf, "0000", "A"), new Value("V1".getBytes()));
+    row2.put(new Key("row2", chunk_cf, "0000", "(A)|(B)"), new Value("V1".getBytes()));
+    row2.put(new Key("row2a", chunk_cf, "0000", "C"), new Value("V1".getBytes()));
+
+    cRow2.put(new Key("row2", refs, "hash1\0" + fileext, "A"), new Value("jpg".getBytes()));
+    cRow2.put(new Key("row2", refs, "hash1\0" + filename, "B"), new Value("foo1.jpg".getBytes()));
+    cRow2.put(new Key("row2", chunk_cf, "0000", "(A)|(B)"), new Value("V1".getBytes()));
+
+    cOnlyRow2.put(new Key("row2", chunk_cf, "0000", "(A)|(B)"), new Value("V1".getBytes()));
+
+    row3.put(new Key("row3", refs, "hash1\0w", "(A&B)|(C&(D|E))"), new Value("".getBytes()));
+    row3.put(new Key("row3", refs, "hash1\0x", "A&B"), new Value("".getBytes()));
+    row3.put(new Key("row3", refs, "hash1\0y", "(A&B)"), new Value("".getBytes()));
+    row3.put(new Key("row3", refs, "hash1\0z", "(F|G)&(D|E)"), new Value("".getBytes()));
+    row3.put(new Key("row3", chunk_cf, "0000", "(A&B)|(C&(D|E))", 10), new Value("V1".getBytes()));
+    row3.put(new Key("row3", chunk_cf, "0000", "A&B", 20), new Value("V1".getBytes()));
+    row3.put(new Key("row3", chunk_cf, "0000", "(A&B)", 10), new Value("V1".getBytes()));
+    row3.put(new Key("row3", chunk_cf, "0000", "(F|G)&(D|E)", 10), new Value("V1".getBytes()));
+
+    cRow3.put(new Key("row3", refs, "hash1\0w", "(A&B)|(C&(D|E))"), new Value("".getBytes()));
+    cRow3.put(new Key("row3", refs, "hash1\0x", "A&B"), new Value("".getBytes()));
+    cRow3.put(new Key("row3", refs, "hash1\0y", "(A&B)"), new Value("".getBytes()));
+    cRow3.put(new Key("row3", refs, "hash1\0z", "(F|G)&(D|E)"), new Value("".getBytes()));
+    cRow3.put(new Key("row3", chunk_cf, "0000", "((F|G)&(D|E))|(A&B)|(C&(D|E))", 20), new Value("V1".getBytes()));
+
+    cOnlyRow3.put(new Key("row3", chunk_cf, "0000", "((F|G)&(D|E))|(A&B)|(C&(D|E))", 20), new Value("V1".getBytes()));
+
+    badrow.put(new Key("row1", chunk_cf, "0000", "A"), new Value("V1".getBytes()));
+    badrow.put(new Key("row1", chunk_cf, "0000", "B"), new Value("V2".getBytes()));
+
+    allRows.putAll(row1);
+    allRows.putAll(row2);
+    allRows.putAll(row3);
+
+    allCRows.putAll(cRow1);
+    allCRows.putAll(cRow2);
+    allCRows.putAll(cRow3);
+
+    allCOnlyRows.putAll(cOnlyRow1);
+    allCOnlyRows.putAll(cOnlyRow2);
+    allCOnlyRows.putAll(cOnlyRow3);
+  }
+
+  private static final Collection<ByteSequence> emptyColfs = new HashSet<>();
+
+  public void test1() throws IOException {
+    runTest(false, allRows, allCRows, emptyColfs);
+    runTest(true, allRows, allCRows, emptyColfs);
+    runTest(false, allRows, allCOnlyRows, Collections.singleton(FileDataIngest.CHUNK_CF_BS));
+    runTest(true, allRows, allCOnlyRows, Collections.singleton(FileDataIngest.CHUNK_CF_BS));
+
+    try {
+      runTest(true, badrow, null, emptyColfs);
+      assertNotNull(null);
+    } catch (RuntimeException e) {
+      assertNull(null);
+    }
+  }
+
+  private void runTest(boolean reseek, TreeMap<Key,Value> source, TreeMap<Key,Value> result, Collection<ByteSequence> cols) throws IOException {
+    MapIterator src = new MapIterator(source);
+    SortedKeyValueIterator<Key,Value> iter = new ChunkCombiner();
+    iter.init(src, null, null);
+    iter = iter.deepCopy(null);
+    iter.seek(new Range(), cols, true);
+
+    TreeMap<Key,Value> seen = new TreeMap<>();
+
+    while (iter.hasTop()) {
+      assertFalse("already contains " + iter.getTopKey(), seen.containsKey(iter.getTopKey()));
+      seen.put(new Key(iter.getTopKey()), new Value(iter.getTopValue()));
+
+      if (reseek)
+        iter.seek(new Range(iter.getTopKey().followingKey(PartialKey.ROW_COLFAM_COLQUAL), true, null, true), cols, true);
+      else
+        iter.next();
+    }
+
+    assertEquals(result, seen);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputFormatIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputFormatIT.java b/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputFormatIT.java
new file mode 100644
index 0000000..8c23ea6
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputFormatIT.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.examples.filedata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.examples.filedata.ChunkInputFormat;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+public class ChunkInputFormatIT extends AccumuloClusterHarness {
+
+  // track errors in the map reduce job; jobs insert a dummy error for the map and cleanup tasks (to ensure test correctness),
+  // so error tests should check to see if there is at least one error (could be more depending on the test) rather than zero
+  private static Multimap<String,AssertionError> assertionErrors = ArrayListMultimap.create();
+
+  private static final Authorizations AUTHS = new Authorizations("A", "B", "C", "D");
+
+  private static List<Entry<Key,Value>> data;
+  private static List<Entry<Key,Value>> baddata;
+
+  private Connector conn;
+  private String tableName;
+
+  @Before
+  public void setupInstance() throws Exception {
+    conn = getConnector();
+    tableName = getUniqueNames(1)[0];
+    conn.securityOperations().changeUserAuthorizations(conn.whoami(), AUTHS);
+  }
+
+  @BeforeClass
+  public static void setupClass() {
+    System.setProperty("hadoop.tmp.dir", System.getProperty("user.dir") + "/target/hadoop-tmp");
+
+    data = new ArrayList<>();
+    ChunkInputStreamIT.addData(data, "a", "refs", "ida\0ext", "A&B", "ext");
+    ChunkInputStreamIT.addData(data, "a", "refs", "ida\0name", "A&B", "name");
+    ChunkInputStreamIT.addData(data, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    ChunkInputStreamIT.addData(data, "a", "~chunk", 100, 1, "A&B", "");
+    ChunkInputStreamIT.addData(data, "b", "refs", "ida\0ext", "A&B", "ext");
+    ChunkInputStreamIT.addData(data, "b", "refs", "ida\0name", "A&B", "name");
+    ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 0, "A&B", "qwertyuiop");
+    ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 0, "B&C", "qwertyuiop");
+    ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "A&B", "");
+    ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "B&C", "");
+    ChunkInputStreamIT.addData(data, "b", "~chunk", 100, 1, "D", "");
+    baddata = new ArrayList<>();
+    ChunkInputStreamIT.addData(baddata, "c", "refs", "ida\0ext", "A&B", "ext");
+    ChunkInputStreamIT.addData(baddata, "c", "refs", "ida\0name", "A&B", "name");
+  }
+
+  public static void entryEquals(Entry<Key,Value> e1, Entry<Key,Value> e2) {
+    assertEquals(e1.getKey(), e2.getKey());
+    assertEquals(e1.getValue(), e2.getValue());
+  }
+
+  public static class CIFTester extends Configured implements Tool {
+    public static class TestMapper extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
+      int count = 0;
+
+      @Override
+      protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) throws IOException, InterruptedException {
+        String table = context.getConfiguration().get("MRTester_tableName");
+        assertNotNull(table);
+
+        byte[] b = new byte[20];
+        int read;
+        try {
+          switch (count) {
+            case 0:
+              assertEquals(key.size(), 2);
+              entryEquals(key.get(0), data.get(0));
+              entryEquals(key.get(1), data.get(1));
+              assertEquals(read = value.read(b), 8);
+              assertEquals(new String(b, 0, read), "asdfjkl;");
+              assertEquals(read = value.read(b), -1);
+              break;
+            case 1:
+              assertEquals(key.size(), 2);
+              entryEquals(key.get(0), data.get(4));
+              entryEquals(key.get(1), data.get(5));
+              assertEquals(read = value.read(b), 10);
+              assertEquals(new String(b, 0, read), "qwertyuiop");
+              assertEquals(read = value.read(b), -1);
+              break;
+            default:
+              fail();
+          }
+        } catch (AssertionError e) {
+          assertionErrors.put(table, e);
+        } finally {
+          value.close();
+        }
+        count++;
+      }
+
+      @Override
+      protected void cleanup(Context context) throws IOException, InterruptedException {
+        String table = context.getConfiguration().get("MRTester_tableName");
+        assertNotNull(table);
+
+        try {
+          assertEquals(2, count);
+        } catch (AssertionError e) {
+          assertionErrors.put(table, e);
+        }
+      }
+    }
+
+    public static class TestNoClose extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
+      int count = 0;
+
+      @Override
+      protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) throws IOException, InterruptedException {
+        String table = context.getConfiguration().get("MRTester_tableName");
+        assertNotNull(table);
+
+        byte[] b = new byte[5];
+        int read;
+        try {
+          switch (count) {
+            case 0:
+              assertEquals(read = value.read(b), 5);
+              assertEquals(new String(b, 0, read), "asdfj");
+              break;
+            default:
+              fail();
+          }
+        } catch (AssertionError e) {
+          assertionErrors.put(table, e);
+        }
+        count++;
+        try {
+          context.nextKeyValue();
+          fail();
+        } catch (IOException ioe) {
+          assertionErrors.put(table + "_map_ioexception", new AssertionError(toString(), ioe));
+        }
+      }
+    }
+
+    public static class TestBadData extends Mapper<List<Entry<Key,Value>>,InputStream,List<Entry<Key,Value>>,InputStream> {
+      @Override
+      protected void map(List<Entry<Key,Value>> key, InputStream value, Context context) throws IOException, InterruptedException {
+        String table = context.getConfiguration().get("MRTester_tableName");
+        assertNotNull(table);
+
+        byte[] b = new byte[20];
+        try {
+          assertEquals(key.size(), 2);
+          entryEquals(key.get(0), baddata.get(0));
+          entryEquals(key.get(1), baddata.get(1));
+        } catch (AssertionError e) {
+          assertionErrors.put(table, e);
+        }
+        try {
+          assertFalse(value.read(b) > 0);
+          try {
+            fail();
+          } catch (AssertionError e) {
+            assertionErrors.put(table, e);
+          }
+        } catch (Exception e) {
+          // expected, ignore
+        }
+        try {
+          value.close();
+          try {
+            fail();
+          } catch (AssertionError e) {
+            assertionErrors.put(table, e);
+          }
+        } catch (Exception e) {
+          // expected, ignore
+        }
+      }
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+      if (args.length != 2) {
+        throw new IllegalArgumentException("Usage : " + CIFTester.class.getName() + " <table> <mapperClass>");
+      }
+
+      String table = args[0];
+      assertionErrors.put(table, new AssertionError("Dummy"));
+      assertionErrors.put(table + "_map_ioexception", new AssertionError("Dummy_ioexception"));
+      getConf().set("MRTester_tableName", table);
+
+      Job job = Job.getInstance(getConf());
+      job.setJobName(this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+      job.setJarByClass(this.getClass());
+
+      job.setInputFormatClass(ChunkInputFormat.class);
+
+      ChunkInputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());
+      ChunkInputFormat.setConnectorInfo(job, getAdminPrincipal(), getAdminToken());
+      ChunkInputFormat.setInputTableName(job, table);
+      ChunkInputFormat.setScanAuthorizations(job, AUTHS);
+
+      @SuppressWarnings("unchecked")
+      Class<? extends Mapper<?,?,?,?>> forName = (Class<? extends Mapper<?,?,?,?>>) Class.forName(args[1]);
+      job.setMapperClass(forName);
+      job.setMapOutputKeyClass(Key.class);
+      job.setMapOutputValueClass(Value.class);
+      job.setOutputFormatClass(NullOutputFormat.class);
+
+      job.setNumReduceTasks(0);
+
+      job.waitForCompletion(true);
+
+      return job.isSuccessful() ? 0 : 1;
+    }
+
+    public static int main(String... args) throws Exception {
+      Configuration conf = new Configuration();
+      conf.set("mapreduce.framework.name", "local");
+      conf.set("mapreduce.cluster.local.dir", new File(System.getProperty("user.dir"), "target/mapreduce-tmp").getAbsolutePath());
+      return ToolRunner.run(conf, new CIFTester(), args);
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    conn.tableOperations().create(tableName);
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+
+    for (Entry<Key,Value> e : data) {
+      Key k = e.getKey();
+      Mutation m = new Mutation(k.getRow());
+      m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    assertEquals(0, CIFTester.main(tableName, CIFTester.TestMapper.class.getName()));
+    assertEquals(1, assertionErrors.get(tableName).size());
+  }
+
+  @Test
+  public void testErrorOnNextWithoutClose() throws Exception {
+    conn.tableOperations().create(tableName);
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+
+    for (Entry<Key,Value> e : data) {
+      Key k = e.getKey();
+      Mutation m = new Mutation(k.getRow());
+      m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    assertEquals(1, CIFTester.main(tableName, CIFTester.TestNoClose.class.getName()));
+    assertEquals(1, assertionErrors.get(tableName).size());
+    // this should actually exist, in addition to the dummy entry
+    assertEquals(2, assertionErrors.get(tableName + "_map_ioexception").size());
+  }
+
+  @Test
+  public void testInfoWithoutChunks() throws Exception {
+    conn.tableOperations().create(tableName);
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+    for (Entry<Key,Value> e : baddata) {
+      Key k = e.getKey();
+      Mutation m = new Mutation(k.getRow());
+      m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), k.getTimestamp(), e.getValue());
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    assertEquals(0, CIFTester.main(tableName, CIFTester.TestBadData.class.getName()));
+    assertEquals(1, assertionErrors.get(tableName).size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputStreamIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputStreamIT.java b/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputStreamIT.java
new file mode 100644
index 0000000..69b8f48
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputStreamIT.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.examples.filedata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ChunkInputStreamIT extends AccumuloClusterHarness {
+
+  private static final Authorizations AUTHS = new Authorizations("A", "B", "C", "D");
+
+  private Connector conn;
+  private String tableName;
+  private List<Entry<Key,Value>> data;
+  private List<Entry<Key,Value>> baddata;
+  private List<Entry<Key,Value>> multidata;
+
+  @Before
+  public void setupInstance() throws Exception {
+    conn = getConnector();
+    tableName = getUniqueNames(1)[0];
+    conn.securityOperations().changeUserAuthorizations(conn.whoami(), AUTHS);
+  }
+
+  @Before
+  public void setupData() {
+    data = new ArrayList<>();
+    addData(data, "a", "refs", "id\0ext", "A&B", "ext");
+    addData(data, "a", "refs", "id\0name", "A&B", "name");
+    addData(data, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(data, "a", "~chunk", 100, 1, "A&B", "");
+    addData(data, "b", "refs", "id\0ext", "A&B", "ext");
+    addData(data, "b", "refs", "id\0name", "A&B", "name");
+    addData(data, "b", "~chunk", 100, 0, "A&B", "qwertyuiop");
+    addData(data, "b", "~chunk", 100, 0, "B&C", "qwertyuiop");
+    addData(data, "b", "~chunk", 100, 1, "A&B", "");
+    addData(data, "b", "~chunk", 100, 1, "B&C", "");
+    addData(data, "b", "~chunk", 100, 1, "D", "");
+    addData(data, "c", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(data, "c", "~chunk", 100, 1, "A&B", "asdfjkl;");
+    addData(data, "c", "~chunk", 100, 2, "A&B", "");
+    addData(data, "d", "~chunk", 100, 0, "A&B", "");
+    addData(data, "e", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(data, "e", "~chunk", 100, 1, "A&B", "");
+    baddata = new ArrayList<>();
+    addData(baddata, "a", "~chunk", 100, 0, "A", "asdfjkl;");
+    addData(baddata, "b", "~chunk", 100, 0, "B", "asdfjkl;");
+    addData(baddata, "b", "~chunk", 100, 2, "C", "");
+    addData(baddata, "c", "~chunk", 100, 0, "D", "asdfjkl;");
+    addData(baddata, "c", "~chunk", 100, 2, "E", "");
+    addData(baddata, "d", "~chunk", 100, 0, "F", "asdfjkl;");
+    addData(baddata, "d", "~chunk", 100, 1, "G", "");
+    addData(baddata, "d", "~zzzzz", "colq", "H", "");
+    addData(baddata, "e", "~chunk", 100, 0, "I", "asdfjkl;");
+    addData(baddata, "e", "~chunk", 100, 1, "J", "");
+    addData(baddata, "e", "~chunk", 100, 2, "I", "asdfjkl;");
+    addData(baddata, "f", "~chunk", 100, 2, "K", "asdfjkl;");
+    addData(baddata, "g", "~chunk", 100, 0, "L", "");
+    multidata = new ArrayList<>();
+    addData(multidata, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(multidata, "a", "~chunk", 100, 1, "A&B", "");
+    addData(multidata, "a", "~chunk", 200, 0, "B&C", "asdfjkl;");
+    addData(multidata, "b", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(multidata, "b", "~chunk", 200, 0, "B&C", "asdfjkl;");
+    addData(multidata, "b", "~chunk", 200, 1, "B&C", "asdfjkl;");
+    addData(multidata, "c", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(multidata, "c", "~chunk", 100, 1, "B&C", "");
+  }
+
+  static void addData(List<Entry<Key,Value>> data, String row, String cf, String cq, String vis, String value) {
+    data.add(new KeyValue(new Key(new Text(row), new Text(cf), new Text(cq), new Text(vis)), value.getBytes()));
+  }
+
+  static void addData(List<Entry<Key,Value>> data, String row, String cf, int chunkSize, int chunkCount, String vis, String value) {
+    Text chunkCQ = new Text(FileDataIngest.intToBytes(chunkSize));
+    chunkCQ.append(FileDataIngest.intToBytes(chunkCount), 0, 4);
+    data.add(new KeyValue(new Key(new Text(row), new Text(cf), chunkCQ, new Text(vis)), value.getBytes()));
+  }
+
+  @Test
+  public void testWithAccumulo() throws AccumuloException, AccumuloSecurityException, TableExistsException, TableNotFoundException, IOException {
+    conn.tableOperations().create(tableName);
+    BatchWriter bw = conn.createBatchWriter(tableName, new BatchWriterConfig());
+
+    for (Entry<Key,Value> e : data) {
+      Key k = e.getKey();
+      Mutation m = new Mutation(k.getRow());
+      m.put(k.getColumnFamily(), k.getColumnQualifier(), new ColumnVisibility(k.getColumnVisibility()), e.getValue());
+      bw.addMutation(m);
+    }
+    bw.close();
+
+    Scanner scan = conn.createScanner(tableName, AUTHS);
+
+    ChunkInputStream cis = new ChunkInputStream();
+    byte[] b = new byte[20];
+    int read;
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<>(scan.iterator());
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 8);
+    assertEquals(new String(b, 0, read), "asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 10);
+    assertEquals(new String(b, 0, read), "qwertyuiop");
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[A&B, B&C, D]");
+    cis.close();
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 16);
+    assertEquals(new String(b, 0, read), "asdfjkl;asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[A&B]");
+    cis.close();
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), -1);
+    cis.close();
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 8);
+    assertEquals(new String(b, 0, read), "asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+    cis.close();
+
+    assertFalse(pi.hasNext());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-examples/blob/d96c6d96/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputStreamTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputStreamTest.java b/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputStreamTest.java
new file mode 100644
index 0000000..d36e5ce
--- /dev/null
+++ b/src/test/java/org/apache/accumulo/examples/filedata/ChunkInputStreamTest.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.filedata;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyValue;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.PeekingIterator;
+import org.apache.hadoop.io.Text;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ChunkInputStreamTest {
+  private static final Logger log = LoggerFactory.getLogger(ChunkInputStream.class);
+  private List<Entry<Key,Value>> data;
+  private List<Entry<Key,Value>> baddata;
+  private List<Entry<Key,Value>> multidata;
+
+  @Before
+  public void setupData() {
+    data = new ArrayList<>();
+    addData(data, "a", "refs", "id\0ext", "A&B", "ext");
+    addData(data, "a", "refs", "id\0name", "A&B", "name");
+    addData(data, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(data, "a", "~chunk", 100, 1, "A&B", "");
+    addData(data, "b", "refs", "id\0ext", "A&B", "ext");
+    addData(data, "b", "refs", "id\0name", "A&B", "name");
+    addData(data, "b", "~chunk", 100, 0, "A&B", "qwertyuiop");
+    addData(data, "b", "~chunk", 100, 0, "B&C", "qwertyuiop");
+    addData(data, "b", "~chunk", 100, 1, "A&B", "");
+    addData(data, "b", "~chunk", 100, 1, "B&C", "");
+    addData(data, "b", "~chunk", 100, 1, "D", "");
+    addData(data, "c", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(data, "c", "~chunk", 100, 1, "A&B", "asdfjkl;");
+    addData(data, "c", "~chunk", 100, 2, "A&B", "");
+    addData(data, "d", "~chunk", 100, 0, "A&B", "");
+    addData(data, "e", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(data, "e", "~chunk", 100, 1, "A&B", "");
+    baddata = new ArrayList<>();
+    addData(baddata, "a", "~chunk", 100, 0, "A", "asdfjkl;");
+    addData(baddata, "b", "~chunk", 100, 0, "B", "asdfjkl;");
+    addData(baddata, "b", "~chunk", 100, 2, "C", "");
+    addData(baddata, "c", "~chunk", 100, 0, "D", "asdfjkl;");
+    addData(baddata, "c", "~chunk", 100, 2, "E", "");
+    addData(baddata, "d", "~chunk", 100, 0, "F", "asdfjkl;");
+    addData(baddata, "d", "~chunk", 100, 1, "G", "");
+    addData(baddata, "d", "~zzzzz", "colq", "H", "");
+    addData(baddata, "e", "~chunk", 100, 0, "I", "asdfjkl;");
+    addData(baddata, "e", "~chunk", 100, 1, "J", "");
+    addData(baddata, "e", "~chunk", 100, 2, "I", "asdfjkl;");
+    addData(baddata, "f", "~chunk", 100, 2, "K", "asdfjkl;");
+    addData(baddata, "g", "~chunk", 100, 0, "L", "");
+    multidata = new ArrayList<>();
+    addData(multidata, "a", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(multidata, "a", "~chunk", 100, 1, "A&B", "");
+    addData(multidata, "a", "~chunk", 200, 0, "B&C", "asdfjkl;");
+    addData(multidata, "b", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(multidata, "b", "~chunk", 200, 0, "B&C", "asdfjkl;");
+    addData(multidata, "b", "~chunk", 200, 1, "B&C", "asdfjkl;");
+    addData(multidata, "c", "~chunk", 100, 0, "A&B", "asdfjkl;");
+    addData(multidata, "c", "~chunk", 100, 1, "B&C", "");
+  }
+
+  private static void addData(List<Entry<Key,Value>> data, String row, String cf, String cq, String vis, String value) {
+    data.add(new KeyValue(new Key(new Text(row), new Text(cf), new Text(cq), new Text(vis)), value.getBytes()));
+  }
+
+  private static void addData(List<Entry<Key,Value>> data, String row, String cf, int chunkSize, int chunkCount, String vis, String value) {
+    Text chunkCQ = new Text(FileDataIngest.intToBytes(chunkSize));
+    chunkCQ.append(FileDataIngest.intToBytes(chunkCount), 0, 4);
+    data.add(new KeyValue(new Key(new Text(row), new Text(cf), chunkCQ, new Text(vis)), value.getBytes()));
+  }
+
+  @Test
+  public void testExceptionOnMultipleSetSourceWithoutClose() throws IOException {
+    ChunkInputStream cis = new ChunkInputStream();
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<>(data.iterator());
+    pi = new PeekingIterator<>(data.iterator());
+    cis.setSource(pi);
+    try {
+      cis.setSource(pi);
+      fail();
+    } catch (IOException e) {
+      /* expected */
+    }
+    cis.close();
+  }
+
+  @Test
+  public void testExceptionOnGetVisBeforeClose() throws IOException {
+    ChunkInputStream cis = new ChunkInputStream();
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<>(data.iterator());
+
+    cis.setSource(pi);
+    try {
+      cis.getVisibilities();
+      fail();
+    } catch (RuntimeException e) {
+      /* expected */
+    }
+    cis.close();
+    cis.getVisibilities();
+  }
+
+  @Test
+  public void testReadIntoBufferSmallerThanChunks() throws IOException {
+    ChunkInputStream cis = new ChunkInputStream();
+    byte[] b = new byte[5];
+
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<>(data.iterator());
+
+    cis.setSource(pi);
+    int read;
+    assertEquals(read = cis.read(b), 5);
+    assertEquals(new String(b, 0, read), "asdfj");
+    assertEquals(read = cis.read(b), 3);
+    assertEquals(new String(b, 0, read), "kl;");
+    assertEquals(read = cis.read(b), -1);
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 5);
+    assertEquals(new String(b, 0, read), "qwert");
+    assertEquals(read = cis.read(b), 5);
+    assertEquals(new String(b, 0, read), "yuiop");
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[A&B, B&C, D]");
+    cis.close();
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 5);
+    assertEquals(new String(b, 0, read), "asdfj");
+    assertEquals(read = cis.read(b), 5);
+    assertEquals(new String(b, 0, read), "kl;as");
+    assertEquals(read = cis.read(b), 5);
+    assertEquals(new String(b, 0, read), "dfjkl");
+    assertEquals(read = cis.read(b), 1);
+    assertEquals(new String(b, 0, read), ";");
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[A&B]");
+    cis.close();
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), -1);
+    cis.close();
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 5);
+    assertEquals(new String(b, 0, read), "asdfj");
+    assertEquals(read = cis.read(b), 3);
+    assertEquals(new String(b, 0, read), "kl;");
+    assertEquals(read = cis.read(b), -1);
+    cis.close();
+
+    assertFalse(pi.hasNext());
+  }
+
+  @Test
+  public void testReadIntoBufferLargerThanChunks() throws IOException {
+    ChunkInputStream cis = new ChunkInputStream();
+    byte[] b = new byte[20];
+    int read;
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<>(data.iterator());
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 8);
+    assertEquals(new String(b, 0, read), "asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 10);
+    assertEquals(new String(b, 0, read), "qwertyuiop");
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[A&B, B&C, D]");
+    cis.close();
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 16);
+    assertEquals(new String(b, 0, read), "asdfjkl;asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[A&B]");
+    cis.close();
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), -1);
+    cis.close();
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 8);
+    assertEquals(new String(b, 0, read), "asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+    cis.close();
+
+    assertFalse(pi.hasNext());
+  }
+
+  private static void assumeExceptionOnRead(ChunkInputStream cis, byte[] b) {
+    try {
+      assertEquals(0, cis.read(b));
+      fail();
+    } catch (IOException e) {
+      log.debug("EXCEPTION {}", e.getMessage());
+      // expected, ignore
+    }
+  }
+
+  private static void assumeExceptionOnClose(ChunkInputStream cis) {
+    try {
+      cis.close();
+      fail();
+    } catch (IOException e) {
+      log.debug("EXCEPTION {}", e.getMessage());
+      // expected, ignore
+    }
+  }
+
+  @Test
+  public void testBadData() throws IOException {
+    ChunkInputStream cis = new ChunkInputStream();
+    byte[] b = new byte[20];
+    int read;
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<>(baddata.iterator());
+
+    cis.setSource(pi);
+    assumeExceptionOnRead(cis, b);
+    assumeExceptionOnClose(cis);
+    // can still get visibilities after exception -- bad?
+    assertEquals(cis.getVisibilities().toString(), "[A]");
+
+    cis.setSource(pi);
+    assumeExceptionOnRead(cis, b);
+    assumeExceptionOnClose(cis);
+    assertEquals(cis.getVisibilities().toString(), "[B, C]");
+
+    cis.setSource(pi);
+    assumeExceptionOnRead(cis, b);
+    assumeExceptionOnClose(cis);
+    assertEquals(cis.getVisibilities().toString(), "[D, E]");
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 8);
+    assertEquals(new String(b, 0, read), "asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[F, G]");
+    cis.close();
+
+    cis.setSource(pi);
+    assumeExceptionOnRead(cis, b);
+    cis.close();
+    assertEquals(cis.getVisibilities().toString(), "[I, J]");
+
+    try {
+      cis.setSource(pi);
+      fail();
+    } catch (IOException e) {
+      // expected, ignore
+    }
+    assumeExceptionOnClose(cis);
+    assertEquals(cis.getVisibilities().toString(), "[K]");
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[L]");
+    cis.close();
+
+    assertFalse(pi.hasNext());
+
+    pi = new PeekingIterator<>(baddata.iterator());
+    cis.setSource(pi);
+    assumeExceptionOnClose(cis);
+  }
+
+  @Test
+  public void testBadDataWithoutClosing() throws IOException {
+    ChunkInputStream cis = new ChunkInputStream();
+    byte[] b = new byte[20];
+    int read;
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<>(baddata.iterator());
+
+    cis.setSource(pi);
+    assumeExceptionOnRead(cis, b);
+    // can still get visibilities after exception -- bad?
+    assertEquals(cis.getVisibilities().toString(), "[A]");
+
+    cis.setSource(pi);
+    assumeExceptionOnRead(cis, b);
+    assertEquals(cis.getVisibilities().toString(), "[B, C]");
+
+    cis.setSource(pi);
+    assumeExceptionOnRead(cis, b);
+    assertEquals(cis.getVisibilities().toString(), "[D, E]");
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 8);
+    assertEquals(new String(b, 0, read), "asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[F, G]");
+    cis.close();
+
+    cis.setSource(pi);
+    assumeExceptionOnRead(cis, b);
+    assertEquals(cis.getVisibilities().toString(), "[I, J]");
+
+    try {
+      cis.setSource(pi);
+      fail();
+    } catch (IOException e) {
+      // expected, ignore
+    }
+    assertEquals(cis.getVisibilities().toString(), "[K]");
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), -1);
+    assertEquals(cis.getVisibilities().toString(), "[L]");
+    cis.close();
+
+    assertFalse(pi.hasNext());
+
+    pi = new PeekingIterator<>(baddata.iterator());
+    cis.setSource(pi);
+    assumeExceptionOnClose(cis);
+  }
+
+  @Test
+  public void testMultipleChunkSizes() throws IOException {
+    ChunkInputStream cis = new ChunkInputStream();
+    byte[] b = new byte[20];
+    int read;
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<>(multidata.iterator());
+
+    b = new byte[20];
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 8);
+    assertEquals(read = cis.read(b), -1);
+    cis.close();
+    assertEquals(cis.getVisibilities().toString(), "[A&B]");
+
+    cis.setSource(pi);
+    assumeExceptionOnRead(cis, b);
+    assertEquals(cis.getVisibilities().toString(), "[A&B]");
+
+    cis.setSource(pi);
+    assertEquals(read = cis.read(b), 8);
+    assertEquals(new String(b, 0, read), "asdfjkl;");
+    assertEquals(read = cis.read(b), -1);
+    cis.close();
+    assertEquals(cis.getVisibilities().toString(), "[A&B, B&C]");
+
+    assertFalse(pi.hasNext());
+  }
+
+  @Test
+  public void testSingleByteRead() throws IOException {
+    ChunkInputStream cis = new ChunkInputStream();
+    PeekingIterator<Entry<Key,Value>> pi = new PeekingIterator<>(data.iterator());
+
+    cis.setSource(pi);
+    assertEquals((byte) 'a', (byte) cis.read());
+    assertEquals((byte) 's', (byte) cis.read());
+    assertEquals((byte) 'd', (byte) cis.read());
+    assertEquals((byte) 'f', (byte) cis.read());
+    assertEquals((byte) 'j', (byte) cis.read());
+    assertEquals((byte) 'k', (byte) cis.read());
+    assertEquals((byte) 'l', (byte) cis.read());
+    assertEquals((byte) ';', (byte) cis.read());
+    assertEquals(cis.read(), -1);
+    cis.close();
+    assertEquals(cis.getVisibilities().toString(), "[A&B]");
+  }
+}


Mime
View raw message