accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mwa...@apache.org
Subject [2/3] accumulo-testing git commit: ACCUMULO-4510 Moved continuous ingest code from Accumulo repo
Date Mon, 23 Jan 2017 22:03:57 GMT
http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
new file mode 100644
index 0000000..4681cb8
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousIngest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.continuous;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.zip.CRC32;
+import java.util.zip.Checksum;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ClientOnDefaultTable;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.trace.CountSampler;
+import org.apache.accumulo.core.trace.Trace;
+import org.apache.accumulo.core.util.FastFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+public class ContinuousIngest {
+
+  private static final byte[] EMPTY_BYTES = new byte[0];
+
+  private static List<ColumnVisibility> visibilities;
+
+  private static void initVisibilities(ContinuousOpts opts) throws Exception {
+    if (opts.visFile == null) {
+      visibilities = Collections.singletonList(new ColumnVisibility());
+      return;
+    }
+
+    visibilities = new ArrayList<>();
+
+    FileSystem fs = FileSystem.get(new Configuration());
+    BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(opts.visFile)), UTF_8));
+
+    String line;
+
+    while ((line = in.readLine()) != null) {
+      visibilities.add(new ColumnVisibility(line));
+    }
+
+    in.close();
+  }
+
+  private static ColumnVisibility getVisibility(Random rand) {
+    return visibilities.get(rand.nextInt(visibilities.size()));
+  }
+
+  public static void main(String[] args) throws Exception {
+
+    ContinuousOpts opts = new ContinuousOpts();
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci");
+    clientOpts.parseArgs(ContinuousIngest.class.getName(), args, bwOpts, opts);
+
+    initVisibilities(opts);
+
+    if (opts.min < 0 || opts.max < 0 || opts.max <= opts.min) {
+      throw new IllegalArgumentException("bad min and max");
+    }
+    Connector conn = clientOpts.getConnector();
+
+    if (!conn.tableOperations().exists(clientOpts.getTableName())) {
+      throw new TableNotFoundException(null, clientOpts.getTableName(), "Consult the README and create the table before starting ingest.");
+    }
+
+    BatchWriter bw = conn.createBatchWriter(clientOpts.getTableName(), bwOpts.getBatchWriterConfig());
+    bw = Trace.wrapAll(bw, new CountSampler(1024));
+
+    Random r = new Random();
+
+    byte[] ingestInstanceId = UUID.randomUUID().toString().getBytes(UTF_8);
+
+    System.out.printf("UUID %d %s%n", System.currentTimeMillis(), new String(ingestInstanceId, UTF_8));
+
+    long count = 0;
+    final int flushInterval = 1000000;
+    final int maxDepth = 25;
+
+    // always want to point back to flushed data. This way the previous item should
+    // always exist in accumulo when verifying data. To do this make insert N point
+    // back to the row from insert (N - flushInterval). The array below is used to keep
+    // track of this.
+    long prevRows[] = new long[flushInterval];
+    long firstRows[] = new long[flushInterval];
+    int firstColFams[] = new int[flushInterval];
+    int firstColQuals[] = new int[flushInterval];
+
+    long lastFlushTime = System.currentTimeMillis();
+
+    out: while (true) {
+      // generate first set of nodes
+      ColumnVisibility cv = getVisibility(r);
+
+      for (int index = 0; index < flushInterval; index++) {
+        long rowLong = genLong(opts.min, opts.max, r);
+        prevRows[index] = rowLong;
+        firstRows[index] = rowLong;
+
+        int cf = r.nextInt(opts.maxColF);
+        int cq = r.nextInt(opts.maxColQ);
+
+        firstColFams[index] = cf;
+        firstColQuals[index] = cq;
+
+        Mutation m = genMutation(rowLong, cf, cq, cv, ingestInstanceId, count, null, r, opts.checksum);
+        count++;
+        bw.addMutation(m);
+      }
+
+      lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+      if (count >= opts.num)
+        break out;
+
+      // generate subsequent sets of nodes that link to previous set of nodes
+      for (int depth = 1; depth < maxDepth; depth++) {
+        for (int index = 0; index < flushInterval; index++) {
+          long rowLong = genLong(opts.min, opts.max, r);
+          byte[] prevRow = genRow(prevRows[index]);
+          prevRows[index] = rowLong;
+          Mutation m = genMutation(rowLong, r.nextInt(opts.maxColF), r.nextInt(opts.maxColQ), cv, ingestInstanceId, count, prevRow, r, opts.checksum);
+          count++;
+          bw.addMutation(m);
+        }
+
+        lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+        if (count >= opts.num)
+          break out;
+      }
+
+      // create one big linked list, this makes all of the first inserts
+      // point to something
+      for (int index = 0; index < flushInterval - 1; index++) {
+        Mutation m = genMutation(firstRows[index], firstColFams[index], firstColQuals[index], cv, ingestInstanceId, count, genRow(prevRows[index + 1]), r,
+            opts.checksum);
+        count++;
+        bw.addMutation(m);
+      }
+      lastFlushTime = flush(bw, count, flushInterval, lastFlushTime);
+      if (count >= opts.num)
+        break out;
+    }
+
+    bw.close();
+    clientOpts.stopTracing();
+  }
+
+  private static long flush(BatchWriter bw, long count, final int flushInterval, long lastFlushTime) throws MutationsRejectedException {
+    long t1 = System.currentTimeMillis();
+    bw.flush();
+    long t2 = System.currentTimeMillis();
+    System.out.printf("FLUSH %d %d %d %d %d%n", t2, (t2 - lastFlushTime), (t2 - t1), count, flushInterval);
+    lastFlushTime = t2;
+    return lastFlushTime;
+  }
+
+  public static Mutation genMutation(long rowLong, int cfInt, int cqInt, ColumnVisibility cv, byte[] ingestInstanceId, long count, byte[] prevRow, Random r,
+      boolean checksum) {
+    // Adler32 is supposed to be faster, but according to wikipedia is not good for small data.... so used CRC32 instead
+    CRC32 cksum = null;
+
+    byte[] rowString = genRow(rowLong);
+
+    byte[] cfString = FastFormat.toZeroPaddedString(cfInt, 4, 16, EMPTY_BYTES);
+    byte[] cqString = FastFormat.toZeroPaddedString(cqInt, 4, 16, EMPTY_BYTES);
+
+    if (checksum) {
+      cksum = new CRC32();
+      cksum.update(rowString);
+      cksum.update(cfString);
+      cksum.update(cqString);
+      cksum.update(cv.getExpression());
+    }
+
+    Mutation m = new Mutation(new Text(rowString));
+
+    m.put(new Text(cfString), new Text(cqString), cv, createValue(ingestInstanceId, count, prevRow, cksum));
+    return m;
+  }
+
+  public static final long genLong(long min, long max, Random r) {
+    return ((r.nextLong() & 0x7fffffffffffffffl) % (max - min)) + min;
+  }
+
+  static final byte[] genRow(long min, long max, Random r) {
+    return genRow(genLong(min, max, r));
+  }
+
+  static final byte[] genRow(long rowLong) {
+    return FastFormat.toZeroPaddedString(rowLong, 16, 16, EMPTY_BYTES);
+  }
+
+  private static Value createValue(byte[] ingestInstanceId, long count, byte[] prevRow, Checksum cksum) {
+    int dataLen = ingestInstanceId.length + 16 + (prevRow == null ? 0 : prevRow.length) + 3;
+    if (cksum != null)
+      dataLen += 8;
+    byte val[] = new byte[dataLen];
+    System.arraycopy(ingestInstanceId, 0, val, 0, ingestInstanceId.length);
+    int index = ingestInstanceId.length;
+    val[index++] = ':';
+    int added = FastFormat.toZeroPaddedString(val, index, count, 16, 16, EMPTY_BYTES);
+    if (added != 16)
+      throw new RuntimeException(" " + added);
+    index += 16;
+    val[index++] = ':';
+    if (prevRow != null) {
+      System.arraycopy(prevRow, 0, val, index, prevRow.length);
+      index += prevRow.length;
+    }
+
+    val[index++] = ':';
+
+    if (cksum != null) {
+      cksum.update(val, 0, index);
+      cksum.getValue();
+      FastFormat.toZeroPaddedString(val, index, cksum.getValue(), 8, 16, EMPTY_BYTES);
+    }
+
+    // System.out.println("val "+new String(val));
+
+    return new Value(val);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousMoru.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousMoru.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousMoru.java
new file mode 100644
index 0000000..c2902ee
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousMoru.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.continuous;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.IOException;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.MapReduceClientOnDefaultTable;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.validators.PositiveInteger;
+
+/**
+ * A map only job that reads a table created by continuous ingest and creates doubly linked list. This map reduce job tests the ability of a map only job to
+ * read and write to accumulo at the same time. This map reduce job mutates the table in such a way that it should not create any undefined nodes.
+ *
+ */
+public class ContinuousMoru extends Configured implements Tool {
+  private static final String PREFIX = ContinuousMoru.class.getSimpleName() + ".";
+  private static final String MAX_CQ = PREFIX + "MAX_CQ";
+  private static final String MAX_CF = PREFIX + "MAX_CF";
+  private static final String MAX = PREFIX + "MAX";
+  private static final String MIN = PREFIX + "MIN";
+  private static final String CI_ID = PREFIX + "CI_ID";
+
+  static enum Counts {
+    SELF_READ;
+  }
+
+  public static class CMapper extends Mapper<Key,Value,Text,Mutation> {
+
+    private short max_cf;
+    private short max_cq;
+    private Random random;
+    private String ingestInstanceId;
+    private byte[] iiId;
+    private long count;
+
+    private static final ColumnVisibility EMPTY_VIS = new ColumnVisibility();
+
+    @Override
+    public void setup(Context context) throws IOException, InterruptedException {
+      int max_cf = context.getConfiguration().getInt(MAX_CF, -1);
+      int max_cq = context.getConfiguration().getInt(MAX_CQ, -1);
+
+      if (max_cf > Short.MAX_VALUE || max_cq > Short.MAX_VALUE)
+        throw new IllegalArgumentException();
+
+      this.max_cf = (short) max_cf;
+      this.max_cq = (short) max_cq;
+
+      random = new Random();
+      ingestInstanceId = context.getConfiguration().get(CI_ID);
+      iiId = ingestInstanceId.getBytes(UTF_8);
+
+      count = 0;
+    }
+
+    @Override
+    public void map(Key key, Value data, Context context) throws IOException, InterruptedException {
+
+      ContinuousWalk.validate(key, data);
+
+      if (WritableComparator.compareBytes(iiId, 0, iiId.length, data.get(), 0, iiId.length) != 0) {
+        // only rewrite data not written by this M/R job
+        byte[] val = data.get();
+
+        int offset = ContinuousWalk.getPrevRowOffset(val);
+        if (offset > 0) {
+          long rowLong = Long.parseLong(new String(val, offset, 16, UTF_8), 16);
+          Mutation m = ContinuousIngest.genMutation(rowLong, random.nextInt(max_cf), random.nextInt(max_cq), EMPTY_VIS, iiId, count++, key.getRowData()
+              .toArray(), random, true);
+          context.write(null, m);
+        }
+
+      } else {
+        context.getCounter(Counts.SELF_READ).increment(1l);
+      }
+    }
+  }
+
+  static class Opts extends ContinuousOpts {
+    @Parameter(names = "--maxColF", description = "maximum column family value to use", converter = ShortConverter.class)
+    short maxColF = Short.MAX_VALUE;
+
+    @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter = ShortConverter.class)
+    short maxColQ = Short.MAX_VALUE;
+
+    @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", required = true, validateWith = PositiveInteger.class)
+    int maxMaps = 0;
+  }
+
+  @Override
+  public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, AccumuloSecurityException {
+    Opts opts = new Opts();
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    MapReduceClientOnDefaultTable clientOpts = new MapReduceClientOnDefaultTable("ci");
+    clientOpts.parseArgs(ContinuousMoru.class.getName(), args, bwOpts, opts);
+
+    Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+    job.setJarByClass(this.getClass());
+
+    job.setInputFormatClass(AccumuloInputFormat.class);
+    clientOpts.setAccumuloConfigs(job);
+
+    // set up ranges
+    try {
+      Set<Range> ranges = clientOpts.getConnector().tableOperations().splitRangeByTablets(clientOpts.getTableName(), new Range(), opts.maxMaps);
+      AccumuloInputFormat.setRanges(job, ranges);
+      AccumuloInputFormat.setAutoAdjustRanges(job, false);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
+    job.setMapperClass(CMapper.class);
+
+    job.setNumReduceTasks(0);
+
+    job.setOutputFormatClass(AccumuloOutputFormat.class);
+    AccumuloOutputFormat.setBatchWriterOptions(job, bwOpts.getBatchWriterConfig());
+
+    Configuration conf = job.getConfiguration();
+    conf.setLong(MIN, opts.min);
+    conf.setLong(MAX, opts.max);
+    conf.setInt(MAX_CF, opts.maxColF);
+    conf.setInt(MAX_CQ, opts.maxColQ);
+    conf.set(CI_ID, UUID.randomUUID().toString());
+
+    job.waitForCompletion(true);
+    clientOpts.stopTracing();
+    return job.isSuccessful() ? 0 : 1;
+  }
+
+  /**
+   *
+   * @param args
+   *          instanceName zookeepers username password table columns outputpath
+   */
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousMoru(), args);
+    if (res != 0)
+      System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousOpts.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousOpts.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousOpts.java
new file mode 100644
index 0000000..3bb11fb
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousOpts.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.continuous;
+
+import java.io.IOException;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.Parameter;
+
+/**
+ * Common CLI arguments for the Continuous Ingest suite.
+ */
+public class ContinuousOpts {
+
+  public static class DebugConverter implements IStringConverter<String> {
+    @Override
+    public String convert(String debugLog) {
+      Logger logger = Logger.getLogger(Constants.CORE_PACKAGE_NAME);
+      logger.setLevel(Level.TRACE);
+      logger.setAdditivity(false);
+      try {
+        logger.addAppender(new FileAppender(new PatternLayout("%d{dd HH:mm:ss,SSS} [%-8c{2}] %-5p: %m%n"), debugLog, true));
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+      return debugLog;
+    }
+  }
+
+  public static class ShortConverter implements IStringConverter<Short> {
+    @Override
+    public Short convert(String value) {
+      return Short.valueOf(value);
+    }
+  }
+
+  @Parameter(names = "--min", description = "lowest random row number to use")
+  long min = 0;
+
+  @Parameter(names = "--max", description = "maximum random row number to use")
+  long max = Long.MAX_VALUE;
+
+  @Parameter(names = "--debugLog", description = "file to write debugging output", converter = DebugConverter.class)
+  String debugLog = null;
+
+  @Parameter(names = "--num", description = "the number of entries to ingest")
+  long num = Long.MAX_VALUE;
+
+  @Parameter(names = "--maxColF", description = "maximum column family value to use", converter = ShortConverter.class)
+  short maxColF = Short.MAX_VALUE;
+
+  @Parameter(names = "--maxColQ", description = "maximum column qualifier value to use", converter = ShortConverter.class)
+  short maxColQ = Short.MAX_VALUE;
+
+  @Parameter(names = "--addCheckSum", description = "turn on checksums")
+  boolean checksum = false;
+
+  @Parameter(names = "--visibilities", description = "read the visibilities to ingest with from a file")
+  String visFile = null;
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousQuery.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousQuery.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousQuery.java
new file mode 100644
index 0000000..8180383
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousQuery.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.continuous;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Map.Entry;
+import java.util.Random;
+
+import org.apache.accumulo.core.cli.ClientOnDefaultTable;
+import org.apache.accumulo.core.cli.ClientOpts.TimeConverter;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+
+public class ContinuousQuery {
+
+  public static class Opts extends ContinuousOpts {
+    @Parameter(names = "--sleep", description = "the time to wait between queries", converter = TimeConverter.class)
+    long sleepTime = 100;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    ScannerOpts scanOpts = new ScannerOpts();
+    ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci");
+    clientOpts.parseArgs(ContinuousQuery.class.getName(), args, scanOpts, opts);
+
+    Connector conn = clientOpts.getConnector();
+    Scanner scanner = ContinuousUtil.createScanner(conn, clientOpts.getTableName(), clientOpts.auths);
+    scanner.setBatchSize(scanOpts.scanBatchSize);
+
+    Random r = new Random();
+
+    while (true) {
+      byte[] row = ContinuousIngest.genRow(opts.min, opts.max, r);
+
+      int count = 0;
+
+      long t1 = System.currentTimeMillis();
+      scanner.setRange(new Range(new Text(row)));
+      for (Entry<Key,Value> entry : scanner) {
+        ContinuousWalk.validate(entry.getKey(), entry.getValue());
+        count++;
+      }
+      long t2 = System.currentTimeMillis();
+
+      System.out.printf("SRQ %d %s %d %d%n", t1, new String(row, UTF_8), (t2 - t1), count);
+
+      if (opts.sleepTime > 0)
+        Thread.sleep(opts.sleepTime);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousScanner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousScanner.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousScanner.java
new file mode 100644
index 0000000..42e0ea8
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousScanner.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.continuous;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.cli.ClientOnDefaultTable;
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.validators.PositiveInteger;
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+public class ContinuousScanner {
+
+  static class Opts extends ContinuousWalk.Opts {
+    @Parameter(names = "--numToScan", description = "Number rows to scan between sleeps", required = true, validateWith = PositiveInteger.class)
+    long numToScan = 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    ScannerOpts scanOpts = new ScannerOpts();
+    ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci");
+    clientOpts.parseArgs(ContinuousScanner.class.getName(), args, scanOpts, opts);
+
+    Random r = new Random();
+
+    long distance = 1000000000000l;
+
+    Connector conn = clientOpts.getConnector();
+    Authorizations auths = opts.randomAuths.getAuths(r);
+    Scanner scanner = ContinuousUtil.createScanner(conn, clientOpts.getTableName(), auths);
+    scanner.setBatchSize(scanOpts.scanBatchSize);
+
+    double delta = Math.min(.05, .05 / (opts.numToScan / 1000.0));
+
+    while (true) {
+      long startRow = ContinuousIngest.genLong(opts.min, opts.max - distance, r);
+      byte[] scanStart = ContinuousIngest.genRow(startRow);
+      byte[] scanStop = ContinuousIngest.genRow(startRow + distance);
+
+      scanner.setRange(new Range(new Text(scanStart), new Text(scanStop)));
+
+      int count = 0;
+      Iterator<Entry<Key,Value>> iter = scanner.iterator();
+
+      long t1 = System.currentTimeMillis();
+
+      while (iter.hasNext()) {
+        Entry<Key,Value> entry = iter.next();
+        ContinuousWalk.validate(entry.getKey(), entry.getValue());
+        count++;
+      }
+
+      long t2 = System.currentTimeMillis();
+
+      // System.out.println("P1 " +count +" "+((1-delta) * numToScan)+" "+((1+delta) * numToScan)+" "+numToScan);
+
+      if (count < (1 - delta) * opts.numToScan || count > (1 + delta) * opts.numToScan) {
+        if (count == 0) {
+          distance = distance * 10;
+          if (distance < 0)
+            distance = 1000000000000l;
+        } else {
+          double ratio = (double) opts.numToScan / count;
+          // move ratio closer to 1 to make change slower
+          ratio = ratio - (ratio - 1.0) * (2.0 / 3.0);
+          distance = (long) (ratio * distance);
+        }
+
+        // System.out.println("P2 "+delta +" "+numToScan+" "+distance+"  "+((double)numToScan/count ));
+      }
+
+      System.out.printf("SCN %d %s %d %d%n", t1, new String(scanStart, UTF_8), (t2 - t1), count);
+
+      if (opts.sleepTime > 0)
+        sleepUninterruptibly(opts.sleepTime, TimeUnit.MILLISECONDS);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousStatsCollector.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousStatsCollector.java
new file mode 100644
index 0000000..818e387
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousStatsCollector.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.continuous;
+
+import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.impl.ClientContext;
+import org.apache.accumulo.core.client.impl.Credentials;
+import org.apache.accumulo.core.client.impl.MasterClient;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.thrift.ThriftNotActiveServiceException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.iterators.ColumnFamilyCounter;
+import org.apache.accumulo.core.master.thrift.MasterClientService;
+import org.apache.accumulo.core.master.thrift.MasterMonitorInfo;
+import org.apache.accumulo.core.master.thrift.TableInfo;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.trace.Tracer;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.core.util.Stat;
+import org.apache.accumulo.server.ServerConstants;
+import org.apache.accumulo.server.cli.ClientOnRequiredTable;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.util.TableInfoUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ContinuousStatsCollector {
+
+  private static final Logger log = LoggerFactory.getLogger(ContinuousStatsCollector.class);
+
+  static class StatsCollectionTask extends TimerTask {
+
+    private final String tableId;
+    private final Opts opts;
+    private final int scanBatchSize;
+
+    public StatsCollectionTask(Opts opts, int scanBatchSize) {
+      this.opts = opts;
+      this.scanBatchSize = scanBatchSize;
+      this.tableId = Tables.getNameToIdMap(opts.getInstance()).get(opts.getTableName());
+      System.out
+          .println("TIME TABLET_SERVERS TOTAL_ENTRIES TOTAL_INGEST TOTAL_QUERY TABLE_RECS TABLE_RECS_IN_MEM TABLE_INGEST TABLE_QUERY TABLE_TABLETS TABLE_TABLETS_ONLINE"
+              + " ACCUMULO_DU ACCUMULO_DIRS ACCUMULO_FILES TABLE_DU TABLE_DIRS TABLE_FILES"
+              + " MAP_TASK MAX_MAP_TASK REDUCE_TASK MAX_REDUCE_TASK TASK_TRACKERS BLACK_LISTED MIN_FILES/TABLET MAX_FILES/TABLET AVG_FILES/TABLET STDDEV_FILES/TABLET");
+    }
+
+    @Override
+    public void run() {
+      try {
+        String acuStats = getACUStats();
+        String fsStats = getFSStats();
+        String mrStats = getMRStats();
+        String tabletStats = getTabletStats();
+
+        System.out.println(System.currentTimeMillis() + " " + acuStats + " " + fsStats + " " + mrStats + " " + tabletStats);
+      } catch (Exception e) {
+        log.error(System.currentTimeMillis() + " - Failed to collect stats", e);
+      }
+    }
+
+    private String getTabletStats() throws Exception {
+
+      Connector conn = opts.getConnector();
+      Scanner scanner = conn.createScanner(MetadataTable.NAME, opts.auths);
+      scanner.setBatchSize(scanBatchSize);
+      scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+      scanner.addScanIterator(new IteratorSetting(1000, "cfc", ColumnFamilyCounter.class.getName()));
+      scanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange());
+
+      Stat s = new Stat();
+
+      int count = 0;
+      for (Entry<Key,Value> entry : scanner) {
+        count++;
+        s.addStat(Long.parseLong(entry.getValue().toString()));
+      }
+
+      if (count > 0)
+        return String.format("%d %d %.3f %.3f", s.getMin(), s.getMax(), s.getAverage(), s.getStdDev());
+      else
+        return "0 0 0 0";
+
+    }
+
+    private String getFSStats() throws Exception {
+      VolumeManager fs = VolumeManagerImpl.get();
+      long length1 = 0, dcount1 = 0, fcount1 = 0;
+      long length2 = 0, dcount2 = 0, fcount2 = 0;
+      for (String dir : ServerConstants.getTablesDirs()) {
+        ContentSummary contentSummary = fs.getContentSummary(new Path(dir));
+        length1 += contentSummary.getLength();
+        dcount1 += contentSummary.getDirectoryCount();
+        fcount1 += contentSummary.getFileCount();
+        contentSummary = fs.getContentSummary(new Path(dir, tableId));
+        length2 += contentSummary.getLength();
+        dcount2 += contentSummary.getDirectoryCount();
+        fcount2 += contentSummary.getFileCount();
+      }
+
+      return "" + length1 + " " + dcount1 + " " + fcount1 + " " + length2 + " " + dcount2 + " " + fcount2;
+    }
+
+    private String getACUStats() throws Exception {
+
+      MasterClientService.Iface client = null;
+      while (true) {
+        try {
+          ClientContext context = new ClientContext(opts.getInstance(), new Credentials(opts.getPrincipal(), opts.getToken()), new ServerConfigurationFactory(
+              opts.getInstance()).getConfiguration());
+          client = MasterClient.getConnectionWithRetry(context);
+          MasterMonitorInfo stats = client.getMasterStats(Tracer.traceInfo(), context.rpcCreds());
+
+          TableInfo all = new TableInfo();
+          Map<String,TableInfo> tableSummaries = new HashMap<>();
+
+          for (TabletServerStatus server : stats.tServerInfo) {
+            for (Entry<String,TableInfo> info : server.tableMap.entrySet()) {
+              TableInfo tableSummary = tableSummaries.get(info.getKey());
+              if (tableSummary == null) {
+                tableSummary = new TableInfo();
+                tableSummaries.put(info.getKey(), tableSummary);
+              }
+              TableInfoUtil.add(tableSummary, info.getValue());
+              TableInfoUtil.add(all, info.getValue());
+            }
+          }
+
+          TableInfo ti = tableSummaries.get(tableId);
+
+          return "" + stats.tServerInfo.size() + " " + all.recs + " " + (long) all.ingestRate + " " + (long) all.queryRate + " " + ti.recs + " "
+              + ti.recsInMemory + " " + (long) ti.ingestRate + " " + (long) ti.queryRate + " " + ti.tablets + " " + ti.onlineTablets;
+
+        } catch (ThriftNotActiveServiceException e) {
+          // Let it loop, fetching a new location
+          log.debug("Contacted a Master which is no longer active, retrying");
+          sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+        } finally {
+          if (client != null)
+            MasterClient.close(client);
+        }
+      }
+    }
+
+  }
+
+  private static String getMRStats() throws Exception {
+    Configuration conf = CachedConfiguration.getInstance();
+    // No alternatives for hadoop 20
+    JobClient jc = new JobClient(new org.apache.hadoop.mapred.JobConf(conf));
+
+    ClusterStatus cs = jc.getClusterStatus(false);
+
+    return "" + cs.getMapTasks() + " " + cs.getMaxMapTasks() + " " + cs.getReduceTasks() + " " + cs.getMaxReduceTasks() + " " + cs.getTaskTrackers() + " "
+        + cs.getBlacklistedTrackers();
+
+  }
+
+  static class Opts extends ClientOnRequiredTable {}
+
+  public static void main(String[] args) {
+    Opts opts = new Opts();
+    ScannerOpts scanOpts = new ScannerOpts();
+    opts.parseArgs(ContinuousStatsCollector.class.getName(), args, scanOpts);
+    Timer jtimer = new Timer();
+
+    jtimer.schedule(new StatsCollectionTask(opts, scanOpts.scanBatchSize), 0, 30000);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousUtil.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousUtil.java
new file mode 100644
index 0000000..5a67a34
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.continuous;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.security.Authorizations;
+
+/**
+ * Useful utility methods common to the Continuous test suite.
+ */
+final class ContinuousUtil {
+  private ContinuousUtil() {}
+
+  /**
+   * Attempt to create a table scanner, or fail if the table does not exist.
+   *
+   * @param connector
+   *          A populated connector object
+   * @param table
+   *          The table name to scan over
+   * @param auths
+   *          The authorizations to use for the scanner
+   * @return a scanner for the requested table
+   * @throws TableNotFoundException
+   *           If the table does not exist
+   */
+  static Scanner createScanner(Connector connector, String table, Authorizations auths) throws TableNotFoundException {
+    if (!connector.tableOperations().exists(table)) {
+      throw new TableNotFoundException(null, table, "Consult the README and create the table before starting test processes.");
+    }
+    return connector.createScanner(table, auths);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousVerify.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousVerify.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousVerify.java
new file mode 100644
index 0000000..64f8a35
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousVerify.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.continuous;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.accumulo.core.cli.MapReduceClientOnDefaultTable;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.accumulo.testing.core.continuous.ContinuousWalk.BadChecksumException;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.VLongWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.validators.PositiveInteger;
+
+/**
+ * A map reduce job that verifies a table created by continuous ingest. It verifies that all referenced nodes are defined.
+ */
+
+public class ContinuousVerify extends Configured implements Tool {
+
+  public static final VLongWritable DEF = new VLongWritable(-1);
+
+  public static class CMapper extends Mapper<Key,Value,LongWritable,VLongWritable> {
+
+    private static final Logger log = LoggerFactory.getLogger(CMapper.class);
+    private LongWritable row = new LongWritable();
+    private LongWritable ref = new LongWritable();
+    private VLongWritable vrow = new VLongWritable();
+
+    private long corrupt = 0;
+
+    @Override
+    public void map(Key key, Value data, Context context) throws IOException, InterruptedException {
+      long r = Long.parseLong(key.getRow().toString(), 16);
+      if (r < 0)
+        throw new IllegalArgumentException();
+
+      try {
+        ContinuousWalk.validate(key, data);
+      } catch (BadChecksumException bce) {
+        context.getCounter(Counts.CORRUPT).increment(1l);
+        if (corrupt < 1000) {
+          log.error("Bad checksum : " + key);
+        } else if (corrupt == 1000) {
+          System.out.println("Too many bad checksums, not printing anymore!");
+        }
+        corrupt++;
+        return;
+      }
+
+      row.set(r);
+
+      context.write(row, DEF);
+      byte[] val = data.get();
+
+      int offset = ContinuousWalk.getPrevRowOffset(val);
+      if (offset > 0) {
+        ref.set(Long.parseLong(new String(val, offset, 16, UTF_8), 16));
+        vrow.set(r);
+        context.write(ref, vrow);
+      }
+    }
+  }
+
+  public static enum Counts {
+    UNREFERENCED, UNDEFINED, REFERENCED, CORRUPT
+  }
+
+  public static class CReducer extends Reducer<LongWritable,VLongWritable,Text,Text> {
+    private ArrayList<Long> refs = new ArrayList<>();
+
+    @Override
+    public void reduce(LongWritable key, Iterable<VLongWritable> values, Context context) throws IOException, InterruptedException {
+
+      int defCount = 0;
+
+      refs.clear();
+      for (VLongWritable type : values) {
+        if (type.get() == -1) {
+          defCount++;
+        } else {
+          refs.add(type.get());
+        }
+      }
+
+      if (defCount == 0 && refs.size() > 0) {
+        StringBuilder sb = new StringBuilder();
+        String comma = "";
+        for (Long ref : refs) {
+          sb.append(comma);
+          comma = ",";
+          sb.append(new String(ContinuousIngest.genRow(ref), UTF_8));
+        }
+
+        context.write(new Text(ContinuousIngest.genRow(key.get())), new Text(sb.toString()));
+        context.getCounter(Counts.UNDEFINED).increment(1l);
+
+      } else if (defCount > 0 && refs.size() == 0) {
+        context.getCounter(Counts.UNREFERENCED).increment(1l);
+      } else {
+        context.getCounter(Counts.REFERENCED).increment(1l);
+      }
+
+    }
+  }
+
+  static class Opts extends MapReduceClientOnDefaultTable {
+    @Parameter(names = "--output", description = "location in HDFS to store the results; must not exist")
+    String outputDir = "/tmp/continuousVerify";
+
+    @Parameter(names = "--maxMappers", description = "the maximum number of mappers to use", validateWith = PositiveInteger.class)
+    int maxMaps = 1;
+
+    @Parameter(names = "--reducers", description = "the number of reducers to use", validateWith = PositiveInteger.class)
+    int reducers = 1;
+
+    @Parameter(names = "--offline", description = "perform the verification directly on the files while the table is offline")
+    boolean scanOffline = false;
+
+    public Opts() {
+      super("ci");
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(this.getClass().getName(), args);
+
+    Job job = Job.getInstance(getConf(), this.getClass().getSimpleName() + "_" + System.currentTimeMillis());
+    job.setJarByClass(this.getClass());
+
+    job.setInputFormatClass(AccumuloInputFormat.class);
+    opts.setAccumuloConfigs(job);
+
+    Set<Range> ranges = null;
+    String clone = opts.getTableName();
+    Connector conn = null;
+
+    if (opts.scanOffline) {
+      Random random = new Random();
+      clone = opts.getTableName() + "_" + String.format("%016x", (random.nextLong() & 0x7fffffffffffffffl));
+      conn = opts.getConnector();
+      conn.tableOperations().clone(opts.getTableName(), clone, true, new HashMap<String,String>(), new HashSet<String>());
+      ranges = conn.tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
+      conn.tableOperations().offline(clone);
+      AccumuloInputFormat.setInputTableName(job, clone);
+      AccumuloInputFormat.setOfflineTableScan(job, true);
+    } else {
+      ranges = opts.getConnector().tableOperations().splitRangeByTablets(opts.getTableName(), new Range(), opts.maxMaps);
+    }
+
+    AccumuloInputFormat.setRanges(job, ranges);
+    AccumuloInputFormat.setAutoAdjustRanges(job, false);
+
+    job.setMapperClass(CMapper.class);
+    job.setMapOutputKeyClass(LongWritable.class);
+    job.setMapOutputValueClass(VLongWritable.class);
+
+    job.setReducerClass(CReducer.class);
+    job.setNumReduceTasks(opts.reducers);
+
+    job.setOutputFormatClass(TextOutputFormat.class);
+
+    job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", opts.scanOffline);
+
+    TextOutputFormat.setOutputPath(job, new Path(opts.outputDir));
+
+    job.waitForCompletion(true);
+
+    if (opts.scanOffline) {
+      conn.tableOperations().delete(clone);
+    }
+    opts.stopTracing();
+    return job.isSuccessful() ? 0 : 1;
+  }
+
+  /**
+   *
+   * @param args
+   *          instanceName zookeepers username password table columns outputpath
+   */
+  public static void main(String[] args) throws Exception {
+    int res = ToolRunner.run(CachedConfiguration.getInstance(), new ContinuousVerify(), args);
+    if (res != 0)
+      System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousWalk.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousWalk.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousWalk.java
new file mode 100644
index 0000000..2335fd4
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/ContinuousWalk.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.continuous;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.zip.CRC32;
+
+import org.apache.accumulo.core.cli.ClientOnDefaultTable;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.trace.Span;
+import org.apache.accumulo.core.trace.Trace;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.Parameter;
+
+public class ContinuousWalk {
+
+  static public class Opts extends ContinuousQuery.Opts {
+    class RandomAuthsConverter implements IStringConverter<RandomAuths> {
+      @Override
+      public RandomAuths convert(String value) {
+        try {
+          return new RandomAuths(value);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+
+    @Parameter(names = "--authsFile", description = "read the authorities to use from a file")
+    RandomAuths randomAuths = new RandomAuths();
+  }
+
+  static class BadChecksumException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+
+    public BadChecksumException(String msg) {
+      super(msg);
+    }
+
+  }
+
+  static class RandomAuths {
+    private List<Authorizations> auths;
+
+    RandomAuths() {
+      auths = Collections.singletonList(Authorizations.EMPTY);
+    }
+
+    RandomAuths(String file) throws IOException {
+      if (file == null) {
+        auths = Collections.singletonList(Authorizations.EMPTY);
+        return;
+      }
+
+      auths = new ArrayList<>();
+
+      FileSystem fs = FileSystem.get(new Configuration());
+      BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(new Path(file)), UTF_8));
+      try {
+        String line;
+        while ((line = in.readLine()) != null) {
+          auths.add(new Authorizations(line.split(",")));
+        }
+      } finally {
+        in.close();
+      }
+    }
+
+    Authorizations getAuths(Random r) {
+      return auths.get(r.nextInt(auths.size()));
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    ClientOnDefaultTable clientOpts = new ClientOnDefaultTable("ci");
+    clientOpts.parseArgs(ContinuousWalk.class.getName(), args, opts);
+
+    Connector conn = clientOpts.getConnector();
+
+    Random r = new Random();
+
+    ArrayList<Value> values = new ArrayList<>();
+
+    while (true) {
+      Scanner scanner = ContinuousUtil.createScanner(conn, clientOpts.getTableName(), opts.randomAuths.getAuths(r));
+      String row = findAStartRow(opts.min, opts.max, scanner, r);
+
+      while (row != null) {
+
+        values.clear();
+
+        long t1 = System.currentTimeMillis();
+        Span span = Trace.on("walk");
+        try {
+          scanner.setRange(new Range(new Text(row)));
+          for (Entry<Key,Value> entry : scanner) {
+            validate(entry.getKey(), entry.getValue());
+            values.add(entry.getValue());
+          }
+        } finally {
+          span.stop();
+        }
+        long t2 = System.currentTimeMillis();
+
+        System.out.printf("SRQ %d %s %d %d%n", t1, row, (t2 - t1), values.size());
+
+        if (values.size() > 0) {
+          row = getPrevRow(values.get(r.nextInt(values.size())));
+        } else {
+          System.out.printf("MIS %d %s%n", t1, row);
+          System.err.printf("MIS %d %s%n", t1, row);
+          row = null;
+        }
+
+        if (opts.sleepTime > 0)
+          Thread.sleep(opts.sleepTime);
+      }
+
+      if (opts.sleepTime > 0)
+        Thread.sleep(opts.sleepTime);
+    }
+  }
+
+  private static String findAStartRow(long min, long max, Scanner scanner, Random r) {
+
+    byte[] scanStart = ContinuousIngest.genRow(min, max, r);
+    scanner.setRange(new Range(new Text(scanStart), null));
+    scanner.setBatchSize(100);
+
+    int count = 0;
+    String pr = null;
+
+    long t1 = System.currentTimeMillis();
+
+    for (Entry<Key,Value> entry : scanner) {
+      validate(entry.getKey(), entry.getValue());
+      pr = getPrevRow(entry.getValue());
+      count++;
+      if (pr != null)
+        break;
+    }
+
+    long t2 = System.currentTimeMillis();
+
+    System.out.printf("FSR %d %s %d %d%n", t1, new String(scanStart, UTF_8), (t2 - t1), count);
+
+    return pr;
+  }
+
+  static int getPrevRowOffset(byte val[]) {
+    if (val.length == 0)
+      throw new IllegalArgumentException();
+    if (val[53] != ':')
+      throw new IllegalArgumentException(new String(val, UTF_8));
+
+    // prev row starts at 54
+    if (val[54] != ':') {
+      if (val[54 + 16] != ':')
+        throw new IllegalArgumentException(new String(val, UTF_8));
+      return 54;
+    }
+
+    return -1;
+  }
+
+  static String getPrevRow(Value value) {
+
+    byte[] val = value.get();
+    int offset = getPrevRowOffset(val);
+    if (offset > 0) {
+      return new String(val, offset, 16, UTF_8);
+    }
+
+    return null;
+  }
+
+  static int getChecksumOffset(byte val[]) {
+    if (val[val.length - 1] != ':') {
+      if (val[val.length - 9] != ':')
+        throw new IllegalArgumentException(new String(val, UTF_8));
+      return val.length - 8;
+    }
+
+    return -1;
+  }
+
+  static void validate(Key key, Value value) throws BadChecksumException {
+    int ckOff = getChecksumOffset(value.get());
+    if (ckOff < 0)
+      return;
+
+    long storedCksum = Long.parseLong(new String(value.get(), ckOff, 8, UTF_8), 16);
+
+    CRC32 cksum = new CRC32();
+
+    cksum.update(key.getRowData().toArray());
+    cksum.update(key.getColumnFamilyData().toArray());
+    cksum.update(key.getColumnQualifierData().toArray());
+    cksum.update(key.getColumnVisibilityData().toArray());
+    cksum.update(value.get(), 0, ckOff);
+
+    if (cksum.getValue() != storedCksum) {
+      throw new BadChecksumException("Checksum invalid " + key + " " + value);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/GenSplits.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/GenSplits.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/GenSplits.java
new file mode 100644
index 0000000..be9ef7a
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/GenSplits.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.continuous;
+
+import java.util.List;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+
+/**
+ *
+ */
+public class GenSplits {
+
+  static class Opts {
+    @Parameter(names = "--min", description = "minimum row")
+    long minRow = 0;
+
+    @Parameter(names = "--max", description = "maximum row")
+    long maxRow = Long.MAX_VALUE;
+
+    @Parameter(description = "<num tablets>")
+    List<String> args = null;
+  }
+
+  public static void main(String[] args) {
+
+    Opts opts = new Opts();
+    JCommander jcommander = new JCommander(opts);
+    jcommander.setProgramName(GenSplits.class.getSimpleName());
+
+    try {
+      jcommander.parse(args);
+    } catch (ParameterException pe) {
+      System.err.println(pe.getMessage());
+      jcommander.usage();
+      System.exit(-1);
+    }
+
+    if (opts.args == null || opts.args.size() != 1) {
+      jcommander.usage();
+      System.exit(-1);
+    }
+
+    int numTablets = Integer.parseInt(opts.args.get(0));
+
+    if (numTablets < 1) {
+      System.err.println("ERROR: numTablets < 1");
+      System.exit(-1);
+    }
+
+    if (opts.minRow >= opts.maxRow) {
+      System.err.println("ERROR: min >= max");
+      System.exit(-1);
+    }
+
+    int numSplits = numTablets - 1;
+    long distance = ((opts.maxRow - opts.minRow) / numTablets) + 1;
+    long split = distance;
+    for (int i = 0; i < numSplits; i++) {
+
+      String s = String.format("%016x", split + opts.minRow);
+
+      while (s.charAt(s.length() - 1) == '0') {
+        s = s.substring(0, s.length() - 1);
+      }
+
+      System.out.println(s);
+      split += distance;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/HistData.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/HistData.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/HistData.java
new file mode 100644
index 0000000..2fff363
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/HistData.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.continuous;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+class HistData<T> implements Comparable<HistData<T>>, Serializable {
+  private static final long serialVersionUID = 1L;
+
+  T bin;
+  long count;
+
+  HistData(T bin) {
+    this.bin = bin;
+    count = 0;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(bin) + Objects.hashCode(count);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public boolean equals(Object obj) {
+    return obj == this || (obj != null && obj instanceof HistData && 0 == compareTo((HistData<T>) obj));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public int compareTo(HistData<T> o) {
+    return ((Comparable<T>) bin).compareTo(o.bin);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/Histogram.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/Histogram.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/Histogram.java
new file mode 100644
index 0000000..0f1ba05
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/Histogram.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.continuous;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedOutputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class Histogram<T> implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  protected long sum;
+  protected HashMap<T,HistData<T>> counts;
+
+  public Histogram() {
+    sum = 0;
+    counts = new HashMap<>();
+  }
+
+  public void addPoint(T x) {
+    addPoint(x, 1);
+  }
+
+  public void addPoint(T x, long y) {
+
+    HistData<T> hd = counts.get(x);
+    if (hd == null) {
+      hd = new HistData<>(x);
+      counts.put(x, hd);
+    }
+
+    hd.count += y;
+    sum += y;
+  }
+
+  public long getCount(T x) {
+    HistData<T> hd = counts.get(x);
+    if (hd == null)
+      return 0;
+    return hd.count;
+  }
+
+  public double getPercentage(T x) {
+    if (getSum() == 0) {
+      return 0;
+    }
+    return (double) getCount(x) / (double) getSum() * 100.0;
+  }
+
+  public long getSum() {
+    return sum;
+  }
+
+  public List<T> getKeysInCountSortedOrder() {
+
+    ArrayList<HistData<T>> sortedCounts = new ArrayList<>(counts.values());
+
+    Collections.sort(sortedCounts, new Comparator<HistData<T>>() {
+      @Override
+      public int compare(HistData<T> o1, HistData<T> o2) {
+        if (o1.count < o2.count)
+          return -1;
+        if (o1.count > o2.count)
+          return 1;
+        return 0;
+      }
+    });
+
+    ArrayList<T> sortedKeys = new ArrayList<>();
+
+    for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) {
+      HistData<T> hd = iter.next();
+      sortedKeys.add(hd.bin);
+    }
+
+    return sortedKeys;
+  }
+
+  public void print(StringBuilder out) {
+    TreeSet<HistData<T>> sortedCounts = new TreeSet<>(counts.values());
+
+    int maxValueLen = 0;
+
+    for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) {
+      HistData<T> hd = iter.next();
+      if (("" + hd.bin).length() > maxValueLen) {
+        maxValueLen = ("" + hd.bin).length();
+      }
+    }
+
+    double psum = 0;
+
+    for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) {
+      HistData<T> hd = iter.next();
+
+      psum += getPercentage(hd.bin);
+
+      out.append(String.format(" %" + (maxValueLen + 1) + "s %,16d %6.2f%s %6.2f%s%n", hd.bin + "", hd.count, getPercentage(hd.bin), "%", psum, "%"));
+    }
+    out.append(String.format("%n %" + (maxValueLen + 1) + "s %,16d %n", "TOTAL", sum));
+  }
+
+  public void save(String file) throws IOException {
+
+    FileOutputStream fos = new FileOutputStream(file);
+    BufferedOutputStream bos = new BufferedOutputStream(fos);
+    PrintStream ps = new PrintStream(bos, false, UTF_8.name());
+
+    TreeSet<HistData<T>> sortedCounts = new TreeSet<>(counts.values());
+    for (Iterator<HistData<T>> iter = sortedCounts.iterator(); iter.hasNext();) {
+      HistData<T> hd = iter.next();
+      ps.println(" " + hd.bin + " " + hd.count);
+    }
+
+    ps.close();
+  }
+
+  public Set<T> getKeys() {
+    return counts.keySet();
+  }
+
+  public void clear() {
+    counts.clear();
+    sum = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/PrintScanTimeHistogram.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/PrintScanTimeHistogram.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/PrintScanTimeHistogram.java
new file mode 100644
index 0000000..7172f3a
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/PrintScanTimeHistogram.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.continuous;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PrintScanTimeHistogram {
+
+  private static final Logger log = LoggerFactory.getLogger(PrintScanTimeHistogram.class);
+
+  public static void main(String[] args) throws Exception {
+    Histogram<String> srqHist = new Histogram<>();
+    Histogram<String> fsrHist = new Histogram<>();
+
+    processFile(System.in, srqHist, fsrHist);
+
+    StringBuilder report = new StringBuilder();
+    report.append(String.format("%n *** Single row queries histogram *** %n"));
+    srqHist.print(report);
+    log.info("{}", report);
+
+    report = new StringBuilder();
+    report.append(String.format("%n *** Find start rows histogram *** %n"));
+    fsrHist.print(report);
+    log.info("{}", report);
+  }
+
+  private static void processFile(InputStream ins, Histogram<String> srqHist, Histogram<String> fsrHist) throws FileNotFoundException, IOException {
+    String line;
+    BufferedReader in = new BufferedReader(new InputStreamReader(ins, UTF_8));
+
+    while ((line = in.readLine()) != null) {
+
+      try {
+        String[] tokens = line.split(" ");
+
+        String type = tokens[0];
+        if (type.equals("SRQ")) {
+          long delta = Long.parseLong(tokens[3]);
+          String point = generateHistPoint(delta);
+          srqHist.addPoint(point);
+        } else if (type.equals("FSR")) {
+          long delta = Long.parseLong(tokens[3]);
+          String point = generateHistPoint(delta);
+          fsrHist.addPoint(point);
+        }
+      } catch (Exception e) {
+        log.error("Failed to process line '" + line + "'.", e);
+      }
+    }
+
+    in.close();
+  }
+
+  private static String generateHistPoint(long delta) {
+    String point;
+
+    if (delta / 1000.0 < .1) {
+      point = String.format("%07.2f", delta / 1000.0);
+      if (point.equals("0000.10"))
+        point = "0000.1x";
+    } else if (delta / 1000.0 < 1.0) {
+      point = String.format("%06.1fx", delta / 1000.0);
+      if (point.equals("0001.0x"))
+        point = "0001.xx";
+    } else {
+      point = String.format("%04.0f.xx", delta / 1000.0);
+    }
+    return point;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo-testing/blob/b81229d7/core/src/main/java/org/apache/accumulo/testing/core/continuous/TimeBinner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/testing/core/continuous/TimeBinner.java b/core/src/main/java/org/apache/accumulo/testing/core/continuous/TimeBinner.java
new file mode 100644
index 0000000..d43e2e5
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/testing/core/continuous/TimeBinner.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.core.continuous;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.accumulo.core.cli.ClientOpts.TimeConverter;
+import org.apache.accumulo.core.cli.Help;
+
+import com.beust.jcommander.Parameter;
+
+public class TimeBinner {
+
+  enum Operation {
+    AVG, SUM, MIN, MAX, COUNT, CUMULATIVE, AMM, // avg,min,max
+    AMM_HACK1 // special case
+  }
+
+  private static class DoubleWrapper {
+    double d;
+  }
+
+  private static DoubleWrapper get(long l, HashMap<Long,DoubleWrapper> m, double init) {
+    DoubleWrapper dw = m.get(l);
+    if (dw == null) {
+      dw = new DoubleWrapper();
+      dw.d = init;
+      m.put(l, dw);
+    }
+    return dw;
+  }
+
+  static class Opts extends Help {
+    @Parameter(names = "--period", description = "period", converter = TimeConverter.class, required = true)
+    long period = 0;
+    @Parameter(names = "--timeColumn", description = "time column", required = true)
+    int timeColumn = 0;
+    @Parameter(names = "--dataColumn", description = "data column", required = true)
+    int dataColumn = 0;
+    @Parameter(names = "--operation", description = "one of: AVG, SUM, MIN, MAX, COUNT", required = true)
+    String operation;
+    @Parameter(names = "--dateFormat", description = "a SimpleDataFormat string that describes the data format")
+    String dateFormat = "MM/dd/yy-HH:mm:ss";
+  }
+
+  public static void main(String[] args) throws Exception {
+    Opts opts = new Opts();
+    opts.parseArgs(TimeBinner.class.getName(), args);
+
+    Operation operation = Operation.valueOf(opts.operation);
+    SimpleDateFormat sdf = new SimpleDateFormat(opts.dateFormat);
+
+    BufferedReader in = new BufferedReader(new InputStreamReader(System.in, UTF_8));
+
+    String line = null;
+
+    HashMap<Long,DoubleWrapper> aggregation1 = new HashMap<>();
+    HashMap<Long,DoubleWrapper> aggregation2 = new HashMap<>();
+    HashMap<Long,DoubleWrapper> aggregation3 = new HashMap<>();
+    HashMap<Long,DoubleWrapper> aggregation4 = new HashMap<>();
+
+    while ((line = in.readLine()) != null) {
+
+      try {
+        String tokens[] = line.split("\\s+");
+
+        long time = (long) Double.parseDouble(tokens[opts.timeColumn]);
+        double data = Double.parseDouble(tokens[opts.dataColumn]);
+
+        time = (time / opts.period) * opts.period;
+
+        switch (operation) {
+          case AMM_HACK1: {
+            if (opts.dataColumn < 2) {
+              throw new IllegalArgumentException("--dataColumn must be at least 2");
+            }
+            double data_min = Double.parseDouble(tokens[opts.dataColumn - 2]);
+            double data_max = Double.parseDouble(tokens[opts.dataColumn - 1]);
+
+            updateMin(time, aggregation3, data, data_min);
+            updateMax(time, aggregation4, data, data_max);
+
+            increment(time, aggregation1, data);
+            increment(time, aggregation2, 1);
+            break;
+          }
+          case AMM: {
+            updateMin(time, aggregation3, data, data);
+            updateMax(time, aggregation4, data, data);
+
+            increment(time, aggregation1, data);
+            increment(time, aggregation2, 1);
+            break;
+          }
+          case AVG: {
+            increment(time, aggregation1, data);
+            increment(time, aggregation2, 1);
+            break;
+          }
+          case MAX: {
+            updateMax(time, aggregation1, data, data);
+            break;
+          }
+          case MIN: {
+            updateMin(time, aggregation1, data, data);
+            break;
+          }
+          case COUNT: {
+            increment(time, aggregation1, 1);
+            break;
+          }
+          case SUM:
+          case CUMULATIVE: {
+            increment(time, aggregation1, data);
+            break;
+          }
+        }
+
+      } catch (Exception e) {
+        System.err.println("Failed to process line : " + line + "  " + e.getMessage());
+      }
+    }
+
+    TreeMap<Long,DoubleWrapper> sorted = new TreeMap<>(aggregation1);
+
+    Set<Entry<Long,DoubleWrapper>> es = sorted.entrySet();
+
+    double cumulative = 0;
+    for (Entry<Long,DoubleWrapper> entry : es) {
+      String value;
+
+      switch (operation) {
+        case AMM_HACK1:
+        case AMM: {
+          DoubleWrapper countdw = aggregation2.get(entry.getKey());
+          value = "" + (entry.getValue().d / countdw.d) + " " + aggregation3.get(entry.getKey()).d + " " + aggregation4.get(entry.getKey()).d;
+          break;
+        }
+        case AVG: {
+          DoubleWrapper countdw = aggregation2.get(entry.getKey());
+          value = "" + (entry.getValue().d / countdw.d);
+          break;
+        }
+        case CUMULATIVE: {
+          cumulative += entry.getValue().d;
+          value = "" + cumulative;
+          break;
+        }
+        default:
+          value = "" + entry.getValue().d;
+      }
+
+      System.out.println(sdf.format(new Date(entry.getKey())) + " " + value);
+    }
+
+  }
+
+  private static void increment(long time, HashMap<Long,DoubleWrapper> aggregation, double amount) {
+    get(time, aggregation, 0).d += amount;
+  }
+
+  private static void updateMax(long time, HashMap<Long,DoubleWrapper> aggregation, double data, double new_max) {
+    DoubleWrapper maxdw = get(time, aggregation, Double.NEGATIVE_INFINITY);
+    if (data > maxdw.d)
+      maxdw.d = new_max;
+  }
+
+  private static void updateMin(long time, HashMap<Long,DoubleWrapper> aggregation, double data, double new_min) {
+    DoubleWrapper mindw = get(time, aggregation, Double.POSITIVE_INFINITY);
+    if (data < mindw.d)
+      mindw.d = new_min;
+  }
+}


Mime
View raw message