accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From els...@apache.org
Subject [2/2] accumulo git commit: ACCUMULO-3231 Initial stub out of some scripts to automate use of the merkle tree + some random ingest
Date Fri, 07 Nov 2014 21:01:25 GMT
ACCUMULO-3231 Initial stub out of some scripts to automate use of the merkle tree + some random ingest


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f6af58de
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f6af58de
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f6af58de

Branch: refs/heads/master
Commit: f6af58de5814aa03dddaff79614e10f245ea124b
Parents: 5fdbb81
Author: Josh Elser <elserj@apache.org>
Authored: Mon Oct 13 23:55:10 2014 -0400
Committer: Josh Elser <elserj@apache.org>
Committed: Fri Nov 7 15:51:53 2014 -0500

----------------------------------------------------------------------
 .../test/replication/merkle/MerkleTree.java     |  92 ++++++
 .../test/replication/merkle/MerkleTreeNode.java | 131 +++++++++
 .../replication/merkle/RangeSerialization.java  |  72 +++++
 .../replication/merkle/cli/CompareTables.java   | 164 +++++++++++
 .../replication/merkle/cli/ComputeRootHash.java | 100 +++++++
 .../replication/merkle/cli/GenerateHashes.java  | 285 +++++++++++++++++++
 .../merkle/cli/ManualComparison.java            |  98 +++++++
 .../merkle/ingest/RandomWorkload.java           | 120 ++++++++
 .../test/replication/merkle/package-info.java   |  38 +++
 .../replication/merkle/skvi/DigestIterator.java | 152 ++++++++++
 .../merkle-replication/configure-replication.sh |  99 +++++++
 test/system/merkle-replication/ingest-data.sh   |  39 +++
 test/system/merkle-replication/merkle-env.sh    |  48 ++++
 test/system/merkle-replication/verify-data.sh   |  91 ++++++
 14 files changed, 1529 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/MerkleTree.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/MerkleTree.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/MerkleTree.java
new file mode 100644
index 0000000..03eb466
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/MerkleTree.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication.merkle;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.accumulo.core.util.Pair;
+
+import com.google.common.collect.Iterables;
+
+/**
+ * Simple implementation of a Merkle tree
+ */
+public class MerkleTree {
+  protected List<MerkleTreeNode> leaves;
+  protected String digestAlgorithm;
+
+  public MerkleTree(List<MerkleTreeNode> leaves, String digestAlgorithm) {
+    this.leaves = leaves;
+    this.digestAlgorithm = digestAlgorithm;
+  }
+
+  public MerkleTreeNode getRootNode() throws NoSuchAlgorithmException {
+    ArrayList<MerkleTreeNode> buffer = new ArrayList<>(leaves.size());
+    buffer.addAll(leaves);
+
+    while (buffer.size() > 1) {
+      // Find two nodes that we want to roll up
+      Pair<Integer,Integer> pairToJoin = findNextPair(buffer);
+
+      // Make a parent node from them
+      MerkleTreeNode parent = new MerkleTreeNode(Arrays.asList(buffer.get(pairToJoin.getFirst()), buffer.get(pairToJoin.getSecond())), digestAlgorithm);
+
+      // Insert it back into the "tree" at the position of the first child
+      buffer.set(pairToJoin.getFirst(), parent);
+
+      // Remove the second child completely
+      buffer.remove(pairToJoin.getSecond().intValue());
+
+      // "recurse"
+    }
+
+    return Iterables.getOnlyElement(buffer);
+  }
+
+  protected Pair<Integer,Integer> findNextPair(List<MerkleTreeNode> nodes) {
+    int i = 0, j = 1;
+    while (i < nodes.size() && j < nodes.size()) {
+      MerkleTreeNode left = nodes.get(i), right = nodes.get(j);
+
+      // At the same level
+      if (left.getLevel() == right.getLevel()) {
+        return new Pair<Integer,Integer>(i, j);
+      }
+
+      // Peek to see if we have another element
+      if (j + 1 < nodes.size()) {
+        // If we do, try to match those
+        i++;
+        j++;
+      } else {
+        // Otherwise, the last two elements must be paired
+        return new Pair<Integer,Integer>(i, j);
+      }
+    }
+
+    if (2 < nodes.size()) {
+      throw new IllegalStateException("Should not have exited loop without pairing two elements when we have at least 3 nodes");
+    } else if (2 == nodes.size()) {
+      return new Pair<Integer,Integer>(0, 1);
+    } else {
+      throw new IllegalStateException("Must have at least two nodes to pair");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/MerkleTreeNode.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/MerkleTreeNode.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/MerkleTreeNode.java
new file mode 100644
index 0000000..dfde41a
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/MerkleTreeNode.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication.merkle;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the level (height) within the tree, the ranges that it covers, and the new hash 
+ */
+public class MerkleTreeNode {
+  private static final Logger log = LoggerFactory.getLogger(MerkleTreeNode.class);
+
+  private Range range;
+  private int level;
+  private List<Range> children;
+  private byte[] hash;
+
+  public MerkleTreeNode(Range range, int level, List<Range> children, byte[] hash) {
+    this.range = range;
+    this.level = level;
+    this.children = children;
+    this.hash = hash;
+  }
+
+  public MerkleTreeNode(Key k, Value v) {
+    range = RangeSerialization.toRange(k);
+    level = 0;
+    children = Collections.emptyList();
+    hash = v.get();
+  }
+
+  public MerkleTreeNode(List<MerkleTreeNode> children, String digestAlgorithm) throws NoSuchAlgorithmException {
+    level = 0;
+    this.children = new ArrayList<Range>(children.size());
+    MessageDigest digest = MessageDigest.getInstance(digestAlgorithm);
+
+    Range childrenRange = null;
+    for (MerkleTreeNode child : children) {
+      this.children.add(child.getRange());
+      level = Math.max(child.getLevel(), level);
+      digest.update(child.getHash());
+
+      if (null == childrenRange) {
+        childrenRange = child.getRange();
+      } else {
+        List<Range> overlappingRanges = Range.mergeOverlapping(Arrays.asList(childrenRange, child.getRange()));
+        if (1 != overlappingRanges.size()) {
+          log.error("Tried to merge non-contiguous ranges: {} {}", childrenRange, child.getRange());
+          throw new IllegalArgumentException("Ranges must be contiguous: " + childrenRange + ", " + child.getRange());
+        }
+
+        childrenRange = overlappingRanges.get(0);
+      }
+    }
+
+    // Our actual level is one more than the highest level of our children
+    level++;
+
+    // Roll the hash up the tree
+    hash = digest.digest();
+
+    // Set the range to be the merged result of the children
+    range = childrenRange;
+  }
+
+  public Range getRange() {
+    return range;
+  }
+
+  public int getLevel() {
+    return level;
+  }
+
+  public List<Range> getChildren() {
+    return children;
+  }
+
+  public byte[] getHash() {
+    return hash;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(32);
+    sb.append("range=").append(range).append(" level=").append(level).append(" hash=").append(Hex.encodeHexString(hash)).append(" children=").append(children);
+    return sb.toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof MerkleTreeNode) {
+      MerkleTreeNode other = (MerkleTreeNode) o;
+      return range.equals(other.getRange()) && level == other.getLevel() && children.equals(other.getChildren()) && Arrays.equals(hash, other.getHash());
+    }
+
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder hcb = new HashCodeBuilder(1395, 39532);
+    return hcb.append(range).append(level).append(children).append(hash).toHashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/RangeSerialization.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/RangeSerialization.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/RangeSerialization.java
new file mode 100644
index 0000000..62bc800
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/RangeSerialization.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication.merkle;
+
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+/**
+ * 
+ */
+public class RangeSerialization {
+  private static final Text EMPTY = new Text(new byte[0]);
+
+  public static Range toRange(Key key) {
+    Text holder = new Text();
+    key.getRow(holder);
+    Key startKey;
+    if (0 == holder.getLength()) {
+      startKey = null;
+    } else {
+      startKey = new Key(holder);
+    }
+    
+    key.getColumnQualifier(holder);
+    Key endKey;
+    if (0 == holder.getLength()) {
+      endKey = null;
+    } else {
+      endKey = new Key(holder);
+    }
+
+    // Don't be inclusive for no bounds on a Range
+    return new Range(startKey, startKey != null, endKey, endKey != null);
+  }
+
+  public static Key toKey(Range range) {
+    Text row = getRow(range);
+    return new Key(row, EMPTY, getColumnQualifier(range));
+  }
+
+  public static Mutation toMutation(Range range, Value v) {
+    Text row = getRow(range);
+    Mutation m = new Mutation(row);
+    m.put(EMPTY, getColumnQualifier(range), v);
+    return m;
+  }
+
+  public static Text getRow(Range range) {
+    return range.isInfiniteStartKey() ? EMPTY : range.getStartKey().getRow();
+  }
+
+  public static Text getColumnQualifier(Range range) {
+    return range.isInfiniteStopKey() ? EMPTY : range.getEndKey().getRow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/CompareTables.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/CompareTables.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/CompareTables.java
new file mode 100644
index 0000000..5ac5b68
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/CompareTables.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication.merkle.cli;
+
+import java.io.FileNotFoundException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Range;
+import org.apache.commons.codec.binary.Hex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Accepts a set of tables, computes the hashes for each, and prints the top-level hash for each table.
+ * <p>
+ * Will automatically create output tables for intermediate hashes instead of requiring their existence.
+ * This will raise an exception when the table we want to use already exists.
+ */
+public class CompareTables {
+  private static final Logger log = LoggerFactory.getLogger(CompareTables.class);
+
+  public static class CompareTablesOpts extends ClientOpts {
+    @Parameter(names={"--tables"}, description = "Tables to compare", variableArity=true)
+    public List<String> tables;
+
+    @Parameter(names = {"-nt", "--numThreads"}, required = false, description = "number of concurrent threads calculating digests")
+    private int numThreads = 4;
+
+    @Parameter(names = {"-hash", "--hash"}, required = true, description = "type of hash to use")
+    private String hashName;
+
+    @Parameter(names = {"-iter", "--iterator"}, required = false, description = "Should pushdown digest to iterators")
+    private boolean iteratorPushdown = false;
+
+    @Parameter(names = {"-s", "--splits"}, required = false, description = "File of splits to use for merkle tree")
+    private String splitsFile = null;
+
+    public List<String> getTables() {
+      return this.tables;
+    }
+
+    public void setTables(List<String> tables) {
+      this.tables = tables;
+    }
+
+    public int getNumThreads() {
+      return numThreads;
+    }
+
+    public void setNumThreads(int numThreads) {
+      this.numThreads = numThreads;
+    }
+
+    public String getHashName() {
+      return hashName;
+    }
+
+    public void setHashName(String hashName) {
+      this.hashName = hashName;
+    }
+
+    public boolean isIteratorPushdown() {
+      return iteratorPushdown;
+    }
+
+    public void setIteratorPushdown(boolean iteratorPushdown) {
+      this.iteratorPushdown = iteratorPushdown;
+    }
+
+    public String getSplitsFile() {
+      return splitsFile;
+    }
+
+    public void setSplitsFile(String splitsFile) {
+      this.splitsFile = splitsFile;
+    }
+  }
+
+  private CompareTablesOpts opts;
+
+  protected CompareTables() {}
+
+  public CompareTables(CompareTablesOpts opts) {
+    this.opts = opts;
+  }
+
+  public Map<String,String> computeAllHashes() throws AccumuloException, AccumuloSecurityException, TableExistsException, NoSuchAlgorithmException,
+      TableNotFoundException, FileNotFoundException {
+    final Connector conn = opts.getConnector();
+    final Map<String,String> hashesByTable = new HashMap<>();
+
+    for (String table : opts.getTables()) {
+      final String outputTableName = table + "_merkle";
+
+      if (conn.tableOperations().exists(outputTableName)) {
+        throw new IllegalArgumentException("Expected output table name to not yet exist: " + outputTableName);
+      }
+
+      conn.tableOperations().create(outputTableName);
+
+      GenerateHashes genHashes = new GenerateHashes();
+      Collection<Range> ranges = genHashes.getRanges(opts.getConnector(), table, opts.getSplitsFile());
+
+      try {
+        genHashes.run(opts.getConnector(), table, table + "_merkle", opts.getHashName(), opts.getNumThreads(), opts.isIteratorPushdown(), ranges);
+      } catch (Exception e) {
+        log.error("Error generating hashes for {}", table, e);
+        throw new RuntimeException(e);
+      }
+
+      ComputeRootHash computeRootHash = new ComputeRootHash();
+      String hash = Hex.encodeHexString(computeRootHash.getHash(conn, outputTableName, opts.getHashName()));
+
+      hashesByTable.put(table, hash);
+    }
+
+    return hashesByTable;
+  }
+
+  public static void main(String[] args) throws Exception {
+    CompareTablesOpts opts = new CompareTablesOpts();
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    opts.parseArgs("CompareTables", args, bwOpts);
+
+    if (opts.isIteratorPushdown() && null != opts.getSplitsFile()) {
+      throw new IllegalArgumentException("Cannot use iterator pushdown with anything other than table split points");
+    }
+
+    CompareTables compareTables = new CompareTables(opts);
+    Map<String,String> tableToHashes = compareTables.computeAllHashes();
+
+    for (Entry<String,String> entry : tableToHashes.entrySet()) {
+      System.out.println(entry.getKey() + " " + entry.getValue());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ComputeRootHash.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ComputeRootHash.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ComputeRootHash.java
new file mode 100644
index 0000000..ea241a6
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ComputeRootHash.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication.merkle.cli;
+
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.test.replication.merkle.MerkleTree;
+import org.apache.accumulo.test.replication.merkle.MerkleTreeNode;
+import org.apache.accumulo.test.replication.merkle.RangeSerialization;
+import org.apache.commons.codec.binary.Hex;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Given a table created by {@link GenerateHashes} which contains the leaves of a Merkle tree, compute the root node of the Merkle tree which can be quickly
+ * compared to the root node of another Merkle tree to ascertain equality.
+ */
+public class ComputeRootHash {
+
+  public static class ComputeRootHashOpts extends ClientOnRequiredTable {
+    @Parameter(names = {"-hash", "--hash"}, required = true, description = "type of hash to use")
+    private String hashName;
+
+    public String getHashName() {
+      return hashName;
+    }
+
+    public void setHashName(String hashName) {
+      this.hashName = hashName;
+    }
+  }
+
+  public byte[] getHash(ComputeRootHashOpts opts) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, NoSuchAlgorithmException {
+    Connector conn = opts.getConnector();
+    String table = opts.getTableName();
+
+    return getHash(conn, table, opts.getHashName());
+  }
+
+  public byte[] getHash(Connector conn, String table, String hashName) throws TableNotFoundException, NoSuchAlgorithmException {
+    List<MerkleTreeNode> leaves = getLeaves(conn, table);
+
+    MerkleTree tree = new MerkleTree(leaves, hashName);
+
+    return tree.getRootNode().getHash();
+  }
+
+  protected ArrayList<MerkleTreeNode> getLeaves(Connector conn, String tableName) throws TableNotFoundException {
+    //TODO make this a bit more resilient to very large merkle trees by lazily reading more data from the table when necessary
+    final Scanner s = conn.createScanner(tableName, Authorizations.EMPTY);
+    final ArrayList<MerkleTreeNode> leaves = new ArrayList<MerkleTreeNode>();
+
+    for (Entry<Key,Value> entry : s) {
+      Range range = RangeSerialization.toRange(entry.getKey());
+      byte[] hash = entry.getValue().get();
+
+      leaves.add(new MerkleTreeNode(range, 0, Collections.<Range> emptyList(), hash));
+    }
+
+    return leaves;
+  }
+
+  public static void main(String[] args) throws Exception {
+    ComputeRootHashOpts opts = new ComputeRootHashOpts();
+    opts.parseArgs("ComputeRootHash", args);
+
+    ComputeRootHash computeRootHash = new ComputeRootHash();
+    byte[] rootHash = computeRootHash.getHash(opts);
+
+    System.out.println(Hex.encodeHexString(rootHash));
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/GenerateHashes.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/GenerateHashes.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/GenerateHashes.java
new file mode 100644
index 0000000..c2c9a5a
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/GenerateHashes.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication.merkle.cli;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map.Entry;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.test.replication.merkle.RangeSerialization;
+import org.apache.accumulo.test.replication.merkle.skvi.DigestIterator;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.Parameter;
+import com.google.common.collect.Iterables;
+
+/**
+ * Read from a table, compute a Merkle tree and output it to a table. Each key-value pair in the destination table is a leaf node of the Merkle tree.
+ */
+public class GenerateHashes {
+  private static final Logger log = LoggerFactory.getLogger(GenerateHashes.class);
+
+  public static class GenerateHashesOpts extends ClientOnRequiredTable {
+    @Parameter(names = {"-hash", "--hash"}, required = true, description = "type of hash to use")
+    private String hashName;
+
+    @Parameter(names = {"-o", "--output"}, required = true, description = "output table name, expected to exist and be writable")
+    private String outputTableName;
+
+    @Parameter(names = {"-nt", "--numThreads"}, required = false, description = "number of concurrent threads calculating digests")
+    private int numThreads = 4;
+
+    @Parameter(names = {"-iter", "--iterator"}, required = false, description = "Should we push down logic with an iterator")
+    private boolean iteratorPushdown = false;
+
+    @Parameter(names = {"-s", "--splits"}, required = false, description = "File of splits to use for merkle tree")
+    private String splitsFile = null;
+
+    public String getHashName() {
+      return hashName;
+    }
+
+    public void setHashName(String hashName) {
+      this.hashName = hashName;
+    }
+
+    public String getOutputTableName() {
+      return outputTableName;
+    }
+
+    public void setOutputTableName(String outputTableName) {
+      this.outputTableName = outputTableName;
+    }
+
+    public int getNumThreads() {
+      return numThreads;
+    }
+
+    public void setNumThreads(int numThreads) {
+      this.numThreads = numThreads;
+    }
+
+    public boolean isIteratorPushdown() {
+      return iteratorPushdown;
+    }
+
+    public void setIteratorPushdown(boolean iteratorPushdown) {
+      this.iteratorPushdown = iteratorPushdown;
+    }
+
+    public String getSplitsFile() {
+      return splitsFile;
+    }
+
+    public void setSplitsFile(String splitsFile) {
+      this.splitsFile = splitsFile;
+    }
+  }
+
+  public Collection<Range> getRanges(Connector conn, String tableName, String splitsFile) throws TableNotFoundException, AccumuloSecurityException, AccumuloException, FileNotFoundException {
+    if (null == splitsFile) {
+      log.info("Using table split points");
+      Collection<Text> endRows = conn.tableOperations().listSplits(tableName);
+      return endRowsToRanges(endRows);
+    } else {
+      log.info("Using provided split points");
+      ArrayList<Text> splits = new ArrayList<Text>();
+
+      String line;
+      java.util.Scanner file = new java.util.Scanner(new File(splitsFile), StandardCharsets.UTF_8.name());
+      try {
+        while (file.hasNextLine()) {
+          line = file.nextLine();
+          if (!line.isEmpty()) {
+            splits.add(new Text(line));
+          }
+        }
+      } finally {
+        file.close();
+      }
+
+      Collections.sort(splits);
+      return endRowsToRanges(splits);
+    }
+  }
+
+  public void run(GenerateHashesOpts opts) throws TableNotFoundException, AccumuloSecurityException, AccumuloException, NoSuchAlgorithmException,
+      FileNotFoundException {
+    Collection<Range> ranges = getRanges(opts.getConnector(), opts.getTableName(), opts.getSplitsFile());
+
+    run(opts.getConnector(), opts.getTableName(), opts.getOutputTableName(), opts.getHashName(), opts.getNumThreads(), opts.isIteratorPushdown(), ranges);
+  }
+
+  public void run(final Connector conn, final String inputTableName, final String outputTableName, final String digestName, int numThreads,
+      final boolean iteratorPushdown, final Collection<Range> ranges) throws TableNotFoundException, AccumuloSecurityException, AccumuloException,
+      NoSuchAlgorithmException {
+    if (!conn.tableOperations().exists(outputTableName)) {
+      throw new IllegalArgumentException(outputTableName + " does not exist, please create it");
+    }
+
+    // Get some parallelism
+    ExecutorService svc = Executors.newFixedThreadPool(numThreads);
+    final BatchWriter bw = conn.createBatchWriter(outputTableName, new BatchWriterConfig());
+
+    try {
+      for (final Range range : ranges) {
+        final MessageDigest digest = getDigestAlgorithm(digestName);
+
+        svc.execute(new Runnable() {
+
+          @Override
+          public void run() {
+            Scanner s;
+            try {
+              s = conn.createScanner(inputTableName, Authorizations.EMPTY);
+            } catch (Exception e) {
+              log.error("Could not get scanner for " + inputTableName, e);
+              throw new RuntimeException(e);
+            }
+
+            s.setRange(range);
+
+            Value v = null;
+            Mutation m = null;
+            if (iteratorPushdown) {
+              IteratorSetting cfg = new IteratorSetting(50, DigestIterator.class);
+              cfg.addOption(DigestIterator.HASH_NAME_KEY, digestName);
+              s.addScanIterator(cfg);
+
+              // The scanner should only ever return us one Key-Value, otherwise this approach won't work
+              Entry<Key,Value> entry = Iterables.getOnlyElement(s);
+
+              v = entry.getValue();
+              m = RangeSerialization.toMutation(range, v);
+            } else {
+              ByteArrayOutputStream baos = new ByteArrayOutputStream();
+              for (Entry<Key,Value> entry : s) {
+                DataOutputStream out = new DataOutputStream(baos);
+                try {
+                  entry.getKey().write(out);
+                  entry.getValue().write(out);
+                } catch (Exception e) {
+                  log.error("Error writing {}", entry, e);
+                  throw new RuntimeException(e);
+                }
+
+                digest.update(baos.toByteArray());
+                baos.reset();
+              }
+
+              v = new Value(digest.digest());
+              m = RangeSerialization.toMutation(range, v);
+            }
+
+            // Log some progress
+            log.info("{} computed digest for {} of {}", Thread.currentThread().getName(), range, Hex.encodeHexString(v.get()));
+
+            try {
+              bw.addMutation(m);
+            } catch (MutationsRejectedException e) {
+              log.error("Could not write mutation", e);
+              throw new RuntimeException(e);
+            }
+          }
+        });
+      }
+
+      svc.shutdown();
+
+      // Wait indefinitely for the scans to complete
+      while (!svc.isTerminated()) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          log.error("Interrupted while waiting for executor service to gracefully complete. Exiting now");
+          svc.shutdownNow();
+          return;
+        }
+      }
+    } finally {
+      // We can only safely close this when we're exiting or we've completely all tasks
+      bw.close();
+    }
+  }
+
+  public TreeSet<Range> endRowsToRanges(Collection<Text> endRows) {
+    ArrayList<Text> sortedEndRows = new ArrayList<Text>(endRows);
+    Collections.sort(sortedEndRows);
+
+    Text prevEndRow = null;
+    TreeSet<Range> ranges = new TreeSet<>();
+    for (Text endRow : sortedEndRows) {
+      if (null == prevEndRow) {
+        ranges.add(new Range(null, false, endRow, true));
+      } else {
+        ranges.add(new Range(prevEndRow, false, endRow, true));
+      }
+      prevEndRow = endRow;
+    }
+
+    ranges.add(new Range(prevEndRow, false, null, false));
+
+    return ranges;
+  }
+
+  protected MessageDigest getDigestAlgorithm(String digestName) throws NoSuchAlgorithmException {
+    return MessageDigest.getInstance(digestName);
+  }
+
+  public static void main(String[] args) throws Exception {
+    GenerateHashesOpts opts = new GenerateHashesOpts();
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    opts.parseArgs(GenerateHashes.class.getName(), args, bwOpts);
+
+    if (opts.isIteratorPushdown() && null != opts.getSplitsFile()) {
+      throw new IllegalArgumentException("Cannot use iterator pushdown with anything other than table split points");
+    }
+
+    GenerateHashes generate = new GenerateHashes();
+    generate.run(opts);
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ManualComparison.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ManualComparison.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ManualComparison.java
new file mode 100644
index 0000000..9f52233
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ManualComparison.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication.merkle.cli;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.cli.ClientOpts;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Accepts two table names and enumerates all key-values pairs in both checking for correctness. All differences between the two tables will be printed to the
+ * console.
+ */
+public class ManualComparison {
+
+  public static class ManualComparisonOpts extends ClientOpts {
+    @Parameter(names={"--table1"}, required = true, description = "First table")
+    public String table1;
+
+    @Parameter(names={"--table2"}, required = true, description = "First table")
+    public String table2;
+  }
+
+  /**
+   * @param args
+   */
+  public static void main(String[] args) throws Exception {
+    ManualComparisonOpts opts = new ManualComparisonOpts();
+    opts.parseArgs("ManualComparison", args);
+
+    Connector conn = opts.getConnector();
+
+    Scanner s1 = conn.createScanner(opts.table1, Authorizations.EMPTY), s2 = conn.createScanner(opts.table2, Authorizations.EMPTY);
+    Iterator<Entry<Key,Value>> iter1 = s1.iterator(), iter2 = s2.iterator();
+    boolean incrementFirst = true, incrementSecond = true;
+
+    Entry<Key,Value> entry1 = iter1.next(), entry2 = iter2.next();
+    while (iter1.hasNext() && iter2.hasNext()) {
+      if (incrementFirst) {
+        entry1 = iter1.next();
+      }
+      if (incrementSecond) {
+        entry2 = iter2.next();
+      }
+      incrementFirst = false;
+      incrementSecond = false;
+
+      if (!entry1.equals(entry2)) {
+
+        if (entry1.getKey().compareTo(entry2.getKey()) < 0) {
+          System.out.println("Exist in original " + entry1);
+          incrementFirst = true;
+        } else if (entry2.getKey().compareTo(entry1.getKey()) < 0) {
+          System.out.println("Exist in replica " + entry2);
+          incrementSecond = true;
+        } else {
+          System.out.println("Differ... " + entry1 + " " + entry2);
+          incrementFirst = true;
+          incrementSecond = true;
+        }
+      } else {
+        incrementFirst = true;
+        incrementSecond = true;
+      }
+    }
+
+    System.out.println("\nExtra entries from " + opts.table1);
+    while (iter1.hasNext()) {
+      System.out.println(iter1.next());
+    }
+
+    System.out.println("\nExtra entries from " + opts.table2);
+    while (iter2.hasNext()) {
+      System.out.println(iter2.next());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/ingest/RandomWorkload.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/ingest/RandomWorkload.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/ingest/RandomWorkload.java
new file mode 100644
index 0000000..5558350
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/ingest/RandomWorkload.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication.merkle.ingest;
+
+import java.util.Random;
+
+import org.apache.accumulo.core.cli.BatchWriterOpts;
+import org.apache.accumulo.core.cli.ClientOnDefaultTable;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Generates some random data with a given percent of updates to be deletes.
+ */
+public class RandomWorkload {
+  public static final String DEFAULT_TABLE_NAME = "randomWorkload";
+
+  public static class RandomWorkloadOpts extends ClientOnDefaultTable {
+    @Parameter(names = {"-n", "--num"}, required = true, description = "Num records to write")
+    public long numRecords;
+
+    @Parameter(names = {"-r", "--rows"}, required = true, description = "Range of rows that can be generated")
+    public int rowMax;
+
+    @Parameter(names = {"-cf", "--colfams"}, required = true, description = "Range of column families that can be generated")
+    public int cfMax;
+
+    @Parameter(names = {"-cq", "--colquals"}, required = true, description = "Range of column qualifiers that can be generated")
+    public int cqMax;
+
+    @Parameter(names = {"-d", "--deletes"}, required = false, description = "Percentage of updates that should be deletes")
+    public int deletePercent = 5;
+
+    public RandomWorkloadOpts() {
+      super(DEFAULT_TABLE_NAME);
+    }
+
+    public RandomWorkloadOpts(String tableName) {
+      super(tableName);
+    }
+  }
+
+  public void run(RandomWorkloadOpts opts, BatchWriterConfig cfg) throws Exception {
+    run(opts.getConnector(), opts.getTableName(), cfg, opts.numRecords, opts.rowMax, opts.cfMax, opts.cqMax, opts.deletePercent);
+  }
+
+  public void run(final Connector conn, final String tableName, final BatchWriterConfig cfg, final long numRecords, int rowMax, int cfMax, int cqMax,
+      int deletePercent) throws Exception {
+
+    final Random rowRand = new Random(12345);
+    final Random cfRand = new Random(12346);
+    final Random cqRand = new Random(12347);
+    final Random deleteRand = new Random(12348);
+    long valueCounter = 0l;
+
+    if (!conn.tableOperations().exists(tableName)) {
+      conn.tableOperations().create(tableName);
+    }
+
+    BatchWriter bw = conn.createBatchWriter(tableName, cfg);
+    try {
+      final Text row = new Text(), cf = new Text(), cq = new Text();
+      final Value value = new Value();
+      for (long i = 0; i < numRecords; i++) {
+        row.set(Integer.toString(rowRand.nextInt(rowMax)));
+        cf.set(Integer.toString(cfRand.nextInt(cfMax)));
+        cq.set(Integer.toString(cqRand.nextInt(cqMax)));
+
+        Mutation m = new Mutation(row);
+
+        // Choose a random value between [0,100)
+        int deleteValue = deleteRand.nextInt(100);
+
+        // putDelete if the value we chose is less than our delete percentage
+        if (deleteValue < deletePercent) {
+          m.putDelete(cf, cq);
+        } else {
+          value.set(Long.toString(valueCounter).getBytes());
+          m.put(cf, cq, valueCounter, value);
+        }
+
+        bw.addMutation(m);
+
+        valueCounter++;
+      }
+    } finally {
+      bw.close();
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    RandomWorkloadOpts opts = new RandomWorkloadOpts();
+    BatchWriterOpts bwOpts = new BatchWriterOpts();
+    opts.parseArgs(RandomWorkload.class.getSimpleName(), args, bwOpts);
+
+    RandomWorkload rw = new RandomWorkload();
+
+    rw.run(opts, bwOpts.getBatchWriterConfig());
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java
new file mode 100644
index 0000000..6afcdf5
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * A <a href="http://en.wikipedia.org/wiki/Merkle_tree">Merkle tree</a> is a hash tree and can be used to evaluate equality over large
+ * files with the ability to ascertain what portions of the files differ. Each leaf of the Merkle tree is some hash of a
+ * portion of the file, with each leaf corresponding to some "range" within the source file. As such, if all leaves are
+ * considered as ranges of the source file, the "sum" of all leaves creates a contiguous range over the entire file.
+ * <P>
+ * The parent of any nodes (typically, a binary tree; however this is not required) is the concatenation of the hashes of
+ * the children. We can construct a full tree by walking up the tree, creating parents from children, until we have a root
+ * node. To check equality of two files that each have a merkle tree built, we can very easily compare the value of at the
+ * root of the Merkle tree to know whether or not the files are the same.
+ * <P>
+ * Additionally, in the situation where we have two files with we expect to be the same but are not, we can walk back down
+ * the tree, finding subtrees that are equal and subtrees that are not. Subtrees that are equal correspond to portions of
+ * the files which are identical, where subtrees that are not equal correspond to discrepancies between the two files.
+ * <P>
+ * We can apply this concept to Accumulo, treating a table as a file, and ranges within a file as an Accumulo Range. We can
+ * then compute the hashes over each of these Ranges and compute the entire Merkle tree to determine if two tables are
+ * equivalent.
+ *
+ * @since 1.7.0
+ */
+package org.apache.accumulo.test.replication.merkle;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java
----------------------------------------------------------------------
diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java
new file mode 100644
index 0000000..dcda76a
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.replication.merkle.skvi;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.IteratorEnvironment;
+import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link SortedKeyValueIterator} which attempts to compute a hash over some range of Key-Value pairs.
+ * <P>
+ * For the purposes of constructing a Merkle tree, this class will only generate a meaningful result if the (Batch)Scanner will compute a single digest over a
+ * Range. If the (Batch)Scanner stops and restarts in the middle of a session, incorrect values will be returned and the merkle tree will be invalid.
+ */
+public class DigestIterator implements SortedKeyValueIterator<Key,Value> {
+  private static final Logger log = LoggerFactory.getLogger(DigestIterator.class);
+  public static final String HASH_NAME_KEY = "hash.name";
+
+  private MessageDigest digest;
+  private Key topKey;
+  private Value topValue;
+  private SortedKeyValueIterator<Key,Value> source;
+
+  @Override
+  public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException {
+    String hashName = options.get(HASH_NAME_KEY);
+    if (null == hashName) {
+      throw new IOException(HASH_NAME_KEY + " must be provided as option");
+    }
+
+    try {
+      this.digest = MessageDigest.getInstance(hashName);
+    } catch (NoSuchAlgorithmException e) {
+      throw new IOException(e);
+    }
+
+    this.topKey = null;
+    this.topValue = null;
+    this.source = source;
+  }
+
+  @Override
+  public boolean hasTop() {
+    return null != topKey;
+  }
+
+  @Override
+  public void next() throws IOException {
+    // We can't call next() if we already consumed it all
+    if (!this.source.hasTop()) {
+      this.topKey = null;
+      this.topValue = null;
+      return;
+    }
+
+    this.source.next();
+
+    consume();
+  }
+
+  @Override
+  public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
+    this.source.seek(range, columnFamilies, inclusive);
+
+    consume();
+  }
+
+  protected void consume() throws IOException {
+    digest.reset();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+
+    if (!this.source.hasTop()) {
+      this.topKey = null;
+      this.topValue = null;
+
+      return;
+    }
+
+    Key lastKeySeen = null;
+    while (this.source.hasTop()) {
+      baos.reset();
+
+      Key currentKey = this.source.getTopKey();
+      lastKeySeen = currentKey;
+
+      currentKey.write(dos);
+      this.source.getTopValue().write(dos);
+
+      digest.update(baos.toByteArray());
+
+      this.source.next();
+    }
+
+    this.topKey = lastKeySeen;
+    this.topValue = new Value(digest.digest());
+  }
+
+  @Override
+  public Key getTopKey() {
+    return topKey;
+  }
+
+  @Override
+  public Value getTopValue() {
+    return topValue;
+  }
+
+  @Override
+  public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
+    DigestIterator copy = new DigestIterator();
+    try {
+      copy.digest = MessageDigest.getInstance(digest.getAlgorithm());
+    } catch (NoSuchAlgorithmException e) {
+      throw new RuntimeException(e);
+    }
+
+    copy.topKey = this.topKey;
+    copy.topValue = this.topValue;
+    copy.source = this.source.deepCopy(env);
+
+    return copy;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/system/merkle-replication/configure-replication.sh
----------------------------------------------------------------------
diff --git a/test/system/merkle-replication/configure-replication.sh b/test/system/merkle-replication/configure-replication.sh
new file mode 100755
index 0000000..44ebdd7
--- /dev/null
+++ b/test/system/merkle-replication/configure-replication.sh
@@ -0,0 +1,99 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Start: Resolve Script Directory
+SOURCE="${BASH_SOURCE[0]}"
+while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink
+   dir=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
+   SOURCE=$(readlink "${SOURCE}")
+   [[ "${SOURCE}" != /* ]] && SOURCE="${dir}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
+done
+dir=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
+script=$( basename "${SOURCE}" )
+# Stop: Resolve Script Directory
+
+# Guess at ACCUMULO_HOME and ACCUMULO_CONF_DIR if not already defined
+ACCUMULO_HOME=${ACCUMULO_HOME:-"${dir}/../../.."}
+ACCUMULO_CONF_DIR=${ACCUMULO_CONF_DIR:-"$ACCUMULO_HOME/conf"}
+
+# Get the configuration values
+. ./merkle-env.sh
+
+tmpdir=$(mktemp -dt "$0.XXXXXXXXXX")
+
+source_commands="${tmpdir}/source_commands.txt"
+
+echo 'Removing old tables and setting replication name on source'
+
+echo "deletetable -f $SOURCE_TABLE_NAME" >> $source_commands
+echo "createtable $SOURCE_TABLE_NAME" >> $source_commands
+echo "config -s replication.name=source" >> $source_commands
+echo "quit" >> $source_commands
+
+# Source: drop and create tables, configure unique name for replication and grant perms
+echo $SOURCE_ACCUMULO_PASSWORD | ${ACCUMULO_HOME}/bin/accumulo shell -u $SOURCE_ACCUMULO_USER -z \
+    $SOURCE_INSTANCE $SOURCE_ZOOKEEPERS -f $source_commands
+
+destination_commands="${tmpdir}/destination_commands.txt"
+
+echo 'Removing old tables and setting replication name on destination'
+
+echo "deletetable -f $DESTINATION_TABLE_NAME" >> $destination_commands
+echo "createtable $DESTINATION_TABLE_NAME" >> $destination_commands
+echo "config -s replication.name=destination" >> $destination_commands
+echo "quit" >> $destination_commands
+
+# Destination: drop and create tables, configure unique name for replication and grant perms
+echo $DESTINATION_ACCUMULO_PASSWORD | ${ACCUMULO_HOME}/bin/accumulo shell -u $DESTINATION_ACCUMULO_USER -z \
+    $DESTINATION_INSTANCE $DESTINATION_ZOOKEEPERS -f $destination_commands
+
+rm $source_commands
+rm $destination_commands
+
+table_id=$(echo $DESTINATION_ACCUMULO_PASSWORD | ${ACCUMULO_HOME}/bin/accumulo shell -u $DESTINATION_ACCUMULO_USER -z \
+    $DESTINATION_INSTANCE $DESTINATION_ZOOKEEPERS -e 'tables -l' | grep "${DESTINATION_TABLE_NAME}" \
+    | grep -v "${DESTINATION_MERKLE_TABLE_NAME}" | awk '{print $3}')
+
+echo "Configuring $SOURCE_TABLE_NAME to replicate to $DESTINATION_TABLE_NAME (id=$table_id)"
+
+# Define our peer 'destination' with the ReplicaSystem impl, instance name and ZKs
+echo "config -s replication.peer.destination=org.apache.accumulo.tserver.replication.AccumuloReplicaSystem,$DESTINATION_INSTANCE,$DESTINATION_ZOOKEEPERS" >> $source_commands
+# Username for 'destination'
+echo "config -s replication.peer.user.destination=$DESTINATION_ACCUMULO_USER" >> $source_commands
+# Password for 'destination'
+echo "config -s replication.peer.password.destination=$DESTINATION_ACCUMULO_PASSWORD" >> $source_commands
+# Configure replication to 'destination' for $SOURCE_TABLE_NAME
+echo "config -t $SOURCE_TABLE_NAME -s table.replication.target.destination=$table_id" >> $source_commands
+# Enable replication for the table
+echo "config -t $SOURCE_TABLE_NAME -s table.replication=true" >> $source_commands
+echo "quit" >> $source_commands
+
+# Configure replication from source to destination and then enable it
+echo $SOURCE_ACCUMULO_PASSWORD | ${ACCUMULO_HOME}/bin/accumulo shell -u $SOURCE_ACCUMULO_USER -z \
+    $SOURCE_INSTANCE $SOURCE_ZOOKEEPERS -f $source_commands
+
+rm $source_commands
+
+# Add some splits to make ingest faster
+echo 'Adding splits...'
+
+echo $SOURCE_ACCUMULO_PASSWORD | ${ACCUMULO_HOME}/bin/accumulo shell -u $SOURCE_ACCUMULO_USER -z \
+    $SOURCE_INSTANCE $SOURCE_ZOOKEEPERS -e "addsplits -t $SOURCE_TABLE_NAME 1 2 3 4 5 6 7 8 9"
+
+echo $DESTINATION_ACCUMULO_PASSWORD | ${ACCUMULO_HOME}/bin/accumulo shell -u $DESTINATION_ACCUMULO_USER -z \
+    $DESTINATION_INSTANCE $DESTINATION_ZOOKEEPERS -e "addsplits -t $DESTINATION_TABLE_NAME 1 2 3 4 5 6 7 8 9"
+

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/system/merkle-replication/ingest-data.sh
----------------------------------------------------------------------
diff --git a/test/system/merkle-replication/ingest-data.sh b/test/system/merkle-replication/ingest-data.sh
new file mode 100755
index 0000000..91b8ccc
--- /dev/null
+++ b/test/system/merkle-replication/ingest-data.sh
@@ -0,0 +1,39 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Start: Resolve Script Directory
+SOURCE="${BASH_SOURCE[0]}"
+while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink
+   dir=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
+   SOURCE=$(readlink "${SOURCE}")
+   [[ "${SOURCE}" != /* ]] && SOURCE="${dir}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
+done
+dir=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
+script=$( basename "${SOURCE}" )
+# Stop: Resolve Script Directory
+
+# Guess at ACCUMULO_HOME and ACCUMULO_CONF_DIR if not already defined
+ACCUMULO_HOME=${ACCUMULO_HOME:-"${dir}/../../.."}
+ACCUMULO_CONF_DIR=${ACCUMULO_CONF_DIR:-"$ACCUMULO_HOME/conf"}
+
+# Get the configuration values
+. ./merkle-env.sh
+
+# Ingest data into the source table
+$ACCUMULO_HOME/bin/accumulo org.apache.accumulo.test.replication.merkle.ingest.RandomWorkload --table $SOURCE_TABLE_NAME \
+    -i $SOURCE_INSTANCE -z $SOURCE_ZOOKEEPERS -u $SOURCE_ACCUMULO_USER -p $SOURCE_ACCUMULO_PASSWORD -d $DELETE_PERCENT \
+    -cf $MAX_CF -cq $MAX_CQ -r $MAX_ROW -n $NUM_RECORDS

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/system/merkle-replication/merkle-env.sh
----------------------------------------------------------------------
diff --git a/test/system/merkle-replication/merkle-env.sh b/test/system/merkle-replication/merkle-env.sh
new file mode 100755
index 0000000..d405394
--- /dev/null
+++ b/test/system/merkle-replication/merkle-env.sh
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Random data will be written to this table
+SOURCE_TABLE_NAME='replicationSource'
+# Then replicated to this table
+DESTINATION_TABLE_NAME='replicationDestination'
+
+# Merkle tree to be stored in this table for the source table
+SOURCE_MERKLE_TABLE_NAME="${SOURCE_TABLE_NAME}_merkle"
+# Merkle tree to be stored in this table for the destination table
+DESTINATION_MERKLE_TABLE_NAME="${DESTINATION_TABLE_NAME}_merkle"
+
+# Connection information to Accumulo
+SOURCE_ACCUMULO_USER="user"
+SOURCE_ACCUMULO_PASSWORD="password"
+
+DESTINATION_ACCUMULO_USER="${SOURCE_ACCUMULO_USER}"
+DESTINATION_ACCUMULO_PASSWORD="${SOURCE_ACCUMULO_PASSWORD}"
+
+SOURCE_INSTANCE="accumulo"
+DESTINATION_INSTANCE="${SOURCE_INSTANCE}"
+
+SOURCE_ZOOKEEPERS="localhost"
+DESTINATION_ZOOKEEPERS="${SOURCE_ZOOKEEPERS}"
+
+# Accumulo user to be configured on the destination instance
+#REPLICATION_USER="${ACCUMULO_USER}"
+#REPLICATION_PASSWORD="${ACCUMULO_PASSWORD}"
+
+# Control amount and distribution of data written
+NUM_RECORDS=100000000
+MAX_ROW=1000000
+MAX_CF=10
+MAX_CQ=100
+DELETE_PERCENT=0

http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/system/merkle-replication/verify-data.sh
----------------------------------------------------------------------
diff --git a/test/system/merkle-replication/verify-data.sh b/test/system/merkle-replication/verify-data.sh
new file mode 100755
index 0000000..225d892
--- /dev/null
+++ b/test/system/merkle-replication/verify-data.sh
@@ -0,0 +1,91 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Start: Resolve Script Directory
+SOURCE="${BASH_SOURCE[0]}"
+while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink
+   dir=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
+   SOURCE=$(readlink "${SOURCE}")
+   [[ "${SOURCE}" != /* ]] && SOURCE="${dir}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located
+done
+dir=$( cd -P "$( dirname "${SOURCE}" )" && pwd )
+script=$( basename "${SOURCE}" )
+# Stop: Resolve Script Directory
+
+# Guess at ACCUMULO_HOME and ACCUMULO_CONF_DIR if not already defined
+ACCUMULO_HOME=${ACCUMULO_HOME:-"${dir}/../../.."}
+ACCUMULO_CONF_DIR=${ACCUMULO_CONF_DIR:-"$ACCUMULO_HOME/conf"}
+
+# Get the configuration values
+. ./merkle-env.sh
+
+tmpdir=$(mktemp -dt "$0.XXXXXXXXXX")
+
+splits=${tmpdir}/splits
+
+echo 1 >> $splits
+echo 2 >> $splits
+echo 3 >> $splits
+echo 4 >> $splits
+echo 5 >> $splits
+echo 6 >> $splits
+echo 7 >> $splits
+echo 8 >> $splits
+echo 9 >> $splits
+
+commands=${tmpdir}/commands
+
+# Generate leaves of merkle trees for source
+echo "deletetable -f $SOURCE_MERKLE_TABLE_NAME" >> $commands
+echo "createtable $SOURCE_MERKLE_TABLE_NAME" >> $commands
+echo "quit" >> $commands
+
+echo $SOURCE_ACCUMULO_PASSWORD | ${ACCUMULO_HOME}/bin/accumulo shell -u $SOURCE_ACCUMULO_USER -z \
+    $SOURCE_INSTANCE $SOURCE_ZOOKEEPERS -f $commands
+
+echo -e "\nGenerating merkle tree hashes for $SOURCE_TABLE_NAME"
+
+$ACCUMULO_HOME/bin/accumulo org.apache.accumulo.test.replication.merkle.cli.GenerateHashes -t $SOURCE_TABLE_NAME \
+    -o $SOURCE_MERKLE_TABLE_NAME -i $SOURCE_INSTANCE -z $SOURCE_ZOOKEEPERS -u $SOURCE_ACCUMULO_USER \
+    -p $SOURCE_ACCUMULO_PASSWORD -nt 8 -hash MD5 --splits $splits
+
+rm $commands
+
+# Generate leaves of merkle trees for destination
+echo "deletetable -f $DESTINATION_MERKLE_TABLE_NAME" >> $commands
+echo "createtable $DESTINATION_MERKLE_TABLE_NAME" >> $commands
+echo "quit" >> $commands
+
+echo $DESTINATION_ACCUMULO_PASSWORD | ${ACCUMULO_HOME}/bin/accumulo shell -u $DESTINATION_ACCUMULO_USER -z \
+    $DESTINATION_INSTANCE $DESTINATION_ZOOKEEPERS -f $commands
+
+echo -e "\nGenerating merkle tree hashes for $DESTINATION_TABLE_NAME"
+
+$ACCUMULO_HOME/bin/accumulo org.apache.accumulo.test.replication.merkle.cli.GenerateHashes -t $DESTINATION_TABLE_NAME \
+    -o $DESTINATION_MERKLE_TABLE_NAME -i $DESTINATION_INSTANCE -z $DESTINATION_ZOOKEEPERS -u $DESTINATION_ACCUMULO_USER \
+    -p $DESTINATION_ACCUMULO_PASSWORD -nt 8 -hash MD5 --splits $splits
+
+echo -e "\nComputing root hash:"
+
+#Compute root node of merkle tree
+$ACCUMULO_HOME/bin/accumulo org.apache.accumulo.test.replication.merkle.cli.ComputeRootHash -t $SOURCE_MERKLE_TABLE_NAME \
+    -i $SOURCE_INSTANCE -z $SOURCE_ZOOKEEPERS -u $SOURCE_ACCUMULO_USER -p $SOURCE_ACCUMULO_PASSWORD -hash MD5
+
+$ACCUMULO_HOME/bin/accumulo org.apache.accumulo.test.replication.merkle.cli.ComputeRootHash -t $DESTINATION_MERKLE_TABLE_NAME \
+    -i $DESTINATION_INSTANCE -z $DESTINATION_ZOOKEEPERS -u $DESTINATION_ACCUMULO_USER -p $DESTINATION_ACCUMULO_PASSWORD -hash MD5
+
+rm -rf $tmpdir


Mime
View raw message